多线程并发工具类03-CyclicBarrier

CyclicBarrier一般也是用于对多个线程任务进行同步执行。是多线程并发开发中重要的一个工具类;

分析一下源码

初始化

    //用于控制栅栏进入的锁
    private final ReentrantLock lock = new ReentrantLock();
    //条件锁
    private final Condition trip = lock.newCondition();
   //
    private final int parties;
    //当任务被捕获时,执行的任务
    private final Runnable barrierCommand;
    //
    private Generation generation = new Generation();

    //等待的数量
    private int count;

await方法

  • 在所有线程任务都到达之前,线程任务都是阻塞状态

  • 当线程任务中出现,中断或者定时任务超时,就会唤醒所有任务执行。

  • 所有线程任务都到达后会唤醒所有任务。

  • 可以在初始化的时候传入基础线程任务,一般是首先执行传入线程任务的。

 public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

/**
 * 1.如果当前线程中断了,打破栅栏;
 * 2.如果所有任务到齐,则执行传入的任务(如果有),并唤醒所有的任务,并返回0;
 * 3.否则,存放入条件队列,直到超时或者打破了栅栏,则抛出异常。如果新生栅栏,则返回当前线程index;
 */
private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;//获取屏障

            if (g.broken)//屏障属性设置为true则抛出异常
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();//唤醒条件队列中的所有任务,将屏障状态设置为true
                throw new InterruptedException();
            }
            /**
             * 1.任务数量减1,判断剩余任务数量是否为0;
             * 2.如果任务为0,则,判断初始化时是否传入任务,先执行传入任务;
             * 3.标记运行状态;并生成新栅栏同时唤醒条件队列中的所有任务;返回0;
             * 4.如果在运行传入任务或唤醒条件队列任务时出现异常,则打破栅栏;    
             */
            int index = --count;
            if (index == 0) {  // 当数量为0时
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;//运行状态,设置为true表示已经运行了传入的任务;
                    nextGeneration();
                    return 0;//并返回0
                } finally {
                    if (!ranAction)//如果任务没执行,则修改屏障状态,唤醒所有条件队列等待任务;
                        breakBarrier();
                }
            }
            /**
             * 如果超时,或者栅栏被打破,或者中断异常,或者有新的栅栏生成会跳出循环。
             * 过程:
             * 1.如果没超时,那么就放入到条件队列中(除非当前线程中断,并且栅栏没有变更,打破栅栏抛出异常)
             * 2.判断栅栏是不是已经中断了,如果中断了则抛出异常;
             * 3.如果生成新的栅栏,说明所有任务已经到位,并且已经唤醒了任务,返回任务数量。
             * 4.如果超时,则打破栅栏,抛出超时异常。
             */
            for (;;) {
                try {
                    if (!timed)//没有超时,则放入条件队列等待唤起
                        trip.await();
                    else if (nanos > 0L)//有等待超时时间,则设置超时时间,超时自动退出
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {//如果中断了,判断屏障状态是否变化,如果没变化,则中断屏障。并抛出异常
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        //中断当前线程
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)//根据状态判断是否抛出异常
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {//超时,则抛出异常
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

/**
 * 打破栅栏:核心是唤醒条件队列中的消息
 */ 
 private void breakBarrier() {
        generation.broken = true;//更改栅栏状态
        count = parties;//将count值复原
        trip.signalAll();//唤醒条件队列中的消息
    }

/**
 * 生成新的栅栏:唤醒所有条件队列的任务,生成新的栅栏
 */
private void nextGeneration() {
    trip.signalAll();
    count = parties;
    generation = new Generation();
}

reset方法

  • 重置当前栅栏
    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            breakBarrier();   // 打破当前的栅栏
            nextGeneration(); // 生成新的栅栏
        } finally {
            lock.unlock();
        }
    }

示例代码

  • 求和操作
class Solver {
    int result = 0;
    final ConcurrentHashMap<String,Object> data = new ConcurrentHashMap<>();
    final CyclicBarrier barrier;

    class Worker implements Runnable {
        int myRow;
        boolean flag = false;
        Worker(int row) { myRow = row; }
        public void run() {
            while (!done()) {
                processRow(myRow);
                try {
                    barrier.await();
                } catch (InterruptedException ex) {
                    return;
                } catch (BrokenBarrierException ex) {
                    return;
                }
            }
        }

        private void processRow(int myRow) {
            data.put(Thread.currentThread().getName(),myRow);
            flag = true;
        }
        private boolean done() {
            return flag;
        }
    }



    public Solver(int N) throws InterruptedException {
        Runnable barrierAction =
                new Runnable() { public void run() { mergeRows(); }};
        barrier = new CyclicBarrier(N, barrierAction);

        List<Thread> threads = new ArrayList<Thread>(N);
        for (int i = 0; i < N; i++) {
            Thread thread = new Thread(new Worker(i));
            threads.add(thread);
            thread.start();
        }

        // wait until done
//        for (Thread thread : threads)
//            thread.join();
    }

    private void mergeRows() {
        this.data.entrySet().stream().forEach(entry ->{
            this.result += (Integer)this.data.get(entry.getKey());
        });
        System.out.println("result:"+result);
    }

    public static void main(String[] args) throws InterruptedException {
        Solver solver = new Solver(5);

    }
    
}

总结

1.CyclicBarrier和CountDownLatch的区别?

  • 内部结构不同:

    • CyclickBarrier内部是通过ReentrantLock的条件队列进行管理线程任务的。

    • CountDownLatch内部是通过静态子类继承AQS,实现共享锁state来进行管理任务数量的。

  • 原理不同:

    • CyclickBarrier是所有任务都到达后才执行任务;

    • CountDownLatch是只在最后判断状态是不是全部执行完成,stat == 0;

  • 线程数量:

    • CyclickBarrier根据线程数进行控制的

    • CountDownLatch是通过调用countDown方法,state减1,和线程个数无关

  • 应用:

    • CyclickBarrier可以根据基于子线程进行处理其他线程的结果,处理比较复杂的业务。并且可以通过reset方法重新执行方法。

    • CountDownLoatch则必须在主线程才能处理,一般用于任务执行初始化数据

原文地址:https://www.cnblogs.com/perferect/p/13711280.html