- 场景引入
日常开发中,有个需求,要求主线程开启多个线程去并行执行任务,并且主线程需要等待所有的子线程执行完成后进行汇总。我们很容易找到 jion()方法来实现这个功能
缺点:由于工作中,我们不会直接创建线程,一般都是交给线程池处理,那么我们就没法对线程池里的线程调用join()方法了。
2.需求升级
主线程不必等所有的子线程全部执行完成就可以进行汇总,并且子线程还能执行其他的无关的功能。查阅JUC包,我们发现CountDownLatch、CyclicBarrier、Semaphore均可以实现这个功能。
今天我们主要学习一下CountDownLatch
3. join 和 CountDownLatch比较
- 调用一个子线程的join()方法后,该线程(主线程)会一直被阻塞,直到子线程运行完成。而CountDownLatch使用计数器,在调用countDown()方法递减计数,在计数器为0时,唤醒阻塞的线程,而不必等到子线程执行完成。
- 在使用线程池执行任务时,join()方法不能使用,CountDownLatch的使用更加灵活。
4. 源码走读
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); // 底层是AQS,构造函数的入参传递给AQS的state属性 this.sync = new CountDownLatch.Sync(count); }
// 调用该方法,计数器的值递减,当计数器的值为0则唤醒所有因调用该方法而阻塞的线程, //否则什么也不做 public void countDown() { // 委托sync调用AQS的方法 sync.releaseShared(1); } public final boolean releaseShared(int arg) { // 调用sync实现的tryReleaseShared方法 if (tryReleaseShared(arg)) { // AQS的释放资源方法,唤醒由于调用await方法而阻塞的线程,此时阻塞线程往下执行,但是不影响子线程调用countDown方法后面的代码的继续执行 doReleaseShared(); return true; } return false; } protected boolean tryReleaseShared(int releases) { // 循环进行cas,直到当前线程成功完成计数器减一并更新到state for (;;) { int c = getState(); if (c == 0) return false; int nextc = c - 1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 判断state是否为0 if (tryAcquireShared(arg) < 0) // 如果不为0,则将当前线程加入到AQS的队列等待 doAcquireSharedInterruptibly(arg); } // 单纯的判断AQS的state是否为0 protected int tryAcquireShared(int acquires) {do return (getState() == 0) ? 1 : -1; }
5. 总结
CountDownLatch基于AQS实现(CountDownLatch静态内部类Sync)。使用AQS的状态值state来存放计数器的值。首先在初始化的时候,设置状态值(计数器的值),当多个线程调用countDown()方法时,实际上是对AQS进行原子性递减。当有线程调用await()方法时后,该线程会被放入AQS的阻塞队列等待计数器为0再返回。其他线程调用 countDown 方法让计数器值递减1,当计数器的值变为0时,当前线程还要调用AQS的 doReleaseShared 方法来激活由于调用 await 方法而阻塞的线程。