CountDownLatch 和 CyclicBarrier

一、CountDownLatch

  其实要完成这种某个线程等待其他线程结果才能开始任务的业务,直接在需要准备的线程中join()依赖的线程就能完成要求,但是在博客的上一篇《三个线程顺序输出》中也说到过,join的线程返回,必须是子线程已经结束。而CountDownLatch提供了更灵活的方案,可在子线程完成好其他线程依赖的工作后调用countDown()方法主动减少计数,同时继续做线程间业务无依赖的其他工作。

  这里CountDownLatch只提供了一个构造方法,参数为大于0整数:

 public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

  具体演示如下:

public class CountDownLatchTest {

    private static CountDownLatch counter = new CountDownLatch(2);

    public static void main(String[] args) throws InterruptedException {
        Thread threadA = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("子线程:" + Thread.currentThread().getName() + " start working!");
                try {
                    Thread.sleep(2000);
                    counter.countDown();
                    System.out.println("子线程:" + Thread.currentThread().getName() + " working finish!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "thread_A");
        Thread threadB = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("子线程:" + Thread.currentThread().getName() + " start working!");
                try {
                    Thread.sleep(1000);
                    counter.countDown();
                    System.out.println("子线程:" + Thread.currentThread().getName() + " working finish!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "thread_B");
        threadA.start();
        threadB.start();
        counter.await();
        System.out.println("A、B准备就绪!");
        // do something C...
    }
}

  结果:

  可以看到在counter.countDown()被调用两次后,主线程便继续执行,同时子线程A、B仍可继续完成各自的工作。需要注意的是,这里初始化的计数值为2,调用2次正好返回,如果初始值为3,而实际只调用2次,那么程序就会在await()处无限等待。因此await()也提供了带等待时间的使用方法:

public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

  例如把测试程序中的await()改成如下:

counter.await(1000,TimeUnit.MILLISECONDS);

  可以看到结果中,await()在没有等到计数归0,就回到主线程继续执行:

  另外CountDownLatch也提供了getCount()方法随时获取当前剩余计数。

  

二、CyclicBarrier

  译作同步屏障,提供了2种初始化方法,第一种为简单int参数:

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

  即挂起当前线程,直到所有设定的线程都到达Barrier状态,再一起执行后续任务。

    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

  第二种方法可以设定一个任务,在程序都到达设定的barrier后,再执行barrierAction,截取部分源码如下:

        if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }    

  可以看到在index为0时,如果command不为null,会执行run()方法。注意run()方法并不是start(),也就是并没有再起一个线程,而是在最后到达barrier的线程中继续执行barrierCommand,再返回。

  这里直接演示第二种初始化下的使用,代码如下:

public class CyclicBarrierTest {

    private static int N = 3;

    private static CyclicBarrier barrier = new CyclicBarrier(N, new Runnable() {

        @Override
        public void run() {
            // TODO Auto-generated method stub
            System.out.println("barrierAction:" + Thread.currentThread().getName() + " 开始工作");
            try {
                Thread.sleep(9000);
                System.out.println("barrierAction:" + Thread.currentThread().getName() + " 结束工作");
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    });

    static class Writer extends Thread {

        private CyclicBarrier cyclicBarrier;

        public Writer(CyclicBarrier barr) {
            this.cyclicBarrier = barr;
        }

        @Override
        public void run() {
            System.out.println("线程" + Thread.currentThread().getName() + "开始写入数据到文件...");
            long sleepTime = (long) (Math.random() * 1000 + 1000);
            try {
                Thread.sleep(sleepTime);
                System.out
                        .println("线程" + Thread.currentThread().getName() + "用时" + sleepTime + "ms写入数据完毕,等待其他线程完成写入...");
                cyclicBarrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println("所有线程写入完毕,继续处理其他任务...");
        }
    }

    public static void main(String[] args) {
        for (int i = 0; i < N; i++) {
            new Writer(barrier).start();
        }

    }
}

  运行结果:

         

  特意多运行了几次,从结果中可以看出:

    1、在barrier处等待的线程数到达设定值(这里为3)前,当前线程会在cyclicBarrier.await()处挂起;

    2、如果设定了barrierAction任务,当最后到达barrier处的线程在执行await()方法时,会执行barrierAction任务,图中打出了每个任务的执行时间,可以看到每次都是执行任务时间最长,即最后到达barrier处的线程,会继续执行barrierAction方法;

    3、虽然到达了设定的await()线程数,但是必须在最后一个到达的线程执行完barrierAction方法,等await()返回后,所有线程才会继续执行后续的代码,示例中barrierActio()方法故意设定了较长的等待时间,但是程序依然在等待await()返回。

  另外,为了避免await()无休止的等待,JDK同样提供了带等待时间的await方法,观察源码片段:

if (timed && nanos <= 0L) {
    breakBarrier();
    throw new TimeoutException();
}

  这里要特别注意的时当某个线程抛出TimeoutException异常后会调用breakBarrier()方法把broken值设置为true: 

    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

  之所以这里提到这个,是因为在dowait()方法中,如果broken为真,函数将直接返回,抛出BrokenBarrierException,造成结果:

    1、如果使用了带barrierCommand方法的构造函数,这里barrierCommand将不会执行;

    2、如果多个线程设置的await()时间不一样,例如A设置的等待时间较短,假设1秒后超时返回,那么B、C即使设置更长的等待时间,他们也不会在await()处等待,而是在运行到await()后立刻抛出BrokenBarrierException。

/**
     * Main barrier code, covering the various policies.
     */
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

  使用带等待参数的await(),运行上文中的代码,故意使等待超时,结果如下:

  另外CyclicBarrier是可以重用的,这里直接给出@海子博客中的例子:

public class Test {
    public static void main(String[] args) {
        int N = 2;
        CyclicBarrier barrier = new CyclicBarrier(N);

        for (int i = 0; i < N; i++) {
            new Writer(barrier).start();
        }

        try {
            Thread.sleep(25000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("CyclicBarrier重用");

        for (int i = 0; i < N; i++) {
            new Writer(barrier).start();
        }
    }

    static class Writer extends Thread {
        private CyclicBarrier cyclicBarrier;

        public Writer(CyclicBarrier cyclicBarrier) {
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
            try {
                Thread.sleep(5000); // 以睡眠来模拟写入数据操作
                System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");

                cyclicBarrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "所有线程写入完毕,继续处理其他任务...");
        }
    }
}

  运行效果如图:

  

三、CountDownLatch 和 CyclicBarrier区别

  1、从使用上可以发现二者侧重点不同,在CountDownLatch使用时其实是存在主线程和子线程的概念,子线程在准备好主线程需要的资源后,主线程结束等待,继续剩下的工作;而在CyclicBarrier使用中,并不存在主和次的说法,更像是一组线程在互相等待后,然后在同一时间,继续后面的操作,可以类比于现实中跑步比赛的场景,一组运动员在各自准备好起跑动作后,由裁判员发令后,统一起跑(额,这里想不到合适的业务场景,尴想了一个强行解释,或许,在优化资源利用时,设置一定的请求数,攒满了后再一起请求?有实际的应用场景欢迎补充)。

  2、等待超时CountDownLatch会直接返回,继续后续工作,CyclicBarrier首先会抛TimeoutException,同时如果barrier要等的线程数大于1,其他线程不会按设定的等待时间等待,而是抛出BrokenBarrierException后直接返回,所以使用CyclicBarrier要注意异常处理的逻辑。

  3、使用细节差异如下表:

      

   4、最后要提的是CountDownLatch 和 CyclicBarrier虽然基本都是在多线程中使用,但是同一个线程多次调用countDown()和await()其实也会使计数改变。

参考资料:

  1、http://www.importnew.com/21889.html

  2、https://blog.csdn.net/tolcf/article/details/50925145

原文地址:https://www.cnblogs.com/lyInfo/p/9131103.html