java并发api总结

1.java.util.concurrent包

1.1 Executors

  • Executor:接口,仅有一个方法为execute(Runnable)
  • ExecutorService:Executor的子接口,扩展了Executor的方法,如submit/shutdown等。
  • Executors:工厂类,提供生成各种线程池的静态方法
  • ScheduledExecutorService:ExecutorService的子接口,提供定时调度功能,如schedule方法
  • Callable:接口,可以执行返回一个结果的任务(线程),仅有call方法。
  • Future:接口,主要方法为get,阻塞的方法,一般的调用:
    	Future<String> future = executor.submit(new Callable<String>() {
    	 public String call() {
    	     return searcher.search(target);
    	 }
    	});
    	String str = future.get();
    
  • FutureTask:类,实现了Future,可取消的异步计算,此类提供了对 Future 的基本实现.
  • ScheduledFuture:Future的子接口,接收ScheduledExecutorService执行的结果,该接口中没有添加新方法。
  • Delayed :接口,用来标记那些应该在给定延迟时间之后执行的对象,主要方法为getDelay
  • CompletionService:接口,将生产新的异步任务与使用已完成任务的结果分离开来的服务。生产者submit执行的任务。 使用者take已完成的任务,并按照完成这些任务的顺序处理它们的结果。
  • ExecutorCompletionService:类,实现了CompletionService,使用提供的 Executor 来执行任务的 CompletionService。使用示例:
    	void solve(Executor e, Collection<Callable<Result>> solvers) throws Exception {
    	     CompletionService<Result> ecs = new ExecutorCompletionService<Result>(e);
    	     for (Callable<Result> s : solvers)
    		 ecs.submit(s);
    	     int n = solvers.size();
    	     for (int i = 0; i < n; ++i) {
    		 Result r = ecs.take().get();
    		 if (r != null)
    		     use(r);
    	     }
    	 }
    
  • AbstractExecutorService:抽象类,ExecutorService的子类,提供ExecutorService执行方法的默认实现。此类使用newTaskFor返回的
  • RunnableFuture:实现submit、invokeAny和invokeAll方法,默认情况下,RunnableFuture是此包中提供的 FutureTask类。
  • ThreadPoolExecutor:线程池核心类,继承了AbstractExecutorService,自定义线程池会用到,Executors中创建的线程池均为该类对象,如Executors.newCachedThreadPool()创建代码为:
public static ExecutorService newCachedThreadPool() {
	return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
    	new SynchronousQueue<Runnable>());

以使用此类的构造方法来自主实现Executors创建的线程池。构造方法介绍如下:

public ThreadPoolExecutor(
	//达到运行条件的线程数,未达到则先创建线程
	int corePoolSize,	
	//最大线程数,达到后再来新的请求则不会再创建
    int maximumPoolSize,			
	//线程生存时间,如CacheThreadPool生存时间为60s
    long keepAliveTime,			
	//枚举,设置时间单位	
    TimeUnit unit,				
	/**
	 * 工作队列,直接提交:SynchronousQueue,
	 * 无界队列:LinkedBlockingQueue,
	 * 有界队列:ArrayBlockingQueue
	 */
    BlockingQueue<Runnable> workQueue,	
	/** 
	  * 使用ThreadFactory创建新线程。如果没有另外说明,则在同一个
	  * ThreadGroup中一律使用Executors.defaultThreadFactory()创建线程,
	  * 并且这些线程具有相同的和非守护进程状态。通过提供不同的ThreadFactory,
	  * 可以改变线程的名称、线程组、优先级、守护进程状态,等等。如果从
	  * newThread返回null时ThreadFactory未能创建线程,则执行程序将继续运行,
	  * 但不能执行任何任务。
	  */
    ThreadFactory threadFactory
	/** 
	  * 创建线程池并且提交任务失败时,线程池会回调RejectedExecutionHandler
	  * 接口的rejectedExecution(Runnable task, ThreadPoolExecutor
	  * executor)方法来处理线程池处理失败的任务,其中task是用户提交的任务,
	  * 而executor是当前执行的任务的线程池 
	  */
    RejectedExecutionHandler handler
   ){
   		......

}

也可以继承ThreadPoolExecutor,可重写的 beforeExecute(java.lang.Thread, java.lang.Runnable) 和 afterExecute(java.lang.Runnable, java.lang.Throwable) 方法,这两种方法分别在执行每个任务之前和之后调用。

  • ScheduledThreadPoolExecutor:ThreadPoolExecutor的子类,使用schedule方法设置定时调度,该方法的返回参数为ScheduledFuture

