J.U.C体系进阶(四):juc-sync 同步器框架

Java - J.U.C体系进阶

作者:Kerwin

邮箱:806857264@qq.com

说到做到,就是我的忍道!

juc-sync 同步器框架

同步器名称 作用
CountDownLatch 倒数计数器,构造时设定计数值,当计数值归零后,所有阻塞线程恢复执行;其内部实现了AQS框架
CyclicBarrier 循环栅栏,构造时设定等待线程数,当所有线程都到达栅栏后,栅栏放行;其内部通过ReentrantLock和Condition实现同步
Semaphore 信号量,类似于“令牌”,用于控制共享资源的访问数量;其内部实现了AQS框架
Exchanger 交换器,类似于双向栅栏,用于线程之间的配对和数据交换;其内部根据并发情况有“单槽交换”和“多槽交换”之分
Phaser 多阶段栅栏,相当于CyclicBarrier的升级版,可用于分阶段任务的并发控制执行;其内部比较复杂,支持树形结构,以减少并发带来的竞争

CountDownLatch

注意:CountDownLatch和CyclicBarrier非常相似,且CyclicBarrier是可以重用的,根据具体的场景不同,代码结构不同,其实两者之间可以相互转化,详见CyclicBarrier模块,下文是CountDownLatch-Demo

// 用法比较简单,直接上代码即可
// 1.CountDownLatch的同一对象传递
// 2.构造参数的默认值需要指定
// 3.线程完成的countDown()->会使默认值减一
// 4.主线程awiw()等待,所有线程都countDown之后,主线程执行
// 应用场景:比如五个子线程文件输出导出数据,主线程等所有子线程都完成之后开始压缩操作,上传文件
          
public class TestCountDownLatch {
    
	/***
	 * 关键点:面向对象的方式->参数传递,把CountDownLatch进行传递,使其共用同一个参数
	 * @param args
	 */
    public static void main(String[] args) {
    	CountDownLatch latch = new CountDownLatch(5);

        ExecutorService executorService = Executors.newCachedThreadPool();
        MyWoker m1 = new MyWoker("work1", latch);
        MyWoker m2 = new MyWoker("work2", latch);
        MyWoker m3 = new MyWoker("work3", latch);
        MyWoker m4 = new MyWoker("work4", latch);
        MyWoker m5 = new MyWoker("work5", latch);
        Boss boss = new Boss("boos", latch);
        executorService.submit(m1);
        executorService.submit(m2);
        executorService.submit(m3);
        executorService.submit(m4);
        executorService.submit(m5);
        executorService.submit(boss);
        
        executorService.shutdown();
    }
}

class MyWoker implements Callable<String> {
	
	private String name;
	private CountDownLatch latch;
	
	public MyWoker (String name, CountDownLatch latch) {
		this.name = name;
		this.latch = latch;
	}
	
	@Override
	public String call() throws Exception {
		System.out.println(name + " 工人开始工作");
		int time = (int)(Math.random() * 100) * 50;
		Thread.sleep(time);
		System.out.println(name + " 工人已经完成任务!");
		latch.countDown();
		return "successful";
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public CountDownLatch getLatch() {
		return latch;
	}

	public void setLatch(CountDownLatch latch) {
		this.latch = latch;
	}
}

class Boss implements Callable<String> {
	
	private String name;
	private CountDownLatch latch;
	
	public Boss (String name, CountDownLatch latch) {
		this.name = name;
		this.latch = latch;
	}
	
