Java多线程:Condition条件

Condition的实现分析

Condition是同步器AbstractQueuedSynchronized的内部类,因为Condition的操作需要获取相关的锁,所以作为同步器(Lock)的内部类比较合理。Object类中的wait()、notify()、notifyAll()方法实现了线程之间的通信,而Condition类中的await()、signal()、signalAll()方法也实现了相似的功能。每个Condition对象都包含着一个队列(等待队列),该队列是Condition对象实现等待/通知功能的关键。

等待队列:

之前我们介绍AQS的时候说过,AQS的同步排队用了一个隐式的双向队列,同步队列的每个节点是一个AbstractQueuedSynchronizer.Node实例。

Node的主要字段有:

  1. waitStatus:等待状态,所有的状态见下面的表格。
  2. prev:前驱节点
  3. next:后继节点
  4. thread:当前节点代表的线程
  5. nextWaiter:Node既可以作为同步队列节点使用,也可以作为Condition的等待队列节点使用(将会在后面讲Condition时讲到)。在作为同步队列节点时,nextWaiter可能有两个值:EXCLUSIVE、SHARED标识当前节点是独占模式还是共享模式;在作为等待队列节点使用时,nextWaiter保存后继节点。
状态 值   含义
  CANCELLED    1 当前节点因为超时或中断被取消同步状态获取,该节点进入该状态不会再变化
SIGNAL -1 标识后继的节点处于阻塞状态,当前节点在释放同步状态或被取消时,需要通知后继节点继续运行。每个节点在阻塞前,需要标记其前驱节点的状态为SIGNAL。
CONDITION -2 标识当前节点是作为等待队列节点使用的。
PROPAGATE -3  
0 0 初始状态

Condition实现等待的时候内部也有一个等待队列,等待队列是一个隐式的单向队列,等待队列中的每一个节点也是一个AbstractQueuedSynchronizer.Node实例。每个Condition对象中保存了firstWaiter和lastWaiter作为队列首节点和尾节点,每个节点使用Node.nextWaiter保存下一个节点的引用,因此等待队列是一个单向队列。每当一个线程调用Condition.await()方法,那么该线程会释放锁,构造成一个Node节点加入到等待队列的队尾。

等待队列是一个FIFO的队列,队列的每一个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了await()方法,该线程就会释放锁、构造成节点进入等待队列并进入等待状态。

这里的节点定义也就是AbstractQueuedSynchronizer.Node的定义。

一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()方法时,将会以当前线程构造节点,并将节点从尾部加入等待队列。

在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而Lock(同步器)拥有一个同步队列和多个等待队列。

注意:等待队列是单向队列而同步队列是双向队列。

我们先看一下Condition类图

 

等待(await):

以 ReentrantLock 类为例,创建一个Condition对象步骤如下:

1 Lock lock = new ReentrantLock();
2 Condition condition1 = lock.newCondition();

 其实最终创建的是 AQS 中的内部类 ConditionObject 类

1 public Condition newCondition() {
2     return sync.newCondition();
3 }
4 
5 final ConditionObject newCondition() {
6     return new ConditionObject();
7 }

而 await() 方法正是 ConditionObject 类中的方法,源码如下:

 1 public final void await() throws InterruptedException {
 2     // 如果线程被中断过,则直接抛出异常并清除中断位
 3     if (Thread.interrupted())
 4         throw new InterruptedException();
 5     // 添加至等待队列中
 6     Node node = addConditionWaiter();
 7     // 释放同步状态,释放锁
 8     long savedState = fullyRelease(node);
 9     int interruptMode = 0;
10     // 判断当前节点是否在同步队列当中
11     while (!isOnSyncQueue(node)) {
12         LockSupport.park(this);
13         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
14             break;
15     }
16     // 节点加入到同步队列中,等待前面的节点执行,记录当前节点代表的线程是否中断
17     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
18         interruptMode = REINTERRUPT;
19     if (node.nextWaiter != null) // clean up if cancelled
20         unlinkCancelledWaiters();
21     if (interruptMode != 0)
22         reportInterruptAfterWait(interruptMode);
23 }

 说明:

1. 调用await()会判断当前线程是否在调用前被中断过,如果中断过则直接抛出异常

2. addConditionWaiter() 作用是构建一个新的代表当前线程的节点加入到等待队列的队尾并返回此节点,其源码如下:

 1 private Node addConditionWaiter() {
 2     Node t = lastWaiter;
 3     // 如果队尾节点取消了,将它清除,取它前驱做为新队尾
 4     if (t != null && t.waitStatus != Node.CONDITION) {
 5         unlinkCancelledWaiters();
 6         t = lastWaiter;
 7     }
 8     // 创建代表当前线程的节点,状态设为 CONDITION,链接到等待队列尾部
 9     Node node = new Node(Thread.currentThread(), Node.CONDITION);
10     if (t == null)
11         firstWaiter = node;
12     else
13         t.nextWaiter = node;
14     lastWaiter = node;
15     return node;
16 }