1.2 Queues

  • BolckingQueue:接口,Queue的子接口,支持两个附加操作的Queue,这两个操作是:获取元素时等待队列变为非空(take方法),以及存储元素时等待空间变得可用(put方法)。
  • LinkedBlockingQueue:类,BlockingQueue的链表实现,一般用来实现无界队列。
  • ArrayBlockingQueue:类,BlockingQueue的数组实现,一般用来实现有界队列。
  • ConcurrentLinkedQueue:java.util.AbstractQueue的子类,一个基于链接节点的无界线程安全队列。此队列按照 FIFO(先进先出)原则对元素进行排序,此实现采用了有效的“无等待 (wait-free)”算法
  • SynchronousQueue:类,BlockingQueue的实现,一般用来实现任务即时提交。
    • 注意1:它一种阻塞队列,其中每个 put 必须等待一个 take,反之亦然。同步队列没有任何内部容量,甚至连一个队列的容量都没有。
    • 注意2:它是线程安全的,是阻塞的。
    • 注意3:不允许使用 null 元素。
    • 注意4:公平排序策略是指调用put的线程之间,或take的线程之间。
    • 注意5:SynchronousQueue的以下方法很有趣:
      • iterator() 永远返回空,因为里面没东西。
      • peek() 永远返回null。
      • put() 往queue放进去一个element以后就一直wait直到有其他thread进来把这个element取走。
      • offer() 往queue里放一个element后立即返回,如果碰巧这个element被另一个thread取走了,offer方法返回true,认为offer成功;否则返回false。
      • offer(2000, TimeUnit.SECONDS) 往queue里放一个element但是等待指定的时间后才返回,返回的逻辑和offer()方法一样。
      • take() 取出并且remove掉queue里的element(认为是在queue里的。。。),取不到东西他会一直等。
      • poll() 取出并且remove掉queue里的element(认为是在queue里的。。。),只有到碰巧另外一个线程正在往 queue里offer数据或者put数据的时候,该方法才会取到东西。否则立即返回null。
      • poll(2000, TimeUnit.SECONDS) 等待指定的时间然后取出并且remove掉queue里的element,其实就是再等其他的thread来往里塞。
      • isEmpty()永远是true。
      • remainingCapacity() 永远是0。
      • remove()和removeAll() 永远是false。
  • PriorityBlockingQueue:类,BlockingQueue的实现,一个无界阻塞队列,它使用与类PriorityQueue相同的顺序规则,并且提供了阻塞获取操作。
  • DelayQueue:类,BlockingQueue的实现,Delayed元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的 Delayed元素。如果延迟都还没有期满,则队列没有头部,并且poll将返回null。当一个元素的getDelay(TimeUnit.NANOSECONDS)方法返回一个小于等于0的值时,将发生到期。即使无法使用take或poll移除未到期的元素,也不会将这些元素作为正常元素对待。

1.3 Concurrent Collections

  • ConcurrentMap:接口,java.util.Map的子接口,提供其他原子putIfAbsent、remove、replace方法的Map。
  • ConcurrentHashMap:类,支持获取的完全并发和更新的所期望可调整并发的哈希表。此类遵守与Hashtable相同的功能规范,并且包括对应于Hashtable的每个方法的方法版本。使用分段锁来进行线程访问控制,单线程环境下只损失非常小的性能。
  • CopyOnWriteArrayList:类,实现了List接口,ArrayList的一个线程安全的变体,其中所有可变操作(add、set 等等)都是通过对底层数组进行一次新的复制来实现的。这一般需要很大的开销,但是当遍历操作的数量大大超过可变操作的数量时,这种方法可能比其他替代方法更 有效。
  • CopyOnWriteArraySet:类,实现了Set接口,对其所有操作使用内部CopyOnWriteArrayList的Set。因此,它共享以下相同的基本属性:
    • 它最适合于具有以下特征的应用程序:set 大小通常保持很小,只读操作远多于可变操作,需要在遍历期间防止线程间的冲突。
    • 它是线程安全的。
    • 因为通常需要复制整个基础数组,所以可变操作(add、set 和 remove 等等)的开销很大。
    • 迭代器不支持可变 remove 操作。
    • 使用迭代器进行遍历的速度很快,并且不会与其他线程发生冲突。在构造迭代器时,迭代器依赖于不变的数组快照。

