CountDownLatch源码解析

  一、CountDownLatch介绍

      CountDownLatch是在jdk1.5被引入的,它主要是通过一个计数器来实现的,当在初始化该类的构造函数时,会事先传入一个状态值,之后在执行await方法后,

    在这个状态值为0之前,当前线程(指的是调用await的线程)会一直等待。它内部使用了AQS来实现的,且是共享锁,具体怎么实现,待会看看它的实现原理。

      它的应用场景:

      一般在于在执行当前线程之前,要完成n个线程的任务,才能执行当前线程。这种场景适合用countdownLatch。

   二、源码解析

      先来看看该类的构造,如下图

      

      如图,红色框选中的是该类的一个内部类,该内部类实现了抽象类AQS,具体锁的获取和释放是由该内部类实现的。

     由上图知countdownLatch只有一个构造函数,    

1    public CountDownLatch(int count) {
2         if (count < 0) throw new IllegalArgumentException("count < 0");
3         this.sync = new Sync(count);
4     }

    很明显,它有一个参数,这个参数,被用在哪里呢,请看下面

1      Sync(int count) {
2             setState(count);
3         }

     这个参数最终用在了状态值上,由此可猜测,这个状态值决定这锁什么时候释放。

      1、内部类Sync

        

   private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);//设置状态值的大小
        }

        int getCount() {
            return getState();//获取状态值
        }
    //当状态值为0才返回1,否则返回-1,也用来判断线程是否拥有该锁,值大于0,不拥有,小于0,则拥有
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
     //对状态值进行操作,每一次成功,则状态值-1,
     //也知道只有状态值为1,然后再执行该方法,才会返回true,否则其它情况全是返回false
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) {//这个无限循环是为了保证在进行有其他线程也在操作状态值,导致失败之后就不操作了 int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc))//对状态值递减,若有其他线程也在操作,则可通过for的无限循环来保证一定能递减成功 return nextc == 0; } } }

  该类重写了AQS的tryAcquireShared(int)和tryReleaseShared(int)两个方法, 

  下面来看看这个CountDownLatch类常用的方法

     2、await()方法

 1   public void await() throws InterruptedException {
 2         sync.acquireSharedInterruptibly(1);
 3     }
 4 
 5 
 6     public final void acquireSharedInterruptibly(int arg)
 7             throws InterruptedException {
 8         if (Thread.interrupted())
 9             throw new InterruptedException();
10         if (tryAcquireShared(arg) < 0)
11             doAcquireSharedInterruptibly(arg);
12     }

  在调用await方法时,再用sync去调用AQS的内部方法acquireSharedInterruptibly(因为sync类没重写该方法),会先判断当前线程是否被中断(中断一般是由外部条件引起的),若中断直接抛出异常,否则,获取通过tryAcquireShared方法来判断当前线程是否拥有该共享锁,当值小于0,则拥有,大于0,则不拥有,继续下一步,若有锁,则再执行doAcquireSharedInterruptibly方法,

  

 1   private void doAcquireSharedInterruptibly(int arg)
 2         throws InterruptedException {
 3         final Node node = addWaiter(Node.SHARED);//对当前线程进行一个包装,同时也初始化了等待队列,即head->node->...->tail
 4         boolean failed = true;
 5         try {
 6             for (;;) {
 7                 final Node p = node.predecessor();//获取该node节点的前一个节点,一般首次调用时,该前一个节点就是head节点。
 8                 if (p == head) {
 9                     int r = tryAcquireShared(arg);//再次获取锁的状态,
10                     if (r >= 0) {//若状态值为0,则进入
11                         setHeadAndPropagate(node, r);
12                         p.next = null; // help GC
13                         failed = false;
14                         return;
15                     }
16                 }
 //shouldParakAfterFailedAcquire方法主要是针对node节点的状态进行操作,若为signal,则挂起,若为0或PROPAGATE,则转换成signal,为cancelled,则放弃,寻找前一个不是该状态值的节点
17 if (shouldParkAfterFailedAcquire(p, node) && 18 parkAndCheckInterrupt())//挂起线程 19 throw new InterruptedException(); 20 } 21 } finally { 22 if (failed)//若failed为true,一般是出现了异常,或者线程被中断 23 cancelAcquire(node); 24 } 25 }

    从上述分析来看,只有当状态值为0的时候,才会调用setHeadAndPropagate(node,int)方法,否则会无限等待,当前线程也会被挂起,该方法源码如下

 private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; 
        setHead(node);//将node节点设置为头结点,对比前面的doAcquireSharedInterruptibly方法,也就是头结点的下一个节点,且该节点的状态为shared
        //对propagate值,头结点和状态,进行判断
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;//获取node节点的下一个节点
         //对node节点的下一个节点进行判断,是否为null,和状态值是否为shared
            if (s == null || s.isShared())
          //该方法作用为了释放当前锁,即线程阻塞
                doReleaseShared();
        }
    }

    上面说的是执行await方法后,发生的一系列操作,也知道了只有当状态值为0,才会使线程通行,下面来看一看怎么使状态值为0的。

  3、countDown方法

    在调用tryReleaseShared方法,每调用一次,state值就会减一,但除了某个时刻当state值减一后恰好为0,才会返回true,否则返回false,为0时刻,也表明锁被其它线程给释放了。

 1 public void countDown() {
 2         sync.releaseShared(1);
 3     }
 4 
 5 public final boolean releaseShared(int arg) {
 6      //尝试获取锁的状态
 7         if (tryReleaseShared(arg)) {
 8             doReleaseShared();//此时,状态值已经为0,执行doReleasseShared方法,
 9             return true;
10         }
11         return false;
12     }

    也许会有人有疑问,说,为什么在执行await方法后的一些类操作中,也执行了doReleaseShared方法,这岂不是要释放两次?

   其实不然,主要是怕doAcquireSharedInterruptibly方法执行后,由于某种原因,当前线程为挂起(即阻塞了),不再执行了,这时只有通过releaseShared方法来唤醒线程,下面看看doReleaseShared方法的实现

  

private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {//若头结点状态为signal,则进入,头结点初始化时的状态值为0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//若交换失败,则终止此步操作,继续下一轮循环
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);//这是释放锁的关键
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // 若head节点被改变了,则继续循环,否则,跳出循环
                break;
        }
    }

  unparkSuccessor(node)分析如下,该方法作用是为了释放node节点的后一个节点中的线程,在这里,node节点就是head节点