3. fullyRelease() 方法功能如同他名字,就是释放当前线程的独占锁,而且不管重入数为多少,都把 state 释放为0。

我们知道unlock() 方法其实底层调用的就是 release(1),所以 release() 方法就是释放锁的方法,具体源码分析见 AQS源码分析

 1 final int fullyRelease(Node node) {
 2     boolean failed = true;
 3     try {
 4         // 获取锁的状态,即姑且可以认为是重入数
 5         int savedState = getState();
 6         // 释放当前的独占锁,并重置锁的状态为0
 7         if (release(savedState)) {
 8             failed = false;
 9             return savedState;
10         } else {
11             throw new IllegalMonitorStateException();
12         }
13     } finally {
14         // 释放失败则将节点状态设置为CANCELLED
15         if (failed)
16             node.waitStatus = Node.CANCELLED;
17     }
18 }

4. isOnSyncQueue() 方法作用是判断当前节点是否在同步队列当中,如果没有则进入到自旋当中,调用 park() 方法将当前线程阻塞住。

 1 final boolean isOnSyncQueue(Node node) {
 2     //如果当前节点状态是CONDITION或node.prev是null,则证明当前节点在等待队列上而不是同步队列上。
 3     //之所以可以用node.prev来判断,是因为一个节点如果要加入同步队列,在加入前就会设置好prev字段。
 4     if (node.waitStatus == Node.CONDITION || node.prev == null)
 5         return false;
 6     //如果node.next不为null,则一定在同步队列上,因为node.next是在节点加入同步队列后设置的
 7     if (node.next != null) // If has successor, it must be on queue
 8         return true;
 9     return findNodeFromTail(node); //前面的两个判断没有返回的话,就从同步队列队尾遍历一个一个看是不是存在当前节点。
10 }
11 
12 private boolean findNodeFromTail(Node node) {
13     Node t = tail;
14     for (;;) {
15         if (t == node)
16             return true;
17         if (t == null)
18             return false;
19         t = t.prev;
20     }
21 } 

5. 首先我们需要知道,LockSupport.park() 方法是会响应中断的。在线程阻塞的时候如果发生了中断,线程会脱离阻塞状态,但因为词语句处于自旋语句中,所以如果当前线程还是没有在同步队列当中会接着阻塞,interruptMode 是用来判断中断类型的。