1.4 Synchronizers

  • CountDownLatch:一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。用给定的计数初始化CountDownLatch。由于调用了countDown()方法,所以在当前计数到达零之前,await方法会一直受阻塞。之后,会释放所有等待的线程,await的所有后续调用都将立即返回。这种现象只出现一次——计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier。
  • Semaphore:类,一个计数信号量。acquire()获取许可,release()释放获得的许可。若线程未获得许可,会一直阻塞。
  • Exchanger:类,可以在对中对元素进行配对和交换的线程的同步点。每个线程将条目上的某个方法呈现给exchange方法,与伙伴线程进行匹配,并且在返回时接收其伙伴的对象。应用较少。
  • CyclicBarrier:CyclicBarrier类似于CountDownLatch也是个计数器,不同的是CyclicBarrier数的是调用了CyclicBarrier.await()进入等待的线程数, 当线程数达到了CyclicBarrier初始时规定的数目时,所有进入等待状态的线程被唤醒并继续。CyclicBarrier初始时还可带一个Runnable的参数 此Runnable任务在CyclicBarrier的数目达到后,在所有其它线程被唤醒前被执行。

1.5 Timing

  • TimeUnit --枚举,TimeUnit表示给定单元粒度的时间段,它提供在这些单元中进行跨单元转换和执行计时及延迟操作的实用工具方法。

  • fork/join并行计算框架 fork/join框架是ExecutorService接口的一种具体实现,目的是为了帮助你更好地利用多处理器带来的好处。它是为那些能够被递归地拆解成子任务的工作类型量身设计的。其目的在于能够使用所有可用的运算能力来提升应用的性能。 fork/join框架的核心是ForkJoinPool类,它是对AbstractExecutorService类的扩展。ForkJoinPool实现了工作偷取算法,并可以执行ForkJoinTask任务。代码形式:

    	if (当前这个任务工作量足够小)
    	    直接完成这个任务
    	else
    	    将这个任务或这部分工作分解成两个部分
    	    分别触发(invoke)这两个子任务的执行,并等待结果
    
    示例1:
    	public class Calculator extends RecursiveTask<Integer> {
    		private static final int THRESHOLD = 100;
    		private int start;
    		private int end;
    		public Calculator(int start, int end) {
    			this.start = start;
    			this.end = end;
    		}
    		@Override
    		protected Integer compute() {
    			int sum = 0;
    			if((start - end) < THRESHOLD){
    			    for(int i = start; i< end;i++){
    			        sum += i;
    			    }
    			}else{
    			    int middle = (start + end) /2;
    			    Calculator left = new Calculator(start, middle);
    			    Calculator right = new Calculator(middle + 1, end);
    			    left.fork();
    			    right.fork();
    			    sum = left.join() + right.join();
    			}
    			return sum;
    		}
    		public void run() throws Exception{  
    			ForkJoinPool forkJoinPool = new ForkJoinPool();  
    			Future<Integer> result = forkJoinPool.submit(new Calculator(0, 10000));  
    			assertEquals(new Integer(49995000), result.get());  
    		} 
    	}
    
    示例2:
    	public class SortTask extends RecursiveAction {
    		final long[] array;
    		final int start;
    		final int end;
    		private int THRESHOLD = 100; //For demo only
    		public SortTask(long[] array) {
    			this.array = array;
    			this.start = 0;
    			this.end = array.length - 1;
    		}
    		public SortTask(long[] array, int start, int end) {
    			this.array = array;
    			this.start = start;
    			this.end = end;
    		}
    		protected void compute() {
    			if (end - start < THRESHOLD)
    			    sequentiallySort(array, start, end);
    			else {
    			    int pivot = partition(array, start, end);
    			    new SortTask(array, start, pivot - 1).fork();
    			    new SortTask(array, pivot + 1, end).fork();
    			}
    		}
    		private int partition(long[] array, int start, int end) {
    			long x = array[end];
    			int i = start - 1;
    			for (int j = start; j < end; j++) {
    			    if (array[j] <= x) {
    			        i++;
    			        swap(array, i, j);
    			    }
    			}
    			swap(array, i + 1, end);
    			return i + 1;
    		}
    		private void swap(long[] array, int i, int j) {
    			if (i != j) {
    			    long temp = array[i];
    			    array[i] = array[j];
    			    array[j] = temp;
    			}
    		}
    		private void sequentiallySort(long[] array, int lo, int hi) {
    			Arrays.sort(array, lo, hi + 1);
    		}
    		@Test
    		public void run() throws InterruptedException {
    			ForkJoinPool forkJoinPool = new ForkJoinPool();
    			Random rnd = new Random();
    			long[] array = new long[SIZE];
    			for (int i = 0; i < SIZE; i++) {
    			    array[i] = rnd.nextInt();
    			}
    			forkJoinPool.submit(new SortTask(array));
    
    		forkJoinPool.shutdown();
    		forkJoinPool.awaitTermination(<span class="hljs-number">1000</span>, TimeUnit.SECONDS);
    
    		<span class="hljs-keyword">for</span> (<span class="hljs-keyword">int</span> i = <span class="hljs-number">1</span>; i &lt; SIZE; i++) {
    		    assertTrue(<span class="hljs-built_in">array</span>[i - <span class="hljs-number">1</span>] &lt; <span class="hljs-built_in">array</span>[i]);
    		}
    	}
    }
    

  • ForkJoinPool:AbstractExecutorService的子类,实现ExecutorService接口和work-stealing算法,管理工作线程和提供关于任务的状态和它们执行的信息。

  • ForkJoinTask:类,实现了Future接口,在ForkJoinPool中执行的任务的基类,提供在任务中执行fork()和join()操作的机制,并且这两个方法控制任务的状态。

  • ForkJoinWorkerThrea:类,继承了Thread类,

  • RecursiveAction:ForkJoinTask的子类,不返回执行结果

  • RecursiveTask:ForkJoinTask的子类,返回执行结果