	@Override
	public String call() throws Exception {
		System.out.println("老板准备就绪,等工人都完成了就来视察~");
		latch.await();
		System.out.println("老板来了,快跑啊~");
		return "successful";
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public CountDownLatch getLatch() {
		return latch;
	}

	public void setLatch(CountDownLatch latch) {
		this.latch = latch;
	}
}

CyclicBarrier

CyclicBarrier是一个同步辅助类,它允许一组线程相互等待直到所有线程都到达一个公共的屏障点

一句话概述就是:“人满发车”

重点理解:

CountDownLatch主要用于主线程阻塞,等待子线程执行完毕后,主线程执行,例如报表导出压缩上传,子线程处理报表,主线程等都执行完毕后,压缩,上传

CyclicBarrier侧重点是人满发车,比如LOL,需要等待是个用户都加载好了之后,再开启主线程执行工作,值得注意的是,这是一般意义的CyclicBarrier

但是,CyclicBarrier提供了另一个构造方法,即可以指定默认额外的执行线程

CyclicBarrier barrier = new CyclicBarrier(5,  new TotalTask(totalService));

这意味着,在很多情况CyclicBarrier可以代替CountDownLatch,主要看代码的结构设计

比如刚才的问题:报表导出压缩上传,子线程处理报表,主线程等都执行完毕后,压缩,上传

如果我用着这种构造方法,配合awit()的位置,让压缩上传线程默认作为最后执行的线程,即可保证执行的顺序,来看个Demo吧:

Demo-1 CyclicBarrier普通使用方法:

public class TestCyclicBarrier {

	public static void main(String[] args) throws InterruptedException {
		CyclicBarrier cycli = new CyclicBarrier(10);
		
		for (int i = 0; i < 9; i++) {
			new Thread(new BarrierThread("张" + i, cycli)).start();
		}
		
		Thread.sleep(3000);
		new Thread(new BarrierThread("张" + 10, cycli)).start();
		
		Thread.sleep(5000);
	}

}

class BarrierThread implements Runnable{
	
	private String  name;
	private CyclicBarrier cycli;
	
	public BarrierThread(String name, CyclicBarrier cycli) {
		super();
		this.name = name;
		this.cycli = cycli;
	}

	@Override
	public void run() {
		System.out.println(name + " 准备就绪");
		try {
			cycli.await();
		} catch (InterruptedException | BrokenBarrierException e) {
			e.printStackTrace();
		}
		System.out.println(name + " 开始执行");
	}
	
	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public CyclicBarrier getCycli() {
		return cycli;
	}

	public void setCycli(CyclicBarrier cycli) {
		this.cycli = cycli;
	}
}

Demo-2 CyclicBarrier 另一种构造方法的使用:

注意 CyclicBarrier barrier = new CyclicBarrier(5, new TotalTask(totalService));

再注意子线程中,awit等待的代码位置,这个代码位置在程序的最后面,因此CyclicBarrier 的灵活性完全可以由我们来把控,到底在哪一点阻塞,完全是我们自己控制的,这样的变化,就可以在一定程度上替代CountDownLatch,达到更加灵活的目的,CyclicBarrier 且是可重复使用的,细节可以再去深入了解

/**  
 * 各省数据独立,分库存偖。为了提高计算性能,统计时采用每个省开一个线程先计算单省结果,最后汇总。  
 *   
 * @author guangbo email:weigbo@163.com  
 *   
 */  
public class Total {   
  
    // private ConcurrentHashMap result = new ConcurrentHashMap();   
  
    public static void main(String[] args) {   
        TotalService totalService = new TotalServiceImpl();   
        CyclicBarrier barrier = new CyclicBarrier(5,   
                new TotalTask(totalService));   
  
        // 实际系统是查出所有省编码code的列表,然后循环,每个code生成一个线程。   
        new BillTask(new BillServiceImpl(), barrier, "北京").start();   
        new BillTask(new BillServiceImpl(), barrier, "上海").start();   
        new BillTask(new BillServiceImpl(), barrier, "广西").start();   
        new BillTask(new BillServiceImpl(), barrier, "四川").start();   
        new BillTask(new BillServiceImpl(), barrier, "黑龙江").start();   
  
    }   
}   
  
/**  
 * 主任务:汇总任务  
 */  
class TotalTask implements Runnable {   
    private TotalService totalService;   
  
