CyclicBarrier回环屏障深度解析

1. 前沿

        从上一节的CountDownLatch的学习,我们发现其只能使用一次,当state递减为0后,就没有用了,需要重新新建一个计数器。那么我们有没有可以复用的计数器呢?当然,JUC包给我们提供了CyclicBarrier回环屏障来实现计数器的复用。

2. 概念讲解

  何为回环:当所有等待线程执行完成后,重置CyclicBarrier的状态,使得它能被复用。

  何为屏障:线程调用await方法后就会被阻塞,这个阻塞点就叫屏障点,等所有线程都调用await方法后,线程们就会突破屏障,继续往下运行。

3. 案例

4. 源码分析

  • 构造函数
// 一个入参的构造函数
public CyclicBarrier(int parties) {
        // 调用还是两个入参的构造函数
        this(parties, null);
    }

public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        // 将入参计数器值赋给count和线程数parties 
        this.parties = parties;
        this.count = parties;
        // 计数器count=0时,执行下面barrierCommand的功能
        this.barrierCommand = barrierAction;
    }
  • await() 
public int await() throws InterruptedException, BrokenBarrierException {
        try {
// 实际调用的dowait方法
return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
     // 回环屏障底层是用ReentrantLock独占锁实现,其底层还是基于AQS实现
final ReentrantLock lock = this.lock; lock.lock(); try { final Generation g = generation;        // 如果回环屏障是broken=true状态,抛出异常。因为dowait之前broken一定是false的 if (g.broken) throw new BrokenBarrierException(); // 如果线程interrupted了,那么打破屏障,代码往下执行 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); }         // 计数器递减1 int index = --count;
// 当计数器为0时,打破屏障
if (index == 0) { // tripped boolean ranAction = false; try {
// 我们代码中调用两个参数的构造函数时,传入的一个任务
final Runnable command = barrierCommand;
            // 如果任务不为空,那么在回环屏障打破的时候,执行我们自定义的任务。  
if (command != null) command.run(); ranAction = true; nextGeneration(); return 0; } finally {
// 如果try方法出现异常,没有正常将ranAction设置为true,那么finally里强行打破屏障
if (!ranAction) breakBarrier(); } }        // 以下是设置了超时时间的dowait方法实现,当等待时间超过了超时时间时,屏障等待也会结束 for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // We're about to finish waiting even if we had not // been interrupted, so this interrupt is deemed to // "belong" to subsequent execution. Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
// 打破屏障的方法
private
void breakBarrier() {
     // 设置打破状态为true generation.broken
= true;
     // 将线程数重新赋给计数器count,这是保证屏障复用的关键。 count
= parties;
     // 唤醒其他所有因为调用await方法而发生阻塞的线程 trip.signalAll(); }

5. 总结

  当我们创建回环屏障对象时,传入的计数器值M,前M-1个线程调用await方法时,获得独占锁,串行话执行dowait方法,都将count递减1,并且将M-1个线程加入到trip的条件队列中去。当最后一个线程执行到await方法时,最终将count置为了0,同时唤醒trip条件队列中所有被阻塞的线程,使得所有的M个线程继续往下执行。

原文地址:https://www.cnblogs.com/zjting/p/12829616.html