java 并发(五)---AbstractQueuedSynchronizer(3)

       文章代码分析和部分图片来自参考文章

问题 :

  •     CountDownLatch  和 CyclicBarrier 的区别

认识 CountDownLatch

         分析这个类,首先了解一下它所可以实现的效果,然后再顺着这个源码的思路思考是不是和它实现的效果一样。下面的代码和图片可以说明 CountDownLatch (下文简称CDL)的工作过程。

       5

  1 public class CountDownLatchDemo {
  2 
  3     public static void main(String[] args) {
  4 
  5         CountDownLatch latch = new CountDownLatch(2);
  6 
  7         Thread t1 = new Thread(new Runnable() {
  8             @Override
  9             public void run() {
 10                 try {
 11                     Thread.sleep(5000);
 12                 } catch (InterruptedException ignore) {
 13                 }
 14                 // 休息 5 秒后(模拟线程工作了 5 秒),调用 countDown()
 15                 latch.countDown();
 16             }
 17         }, "t1");
 18 
 19         Thread t2 = new Thread(new Runnable() {
 20             @Override
 21             public void run() {
 22                 try {
 23                     Thread.sleep(10000);
 24                 } catch (InterruptedException ignore) {
 25                 }
 26                 // 休息 10 秒后(模拟线程工作了 10 秒),调用 countDown()
 27                 latch.countDown();
 28             }
 29         }, "t2");
 30 
 31         t1.start();
 32         t2.start();
 33 
 34         Thread t3 = new Thread(new Runnable() {
 35             @Override
 36             public void run() {
 37                 try {
 38                     // 阻塞,等待 state 减为 0
 39                     latch.await();
 40                     System.out.println("线程 t3 从 await 中返回了");
 41                 } catch (InterruptedException e) {
 42                     System.out.println("线程 t3 await 被中断");
 43                     Thread.currentThread().interrupt();
 44                 }
 45             }
 46         }, "t3");
 47         Thread t4 = new Thread(new Runnable() {
 48             @Override
 49             public void run() {
 50                 try {
 51                     // 阻塞,等待 state 减为 0
 52                     latch.await();
 53                     System.out.println("线程 t4 从 await 中返回了");
 54                 } catch (InterruptedException e) {
 55                     System.out.println("线程 t4 await 被中断");
 56                     Thread.currentThread().interrupt();
 57                 }
 58             }
 59         }, "t4");
 60 
 61         t3.start();
 62         t4.start();
 63     }
 64 }

         我们知道这里面实际需要分析的方法就是 await 和 countDown 方法。

源码分析 CountDownLatch

CDL

        从方法中我们就可以知道CDL中的实现AQS的共享模式获取锁。我们以上面的Demo 来做源码分析

         下文源码分析来自   一行一行源码分析清楚 AbstractQueuedSynchronizer (三)

  1 public void await() throws InterruptedException {
  2     sync.acquireSharedInterruptibly(1);
  3 }
  4 public final void acquireSharedInterruptibly(int arg)
  5         throws InterruptedException {
  6     // 这也是老套路了,我在第二篇的中断那一节说过了
  7     if (Thread.interrupted())
  8         throw new InterruptedException();
  9     // t3 和 t4 调用 await 的时候,state 都大于 0。
 10     // 也就是说,这个 if 返回 true,然后往里看
 11     if (tryAcquireShared(arg) < 0)
 12         doAcquireSharedInterruptibly(arg);
 13 }
 14 // 只有当 state == 0 的时候,这个方法才会返回 1
 15 protected int tryAcquireShared(int acquires) {
 16     return (getState() == 0) ? 1 : -1;
 17 }
  1 private void doAcquireSharedInterruptibly(int arg)
  2     throws InterruptedException {
  3     // 1. 入队
  4     final Node node = addWaiter(Node.SHARED);
  5     boolean failed = true;
  6     try {
  7         for (;;) {
  8             final Node p = node.predecessor();
  9             if (p == head) {
 10                 // 同上,只要 state 不等于 0,那么这个方法返回 -1
 11                 int r = tryAcquireShared(arg);
 12                 if (r >= 0) {
 13                     setHeadAndPropagate(node, r);
 14                     p.next = null; // help GC
 15                     failed = false;
 16                     return;
 17                 }
 18             }
 19             // 2
 20             if (shouldParkAfterFailedAcquire(p, node) &&
 21                 parkAndCheckInterrupt())
 22                 throw new InterruptedException();
 23         }
 24     } finally {
 25         if (failed)
 26             cancelAcquire(node);
 27     }
 28 }

           图一是两个线程假如到阻塞队列中的情景,图二是CountDown 方法执行的过程。