    TotalTask(TotalService totalService) {   
        this.totalService = totalService;   
    }   
  
    public void run() {   
        // 读取内存中各省的数据汇总,过程略。   
        totalService.count();   
        System.out.println("=======================================");   
        System.out.println("开始全国汇总");   
    }   
}   
  
/**  
 * 子任务:计费任务  
 */  
class BillTask extends Thread {   
    // 计费服务   
    private BillService billService;   
    private CyclicBarrier barrier;   
    // 代码,按省代码分类,各省数据库独立。   
    private String code;   
  
    BillTask(BillService billService, CyclicBarrier barrier, String code) {   
        this.billService = billService;   
        this.barrier = barrier;   
        this.code = code;   
    }   
  
    public void run() {   
        System.out.println("开始计算--" + code + "省--数据!");   
        billService.bill(code);   
        // 把bill方法结果存入内存,如ConcurrentHashMap,vector等,代码略   
        System.out.println(code + "省已经计算完成,并通知汇总Service!");   
        try {   
            // 通知barrier已经完成   
            barrier.await();   
        } catch (InterruptedException e) {   
            e.printStackTrace();   
        } catch (BrokenBarrierException e) {   
            e.printStackTrace();   
        }   
    }   
  
} 

相关方法:

— getParties()

获取CyclicBarrier打开屏障的线程数量,也成为方数。

— getNumberWaiting()

获取正在CyclicBarrier上等待的线程数量。

—await()

—await(timeout,TimeUnit)

—isBroken()

获取是否破损标志位broken的值,此值有以下几种情况:

CyclicBarrier初始化时,broken=false,表示屏障未破损。
如果正在等待的线程被中断,则broken=true,表示屏障破损。
如果正在等待的线程超时,则broken=true,表示屏障破损。
如果有线程调用CyclicBarrier.reset()方法,则broken=false,表示屏障回到未破损状态。
—reset()

使得CyclicBarrier回归初始状态,直观来看它做了两件事:

如果有正在等待的线程,则会抛出BrokenBarrierException异常,且这些线程停止等待,继续执行。
将是否破损标志位broken置为false。

CountDownLatch和CyclicBarrier的主要联系和区别如下:

1.闭锁CountDownLatch做减计数,而栅栏CyclicBarrier则是加计数。

2.CountDownLatch是一次性的,CyclicBarrier可以重用。

3.CountDownLatch强调一个线程等多个线程完成某件事情。CyclicBarrier是多个线程互等,等大家都完成。

4.鉴于上面的描述,CyclicBarrier在一些场景中可以替代CountDownLatch实现类似的功能

Semaphore

信号量Semaphore是一个并发工具类,用来控制可同时并发的线程数,其内部维护了一组虚拟许可,通过构造器指定许可的数量,每次线程执行操作时先通过acquire方法获得许可,执行完毕再通过release方法释放许可。如果无可用许可,那么acquire方法将一直阻塞,直到其它线程释放许可

Semaphore用来控制并发线程数,但有个问题FixedThreadPool也可以控制最大并发数,那两者有何不一样呢?首先,量级来看,Semaphore轻量级,是一个并发工具类,线程池重量级无疑,其次特点来讲,Semaphore内的线程是我们实实在在自己创建的,FixedThreadPool是分配给我们的线程池里面的线程

另外,Semaphore如果默认大小为1的时候,还可以当作互斥锁使用,且有公平锁和非公平锁之分(是否按顺序执行,是则就是公平的,但是非常耗性能)

代码Demo,线程队列和Semaphore配合使用

public class TestQueeThread_2 {
	static Semaphore semaphore = new Semaphore(1);
    public static void main(String[] args) throws InterruptedException {
        System.out.println("begin:" + (System.currentTimeMillis() / 1000)); 
        BlockingQueue<String> myQueue = new ArrayBlockingQueue<String>(1);
       
        for (int i = 0; i < 100; i++) {
        	new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						semaphore.acquire();
						String log = myQueue.take();
						System.out.println(Thread.currentThread().getName() + ":" + doSome(log));
						semaphore.release();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
			}).start();
        }
        