2. java.util.concurrent.lock包

2.1 Locks

  • Lock:接口,Lock实现提供了比使用synchronized方法和语句可获得的更广泛的锁定操作。此实现允许更灵活的结构,可以具有差别很大的属性,可以支持多个相关的Condition对象。方法介绍如下:

    • void lock():获取锁。
    • void lockInterruptibly():如果当前线程未被中断,则获取锁。
    • Condition newCondition():返回绑定到此Lock实例的新Condition实例。
    • boolean tryLock():仅在调用时锁为空闲状态才获取该锁。
    • boolean tryLock(long time, TimeUnit unit):如果锁在给定的等待时间内空闲,并且当前线程未被中断,则获取锁。
    • void unlock():释放锁。
  • Condition:Condition将Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意Lock实现组合使用,为每个对象提供多个等待set(wait-set)。其中,Lock替代了synchronized方法和语句的使用,Condition替代了Object监视器方法的使用。不同于其他线程,使用此类时,等待与唤醒不再使用wait与notity方法,而是使用await()与signal()。

  • ReentrantLock:类,Lock的实现,一个可重入的互斥锁Lock,此类的构造方法接受一个可选的公平 参数。

  • ReadWriteLock:口,ReadWriteLock维护了一对相关的锁,一个用于只读操作,另一个用于写入操作。只要没有writer,读取锁可以由多个reader线程同时保持。写入锁是独占的。

  • ReentrantReadWriteLock:类,ReadWriteLock的实现,此类不会将读取者优先或写入者优先强加给锁访问的排序。但是支持可选的公平策略。

  • AbstractQueuedSynchronizer:抽象类,为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同步器(信号量、事件,等等)提供一个框架。

  • LockSupport:类,用来创建锁和其他同步类的基本线程阻塞原语。park和unpark方法提供了阻塞和解除阻塞线程的有效方法

3. java.util.concurrent.atomic包

3.1 Atomics:提供原子操作的包,

  • AtomicBoolean:Boolean的原子类
  • AtomicInteger:Integer的原子类,java.lang.Number的子类
  • AtomicIntegerArray:可以用原子方式更新其元素的int数组
  • AtomicIntegerFieldUpdater:基于反射的实用工具,可以对指定类的指定volatile int字段进行原子更新。
  • AtomicLong:Long的原子类,java.lang.Number的子类
  • AtomicLongArray:可以用原子方式更新其元素的 long 数组。
  • AtomicLongFieldUpdater:基于反射的实用工具,可以对指定类的指定 volatile long 字段进行原子更新。
  • AtomicMarkableReference:维护带有标记位的对象引用,可以原子方式对其进行更新。
  • AtomicReference:可以用原子方式更新的对象引用。
  • AtomicReferenceArray:可以用原子方式更新其元素的对象引用数组。
  • AtomicReferenceFieldUpdater:基于反射的实用工具,可以对指定类的指定 volatile 字段进行原子更新。
                                                <div class="ad-wrap" style="margin-top: 12px;">
                                                            </div>
                                        </div>

原文地址:https://my.oschina.net/funcy/blog/1928224

原文地址:https://www.cnblogs.com/jpfss/p/10072296.html