AQS(AbstractQueuedSynchronizer)解析

AQS概念

AQS是用于构建锁和同步器的框架。ReentrantLock、信号量、闭锁、FutureTask等等都是基于AQS构建的。其内部封装了对等待线程队列维护,同时开放接口自定义共享资源的获取和释放。

AQS负责管理同步器类中状态的整数信息,通过getState,setState和compareAndSetState等方法操作状态。

支持共享和独占两种获取资源的方式。如果是独占获取,需要实现 tryAcquire 、 tryRelease 和 isHeldExclusively 方法。如果是共享获取,实现 tryAcquireShared 和 tryReleaseShared 等方法。

 boolean tryAcquire(int arg) :独占方式。获取资源成功后返回true,否则返回false

 boolean tryRelease(int arg) :独占方式。释放资源成功后返回true,否则返回false

 int tryAcquireShared(int arg) :共享方式。获取资源失败返回负数,获取成功则返回剩余资源数量,0表示无剩余资源

 boolean tryReleaseShared(int arg) :共享方式。释放资源,允许唤醒后续节点返回true,否则返回false

AQS源码

1 重要的内部类

 1 static final class Node {
 2     //共享模式
 3     static final Node SHARED = new Node();
 4     //独占模式
 5     static final Node EXCLUSIVE = null;
 6 
 7     //表示当前节点已经取消调度,如超时和中断,是终结状态
 8     static final int CANCELLED =  1;
 9     //表示当前节点的下一个节点等待当前节点唤醒,当节点入队时会将前一个节点置为该状态
10     static final int SIGNAL    = -1;
11     //表示当前节点等待在Condition上
12     static final int CONDITION = -2;
13     //共享模式下,前继节点会唤醒之后的一个及多个相邻节点
14     static final int PROPAGATE = -3;
15 
16     //当前节点等待资源的状态
17     volatile int waitStatus;
18     //前一个节点
19     volatile Node prev;
20     //后一个节点
21     volatile Node next;
22     //当前节点代表的线程
23     volatile Thread thread;
24     Node nextWaiter;
25 
26     final boolean isShared() {
27         return nextWaiter == SHARED;
28     }
29 
30     Node() {}
31 
32     Node(Thread thread, Node mode) {
33         this.nextWaiter = mode;
34         this.thread = thread;
35     }
36 
37     Node(Thread thread, int waitStatus) {
38         this.waitStatus = waitStatus;
39         this.thread = thread;
40     }
41 }

  Node节点代表了在队列中等待资源的线程,头节点除外,初始状态为0。

2 acquire方法

