condition学习小结

参考 https://www.cnblogs.com/go2sea/p/5630355.html
建议先了解AQS相关知识,可以参考 https://www.cnblogs.com/lllliuxiaoxia/p/15769401.html

condition创建

Condition在JUC框架下提供了传统Java监视器风格的wait、notify和notifyAll相似的功能。
Condition必须被绑定到一个独占锁上使用。ReentrantLock中获取Condition的方法为:

public Condition newCondition() {
	return sync.newCondition();
}

查看AQS的newCondition()方法,实际调用:

final ConditionObject newCondition() {
	return new ConditionObject();
}

直接初始化并返回了一个AQS提供的ConditionObject对象。因此,Condition实际上是AQS框架的内容。ConditionObject通过维护firstWaiter和lastWaiter来维护Condition等待队列。
通过signal操作将Condition等待队列中的线程移到Sync锁等待队列

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

await解析

firstWaiter和lastWaiter分别是Condition队列的头结点和尾节点。
Condition在调用await方法之前,必须先获取锁,注意,这个锁必须是一个独占锁。
我们先来看一下await中用到的几个方法:

  • addConditionWaiter
    此方法在Condition队列中添加一个等待线程:
private Node addConditionWaiter() {
	Node t = lastWaiter;
	// If lastWaiter is cancelled, clean out.
	if (t != null && t.waitStatus != Node.CONDITION) {
		unlinkCancelledWaiters();
		t = lastWaiter;
	}
	Node node = new Node(Thread.currentThread(), Node.CONDITION);
	if (t == null)
		firstWaiter = node;
	else
		t.nextWaiter = node;
	lastWaiter = node;
	return node;
}

方法先检查一下队列尾节点状态是否还是Condition(如果被signal或者中断,waitStatus会被修改为0或者CANCELLED)。如果尾节点被取消或者中断,调用unlinkCancelledWaiters方法删除Condition队列中被cancel的节点,相当于对等待队列进行一次清扫。然后将当前线程封装在一个Node中,添加到Condition队列的尾部。这里由于我们在操纵Condition队列的时候已经获取了一个独占锁,因此不会发生竞争。
值得注意的是,Condition队列与Sync队列(锁等待队列)有几点不同:①Condition队列是一个单向链表,而Sync队列是一个双向链表;②Sync队列在初始化的时候,会在队列头部添加一个空的dummy节点,它不持有任何线程,而Condition队列初始化时,头结点就开始持有等待线程了。
我们有必要在这里提一下Node对象中的nextWaiter成员、SHARED成员和EXCLUSIVE成员:

/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

Node nextWaiter;

nextWaiter在共享模式下,被设置为SHARED,SHARED为一个final的空节点,用来表示当前模式是共享模式;默认情况下nextWaiter是null,EXCLUSIVE成员是一个final的null,因此默认模式是独占模式。在Condition队列中nextWaiter被用来指向队列里的下一个等待线程。在一个线程从Condition队列中被移除之后,nextWaiter被设置为空(EXCLUSIVE)。这再次表明:Condition必须被绑定在一个独占锁上使用。

  • unlinkCancelledWaiters
private void unlinkCancelledWaiters() {
	Node t = firstWaiter;
	Node trail = null;
	while (t != null) {
		Node next = t.nextWaiter;
		if (t.waitStatus != Node.CONDITION) {
			t.nextWaiter = null;
			if (trail == null)
				firstWaiter = next;
			else
				trail.nextWaiter = next;
			if (next == null)
				lastWaiter = trail;
		}
		else
			trail = t;
		t = next;
	}
}

unlinkCancelledWaiters方法很简单,从头到尾遍历Condition队列,移除状态不为CONDITION的节点(被cancel或被中断的节点)。由于这里我们在操纵Condition队列的时候已经获取了所绑定的独占锁,因此不用担心竞争的发生。

  • fullyRelease
    我们再来看一下fullyRelease方法,这个方法用来释放锁:
