从ReentrantLock实例分析AbstractQueuedSynchronizer和ConditionObject

1.实例:3个线程交替打印1,2,3一定次数

代码如下:

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TestReentrantLock {

    private static final int times = 12;
    private static int count = 0;

    public static void main(String[] args) {
        // 实现1,2,3交替打印times次
        Lock lock = new ReentrantLock();
        Condition condition1 = lock.newCondition();
        Condition condition2 = lock.newCondition();
        Condition condition3 = lock.newCondition();

        Thread print1 = new Thread(() -> {
            lock.lock();
            for (int i = 0; i < times; i++) {
                try {
                    while (count % 3 != 0)
                        condition3.await();
                    print(1);
                    count++;
                    condition1.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            lock.unlock();
        });

        Thread print2 = new Thread(() -> {
            lock.lock();
            for (int i = 0; i < times; i++) {
                try {
                    while (count % 3 != 1)
                        condition1.await();
                    print(2);
                    count++;
                    condition2.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            lock.unlock();
        });

        Thread print3 = new Thread(() -> {
            lock.lock();
            for (int i = 0; i < times; i++) {
                try {
                    while (count % 3 != 2)
                        condition2.await();
                    print(3);
                    count++;
                    condition3.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            lock.unlock();
        });

        print1.start();
        print2.start();
        print3.start();
    }

    private static void print(int num) {
        System.out.print(num);
    }
}

运行结果:

 2.源码分析

首先3个线程启动后会执行lock方法,这个方法底层是AQS实现的。

ReentrantLock默认非公平锁,所以lock方法会首先尝试通过CAS直接获取锁,如果获取失败执行acquire(1)函数。

这里只有一个线程会获取成功,假设是线程2,那么此时state=1,exclusiveOwnerThread=Thread-1,队列的头尾指针都为null。

然后针对线程2,会进入循环开始准备打印,但此时由于1还没有打印,所以会执行condition1.await()这行;

具体进入源码,此时会执行addConditionWaiter()这个函数,将创建一个Node(thread-1,waitStatus=CONDITION)并加入condition1的队列中并返回这个节点。

然后会执行fullyRelease(node),具体是调用release(savedState)彻底释放锁,所以这里saveState是state的值。

            这里release会首先调用tryRelease方法尝试释放,这个方法是由ReentrantLock中的Sync类实现的。

                          这里会将state置为0,exclusiveOwnerThread置为null,返回true。

           然后当前锁队列为空,release函数返回true。

这时fullyRelease返回savedState即1。

然后会进入循环判断node是否在锁队列中,这里显然不在,所以进入循环。

然后会执行LockSupport.park(this)挂起当前线程。

这个时候lock对象state=0,exclusiveOwnerThread=null,队列也为空。

condition1队列如下图,condition2,condition3都为空。

假设现在到thread1执行,首先通过CAS获取锁,值state=1,exclusiveOwnerThread=thread-0。

然后不进入循环,直接打印1,将count加1,然后执行condition1.signal()。

这里会执行doSignal方法,将condition1队列的那个节点作为参数。

  在doSignal方法中,会将节点从队列移除,此时condition1队列为空。

     然后会执行transferForSignal方法,这里会调用enq方法将节点加入锁队列,即sync队列。

                   调用结束后sync队列如下图所示,然后transferForSignal方法返回true。

        

然后doSingal方法结束,signal方法也结束。

此时锁还是线程1持有,3个条件队列都为空,然后进入下一次循环,此时count=1,需要执行condition3.await()。

首先创建节点加入condition3的队列,然后调用fullyRelease方法释放锁。fullyRelease会调用release方法释放锁,

  这里release在执行完tryRelease后,与上一次不同的是这次sync队列不为空,会执行unparkSuccessor函数,传入参数是head。

    这个函数会选择第一个合适的节点进行唤醒,这里就是唤醒了线程2。

  唤醒后整个fullyRelease方法结束。后面由于线程1不在sync队列,会被挂起。

这个时候lock对象state=0,exclusiveOwnerThread=null,sync队列如下图。

 然后conditon3队列中存着线程1节点,condition1,condition2都为空。

线程1被挂起,线程2被唤醒。

如果这时线程3被执行,那么通过CAS得到了锁,lock对象state=1,exclusiveOwnerThread=thread-2。

那么这时对于线程2,在执行acquireQueued方法时调用tryRequire就会失败,然后会去调用shouldParkAfterFailedAcquire方法,

方法参数p为head,node为线程2对应节点。

这个方法将head节点的waitStatus置为SIGNAL,返回false。

这时会进行新的一次循环,这次在调用shouldParkAfterFailedAcquire时会返回true,这时会执行方法parkAndCheckInterrupt()。

这个方法会挂起当前线程,即线程2。

对于线程3这时由于count=1,所以执行condition2.await()方法。

首先创建节点加入condition2队列,然后调用fullyRelease释放锁,通过release函数调用tryRelease成功后,lock对象state=0,exclusiveOwnerThread=null。

然后会执行unparkSuccessor函数,重新唤醒线程2。再然后线程3由于不在sync队列,所以被挂起。

然后线程2这时执行acquireQueued方法,成功获取到了锁,将从sync队列中删去这个节点,此时lock对象state=1,exclusiveOwnerThread=thread-1。

condition1队列为空,condition2,condition3队列分别存储线程1和线程3对应的节点。

线程2获取到锁后,await方法执行结束。此时count=1跳出循环,然后打印2,count加1,调用condition2.signal()。

这里过程与之前线程1执行condition1.signal()类似,将线程3对应节点加入sync队列,并从condition2队列中移除。

然后是不满足循环条件,执行condition1.await()函数,首先将创建节点加入condition1队列,然后fullyRelease释放锁,再通过unparkSuccessor唤醒线程3。

最后由于线程2不在sync队列,被挂起。

这时线程3与线程2类似,先通过acquireQueued拿到锁并将节点从sync队列移除,condition2.await方法执行结束,然后跳出循环,打印3,然后count++,再执行

condition3.signal方法将线程1节点从condition3队列删除,加入sync队列。

然后再次循环不满足条件,执行condition2.await方法,将线程3节点加入condition2队列,然后通过tryRelease释放锁,通过unparkSuccessor唤醒线程1,自身由于不在

sync队列被挂起。

后续过程与以上类似,不再分析。

通过分析可以看出,在调用condition.signal()时,只是将condition队列上的第一个节点移到了sync队列,并不释放锁;

condition.await()会将当前线程的节点加入条件队列,然后释放锁,释放后如果有线程在sync队列就进行唤醒第一个合适的。之后会因为不在sync队列而被挂起。

原文地址:https://www.cnblogs.com/csdeblog/p/11442449.html