AQS-等待队列

  AQS的原理在于,每当有新的线程请求资源时,该线程会进入一个等待队列(Waiter Queue),只有当持有锁的线程释放资源后,该线程才能持有资源。该等待队列的实现方式是双向链表,线程会被包裹在链表节点Node中。Node即队列的节点对象,它封装了各种等待状态(典型的状态机模式),前驱和后继节点信息,以及它对应的线程。

  AQS定义两种资源共享方式:Exclusive(独占,在特定时间内,只有一个线程能够执行,如ReentrantLock)和share(共享,多个线程可以同时执行,如ReadLock、Semaphore、CountDownLatch),可见不同的实现方式征用共享资源的方式不同,由此,自定义同步器在实现时要根据需求来实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经实现好了。

Node节点

 1 //标记节点为共享模式
 2 static final Node SHARED = new Node();
 3 //标记节点为独占模式
 4 static final Node EXCLUSIVE = null;
 5 //等待状态
 6 volatile int waitStatus;
 7 //前驱结点
 8 volatile Node prev;
 9 //后继节点
10 volatile Node next;
11 //线程
12 volatile Thread thread;

自定义同步器时主要需要实现以下几种方法: 

  • isHeldExclusively():该线程是否正在独占资源,只有用到condition时才需要去使用它。
  • tryAcquire(int):独占方式,尝试获取资源,返回boolean。
  • tryRelease(int):独占方式,尝试释放资源,返回boolean。
  • tryAcquireShared(int):共享方式,尝试获取资源,返回int。
  • tryReleaseShared(int):共享方式,尝试释放资源,返回boolean。

等待队列节点对象Node有四种不同的状态:

  • CANCELLED(1)已取消
  • SIGNAL(-1)竞争获胜需要唤醒
  • CONDITION(-2)在condition队列中等待
  • PROPAGATE(-3)后续节点传播唤醒操作,共享模式下使用

acquire方法执行流程

1 public final void acquire(int arg) {
2     if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
3         selfInterrupt();
4     }
5 }

1)      尝试获取锁tryAcquire,返回值表示当前线程是否获取锁。 

2)      如果获取成功,那么说明当前对象已经持有锁,执行中断操作,中断操作会解除线程阻塞。

3)      如果获取失败,那么把当前线程封装为Waiter节点,等待队列没有节点时初始化队列,有则使用compareAndSetTail()添加进Waiter队列尾端。

4)      acquiredQueue自旋获取资源,并且返回Waiter节点持有的线程的应当具备的中断状态。