        for (int i = 0; i < 100; i++) { // 这行代码不能改动
            String input = i + "";
            myQueue.put(input);
        }
    }

    public static String doSome(String input) {
    	String output = null;
        try {
            Thread.sleep(1000);
            output = input + ":" + (System.currentTimeMillis() / 1000);
            return output;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } 
		return output;
    }
}
// 这种用法可能违背了oracle的本意,我们来看看oracle的官方Demo
// Semaphore是用来约束线程使用共享资源的,控制数据一致性,当然还得是锁
class Pool {
    private static final int MAX_AVAILABLE = 100; // 可同时访问资源的最大线程数
    private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
    protected Object[] items = new Object[MAX_AVAILABLE];   //共享资源
    protected boolean[] used = new boolean[MAX_AVAILABLE];
    public Object getItem() throws InterruptedException {
        available.acquire();
        return getNextAvailableItem();
    }
    public void putItem(Object x) {
        if (markAsUnused(x))
            available.release();
    }
    private synchronized Object getNextAvailableItem() {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if (!used[i]) {
                used[i] = true;
                return items[i];
            }
        }
        return null;
    }
    private synchronized boolean markAsUnused(Object item) {
        for (int i = 0; i < MAX_AVAILABLE; ++i) {
            if (item == items[i]) {
                if (used[i]) {
                    used[i] = false;
                    return true;
                } else
                    return false;
            }
        }
        return false;
    }
}

Exchanger

Exchanger是用作线程并发协作的工具类,简单一句话讲,如果A,B线程都拥有Exchanger对象,如果某一个调用Exchanger的交换方法exchange时候,快的那个会主动等慢的那个(等的意思就是挂起),然后都到位之后,互相唤醒交换数据

代码Demo:

public class ExchangerTest {
	static class Producer extends Thread {
		private Exchanger<Integer> exchanger;
		private static int data = 0;

		Producer(String name, Exchanger<Integer> exchanger) {
			super("Producer-" + name);
			this.exchanger = exchanger;
		}