1 public final void acquire(int arg) {
2     if (!tryAcquire(arg) &&
3         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4         selfInterrupt();
5 }

  独占访问资源的顶层入口。

  1.tryAcquire方法由实现类自己实现,定义独占访问资源的逻辑。修改state的值代表占用了资源。

  2.addWaiter方法将当前线程加入到等待队列

  3.acquireQueued将不停尝试tryAcquire方法直到成功,期间不响应中断。如果被通知中断则在成功后返回true

  4.如果重试期间被中断过,在获得资源后才会通过selfInterrupt中断自己

  一上来就会尝试获取锁,而不管等待队列中或许还有其他线程在等待,这里体现了非公平性。

  2.1 addWaiter

 1 private Node addWaiter(Node mode) {
 2     Node node = new Node(Thread.currentThread(), mode);
 3     //tail记录了队列的尾节点
 4     Node pred = tail;
 5     if (pred != null) {
 6         //将新节点的前指针指向旧的尾节点
 7         node.prev = pred;
 8         /**
 9          * 这个方法内部:unsafe.compareAndSwapObject(this, tailOffset, pred, node);
10          * tailOffset是tail属性在对象内存中的偏移量,根据偏移量找到tail存储的对象,比较和pred是否相同,相同则用node替换tail现有的值
11          */
12         if (compareAndSetTail(pred, node)) {
13             //将旧的尾节点后指针指向新节点
14             pred.next = node;
15             return node;
16         }
17     }
18     //失败或者首次插入调用enq
19     enq(node);
20     return node;
21 }

   

  2.2 enq

 1 private Node enq(final Node node) {
 2     //CAS+自旋
 3     for (; ; ) {
 4         Node t = tail;
 5         if (t == null) {
 6             /**
 7              * 第一个放入队列
 8              * 这个方法内部:unsafe.compareAndSwapObject(this, headOffset, null, update);
 9              * 将新节点设置到head属性
10              */
11             if (compareAndSetHead(new Node()))
12                 tail = head;
13         } else {
14             node.prev = t;
15             if (compareAndSetTail(t, node)) {
16                 t.next = node;
17                 return t;
18             }
19         }
20     }
21 }

   注意标红处,头节点是一个默认占位节点表示已经占有资源的节点。后续节点占有资源之后,会将自己设置为头节点,并释放内部封装的线程。

  2.3 acquireQueued

 1 final boolean acquireQueued(final Node node, int arg) {
 2     boolean failed = true;
 3     try {
 4         //标记轮询过程中是否遭遇中断
 5         boolean interrupted = false;
 6         for (;;) {
 7             //前指针指向的节点
 8             final Node p = node.predecessor();
 9             //前指针是头节点,则开始尝试获取资源
10             if (p == head && tryAcquire(arg)) {
11                 //获取成功,将自己设置为头节点,并将自己的前向指针和封装的线程置为空。头节点必然是正在占有资源的节点
12                 setHead(node);
13                 //将前节点后指针置空,方便GC回收
14                 p.next = null;
15                 failed = false;
16                 return interrupted;
17             }
18             /**
19              * shouldParkAfterFailedAcquire:将前置节点的状态设置为SIGNAL,以便自己陷入等待后,前置节点释放资源后会将自己唤醒。设置完毕返回true,这个方法会删除无效节点
20              * parkAndCheckInterrupt:进入wait状态,被唤醒后返回中断状态
21              */
22             if (shouldParkAfterFailedAcquire(p, node) &&
23                     parkAndCheckInterrupt())
24                 //到了这里说明是被中断唤醒的,设置中断标识
25                 interrupted = true;
26         }
27     } finally {
28         if (failed)
29             cancelAcquire(node);
30     }
31 }

  如果在队列中处于第2名,会尝试竞争资源,成功后自己变为头节点。否则通知还在等待的前节点,让他有幸获得资源,使用完释放后通知自己,自己会在行23处等待。

  2.4 shouldParkAfterFailedAcquire

 1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2     int ws = pred.waitStatus;
 3     if (ws == Node.SIGNAL)
 4         //如果前置节点的状态已经是SIGNAL(代表了前置节点释放完资源后,会唤醒自己),可以安心进入等待状态
 5         return true;
 6     if (ws > 0) {
 7         //如果前置节点状态>0,说明它已经咸鱼了,这个时候需要跳过这个节点
 8         //下面的循环不停前推,直到找到一个还在努力的少年
 9         do {
10             node.prev = pred = pred.prev;
11         } while (pred.waitStatus > 0);
12         //这里完成新的前节点的相互关联,中间的咸鱼链因为不再被引用,会被GC回收。
13         pred.next = node;
14     } else {
15         //前置节点是正常的节点,会将它的状态设置为SIGNAL,CAS失败会回到外层继续循环
16         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
17     }
18     return false;
19 }

  2.5 parkAndCheckInterrupt

1 private final boolean parkAndCheckInterrupt() {
2     //进入wait状态,跟竞争内置锁失败进入阻塞状态不同
3     LockSupport.park(this);
4     //被唤醒了,可能是unPark或者被中断,返回自己的中断状态
5     return Thread.interrupted();
6 }

