Java并发和高并发学习总结(四)- J.U.C之工具类

1、总览

  • CountDownLatch允许一个或多个线程等待某些操作完成
  • Semaphore Java版本的信号量实现
  • CyclicBarrier 一种辅助性的同步结构,允许多个线程等待到达某个屏障
  • Exchanger 在线程间交换数据的一种手段

2、CountDownLatch

当一个或多个线程需要等待其他线程完成操作时,就可以使用CountDownLatch了,当然,最简单的你也可以使用join方法

2.1、join

public class JoinTest {
    public static void main(String[] args) throws InterruptedException {
        Thread parser1 = new Thread(new Runnable() {
            @Override
            public void run() {
            }
        });
        Thread parser2 = new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("parser2 finish");
            }
        });
        parser1.start();
        parser2.start();
        parser1.join();
        parser2.join();
        System.out.println("all parser finish");
    }
}

join线程用于让当前执行线程等待join线程执行结束,其实现原理是不停检查join线程是否存活,如果join线程存活则让当期线程永远等待。其中wait(0)表示永远等待下去。知道join线程中止后,线程的this.notifyAll()方法会被调用,调用notifyAll()方法是在JVM中实现的。

2.1、CountDownLatch实现

CountDownLatch更为巧妙的实现了这个需求,并且比join的功能更为强大。

public class CountDownLatchTest {
    static CountDownLatch c = new CountDownLatch(2);
    public static void main(String[] args) throws InterruptedException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(1);
                c.countDown();
                System.out.println(2);
                c.countDown();
            }
        }).start();
        c.await();
        System.out.println("3");
    }
}

构造函数接收一个int类型的参数作为计数器,N这里指可能要等待的点。
当调用countDwon方法时,N减一,await方法则会阻塞当前线程,知道N变成0。await方法还有一个带指定时间的重载方法await(long time,TimeUnit unit),等待时间结束后,就不在阻塞当前线程。
关于实现,在前面一篇有提到,这里就不在赘述。

3、CyclicBarrier

字面意思是可循环使用的屏障,他要做的事情是让一组线程到达一个屏障时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

3.1、CyclicBarrier的使用

CyclicBarrier要拦截的线程数在构造方法中声明,每个线程调用await方法通知CyclicBarrier已到达屏障,然后当前线程被阻塞。

public class CyclicBarrierTest {

	static CyclicBarrier c = new CyclicBarrier(2);
	
	public static void main(String[] args){
		new Thread(new Runnable(){
			public void run(){
				try {
					c.await();
				} catch (Exception e) {
					e.printStackTrace();
				}
				System.out.println(1);
			}
		}).start();
		try {
			c.await();
		} catch (Exception e){
			e.printStackTrace();
		}
		System.out.println(2);
	}
	
}

CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,以方便处理更复杂的业务场景。

3.2、CyclicBarrier和CountDownLatch的区别

CountDownLatch的计数器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重
置。所以CyclicBarrier能处理更为复杂的业务场景。例如,如果计算发生错误,可以重置计数
器,并让线程重新执行一次。

CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得Cyclic-Barrier
阻塞的线程数量。isBroken()方法用来了解阻塞的线程是否被中断。

4、semaphore

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

public class SemaphoreTest {

	private static final int THREAD_COUNT = 30;
	private static ExecutorService threadPool = 
			Executors.newFixedThreadPool(THREAD_COUNT);
	private static Semaphore s = new Semaphore(10);
	
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		for(int i=0;i < THREAD_COUNT;i++){
			threadPool.execute(new Runnable(){

				public void run() {
					// TODO Auto-generated method stub
					try {
						s.acquire();
						System.out.println("sava data!");
						Thread.sleep(5000);
						s.release();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					
				}
				
			});
		}
		threadPool.shutdown();
	}

}

在代码中,虽然有30个线程在执行,但是只允许10个并发执行。Semaphore的构造方法
Semaphore(int permits)接受一个整型的数字,表示可用的许可证数量。Semaphore(10)表示允
许10个线程获取许可证,也就是最大并发数是10。Semaphore的用法也很简单,首先线程使用
Semaphore的acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证。

其他方法

  • ·intavailablePermits():返回此信号量中当前可用的许可证数。
  • ·intgetQueueLength():返回正在等待获取许可证的线程数
  • booleanhasQueuedThreads():是否有线程正在等待获取许可证
  • void reducePermits(int reduction):减少reduction个许可证,是个protected方法
  • ·Collection getQueuedThreads():返回所有等待获取许可证的线程集合,是个protected方法

5、Exchanger

Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交
换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过
exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也
执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产
出来的数据传递给对方。

public class ExchangerTest {
	private static final Exchanger<String> exgr = new Exchanger<String>();
	private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

	public static void main(String[] args) {
		threadPool.execute(new Runnable() {
			public void run() {
				try {
					String A = "银行流水A"; // A录入银行流水数据
					exgr.exchange(A);
				} catch (InterruptedException e) {
				}
			}
		});
		threadPool.execute(new Runnable() {
			public void run() {
				try {
					String B = "银行流水B"; // B录入银行流水数据
					String A = exgr.exchange("B");
					System.out.println("A和B数据是否一致:" + A.equals(B) + ",A录入的是:"
							+ A + ",B录入是:" + B);
				} catch (InterruptedException e) {
				}
			}
		});
		threadPool.shutdown();
	}
}
原文地址:https://www.cnblogs.com/j-howie/p/10342002.html