Intrinsic VS explicity
1. 不一定保证公平 1. 提供公平和非公平的选择
2. 无 2. 提供超时的功能
3. 无 3. 提供对中断的响应
4. 无 4. 提供try... 的方法
5. 保证获取锁和释放锁的一致性 5. 需要自己保证
6. 只有一个conditionQueue 6. 可以有多个conditonQueue
7. 使用灵活
锁: 一个带有内部状态的类
自定义同步协议需要注意的点:
- 获取锁的时候, 状态如何变化
- 释放锁的时候, 状态如何变化, 已经状态变化后是都对别的等待方有影响
AQS的作用
- 提供对获取锁成功或失败后, 线程的管理, 比如是否进行阻塞
- 当释放锁后, 提供对等待的线程的通知功能
- 提供给用户对状态进行自定义处理的接口
AQS的使用
- AQS依赖于一个FIFO队列和一个int表示的状态. 具体的实现类可以自己去保存别的状态, 但是只有这个int表示的状态是require和release的, 子类自己的状态是做辅助作用的.
2. 想要使用AQS来作为一个同步器的基类,需要实现下面这几个方法.
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively
(注意, 一定要保证线程安全)
3. 可以通过这些方法来对状态进行监控和修改
getState
setState
compareAndSetState
源码分析
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
/**addWaiter()将新node加到队列的末尾, 如果队列没有初始化的话,进行初始化操作(如果head节点是null的话,创建一个新的node当head节点)
acquireQueued() 如果当前节点是head的下一个节点,进行获取state操作,成功后不进行阻塞. 否则,在链表中向前寻找,直到找到一个没有被取消的节点,把当前节点作为找到的节点的下一个节点,然后把当前节点阻塞.
cancelAcquire() 把当前节点的状态改为取消状态,然后向前找,直到找到一个没有被取消的节点,如果这个节点不是头节点,而且没有被取消,则把当前节点的下一个节点作为找到的这个节点的子节点. 否则,找到当前节点的下一个没有被取消的节点,然后把它唤醒.
**/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 找到下一个没有被取消的节点,然后把他唤醒.
unparkSuccessor(h);
return true;
}
return false;
}
Release操作不会去操作队列,只会去唤醒后边的没有取消的节点.而且release后会把当前节点的状态改为0.
Head节点可能是初始化的那个new node()节点, 也有可能是获取状态成功的那个节点.
所以,即使头节点的状态是0, 后续来的节点也会把头节点的状态改为Signal, 除非后面已经没有等待的节点了.
就算中间的节点可能因为某种因素导致链条没有接上,也会从tail开始搜索, 来保证如果有是一定可以找到的. 就算因为竞争的问题, 导致新加入的节点没有接收到前面节点的通知,但是,这种情况下, 前面的节点,除了头节点,都已经被取消了, 所以在enq()操作的时候, 会直接去tryAcquire,也不会导致整个操作有问题. 还有一个保障措施是只有当前面的节点是signal的时候, 才会安心的去block,而在reslease这边,会把这个状态改成0,然后再去唤醒后边的几点.
需要注意的是, 如果我们的release操作和acquire操作中,对state的状态改动不一致,会导致死锁.
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
// 和非中断的操作的区别是, 当当前节点被唤醒的时候, 如果被中断过,会抛出中断异常
doAcquireInterruptibly(arg);
}
/**
* Attempts to acquire in exclusive mode, aborting if interrupted,
* and failing if the given timeout elapses. Implemented by first
* checking interrupt status, then invoking at least once {@link
* #tryAcquire}, returning on success. Otherwise, the thread is
* queued, possibly repeatedly blocking and unblocking, invoking
* {@link #tryAcquire} until success or the thread is interrupted
* or the timeout elapses. This method can be used to implement
* method {@link Lock#tryLock(long, TimeUnit)}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @param nanosTimeout the maximum number of nanoseconds to wait
* @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
// 和正常的区别是, 在shouldpark后, 会判断时间的长短,短的话, 会进行自旋. 长的话, 会调用parkNanos()进行阻塞
doAcquireNanos(arg, nanosTimeout);
}
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
// 和正常的区别是, 在成功获取状态后,在setHead后,如果tryAcquire返回的值大于0,而且下一个节点也是shared的节点,且当前节点的状态是signal. 则唤醒下一个节点. 如果当前节点的状态是0,则改为PROPAGATE
doAcquireShared(arg);
}
/**
* Release action for shared mode -- signal successor and ensure
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
和release的区别是, 如果head的状态是0 , 会改为PROPAGATE
private void doReleaseShared() {
/*
Condition
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with
* saved state as argument, throwing
* IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1如果最后一个节点被取消了, 就把整个链表中取消的节点都删掉
// 2 把这个节点加到链表的最后, 初始状态是condition
Node node = addConditionWaiter();
// 调用release操作, 把当前获取锁的node release掉(注意, 并没有保证conditionNode 和 当前持有锁的线程的node的对应)
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断这个node是否在SyncQueue 中
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 被唤醒或被中断
if ((interruptMode =
// 如果当前线程没有被中断过,break, 但是中断后可能会很诡异
否则判断是否在signal之前取消了,如果是, 把这个node转到SyncQueue中, 状态从condition转到0,
如果状态不是condition, 一直等到这个节点被放到syncQueue中,然后返回false
checkInterruptWhileWaiting(node)) != 0)
break;
}
// 获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
// 看下是否被中断过, 然后对中断做响应.
reportInterruptAfterWait(interruptMode);
}
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
// 所以使用condition必须实现isHeldExclusively 这个方法
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
// 1. 把节点的状态从condition转到0, 如果失败, 返回false
2. 把这个节点插入到syncQueue中, 把这个节点的上一个节点置为signal. 如果上个节点是取消状态或者置为signal的操作失败,把这个节点的线程唤醒. 返回true. 这时这个节点已经在syncqueue中了, 整个流程可以正常跑通
// 结论: 将这个node的线程作为一个正常的获取锁的线程, 把他放到syncQueue中, 等待被唤醒, 如果没有头节点, 或错过了头节点的唤醒, 就直接唤醒,
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}