5)      根据返回结果来确定是否需要执行线程中断操作。

 1 private Node addWaiter(Node mode) {
 2     //封装当前节点为node节点
 3     Node node = new Node(Thread.currentThread(), mode);
 4     Node pred = tail;
 5     if(pred != null) {
 6         //将node节点的前驱节点设置为tail
 7         node.prev = pred;
 8         //多线程环境下,tail可能已经被其它线程修改了,这里校验pred是否依然是为节点
 9         //如果是,那么将node设置为尾结点,原尾结点的后继节点设置为node,返回node
10         if(compareAndSelfTail(pred, node)) {
11             pred.next = node;
12             return node;
13         }
14     }
15     //执行到这里,说明tail为null,或者tail已经发生了变动
16     enq(node);
17     return node;
18 }    
 1 private Node enq(final Node node) {
 2     //下面这个死循环用于把node节点插入到队尾,由于多线程环境下,tail节点可能
 3     //随时变动,必须不停的尝试,让下面两个操作不会被其它线程干涉。
 4     //1,node.prev必须为当前尾结点
 5     //2,node设置为新的尾结点
 6     for(;;) {
 7         Node t = tail;
 8         //tail为空,也说明head为空,此时初始化队列
 9         if(t == null) {
10             //CAS方式初始化队头
11             if(compareAndSetHead(new Node()))
12                 tail = head;
13         } else {
14             //设置node.prev为当前尾结点
15             node.prev = t;
16            //多线程环境下,此时尾结点可能已经被其它访问修改了,需要CAS来进行比较
17             //如果t依然是尾结点,那么node设置为尾结点、
18             if(compareAndSetTail(t, node)) {
19                 t.next = node;
20                 return t;
21             }
22         }
23     }
24 }

  acquiredQueued(Node)方法会接收addWaiter封装好的Node对象,该方法的本质在于以自旋的方式获取资源,即自旋锁。它做了两件事,如果指定节点的前驱节点时头结点,那么再次尝试获取锁,反之,尝试阻塞当前线程。自旋不能构成死循环,否则会浪费大量CPU资源,在AQS中如果p==head&&tryAcquire(arg)条件不足时不会一直循环下去。通常,在p==head之前,必然会有一个线程得到锁,此时tryAcquire()通过,循环结束。如果发生了极端情况,那么node.predecessor()也会在node==head的情况下抛出空指针异常,循环结束。shouldParkAfterFailedAcquire(p,node)检测前驱节点的等待状态,需要阻塞则调用partAndCheckInterrupt()方法会阻塞当前线程,该循环也不会无限制的消耗资源。

 1 final boolean acquireQueued(final Node, int arg) {
 2     boolean failed = true;
 3     try {
 4         boolean interrupted = false;
 5         for(;;) {
 6             //找到node的前驱节点,如果node已经为head,那么会抛出空指针异常
 7             //空指针异常说明整个等待队列都没有能够获取锁的线程。
 8             final Node p = node.predecessor();
 9             //前驱节点为头结点时,当前线程尝试获取锁
10             //如果获取成功,那么node会成为新的头结点,这个过程会清空node的线程信息。
11             if(p == head && tryAcquire(arg)) {
12                 setHead(node);
13                p.next = null;
14                 failed = false;
15                 return interrupted;
16             }
17             //当前线程不能获取锁,则说明该节点需要阻塞
18             //shouldParkAfterFailedAcquire()用于检查和设置节点阻塞状态
19             //如果为通过检查,那么说明没有阻塞,parkAndCheckInterrupt()用于阻塞当前线程。
20             if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
21                interrupted = true;
22             }
23             finally {
24                 if(failed) cancelAcquire(node);
25             }
26     }
27 }                    
 1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2     //前驱节点的等待状态
 3     int ws = pred.waitStatus;
 4     if(ws == Node.SIGNAL) {
 5         //SIGNAL表示前驱节点需要被唤醒,此时node是一定可以安全阻塞的,所以返回true
 6         return true;
 7     }
 8     if(ws > 0) {
 9         //大于0的等待状态只有CANCELLED,从队列里移除所有前置的CANCELLED节点。
10         do {
11             node.prev = pred = pred.prev;
12         } while (pred.waitStatus > 0);
13         pred.next = node;
14     } else {
15         //运行到这里,说明前驱节点处于0、CONDITION或者PROPAGATE状态下
16         //此时该节点需要被置为SIGNAL状态,等待被唤醒。
17         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
18     }
19 }
1 private final boolean parkAndCheckInterrupt() {
2     //LockSupport.park()用于阻塞当前线程
3     LockSupport.park(this);
4     return Thread.interruupted();
5 }

  由此可以得出结论,当一个新的线程节点入队之后,会检查它的前驱节点,只要有一个节点的状态是SIGNAL,就表示当前节点之前的节点正在被等待唤醒,那么当前线程就需要被阻塞,以等待RentrantLock.unlock()唤醒之前的线程。 

 

  在过程2中,node1刚刚入队,没有争抢到锁,此时head状态为初始化的0状态,于是调用了compareAndSetWaitStatus(pred,ws,Node.SIGNAL),这个方法会把head的状态改为SIGNAL。

  在过程3中,acquired()方法里的for循环会在执行一次,此时,node1的前驱节点依然是head,如果它依然没有竞争锁,那么由于head的waitStatus属性的值为SIGNAL,这会导致shouldParkAfterFailedAcquire()方法返回true,当前线程(node1持有的线程)被阻塞,代码不在继续往下执行。这样就达到了让等待队列里的线程阻塞的目的,由此可以类推更多线程入队的过程。由此可以类推更多线程入队的过程:

 

       SIGNAL状态由release()方法进行修改,这个方法首先调用tryRelease()方法尝试释放锁,它返回的是锁是否处于可用状态,如果锁可用,那么该方法也不负责中断等待线程的阻塞,它仅仅把锁的线程持有者设为null;然后,如果成功的释放锁,那么判断队头状态,队头为空则说明队列没有等待线程,不再做其它操作,反之再判断队头的状态waitStatus,只要它不为0,就说明等待队列中有被阻塞的节点。

 1 public final boolean release(int arg) {
 2     if(tryRelease(arg)) {
 3         Node h = head;
 4         if(h != null && h.waitStatus != 0) {
 5             unparkSuccessor(h);
 6         }
 7         return true;
 8     }
 9     return false;
10 }
 1 private void unparkSuccessor(Node node) {
 2     int ws = node.waitStatus;
 3     //小于0的状态waitStatus只有SIGNAL和CONDITION
 4     if(ws < 0) {
 5         compareAndSetWaitStatus(node, ws, 0);
 6     }
 7     Node s = node.next;
 8     //前驱查找需要唤醒的节点
 9     if(s == null || s.waitStatus > 0) {
10        s = null;
11         for(Node t = tail; t != null && t != node; t = t.prev) {
12             if(t.waitStatus <= 0) s = t;
13         }
14     }
15     if(s != null) {
16         LockSupport.unpark(s.thread);
17     }
18 }

       unparkSuccessor()负责确保中断正确的线程阻塞。在ReentrantLock.unlock()的调用过程中,unparkSuccessor(Node node)的形参node始终为head节点,这个方法执行的主要操作为:

  • 首先把head节点的waitStatus设置为0,表示队列里没有需要中断阻塞的线程。
  • 然后确定需要被唤醒的节点,该节点是队列中第一个waitStatus小于等于0的节点。
  • 最后,调用LockSupport.unlock()方法中断指定线程的阻塞状态。

 

       需要注意的是,node1对应的线程此时已经中断了阻塞,它会开始继续执行AQS的AacquireQueued()方法中for循环的代码final Node p = node.predecessor();显然node1的前驱节点head由于锁已经被释放,队列变化为

 

       这部分代码比较巧妙,可以注意到,在释放的过程中,代码里并没有改变head的waitStatus为SIGNAL,而是直接使用node1替代了原先的head。换言之,原本需要修改head/node2的前驱和后置,并且把head的waitStatus修改为SIGNAL,使用当前的代码,只需要释放node1的持有线程,然后移除head节点,这样可以更快的到达队列规整的目的。

AQS如何阻塞线程和中断阻塞

       在acquired()方法中,当前线程尝试获取锁,如果没有获得,那么会把线程加入等待队列中,加入到队列的线程会被阻塞。

       线程阻塞有三种常见的实现方式:Object.wait()、Thread.join()、或者Thread.sleep()。

       中断阻塞则通过Thread.interrupt()方法来实现,这个方法会发出一个中断信号量从而导致线程抛出中断异常InterruptedException,已达到结束阻塞的目的。需要注意的是Interrupt不会中断用户循环体造成阻塞,它仅仅是抛出信号量,具体处理方式还是由用户处理。Thread.isInterrupted可以得到中断状态。

       对于wait、sleep、join等会造成线程阻塞的方法,由于它们都会抛出Interrupted Exception,处理方式如下

1 try {
2     Thread.currentThread().sleep(500);
3 } catch (InterruptedException e) {
4     //中断后抛出异常,在异常捕获里可以对中断定制处理
5 }

        对循环体处理方式如下表示:

1 //使用Thread.isInterrupted方法获取中断信号量
2 while(!Thread.currentThread().isInterrupted && 用户自定义条件) {
3 }
原文地址:https://www.cnblogs.com/guanghe/p/13462079.html