final int fullyRelease(Node node) {
	boolean failed = true;
	try {
		int savedState = getState();
		if (release(savedState)) {
			failed = false;
			return savedState;
		} else {
			throw new IllegalMonitorStateException();
		}
	} finally {
		if (failed)
			node.waitStatus = Node.CANCELLED;
	}
}

方法首先获取了state的值,这个值表示可锁被“重入”深度,并调用release释放全部的重入获取,如果成功,返回这个深度,如果失败,要将当前线程的waitStatus设置为CANCELLED。

  • isOnSyncQueue
    我们再来看一下isOnSyncQueue方法,这个方法返节点是否在Sync队列中等待锁:
final boolean isOnSyncQueue(Node node) {
	if (node.waitStatus == Node.CONDITION || node.prev == null)
		return false;
	if (node.next != null) // If has successor, it must be on queue
		return true;
	/*
	 * node.prev can be non-null, but not yet on queue because
	 * the CAS to place it on queue can fail. So we have to
	 * traverse from tail to make sure it actually made it.  It
	 * will always be near the tail in calls to this method, and
	 * unless the CAS failed (which is unlikely), it will be
	 * there, so we hardly ever traverse much.
	 */
	return findNodeFromTail(node);
}

node从Condition队列移除的第一步,就是设置waitStatus为其他值,因此是否等于Node.CONDITON可以作为判断标志,如果等于,说明还在Condition队列中,即不再Sync队列里。在node被放入Sync队列时,第一步就是设置node的prev为当前获取到的尾节点,所以如果发现node的prev为null的话,可以确定node尚未被加入Sync队列。
相似的,node被放入Sync队列的最后一步是设置node的next,如果发现node的next不为null,说明已经完成了放入Sync队列的过程,因此可以返回true。
当我们执行完两个if而仍未返回时,node的prev一定不为null,next一定为null,这个时候可以认为node正处于放入Sync队列的执行CAS操作执行过程中。而这个CAS操作有可能失败,因此我们再给node一次机会,调用findNodeFromTail来检测:

  • findNodeFromTail
private boolean findNodeFromTail(Node node) {
	Node t = tail;
	for (;;) {
		if (t == node)
			return true;
		if (t == null)
			return false;
		t = t.prev;
	}
}

findNodeFromTail方法从尾部遍历Sync队列,如果检查node是否在队列中,如果还不在,此时node也许在CAS自旋中,在不久的将来可能会进到Sync队列里。但我们已经等不了了,直接放回false。

  • checkInterruptWhileWaiting
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT =  1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE    = -1;

/**
 * Checks for interrupt, returning THROW_IE if interrupted
 * before signalled, REINTERRUPT if after signalled, or
 * 0 if not interrupted.
 */
private int checkInterruptWhileWaiting(Node node) {
	return Thread.interrupted() ?
		(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
		0;
}

此方法在线程从park中醒来后调用,它的返回值有三种:0代表在park过程中没有发生中断;THORW_IE(1)代表发生了中断,且在后续我们需要抛出中断异常;REINTERRUPT(-1)表示发生了中断,但在后续我们不抛出中断异常,而是“补上”这次中断。当没有发生中断时,我们返回0即可,当中断发生时,返回THROW_IE or REINTERRUPT由transferAfterCancelledWait方法判断:

  • transferAfterCancelledWait
final boolean transferAfterCancelledWait(Node node) {
	if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
		enq(node);
		return true;
	}
	/*
	 * If we lost out to a signal(), then we can't proceed
	 * until it finishes its enq().  Cancelling during an
	 * incomplete transfer is both rare and transient, so just
	 * spin.
	 */
	while (!isOnSyncQueue(node))
		Thread.yield();
	return false;
}