3 release方法

 1 public final boolean release(int arg) {
 2     //释放资源
 3     if (tryRelease(arg)) {
 4         Node h = head;
 5         if (h != null && h.waitStatus != 0)
 6             //唤醒后续的节点
 7             unparkSuccessor(h);
 8         return true;
 9     }
10     return false;
11 }

  1.tryRelease释放资源,即将state的值还原

  2.唤醒队列中正在等待的节点,是最靠前的节点,也就是等待最久的节点,这部分体现了公平性。

  3.1 unparkSuccessor

 1 private void unparkSuccessor(Node node) {
 2     int ws = node.waitStatus;
 3     if (ws < 0)
 4         //将节点的等待状态设置为初始状态
 5         compareAndSetWaitStatus(node, ws, 0);
 6 
 7     Node s = node.next;
 8     if (s == null || s.waitStatus > 0) {
 9         //状态大于0的节点是无效节点,它已经放弃竞争了
10         s = null;
11         //从尾节点开始反向遍历,寻找最靠前的等待节点。为什么不从前向后?
12         for (Node t = tail; t != null && t != node; t = t.prev)
13             //如果当前节点是有效节点,记录下来
14             if (t.waitStatus <= 0)
15                 s = t;
16     }
17     if (s != null)
18         //使用unpark唤醒
19         LockSupport.unpark(s.thread);
20 }

  这里唤醒的是离head最近的有效节点,虽然其前置节点不一定就是头节点,但是经过2.4的shouldParkAfterFailedAcquire方法删除无效节点之后,其前置节点一定是头节点。

  回答标红的位置,直观上讲肯定是从前向后更快,但是看新节点进入等待队列的逻辑,2.1和2.2,逻辑是差不多的,都是先将新节点的前向指针关联到尾节点,然后CAS尝试设置将尾节点设置成新节点,再将旧的尾节点后向指针设置到新节点,关键在最后一步,在并发场景下,发生时间片切换,当前线程被挂起了。从前向后遍历的话,到了这里就无法找到下一个节点,主要还是两个节点设置前后关联这个操作不是原子的。

4 acquireShared方法

1 public final void acquireShared(int arg) {
2     if (tryAcquireShared(arg) < 0)
3         //尝试获取资源,如果失败则进入等待队列
4         doAcquireShared(arg);
5 }

  4.1 doAcquireShared

 1 private void doAcquireShared(int arg) {
 2     //在队列尾添加新节点
 3     final Node node = addWaiter(Node.SHARED);
 4     boolean failed = true;
 5     try {
 6         boolean interrupted = false;
 7         //循环尝试获取资源
 8         for (;;) {
 9             final Node p = node.predecessor();
10             //正处在第二节点
11             if (p == head) {
12                 //尝试获取资源
13                 int r = tryAcquireShared(arg);
14                 //获取成功
15                 if (r >= 0) {
16                     //将自己设置为头结点,并且可能的情况下唤醒后续节点
17                     setHeadAndPropagate(node, r);
18                     p.next = null;
19                     if (interrupted)
20                         //如果等待过程中被中断,这里恢复中断
21                         selfInterrupt();
22                     failed = false;
23                     return;
24                 }
25             }
26             //等待获取资源
27             if (shouldParkAfterFailedAcquire(p, node) &&
28                     parkAndCheckInterrupt())
29                 interrupted = true;
30         }
31     } finally {
32         if (failed)
33             cancelAcquire(node);
34     }
35 }

  注意代码11行的这个判断条件,必须是第二个节点才会尝试获取资源。获取资源有两种途径,一个是tryAcquireShared直接获取,不需要设置头节点。一种是doAcquireShared,在等待队列中被唤醒并获取资源,这种情况需要设置自己是头节点。

  这里有一个问题,同时获取资源的线程有多个,那么在等待队列中被唤醒的节点也会有多个,一种特殊的情况,头节点尚未释放资源,第二个节点被其余线程唤醒了,那么走到13行,将自己设置成新的头节点,并将自己的前置指针置空。这种情况是否会出问题?不会有问题,之前的头节点代表的线程使用完毕后依然通过releaseShared释放资源

  4.2 setHeadAndPropagate

 1 private void setHeadAndPropagate(Node node, int propagate) {
 2     Node h = head;
 3     //将当前节点设置为头结点,获取了资源的节点都是头节点
 4     setHead(node);
 5     //如果还有剩余资源 后面这部分判断条件没有看懂,请参考:https://blog.csdn.net/anlian523/article/details/106319294/
 6     if (propagate > 0 || h == null
 7             || h.waitStatus < 0
 8             || (h = head) == null 
 9             || h.waitStatus < 0) {
10         Node s = node.next;
11         if (s == null || s.isShared())
12             //
13             doReleaseShared();
14     }
15 }

  上面行7是判断的原头节点,头节点的等待状态可能小于0,需要看下面5.1的解释,也就是同时有多个线程释放了资源并执行了 doReleaseShared 唤醒队列中等待的节点。行8又重新赋值了一下,因为这中间可能其他节点将头节点替换了,需要再次判断下。

  跟独占方式不同的是,自己获得资源之后,如果发现还有资源剩余,是会继续唤醒后续节点的,不用等到释放资源的时候唤醒。

  获取资源的方式有两处,一处是 acquireShared 每个线程第一次会尝试获取,第二处是 doAcquireShared 在等待队列中被唤醒后尝试获取。