private void unparkSuccessor(Node node) {
      
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

       
        Node s = node.next;
      //下一个节点为null或状态值为cancelled,
if (s == null || s.waitStatus > 0) { s = null;
        //由后往前搜索,节点状态值小于或等于0的节点(即状态值不是cancelled值),搜索到的结果一定是最靠近node节点的,且状态值<=0.
        //至于为什么不从前往后搜索,原因不太清楚!!!
for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);//释放 }

  三、总结 

    countdownLatch在初始化构造函数时,会先将参数设置为状态state值,之后执行await方法后,会进行这一系列的步骤
    1、将shared和当前线程包装成一个node节点,(在第一次调用还会初始化等待队列)在队列中,有这样的队列 head->node,其中node就是被包装成share的节点

    2、之后在doAcquireSharedInterruptibly方法中,执行了shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法, 若顺利,则head节点的状态值会变为signal,并且当前线程会通过执行park方法进行挂起。

      
    3、在方法tryReleaseShared中,会一直操作state值,使之减1,一直到state的值,减为0时,在这之前,当前线程一直会被阻塞。当为0时,会执行doReleaseShared方法 对当前线程执行unparkSuccessor方法,进行放行。


  以上就是我对countdownLatch类的理解,若有不足之处,还望指正!

 

原文地址:https://www.cnblogs.com/qm-article/p/8320177.html