transferAfterCancelledWait方法并不在ConditionObject中定义,而是由AQS提供。这个方法根据是否中断发生时,是否有signal操作来“掺和”来返回结果。方法调用CAS操作将node的waitStatus从CONDITION设置为0,如果成功,说明当中断发生时,说明没有signal发生(signal的第一步是将node的waitStatus设置为0),在调用enq将线程放入Sync队列后直接返回true,表示中断先于signal发生,即中断在await等待过程中发生,根据await的语义,在遇到中断时需要抛出中断异常,返回true告诉上层方法返回THROW_IT,后续会根据这个返回值做抛出中断异常的处理。此时的变化流程为(中断->interrupted检测->CAS)

如果CAS操作失败,是否说明中断后于signal发生呢?只能说这时候我们不能确定中断和signal到底谁先发生,只是在我们做CAS操作的时候,他们俩已经都发生了(中断->interrupted检测->signal->CAS,或者signal->中断->interrupted检测->CAS都有可能),这时候我们无法判断到底顺序是怎样,这里的处理是不管怎样都返回false告诉上层方法返回REINTERRUPT,当做是signal先发生(线程被signal唤醒)来处理,后续根据这个返回值做“补上”中断的处理。在返回false之前,我们要先做一下等待,直到当前线程被成功放入Sync锁等待队列。因为这里认为是先signal后中断,所以把node放到Sync同步队列上。

因此,我们可以这样总结:transferAfterCancelledWait的返回值表示了线程是否因为中断从park中唤醒。

  • await
public final void await() throws InterruptedException {
	if (Thread.interrupted())
		throw new InterruptedException();
	Node node = addConditionWaiter();
	int savedState = fullyRelease(node);
	int interruptMode = 0;
	while (!isOnSyncQueue(node)) {
		LockSupport.park(this);
		if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
			break;
	}
	if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
		interruptMode = REINTERRUPT;
	if (node.nextWaiter != null) // clean up if cancelled
		unlinkCancelledWaiters();
	if (interruptMode != 0)
		reportInterruptAfterWait(interruptMode);
}

await方法是及时响应中断的。它首先检查了一下中断标志。然后调用addConditionWaiter将当前线程放入Condition队列的尾,并顺手清理了一下队列里的无用节点。紧接着调用fullyRelease方法释放当前线程持有的锁。然后是一个while循环,这个循环会循环检测线程的状态,直到线程被signal或者中断唤醒且被放入Sync锁等待队列。如果中断发生的话,还需要调用checkInterruptWhileWaiting方法,根据中断发生的时机确定后去处理这次中断的方式,如果发生中断,退出while循环。

退出while循环后,我们调用acquireQueued方法来获取锁,注意,acquireQueued方法的返回值表示在等待获取锁的过程中是否发生中断,如果发生中断 且 原来没有需要做抛出处理的中断发生时,我们将后续处理方式设置为REINTERRUPT(如果原来在await状态有中断发生,即interrruptMode==THROW_IE,依然保持THROW_IE)。

如果是应为中断从park中唤醒(interruptMode==THROT_IE),当前线程仍在Condition队列中,但waitStatus已经变成0了,这里在调用unlinkCancelledWaiters做一次清理。

最后,根据interruptMode的值,调用reportInterruptAfterWait做出相应处理:

private void reportInterruptAfterWait(int interruptMode)
	throws InterruptedException {
	if (interruptMode == THROW_IE)
		throw new InterruptedException();
	else if (interruptMode == REINTERRUPT)
		selfInterrupt();
}

如果interruptMod==0,donothing,如果是THROW_IE,说明在await状态下发生中断,抛出中断异常,如果是REINTERRUPT,说明是signal“掺和”了中断,我们无法分辨具体的先后顺序,于是统一按照先signal再中断来处理,即成功获取锁之后要调用selfInterrupt“补上”这次中断。

signal

signal 唤醒Condition队列的头节点持有的线程

public final void signal() {
	if (!isHeldExclusively())
		throw new IllegalMonitorStateException();
	Node first = firstWaiter;
	if (first != null)
		doSignal(first);
}

