ReentrantLock 分析

JUC:Java.util.concurrent 是在并发编程中比较常用的工具类,里面包含很多用来在并发场景中使用的组件。比如线程池、阻塞队列、计时器、同步器、并发集合等等。

lock

​ 定义了释放锁和获得锁的抽象方法,在JDK1.5出现,可以解决synchronized在某些场景下的短板,Lock(可重入,可中断,可公平)比synchronized(可重入,不可中断,不可公平)更加灵活。

ReentrantLock

​ 重入锁:当前线程调用lock获取锁之后,如果方法里面再次调用lock,那么不阻塞,增加重入次数就可以了,这样避免线程死锁的发生。

ReentrantReadWriteLock:读写锁,阻塞写,不阻塞读,能够比排它锁提供更好的并发量和吞吐量。

    static HashMap<String, Object> cacheMap = new HashMap<>();
    static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    static Lock read = rwl.readLock();
    static Lock write = rwl.writeLock();

    /**
     *  读读共享,读写互斥
     */
    public static Object get(String key){
        read.lock();
        try {
            return cacheMap.get(key);
        } finally {
            read.unlock();
        }
    }

    public static void put(String key, Object value){
        write.lock();
        try {
            cacheMap.put(key, value);
        } finally {
            write.unlock();
        }
    }

ReentrantLock 实现原理

​ AQS:全称AbstractQueuedSynchronizer,是一个同步工具也是 Lock 用来实现线程同步的核心组件。

​ AQS内部是一个双向链表,里面由竞争锁失败的节点(Node)组成,每个节点都有指向前驱节点和后继节点的两个指针,其中head和tail节点分别代表队列的首和尾。

​ 竞争锁失败和释放锁:

​ 1,竞争锁失败会将当前线程封装成node,然后他的前驱指向原来的队尾,原来的队尾next指向它,通过cas将tail指向新的尾部节点。

​ 2,释放锁会唤醒head指向的节点,该节点前驱指针指向null,head指向该节点的下一个节点。

ReentrantLock 源码分析加锁

我们的源码分析以非公平锁为例

​ 公平锁:严格按照先进先出的规则获取锁。

​ 非公平锁:新进来的线程总要先争抢一下锁,不管当前队列上是否存在线程等待。

时序图

cas:以乐观所得方式做比较替换。

Node节点的状态

状态 描述
CANCELLED 值为1,由于同步队列中等待的线程等待超时或者是被中断,需要从同步队列中取消等待,节点进入该状态不在变换。
SIGNAL 值为-1,后继节点处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行。
CONDITION 值为-2,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal之后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取。
PROPAGATE 值为-3,表示下一次共享式同步状态的获取将会无条件的传播下去。
INITIAL 值为0,初始状态。

1,NonfairSync.lock()

        final void lock() {
            /**
             *      首先进来争抢锁,cas成功表示获得锁,会调用unsafe.compareAndSwapInt,aqs里面
             *  有个属性state
             *      0:代表无锁状态,
             *      大于0:表示已经有线程获得了锁,由于因为ReentrantLock允许重入,所以同一个线程
             *  多次获得同步锁的时候,state会递增,而释放锁的时候,同样state会递减,直到state=0
             *  才允许其它线程获取锁
             *
             */
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1); //cas失败,说明此时state已经不为0,调用acquire(1)走竞争锁逻辑  进入(2)
        }

2,AQS.acquire(int )

        public final void acquire(int arg) {
            /**
             * 1,通过tryAcquire尝试获取独占锁,成功返回true,失败返回false
             * 2,失败会通过addWaiter方法将当前线程封装成Node,添加到AQS的队列尾部
             * 3,acquireQueued 将Node作为参数,通过自旋去尝试获取锁
             */
            if (!tryAcquire(arg) &&
                    acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
                selfInterrupt();
        }

3,NonfairSync.tryAcquire(int )

        //尝试获取锁,成功为true,失败false
		protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

4,Sync.nonfairTryAcquire(int )

        /**
         * 1,获取当前锁的状态,
         * 2,如果state=0,表示无锁状态,通过cas更新state状态的值获得锁
         * 3,当前线程是属于重入,则增加重入次数
         */
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //同一个线程获取锁,增加重入次数
            else if (current == getExclusiveOwnerThread()) {    
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

5,AQS.addWaiter(Node )

        /**
         * 1,将当前线程封装为Node,
         * 2,判断队尾节点是否为空,不为空通过cas将node替换为tail节点
         * 3,为空将节点通过自旋的方式加入AQS队尾
         */
        private AbstractQueuedSynchronizer.Node addWaiter(AbstractQueuedSynchronizer.Node mode) {
            AbstractQueuedSynchronizer.Node node = new AbstractQueuedSynchronizer.Node(Thread.currentThread(), mode);
            AbstractQueuedSynchronizer.Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }

6,AQS.acquireQueued(Node, int)

​ 通过addWaiter将线程添加到AQS队列后,接着会将该节点传入acquireQueued方法去竞争锁

        /**
         * 1,获取当前线程的前驱节点,
         * 2,如果是head节点,那么它就有资格获取锁,调用tryAcquire去争抢锁并且移出原来的head节点,head指向当前节点
         * 3,否则,根据waitstatus去挂起线程
         * 4,通过cancelAcquire取消获得锁的操作
         */
        final boolean acquireQueued(final AbstractQueuedSynchronizer.Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final AbstractQueuedSynchronizer.Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }

LockSupport:Java6引入的一个类,提供了基本的线程同步原语,park和unpark,unpark就像提供许可,park就像等待许可,但是这个许可是一次性的,不可叠加的,既调用unpark多次结果是一样的只够park消费一次。如果线程park阻塞了,这时候调用interrupt()方法会发起中断,然后在调用unpark方法,但是不抛出异常,只调用unpark方法只是会唤起park并不会有中断信号。

ReentrantLock 源码分析释放锁

1,AQS.release(int )

    public final boolean release(int arg) {
        if (tryRelease(arg)) {		//如果释放锁成功
            Node h = head;
            //如果head节点不为空,则唤醒后续节点
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

2,Reentrant.tryRelease(int )

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

3,AQS.unparkSuccessor(Node )

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;		////获得 head 节点的状态
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);	// 设置 head 节点状态为 0

        Node s = node.next;	//得到 head 节点的下一个节点
        //如果下一个节点为 null 或者 status>0 表示 cancelled 状态
        if (s == null || s.waitStatus > 0) {
            s = null;
            //通过从尾部节点开始扫描,找到距离 head 最近的一个waitStatus<=0 的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)	//next 节点不为空,直接唤醒这个线程即可
            LockSupport.unpark(s.thread);
    }
原文地址:https://www.cnblogs.com/gaojf/p/12761242.html