4

图一

1

        图二(例子中数量不为10,此处仅为说明执行过程)

  1 public void countDown() {
  2     sync.releaseShared(1);
  3 }
  4 public final boolean releaseShared(int arg) {
  5     // 只有当 state 减为 0 的时候,tryReleaseShared 才返回 true
  6     // 否则只是简单的 state = state - 1 那么 countDown 方法就结束了
  7     if (tryReleaseShared(arg)) {
  8         // 唤醒 await 的线程
  9         doReleaseShared();
 10         return true;
 11     }
 12     return false;
 13 }
 14 // 这个方法很简单,用自旋的方法实现 state 减 1
 15 protected boolean tryReleaseShared(int releases) {
 16     for (;;) {
 17         int c = getState();
 18         if (c == 0)
 19             return false;
 20         int nextc = c-1;
 21         if (compareAndSetState(c, nextc))
 22             return nextc == 0;
 23     }
 24 }
 25 countDown 方法就是每次调用都将 state 值减 1,如果 state 减到 0 了,那么就调用下面的方法进行唤醒阻塞队列中的线程:
 26 
 27 // 调用这个方法的时候,state == 0
 28 // 这个方法先不要看所有的代码,按照思路往下到我写注释的地方,其他的之后还会仔细分析
 29 private void doReleaseShared() {
 30     for (;;) {
 31         Node h = head;
 32         if (h != null && h != tail) {
 33             int ws = h.waitStatus;
 34             // t3 入队的时候,已经将头节点的 waitStatus 设置为 Node.SIGNAL(-1) 了
 35             if (ws == Node.SIGNAL) {
 36                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
 37                     continue;            // loop to recheck cases
 38                 // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点
 39                 // 在这里,也就是唤醒 t3
 40                 unparkSuccessor(h);
 41             }
 42             else if (ws == 0 &&
 43                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // todo
 44                 continue;                // loop on failed CAS
 45         }
 46         if (h == head)                   // loop if head changed
 47             break;
 48     }
 49 }

         接下来就是唤醒了哦!

  1 private void doAcquireSharedInterruptibly(int arg)
  2     throws InterruptedException {
  3     final Node node = addWaiter(Node.SHARED);
  4     boolean failed = true;
  5     try {
  6         for (;;) {
  7             final Node p = node.predecessor();
  8             if (p == head) {
  9                 int r = tryAcquireShared(arg);
 10                 if (r >= 0) {
 11                     setHeadAndPropagate(node, r); // 2. 这里是下一步
 12                     p.next = null; // help GC
 13                     failed = false;
 14                     return;
 15                 }
 16             }
 17             if (shouldParkAfterFailedAcquire(p, node) &&
 18                 // 1. 唤醒后这个方法返回
 19                 parkAndCheckInterrupt())
 20                 throw new InterruptedException();
 21         }
 22     } finally {
 23         if (failed)
 24             cancelAcquire(node);
 25     }
 26 }
  1 private void setHeadAndPropagate(Node node, int propagate) {
  2     Node h = head; // Record old head for check below
  3     setHead(node);
  4 
  5     // 下面说的是,唤醒当前 node 之后的节点,即 t3 已经醒了,马上唤醒 t4
  6     // 类似的,如果 t4 后面还有 t5,那么 t4 醒了以后,马上将 t5 给唤醒了
  7     if (propagate > 0 || h == null || h.waitStatus < 0 ||
  8         (h = head) == null || h.waitStatus < 0) {
  9         Node s = node.next;
 10         if (s == null || s.isShared())
 11             // 又是这个方法,只是现在的 head 已经不是原来的空节点了,是 t3 的节点了
 12             doReleaseShared();
 13     }
 14 }
  1 // 调用这个方法的时候,state == 0
  2 private void doReleaseShared() {
  3     for (;;) {
  4         Node h = head;
  5         // 1. h == null: 说明阻塞队列为空
  6         // 2. h == tail: 说明头结点可能是刚刚初始化的头节点,
  7         //   或者是普通线程节点,但是此节点既然是头节点了,那么代表已经被唤醒了,阻塞队列没有其他节点了
  8         // 所以这两种情况不需要进行唤醒后继节点
  9         if (h != null && h != tail) {
 10             int ws = h.waitStatus;
 11             // t4 将头节点(此时是 t3)的 waitStatus 设置为 Node.SIGNAL(-1) 了
 12             if (ws == Node.SIGNAL) {
 13                 // 这里 CAS 失败的场景请看下面的解读
 14                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
 15                     continue;            // loop to recheck cases
 16                 // 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点
 17                 // 在这里,也就是唤醒 t4
 18                 unparkSuccessor(h);
 19             }
 20             else if (ws == 0 &&
 21                      // 这个 CAS 失败的场景是:执行到这里的时候,刚好有一个节点入队,入队会将这个 ws 设置为 -1
 22                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
 23                 continue;                // loop on failed CAS
 24         }
 25         // 如果到这里的时候,前面唤醒的线程已经占领了 head,那么再循环
 26         // 否则,就是 head 没变,那么退出循环,
 27         // 退出循环是不是意味着阻塞队列中的其他节点就不唤醒了?当然不是,唤醒的线程之后还是会调用这个方法的
 28         if (h == head)                   // loop if head changed
 29             break;
 30     }
 31 }

          这里方法要注意了哦,t3 作为唤醒者,走到了 doReleaseShared 执行唤醒的操作,t4作为被唤醒者,也会走到 doReleaseShared ,这个方法,OK ,但是 t3的那个线程继续走到 28行的时候,有可能由于此时head 是被T4修改了(看唤醒后的流程),所以 h==head 返回 false ,那么循环就会继续,继续的话,就有可能两线程在 14行相遇,导致了 CAS 失败。失败了没关系,需要知道的是,总会有一个获得了锁,然后接着唤醒后面的线程,即是说会有一定几率发生多个线程唤醒后面锁的情况,但是为什么要这样做呢?个人理解 :

  • 方法复用
  • 提升吞吐量

CyclicBarrier

       CyclicBarrier 基于 Condition 来实现,CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。CyclicBarrier 是基于 AQS 的 ConditionObject 和 ReentranLock 实现的。

cyclicbarrier-2

         cyclicbarrier-3

      下面代码分析来自参考文章,建议可以去阅读一下。

  1 public class CyclicBarrier {
  2     // 我们说了,CyclicBarrier 是可以重复使用的,我们把每次从开始使用到穿过栅栏当做"一代"
  3     private static class Generation {
  4         boolean broken = false;
  5     }
  6 
  7     /** The lock for guarding barrier entry */
  8     private final ReentrantLock lock = new ReentrantLock();
  9     // CyclicBarrier 是基于 Condition 的
 10     // Condition 是“条件”的意思,CyclicBarrier 的等待线程通过 barrier 的“条件”是大家都到了栅栏上
 11     private final Condition trip = lock.newCondition();
 12 
 13     // 参与的线程数
 14     private final int parties;
 15 
 16     // 如果设置了这个,代表越过栅栏之前,要执行相应的操作
 17     private final Runnable barrierCommand;
 18 
 19     // 当前所处的“代”
 20     private Generation generation = new Generation();
 21 
 22     // 还没有到栅栏的线程数,这个值初始为 parties,然后递减
 23     // 还没有到栅栏的线程数 = parties - 已经到栅栏的数量
 24     private int count;
 25 
 26     public CyclicBarrier(int parties, Runnable barrierAction) {
 27         if (parties <= 0) throw new IllegalArgumentException();
 28         this.parties = parties;
 29         this.count = parties;
 30         this.barrierCommand = barrierAction;
 31     }
 32 
 33     public CyclicBarrier(int parties) {
 34         this(parties, null);
 35     }

首先,先看怎么开启新的一代:

  1 // 开启新的一代,当最后一个线程到达栅栏上的时候,调用这个方法来唤醒其他线程,同时初始化“下一代”
  2 private void nextGeneration() {
  3     // 首先,需要唤醒所有的在栅栏上等待的线程
  4     trip.signalAll();
  5     // 更新 count 的值
  6     count = parties;
  7     // 重新生成“新一代”
  8     generation = new Generation();
  9 }
 10 

看看怎么打破一个栅栏:

  1 private void breakBarrier() {
  2     // 设置状态 broken 为 true
  3     generation.broken = true;
  4     // 重置 count 为初始值 parties
  5     count = parties;
  6     // 唤醒所有已经在等待的线程
  7     trip.signalAll();
  8 }
  9 

这两个方法之后用得到,现在开始分析最重要的等待通过栅栏方法 await 方法:

  1 // 不带超时机制
  2 public int await() throws InterruptedException, BrokenBarrierException {
  3     try {
  4         return dowait(false, 0L);
  5     } catch (TimeoutException toe) {
  6         throw new Error(toe); // cannot happen
  7     }
  8 }
  9 // 带超时机制,如果超时抛出 TimeoutException 异常
 10 public int await(long timeout, TimeUnit unit)
 11     throws InterruptedException,
 12            BrokenBarrierException,
 13            TimeoutException {
 14     return dowait(true, unit.toNanos(timeout));
 15 }
 16 

继续往里看:

  1 private int dowait(boolean timed, long nanos)
  2         throws InterruptedException, BrokenBarrierException,
  3                TimeoutException {
  4     final ReentrantLock lock = this.lock;
  5     // 先要获取到锁,然后在 finally 中要记得释放锁
  6     // 如果记得 Condition 部分的话,我们知道 condition 的 await 会释放锁,signal 的时候需要重新获取锁
  7     lock.lock();
  8     try {
  9         final Generation g = generation;
 10         // 检查栅栏是否被打破,如果被打破,抛出 BrokenBarrierException 异常
 11         if (g.broken)
 12             throw new BrokenBarrierException();
 13         // 检查中断状态,如果中断了,抛出 InterruptedException 异常
 14         if (Thread.interrupted()) {
 15             breakBarrier();
 16             throw new InterruptedException();
 17         }
 18         // index 是这个 await 方法的返回值
 19         // 注意到这里,这个是从 count 递减后得到的值
 20         int index = --count;
 21 
 22         // 如果等于 0,说明所有的线程都到栅栏上了,准备通过
 23         if (index == 0) {  // tripped
 24             boolean ranAction = false;
 25             try {
 26                 // 如果在初始化的时候,指定了通过栅栏前需要执行的操作,在这里会得到执行
 27                 final Runnable command = barrierCommand;
 28                 if (command != null)
 29                     command.run();
 30                 // 如果 ranAction 为 true,说明执行 command.run() 的时候,没有发生异常退出的情况
 31                 ranAction = true;
 32                 // 唤醒等待的线程,然后开启新的一代
 33                 nextGeneration();
 34                 return 0;
 35             } finally {
 36                 if (!ranAction)
 37                     // 进到这里,说明执行指定操作的时候,发生了异常,那么需要打破栅栏
 38                     // 之前我们说了,打破栅栏意味着唤醒所有等待的线程,设置 broken 为 true,重置 count 为 parties
 39                     breakBarrier();
 40             }
 41         }
 42 
 43         // loop until tripped, broken, interrupted, or timed out
 44         // 如果是最后一个线程调用 await,那么上面就返回了
 45         // 下面的操作是给那些不是最后一个到达栅栏的线程执行的
 46         for (;;) {
 47             try {
 48                 // 如果带有超时机制,调用带超时的 Condition 的 await 方法等待,直到最后一个线程调用 await
 49                 if (!timed)
 50                     trip.await();
 51                 else if (nanos > 0L)
 52                     nanos = trip.awaitNanos(nanos);
 53             } catch (InterruptedException ie) {
 54                 // 如果到这里,说明等待的线程在 await(是 Condition 的 await)的时候被中断
 55                 if (g == generation && ! g.broken) {
 56                     // 打破栅栏
 57                     breakBarrier();
 58                     // 打破栅栏后,重新抛出这个 InterruptedException 异常给外层调用的方法
 59                     throw ie;
 60                 } else {
 61                     // 到这里,说明 g != generation, 说明新的一代已经产生,即最后一个线程 await 执行完成,
 62                     // 那么此时没有必要再抛出 InterruptedException 异常,记录下来这个中断信息即可
 63                     // 或者是栅栏已经被打破了,那么也不应该抛出 InterruptedException 异常,
 64                     // 而是之后抛出 BrokenBarrierException 异常
 65                     Thread.currentThread().interrupt();
 66                 }
 67             }
 68 
 69               // 唤醒后,检查栅栏是否是“破的”
 70             if (g.broken)
 71                 throw new BrokenBarrierException();
 72 
 73             // 这个 for 循环除了异常,就是要从这里退出了
 74             // 我们要清楚,最后一个线程在执行完指定任务(如果有的话),会调用 nextGeneration 来开启一个新的代
 75             // 然后释放掉锁,其他线程从 Condition 的 await 方法中得到锁并返回,然后到这里的时候,其实就会满足 g != generation 的
 76             // 那什么时候不满足呢?barrierCommand 执行过程中抛出了异常,那么会执行打破栅栏操作,
 77             // 设置 broken 为true,然后唤醒这些线程。这些线程会从上面的 if (g.broken) 这个分支抛 BrokenBarrierException 异常返回
 78             // 当然,还有最后一种可能,那就是 await 超时,此种情况不会从上面的 if 分支异常返回,也不会从这里返回,会执行后面的代码
 79             if (g != generation)
 80                 return index;
 81 
 82             // 如果醒来发现超时了,打破栅栏,抛出异常
 83             if (timed && nanos <= 0L) {
 84                 breakBarrier();
 85                 throw new TimeoutException();
 86             }
 87         }
 88     } finally {
 89         lock.unlock();
 90     }
 91 }
 92 

       唤醒线程,最后调用的是 Condition 的像 signal 的逻辑,向sync queue 里插进元素。  

       下面开始收尾工作。

       首先,我们看看怎么得到有多少个线程到了栅栏上,处于等待状态:

  1 public int getNumberWaiting() {
  2     final ReentrantLock lock = this.lock;
  3     lock.lock();
  4     try {
  5         return parties - count;
  6     } finally {
  7         lock.unlock();
  8     }
  9 }
 10 

        判断一个栅栏是否被打破了,这个很简单,直接看 broken 的值即可:

  1 public boolean isBroken() {
  2     final ReentrantLock lock = this.lock;
  3     lock.lock();
  4     try {
  5         return generation.broken;
  6     } finally {
  7         lock.unlock();
  8     }
  9 }
 10 

        前面我们在说 await 的时候也几乎说清楚了,什么时候栅栏会被打破,总结如下:

  1. 中断,我们说了,如果某个等待的线程发生了中断,那么会打破栅栏,同时抛出 InterruptedException 异常;
  2. 超时,打破栅栏,同时抛出 TimeoutException 异常;
  3. 指定执行的操作抛出了异常,这个我们前面也说过。

       最后,我们来看看怎么重置一个栅栏:

  1 public void reset() {
  2     final ReentrantLock lock = this.lock;
  3     lock.lock();
  4     try {
  5         breakBarrier();   // break the current generation
  6         nextGeneration(); // start a new generation
  7     } finally {
  8         lock.unlock();
  9     }
 10 }
 11 

        我们设想一下,如果初始化时,指定了线程 parties = 4,前面有 3 个线程调用了 await 等待,在第 4 个线程调用 await 之前,我们调用 reset 方法,那么会发生什么?

        首先,打破栅栏,那意味着所有等待的线程(3个等待的线程)会唤醒,await 方法会通过抛出 BrokenBarrierException 异常返回。然后开启新的一代,重置了 count 和 generation,相当于一切归零了。

Semaphore

        有了 CountDownLatch 的基础后,分析 Semaphore 会简单很多。Semaphore 是什么呢?它类似一个资源池(读者可以类比线程池),每个线程需要调用 acquire() 方法获取资源,然后才能执行,执行完后,需要 release 资源,让给其他的线程用。

        大概大家也可以猜到,Semaphore 其实也是 AQS 中共享锁的使用,因为每个线程共享一个池嘛。

        套路解读:创建 Semaphore 实例的时候,需要一个参数 permits,这个基本上可以确定是设置给 AQS 的 state 的,然后每个线程调用 acquire 的时候,执行 state = state - 1,release 的时候执行 state = state + 1,当然,acquire 的时候,如果 state = 0,说明没有资源了,需要等待其他线程 release。

构造方法:

  1 public Semaphore(int permits) {
  2     sync = new NonfairSync(permits);
  3 }
  4 
  5 public Semaphore(int permits, boolean fair) {
  6     sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  7 }
  8 

         这里和 ReentrantLock 类似,用了公平策略和非公平策略。

看 acquire 方法:

  1 public void acquire() throws InterruptedException {
  2     sync.acquireSharedInterruptibly(1);
  3 }
  4 public void acquireUninterruptibly() {
  5     sync.acquireShared(1);
  6 }
  7 public void acquire(int permits) throws InterruptedException {
  8     if (permits < 0) throw new IllegalArgumentException();
  9     sync.acquireSharedInterruptibly(permits);
 10 }
 11 public void acquireUninterruptibly(int permits) {
 12     if (permits < 0) throw new IllegalArgumentException();
 13     sync.acquireShared(permits);
 14 }
 15 

          这几个方法也是老套路了,大家基本都懂了吧,这边多了两个可以传参的 acquire 方法,不过大家也都懂的吧,如果我们需要一次获取超过一个的资源,会用得着这个的。

          我们接下来看不抛出 InterruptedException 异常的 acquireUninterruptibly() 方法吧:

  1 public void acquireUninterruptibly() {
  2     sync.acquireShared(1);
  3 }
  4 public final void acquireShared(int arg) {
  5     if (tryAcquireShared(arg) < 0)
  6         doAcquireShared(arg);
  7 }
  8 

          前面说了,Semaphore 分公平策略和非公平策略,我们对比一下两个 tryAcquireShared 方法:

  1 // 公平策略:
  2 protected int tryAcquireShared(int acquires) {
  3     for (;;) {
  4         // 区别就在于是不是会先判断是否有线程在排队,然后才进行 CAS 减操作
  5         if (hasQueuedPredecessors())
  6             return -1;
  7         int available = getState();
  8         int remaining = available - acquires;
  9         if (remaining < 0 ||
 10             compareAndSetState(available, remaining))
 11             return remaining;
 12     }
 13 }
 14 // 非公平策略:
 15 protected int tryAcquireShared(int acquires) {
 16     return nonfairTryAcquireShared(acquires);
 17 }
 18 final int nonfairTryAcquireShared(int acquires) {
 19     for (;;) {
 20         int available = getState();
 21         int remaining = available - acquires;
 22         if (remaining < 0 ||
 23             compareAndSetState(available, remaining))
 24             return remaining;
 25     }
 26 }
 27 

         也是老套路了,所以从源码分析角度的话,我们其实不太需要关心是不是公平策略还是非公平策略,它们的区别往往就那么一两行。

         我们再回到 acquireShared 方法,

  1 public final void acquireShared(int arg) {
  2     if (tryAcquireShared(arg) < 0)
  3         doAcquireShared(arg);
  4 }
  5 

         由于 tryAcquireShared(arg) 返回小于 0 的时候,说明 state 已经小于 0 了(没资源了),此时 acquire 不能立马拿到资源,需要进入到阻塞队列等待,虽然贴了很多代码,不在乎多这点了:

  1 private void doAcquireShared(int arg) {
  2     final Node node = addWaiter(Node.SHARED);
  3     boolean failed = true;
  4     try {
  5         boolean interrupted = false;
  6         for (;;) {
  7             final Node p = node.predecessor();
  8             if (p == head) {
  9                 int r = tryAcquireShared(arg);
 10                 if (r >= 0) {
 11                     setHeadAndPropagate(node, r);
 12                     p.next = null; // help GC
 13                     if (interrupted)
 14                         selfInterrupt();
 15                     failed = false;
 16                     return;
 17                 }
 18             }
 19             if (shouldParkAfterFailedAcquire(p, node) &&
 20                 parkAndCheckInterrupt())
 21                 interrupted = true;
 22         }
 23     } finally {
 24         if (failed)
 25             cancelAcquire(node);
 26     }
 27 }
 28 

          这个方法我就不介绍了,线程挂起后等待有资源被 release 出来。接下来,我们就要看 release 的方法了:

  1 public void acquire() throws InterruptedException {
  2     sync.acquireSharedInterruptibly(1);
  3 }
  4 public void acquireUninterruptibly() {
  5     sync.acquireShared(1);
  6 }
  7 public void acquire(int permits) throws InterruptedException {
  8     if (permits < 0) throw new IllegalArgumentException();
  9     sync.acquireSharedInterruptibly(permits);
 10 }
 11 public void acquireUninterruptibly(int permits) {
 12     if (permits < 0) throw new IllegalArgumentException();
 13     sync.acquireShared(permits);
 14 }
 15 

           tryReleaseShared 方法总是会返回 true,然后是 doReleaseShared,这个也是我们熟悉的方法了,我就贴下代码,不分析了,这个方法用于唤醒所有的等待线程:

  1 public void acquire() throws InterruptedException {
  2     sync.acquireSharedInterruptibly(1);
  3 }
  4 public void acquireUninterruptibly() {
  5     sync.acquireShared(1);
  6 }
  7 public void acquire(int permits) throws InterruptedException {
  8     if (permits < 0) throw new IllegalArgumentException();
  9     sync.acquireSharedInterruptibly(permits);
 10 }
 11 public void acquireUninterruptibly(int permits) {
 12     if (permits < 0) throw new IllegalArgumentException();
 13     sync.acquireShared(permits);
 14 }
 15 

             Semphore 的源码里面也是有分公平锁和非公平锁的两种方式的使用,看一下获取锁的方法。默认实现的非公平锁。

  1 public void acquire() throws InterruptedException {
  2     sync.acquireSharedInterruptibly(1);
  3 }
  4 public void acquireUninterruptibly() {
  5     sync.acquireShared(1);
  6 }
  7 public void acquire(int permits) throws InterruptedException {
  8     if (permits < 0) throw new IllegalArgumentException();
  9     sync.acquireSharedInterruptibly(permits);
 10 }
 11 public void acquireUninterruptibly(int permits) {
 12     if (permits < 0) throw new IllegalArgumentException();
 13     sync.acquireShared(permits);
 14 }

         思想就是循环CAS 一直到设置成功或是 remaining <0 退出。

  1 // 公平策略:
  2 protected int tryAcquireShared(int acquires) {
  3     for (;;) {
  4         // 区别就在于是不是会先判断是否有线程在排队,然后才进行 CAS 减操作
  5         if (hasQueuedPredecessors())
  6             return -1;
  7         int available = getState();
  8         int remaining = available - acquires;
  9         if (remaining < 0 ||
 10             compareAndSetState(available, remaining))
 11             return remaining;
 12     }
 13 }
 14 // 非公平策略:
 15 protected int tryAcquireShared(int acquires) {
 16     return nonfairTryAcquireShared(acquires);
 17 }
 18 final int nonfairTryAcquireShared(int acquires) {
 19     for (;;) {
 20         int available = getState();
 21         int remaining = available - acquires;
 22         if (remaining < 0 ||
 23             compareAndSetState(available, remaining))
 24             return remaining;
 25     }
 26 }

        获取锁的操作完成后要是获取不到锁的话,那么就会进入下面这个方法,很熟悉了,要是头结点就尝试获取锁,获取不到就进入阻塞等待。

  1 private void doAcquireShared(int arg) {
  2     final Node node = addWaiter(Node.SHARED);
  3     boolean failed = true;
  4     try {
  5         boolean interrupted = false;
  6         for (;;) {
  7             final Node p = node.predecessor();
  8             if (p == head) {
  9                 int r = tryAcquireShared(arg);
 10                 if (r >= 0) {
 11                     setHeadAndPropagate(node, r);
 12                     p.next = null; // help GC
 13                     if (interrupted)
 14                         selfInterrupt();
 15                     failed = false;
 16                     return;
 17                 }
 18             }
 19             if (shouldParkAfterFailedAcquire(p, node) &&
 20                 parkAndCheckInterrupt())
 21                 interrupted = true;
 22         }
 23     } finally {
 24         if (failed)
 25             cancelAcquire(node);
 26     }
 27 }

 

总结

CountDownLatch 
- 内部实现是使用一个继承了 AQS 的类,获取锁的方式是共享锁,阻塞的线程放在AQS的 sync queue内
- 初始化时,传入的栅栏数量时AQS的 status ,也就注定了栅栏数量不能修改

CyclicBarrier
- 内部实现是一个ReentrantLock 和一个 ConditionObject , 可以多次复用栅栏。

两者的区别在于 :
1.内部实现
2.栅栏数量可否修改,可否复用的

参考资料

原文地址:https://www.cnblogs.com/Benjious/p/10161640.html