		@Override
		public void run() {
			for (int i = 1; i < 5; i++) {
				try {
					TimeUnit.SECONDS.sleep(1);
					data = i;
					System.out.println(getName() + " 交换前:" + data);
					data = exchanger.exchange(data);
					System.out.println(getName() + " 交换后:" + data);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
	}

	static class Consumer extends Thread {
		private Exchanger<Integer> exchanger;
		private static int data = 0;

		Consumer(String name, Exchanger<Integer> exchanger) {
			super("Consumer-" + name);
			this.exchanger = exchanger;
		}

		@Override
		public void run() {
			while (true) {
				data = 0;
				System.out.println(getName() + " 交换前:" + data);
				try {
					TimeUnit.SECONDS.sleep(1);
					data = exchanger.exchange(data);
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				System.out.println(getName() + " 交换后:" + data);
			}
		}
	}

	public static void main(String[] args) throws InterruptedException {
		Exchanger<Integer> exchanger = new Exchanger<Integer>();
		new Producer("", exchanger).start();
		new Consumer("", exchanger).start();
		TimeUnit.SECONDS.sleep(7);
		System.exit(-1);
	}
}

结果打印:
Consumer- 交换前:0
Producer- 交换前:1
Consumer- 交换后:1
Producer- 交换后:0
Consumer- 交换前:0
Producer- 交换前:2
Producer- 交换后:0
Consumer- 交换后:2
Consumer- 交换前:0
Producer- 交换前:3
Producer- 交换后:0
Consumer- 交换后:3
Consumer- 交换前:0
Producer- 交换前:4
Producer- 交换后:0
Consumer- 交换后:4
Consumer- 交换前:0

我暂时没有碰到用需要此特点的需求,不过很显然它可以用作生产者消费者模式,通过网上的解释来看,Exchanger的实现是非常复杂的,主要是依赖CAS自旋操作

Phase

Phaser 是一个多栅栏的同步工具

phase(阶段) - Phaser也有栅栏,在Phaser中,栅栏的名称叫做phase(阶段),在任意时间点,Phaser只处于某一个phase(阶段),初始阶段为0,最大达到Integerr.MAX_VALUE,然后再次归零。当所有parties参与者都到达后,phase值会递增

parties(参与者) - 其实就是CyclicBarrier中的参与线程的概念,CyclicBarrier中的参与者在初始构造指定后就不能变更,而Phaser既可以在初始构造时指定参与者的数量,也可以中途通过register、bulkRegister、arriveAndDeregister等方法注册/注销参与者

arrive(到达) / advance(进阶) - Phaser注册完parties(参与者)之后,参与者的初始状态是unarrived的,当参与者到达(arrive)当前阶段(phase)后,状态就会变成arrived。当阶段的到达参与者数满足条件后(注册的数量等于到达的数量),阶段就会发生进阶(advance)——也就是phase值+1

public class SwimmerTest {

	// 游泳选手个数
	private static int swimmerNum = 6;

	public static void main(String[] args) {

		Phaser phaser = new Phaser(7){
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("---------------PHASE[" + phase + "],Parties[" + registeredParties + "] ---------------");
                return phase >= 2  || registeredParties == 0;
            }
        };

		for (int i = 0; i < swimmerNum; i++) {
			new Thread(new Swimmer(phaser), "swimmer" + i).start();
		}

		// 主线程到达,开启第二阶段
		phaser.arriveAndAwaitAdvance();
		
		// 主线程销毁,开启第三阶段
		phaser.arriveAndDeregister();
		
		// 加while是为了防止其它线程没结束就打印了"比赛结束"
		while (!phaser.isTerminated()) {}

		System.out.println("===== 比赛结束 =====");
	}
}

class Swimmer implements Runnable {
	
	private Phaser phaser;

	public Swimmer(Phaser phaser) {
		this.phaser = phaser;
	}

	@Override
	public void run() {

		// 从这里到第一个phaser.arriveAndAwaitAdvance()是第一阶段做的事
		System.out.println("游泳选手-" + Thread.currentThread().getName() + ":已到达赛场");

		phaser.arriveAndAwaitAdvance();

		// 从这里到第二个phaser.arriveAndAwaitAdvance()是第二阶段做的事
		System.out.println("游泳选手-" + Thread.currentThread().getName() + ":已准备好");

		phaser.arriveAndAwaitAdvance();

		// 从这里到第三个phaser.arriveAndAwaitAdvance()是第三阶段做的事
		System.out.println("游泳选手-" + Thread.currentThread().getName() + ":完成比赛");

		phaser.arriveAndAwaitAdvance();
	}
}

上文说到,Phase阶段概念,且注册数量和到达数量一致的之后,就会进入下一个阶段,代码中即是如此,一开始注册七个指标,游泳子线程会运行到达6个,然后由主线程控制到达,进入到下一个阶段,注意的是参与的线程可以注册也可以销毁,所以主线程阶段二是到达,阶段三测试了销毁

Phaser 的onAdvance方法:

Phaser phaser = new Phaser(7){
    @Override
    protected boolean onAdvance(int phase, int registeredParties) {
        System.out.println("---------------PHASE[" + phase + "],Parties[" + registeredParties + "] ---------------");
        return phase >= 2  || registeredParties == 0;
    }
};

当前阶段,最后一个线程到达后,会触发onAdvance方法,此处是打印了信息,且写明了Phaser终止的标志,注册线程数为0或阶段数到达2 (0,1,2)

原文地址:https://www.cnblogs.com/kkzhilu/p/12859506.html