1 // 判断是否发生过中断,如果在 signalled 前中断,即调用sigal()前,则返回 THROW_IE,
2 // 如果在 signalled 之后中断则返回 REINTERRUPT,没有中断返回 0
3 private int checkInterruptWhileWaiting(Node node) {
4     return Thread.interrupted() ?
5         (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
6         0;
7 }

6. 当退出上面自旋说明当前节点已经存在于同步队列中了,但是当前节点不一定处于队首。acquireQueued()的作用就是逐步的去执行CLH队列的线程,如果当前线程获取到了锁,则返回;否则,当前线程进行阻塞,直到唤醒并重新获取锁了才返回。

7. 根据 interruptMode 来确定对的处理中断,如果为 THROW_IE 即 signal 前中断会抛出 InterruptedException,如果为 REINTERRUPT 即 signal 后中断则会通过 selfInterrupt() 将中断状态被设置成true(因为调用 acquireQueued() 会调用 interrupted() 清除中断状态)。具体代码详见AQS源码分析

 1 // 判断是否发生过中断,如果在 signalled 前中断,即调用sigal()前,则返回 THROW_IE,
 2 // 如果在 signalled 之后中断则返回 REINTERRUPT,没有中断返回 0
 3 private int checkInterruptWhileWaiting(Node node) {
 4     return Thread.interrupted() ?
 5         (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
 6         0;
 7 }
 8 
 9 private void reportInterruptAfterWait(int interruptMode)
10     throws InterruptedException {
11     if (interruptMode == THROW_IE)
12         throw new InterruptedException();
13     else if (interruptMode == REINTERRUPT)
14         selfInterrupt(); // 恢复中断状态
15 }

小结:

调用Condition的await()方法,会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。

从队列的角度来看,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。

当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过Condition.signal()方法唤醒,而是对等待线程进行中断,则抛出InterruptedException。

通知(signal):

signal() 的作用与 notify() 作用类似,用于唤醒处于 await() 等待状态下的线程。

调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。

Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,将等待队列中的节点全部移动到同步队列中,并唤醒每个节点的线程。

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException(); // 如果同步状态不是被当前线程独占,直接抛异常。
    Node first = firstWaiter;
    if (first != null)
        doSignal(first); // 通知等待队列队首的节点
}

public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

所以,从上面的源码可以看出,Condition 只能配合独占类同步组件使用

1 private void doSignal(Node first) {
2     do {
3         if ( (firstWaiter = first.nextWaiter) == null)
4             lastWaiter = null;
5         first.nextWaiter = null;
6         //transferForSignal方法尝试唤醒等待队列队首节点,如果唤醒失败,则继续尝试唤醒后继节点。
7     } while (!transferForSignal(first) &&
8             (first = firstWaiter) != null);
9 }
 1 final boolean transferForSignal(Node node) {
 2     //如果当前节点状态为CONDITION,则将状态改为0准备加入同步队列;如果当前状态不为CONDITION,
 3     //说明该节点等待已被中断,则该方法返回false,doSignal()方法会继续尝试唤醒当前节点的后继节点
 4     if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
 5         return false;
 6 
 7     Node p = enq(node);  //将节点加入同步队列,返回的p是节点在同步队列中的先驱节点
 8     int ws = p.waitStatus;
 9     //如果先驱节点的状态为CANCELLED(>0) 或设置先驱节点的状态为SIGNAL失败,那么就立即唤醒当前节点对应的线程,
10     //线程被唤醒后会执行acquireQueued方法,该方法会从当前节点往前遍历找到最近的状态为SIGNAL的节点排在它后面并再次park线程,
11     //如果当前设置前驱节点状态为SIGNAL成功,那么就不需要马上唤醒线程了,
12     //当它的前驱节点成为同步队列的首节点且释放同步状态后,会自动唤醒它。(调用unlock()方法会在底层掉unparkSuccessor()方法将后继节点唤醒)
13     //即使立刻唤醒了线程也会在acquireQueued方法处阻塞住,因为当前线程并没有获得锁,前面线程调用unlock之后才会获得锁
14     if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
15         LockSupport.unpark(node.thread);
16     return true;
17 }

小结

总的来说,Condition的本质就是等待队列和同步队列的交互:

当一个持有锁的线程调用Condition.await()时,它会执行以下步骤:

  1. 构造一个新的等待队列节点加入到等待队列队尾
  2. 释放锁,也就是将它的同步队列节点从同步队列队首移除
  3. 自旋,直到它在等待队列上的节点移动到了同步队列(通过其他线程调用signal())或被中断
  4. 阻塞当前节点,直到它获取到了锁,也就是它在同步队列上的节点排队排到了队首。

当一个持有锁的线程调用Condition.signal()时,它会执行以下操作:

从等待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点CANCELLED,就尝试唤醒下一个节点;如果再CANCELLED则继续迭代。

对每个节点执行唤醒操作时,首先将节点加入同步队列,此时await()操作的步骤3的解锁条件就已经开启了。然后分两种情况讨论:

  1. 如果先驱节点的状态为CANCELLED(>0) 或设置先驱节点的状态为SIGNAL失败,那么就立即唤醒当前节点对应的线程,此时await()方法就会完成步骤3,进入步骤4.
  2. 如果成功把先驱节点的状态设置为了SIGNAL,那么就不立即唤醒了。等到先驱节点成为同步队列首节点并释放了同步状态后,会自动唤醒当前节点对应线程的,这时候await()的步骤3才执行完成,而且有很大概率快速完成步骤4.

总结

  最后我们再来看一下整个 await-signal 执行流程总结。我们知道AQS自己维护的队列是当前等待资源的队列,AQS会在资源被释放后,依次唤醒队列中从前到后的所有节点,使他们对应的线程恢复执行。直到队列为空。而Condition自己也维护了一个队列,该队列的作用是维护一个等待signal信号的队列,两个队列的作用是不同,事实上,每个线程也仅仅会同时存在以上两个队列中的一个,流程是这样的:

1. 线程1调用reentrantLock.lock时,线程被加入到AQS的同步队列中。

2. 线程1调用await方法被调用时,该线程从AQS中移除,对应操作是锁的释放。

3. 接着马上被加入到Condition的等待队列中,阻塞住等待 signal 信号。

4. 线程2,因为线程1释放锁的关系,被唤醒,并判断可以获取锁,于是线程2获取锁,并被加入到AQS的同步队列中。

5.  线程2调用signal方法,这个时候Condition的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS的等待队列中。  注意,这个时候,线程并没有被唤醒。

6. signal方法执行完毕,线程2调用reentrantLock.unLock()方法,释放锁。这个时候因为AQS中只有线程1,于是,AQS释放锁后按从头到尾的顺序唤醒线程时,线程1被唤醒,于是线程1回复执行。

7. 直到释放所整个过程执行完毕。

可以看到,整个协作过程是靠结点在AQS的同步队列和Condition的等待队列中来回移动实现的,Condition作为一个条件类,很好的自己维护了一个等待信号的队列,并在适时的时候将结点加入到AQS的同步队列中来实现的唤醒操作。

参考资料:

Java显式锁学习总结之六:Condition源码分析

 

原文地址:https://www.cnblogs.com/2015110615L/p/6782128.html