5 releaseShared方法

1 public final boolean releaseShared(int arg) {
2     //尝试释放资源
3     if (tryReleaseShared(arg)) {
4         //唤醒其他节点
5         doReleaseShared();
6         return true;
7     }
8     return false;
9 }

  5.1 doReleaseShared

 1 private void doReleaseShared() {
 2 
 3     for (; ; ) {
 4         Node h = head;
 5         if (h != null && h != tail) {
 6             int ws = h.waitStatus;
 7             //有节点让自己释放时通知
 8             if (ws == Node.SIGNAL) {
 9                 /**
10                  * 先将头节点的状态还原。
11                  * 当访问头节点时,头节点状态为-1,代表头节点正占用资源,后面有节点正等待唤醒
12                  * 若头节点状态为0,代表头节点已经释放了资源,正准备唤醒节点 或者 已经唤醒了节点,但后续节点尚未将自己设置为新的头节点
13                  */
14                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
15                     continue;
16                 //唤醒下一个最靠前的等待节点
17                 unparkSuccessor(h);
18             }
19             //到这里,说明有两条或以上的线程准备释放资源
20             //1. 头节点已经释放过资源了,此时它的状态是0,并且唤醒的那个节点还未替换头节点
21             //2. 有其他线程通过releaseShared方法释放了资源,尝试唤醒其他节点
22             //出现这种情况会将头节点状态设置为-3
23             else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
24                 continue;
25         }
26         //如果头节点发生变化,说明已经成功唤醒了,退出循环
27         if (h == head)
28             break;
29     }
30 }

  在并发场景下,doReleaseShared可能被同时执行多次,第一次执行的线程,将head的状态由-1设置为0,并唤醒新的节点,在新节点尚未替换head时,第二次执行的线程,又将head的状态从0设置为-3,注意只会设置两次,第三次的话会进入自旋等待!这里说明一下共享模式下头节点状态的含义:

  1. 头节点状态为-1,说明还没有开始唤醒其他节点,可能是没有资源剩余了,也可能是代码还没执行到下面

  2. 头节点状态为0,正在唤醒 或 已经唤醒下一个节点但新节点还没替换头节点

  3. 头节点状态为-3,在2的基础上,又有一个线程释放了资源,此时需要额外唤醒一个节点,这个唤醒操作是由2产生的新节点来执行的,请看4.2#6代码。新节点替换了头节点之后,发现原头节点的状态小于0,即使此时可能 propagate == 0,自己也应该继续唤醒另一个节点了。  

  doReleaseShared的入口有两处,一处是 releaseShared 每个线程用完之后释放并唤醒其余节点,第二处是 setHeadAndPropagate 当有节点被唤醒并获得资源后,若资源还有余,会唤醒其余节点。

    

关于公平锁和非公平锁

  AQS获取资源有两种方式:一种是执行 acquire / acquireShared 方法,不论等待队列是否有线程,都会通过 tryAcquire / tryAcquireShared 方法直接尝试获取。第二种是在等待队列中,按照先入先出的顺序唤醒。

  是否是公平锁交给实现类规定,AQS只定义了等待队列中获取资源是公平的,如果实现类在 tryAcquire / tryAcquireShared 中规定必须在等待队列为空的情况下才能获取资源,那么就是公平锁,否则是非公平锁。

关于LockSupport

  https://blog.csdn.net/tangtong1/article/details/102829724?utm_medium=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-1.control&dist_request_id=&depth_1-utm_source=distribute.pc_relevant_t0.none-task-blog-2%7Edefault%7EBlogCommendFromMachineLearnPai2%7Edefault-1.control

  

参考:https://www.cnblogs.com/waterystone/p/4920797.html

人生就像蒲公英,看似自由,其实身不由己。
原文地址:https://www.cnblogs.com/walker993/p/14619231.html