Curator使用:(七)分布式Barrier

分布式Barrier##

解决线程同步问题

jdk中的Barrier###

    main:
    ExecutorService es = Executors.newFixedThreadPool(2);

    for (int i = 0; i < 5; i++) {
       es.submit(new R("t"+i));
    }
    es.shutdown();
    

    static class R implements Runnable{
        String name;
        public R (String name){
            this.name = name;
        }
        public void run(){
            System.out.println(this.name+" is ready");
            try {
                barrier.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            System.out.println(this.name+" run");
        }
    }
    static CyclicBarrier barrier = new CyclicBarrier(2);
    
t0 is ready
t1 is ready
t1 run
t0 run
t2 is ready
t3 is ready
t3 run
t2 run
t4 is ready

分布式的Barrier(主线程触发)###

    for (int i = 0; i < 5; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+" is ready ");
                CuratorFramework client = CuratorFrameworkFactory.builder()
                        .connectString("ip:port")
                        .sessionTimeoutMs(2000)
                        .connectionTimeoutMs(5000)
                        .retryPolicy(new ExponentialBackoffRetry(1000,3))
                        .namespace("test")
                        .build();
                client.start();
                DistributedBarrier barrier = new DistributedBarrier(client,"/distributed_barrier");
                try {
                    barrier.setBarrier();
                    barrier.waitOnBarrier();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+" run  ");
            }).start();
        }
        Thread.sleep(5000);
        DistributedBarrier barrier = new DistributedBarrier(cc,"/distributed_barrier");
        barrier.removeBarrier();
Thread-1 is ready 
Thread-2 is ready 
Thread-3 is ready 
Thread-4 is ready 
Thread-5 is ready 
Thread-3 run 
Thread-5 run  
Thread-1 run   
Thread-2 run  
Thread-4 run  

分布式的Barrier(根据等待线程数量触发,同时进入 and 同时退出)###

    for (int i = 0; i < 5; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName()+" is ready ");
                CuratorFramework client = CuratorFrameworkFactory.builder()
                        .connectString("ip:port")
                        .sessionTimeoutMs(2000)
                        .connectionTimeoutMs(5000)
                        .retryPolicy(new ExponentialBackoffRetry(1000,3))
                        .namespace("test")
                        .build();
                client.start();
                DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client,"/distributed_barrier",5);
                try {
                    //进入时会等待,5个才会同时进入
                    barrier.enter();
                    Thread.sleep(3000);
                    //退出时依然要等待,5个才会同时退出
                    barrier.leave();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println(Thread.currentThread().getName()+" run  ");
            }).start();
        }
Thread-2 is ready 
Thread-1 is ready 
Thread-4 is ready 
Thread-3 is ready 
Thread-5 is ready 
Thread-5 run  
Thread-3 run  
Thread-4 run  
Thread-2 run  
Thread-1 run  
原文地址:https://www.cnblogs.com/june777/p/11881670.html