调用signal之前也需要获取锁,因此signal方法首先检测了一下当前线程是否获取了独占锁。然后调用doSignal唤醒队列中第一个等待线程。注意,这里的“唤醒”意思是将线程从Condition队列移到Sync队列,表示已经完成Condition的等待,具有了去竞争锁的资格。至此,我们可以发现,由于await会直接把线程放入Condition等待队列的尾部,因此Condition是公平的,即按照入列的顺序来signal。

private void doSignal(Node first) {
	do {
		if ( (firstWaiter = first.nextWaiter) == null)
			lastWaiter = null;
		first.nextWaiter = null;
	} while (!transferForSignal(first) &&
			 (first = firstWaiter) != null);
}

doSignal方法先将first节点从队列中摘下,然后调用transferForSignal去改变first节点的waitStatus(所谓唤醒线程),这个方法有可能失败,因为等待线程可能已经到时或者被中断,因此while循环这个操作直到成功唤醒或队列为空。我们来看下transferForSignal方法:

final boolean transferForSignal(Node node) {
	/*
	 * If cannot change waitStatus, the node has been cancelled.
	 */
	if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
		return false;

	/*
	 * Splice onto queue and try to set waitStatus of predecessor to
	 * indicate that thread is (probably) waiting. If cancelled or
	 * attempt to set waitStatus fails, wake up to resync (in which
	 * case the waitStatus can be transiently and harmlessly wrong).
	 */
	Node p = enq(node);
	int ws = p.waitStatus;
	if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
		LockSupport.unpark(node.thread);
	return true;
}

这个方法并不在ConditionObject中定义,而是由AQS提供。方法首先调用CAS操作修改node的waitStatus,如果失败,表示线程已经放弃等待(到时或被中断),直接返回false。如果成功,调用enq方法将它放入Sync锁等待队列,返回值p是node在Sync队列中的前驱节点。检测一下前驱p的waitStatus,如果waitStatus为CANCELLED或者无法将前驱p的waitStatus设为SIGNAL,则直接唤醒node持有的线程,即unpark,否则doSignal方法循环退出,node等待前驱p唤醒。这里必须搞清楚,node线程是在哪里park的,显然,他还在await方法的那个while循环里。unpark之后,node线程将会从while循环中退出,然后去调用acquireQueued方法,这个方法是一个自旋,弄得线程会在自旋过程中清除已经为CANCELLED状态的前驱,然后注册前驱节点的waitStatus为SIGNAL。

至此,signal方法已经完成了所有该做的,“唤醒”的线程已经成功加入Sync队列并已经参与锁的竞争了,返回true。

signalAll

signalAll 唤醒Condition队列的所有等待线程

public final void signalAll() {
	if (!isHeldExclusively())
		throw new IllegalMonitorStateException();
	Node first = firstWaiter;
	if (first != null)
		doSignalAll(first);
}

signalAll方法同样先检测是否持有独占锁,然后对奥用doSignalAll方法:

private void doSignalAll(Node first) {
	lastWaiter = firstWaiter = null;
	do {
		Node next = first.nextWaiter;
		first.nextWaiter = null;
		transferForSignal(first);
		first = next;
	} while (first != null);
}

doSignalAll方法循环调用transferForSignal方法“唤醒”队列的头结点,直到队列为空。

总结:ConditionObject由AQS提供,它实现了类似wiat、notify和notifyAll类似的功能。Condition必须与一个独占锁绑定使用,在await或signal之前必须现持有独占锁。Condition队列是一个单向链表,他是公平的,按照先进先出的顺序从队列中被“唤醒”,所谓唤醒指的是完成Condition对象上的等待,被移到Sync锁等待队列中,有参与竞争锁的资格(Sync队列有公平&非公平两种模式,注意区别)。

原文地址:https://www.cnblogs.com/lllliuxiaoxia/p/15772582.html