ExecutorService 线程池详解

1、什么是ExecutorService,为什么要使用线程池?

  许多服务器应用程序都面向处理来自某些远程来源的大量短小的任务,每当一个请求到达就创建一个新线程,然后在新线程中为请求服务,但是频繁创建新线程、销毁新线程、线程切换既花费较多的时间,影响相应速度,又消耗大量的系统资源,且有时服务器无法处理过多请求导致崩溃。一种情形:假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。 如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。ExecutorService是一个线程池,请求到达时,线程已经存在,响应延迟低,多个任务复用线程,避免了线程的重复创建和销毁,并且可以规定线程数目,请求数目超过阈值时强制其等待直到有空闲线程。

  当我们有任务需要多线程来完成时,将任务(实现Runnable、callable接口、继承Thread类的对象)提交给ExecutorService。

创建方式如下:

1 public ThreadPoolExecutor(int corePoolSize,
2                               int maximumPoolSize,
3                               long keepAliveTime,
4                               TimeUnit unit,
5                               BlockingQueue<Runnable> workQueue,
6                               ThreadFactory threadFactory,
7                               RejectedExecutionHandler handler)

corePoolSize : 核心线程数,一旦创建将不会再释放。如果创建的线程数还没有达到指定的核心线程数量,将会继续创建新的核心线程,直到达到最大核心线程数后,核心线程数将不在增加;如果没有空闲的核心线程,同时又未达到最大线程数,则将继续创建非核心线程;如果核心线程数等于最大线程数,则当核心线程都处于激活状态时,任务将被挂起,等待有空闲线程时再执行。

maximumPoolSize : 最大线程数,允许创建的最大线程数量。如果最大线程数等于核心线程数,则无法创建非核心线程;如果非核心线程处于空闲时,超过设置的空闲时间,则将被回收,释放占用的资源。

keepAliveTime : 也就是当线程空闲时,所允许保存的最大时间,超过这个时间,线程将被释放销毁,但只针对于非核心线程。

unit : 时间单位,TimeUnit.SECONDS等。

workQueue : 任务队列,用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。

  1. ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列必须设置容量。此队列按 FIFO(先进先出)原则对元素进行排序。
  2. LinkedBlockingQueue:一个基于链表结构的阻塞队列,可以设置容量,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。
  3. SynchronousQueue:一个不存储元素的阻塞队列。每个插入offer操作必须等到另一个线程调用移除poll操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue。
  4. PriorityBlockingQueue:一个具有优先级的无限阻塞队列

threadFactory :  线程工厂,用于创建线程。

handler : 当线程边界和队列容量已经达到最大时,用于处理阻塞时的程序。

2、ExecutorService的类型

一共有四种线程池:

CachedThreadPool可缓存线程池、SingleThreadExecutor单线程池、FixedThreadPool固定线程数线程池、ScheduledThreadPool 固定线程数,支持定时和周期性任务线程池。

ThreadPoolExecutor(该类就是线程池类)继承AbstractExecutorService类,该抽象类实现ExecutorService接口,Executors是一个工厂类,包含很多静态方法,包括newCachedThreadPool、newSingleThreadExecutor、newFixedThreadPool等,这些静态方法中调用了ThreadPoolExecutor的构造函数,并且不同的线程池调用构造方法时传入不同的参数。

2.1 CachedThreadPool可缓存线程池 (无界线程池)

1 public static ExecutorService newCachedThreadPool() {
2         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3                                       60L, TimeUnit.SECONDS,
4                                       new SynchronousQueue<Runnable>());
5     }

通过它的创建方式可以知道,创建的都是非核心线程,而且最大线程数为Interge的最大值,空闲线程存活时间是1分钟。SynchronousQueue队列,一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作。所以,当我们提交第一个任务的时候,是加入不了队列的,这就满足了,一个线程池条件“当无法加入队列的时候,且任务没有达到maxsize时,我们将新开启一个线程任务”。即当线程不够用的时候会不断创建新线程,如果线程无限增长,会导致内存溢出。所以我们的maxsize是big big。时间是60s,当一个线程没有任务执行会暂时保存60s超时时间,如果没有的新的任务的话,会从cache中remove掉。因此长时间不提交任务的CachedThreadPool不会占用系统资源。就是缓冲区为1的生产者消费者模式。

2.2 SingleThreadExecutor单线程池

1 public static ExecutorService newSingleThreadExecutor() {
2         return new FinalizableDelegatedExecutorService
3             (new ThreadPoolExecutor(1, 1,
4                                     0L, TimeUnit.MILLISECONDS,
5                                     new LinkedBlockingQueue<Runnable>()));
6     }

只用一个线程来执行任务,保证任务按FIFO顺序一个个执行。

2.3 FixedThreadPool固定线程数线程池

1 public static ExecutorService newFixedThreadPool(int nThreads) {
2         return new ThreadPoolExecutor(nThreads, nThreads,
3                                       0L, TimeUnit.MILLISECONDS,
4                                       new LinkedBlockingQueue<Runnable>());
5     }

 coresize和maxmumsize相同,超时时间为0,队列用的LinkedBlockingQueue无界的FIFO队列,如果队列里面有线程任务的话就从队列里面取出线程,然后开启一个新的线程开始执行。 很明显,这个线程池始终只有size的线程在运行,大小固定,难以扩展。

2.4 ScheduledThreadPool 固定线程数,支持定时和周期性任务线程池

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE,
          DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,
          new DelayedWorkQueue());
}

3、创建线程的时机(线程池的工作策略)

  1. 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。(即如果当前运行的线程小于corePoolSize,则任务根本不会添加到workQueue中)
  2. 如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入工作队列,而不添加新的线程
  3. 如果无法将请求加入workQueue(但是队列已满),则创建新的线程,除非创建此线程超出 maximumPoolSize,如果超过,在这种情况下,新的任务将被拒绝。

3、代码例子

 1 public class ExecutorServicepool {
 2     public static void main(String[] args) throws InterruptedException {
 3         int[] a = new int[1];
 4         //创建一个容量为5的线程池
 5         ExecutorService executorService = Executors.newFixedThreadPool(5);
 6         for(int i = 0;i<15;i++){
 7             //向线程池提交一个任务(其实就是通过线程池来启动一个线程)
 8             executorService.execute(new TestRunnable(a));
 9        System.out.println("============  "+i);
10        Thread.sleep(millis:1000); 
         System.out.printlin("主线程休眠了1秒钟")
11 } 12 } 13 } 14 15 class TestRunnable extends Thread{ 16 public int[] count; 17 TestRunnable(int[] a){ 18 this.count = a; 19 } 20 @Override 21 public void run(){ 22 try { 23 if(Thread.currentThread().getName().equals("pool-1-thread-1")) 24 Thread.sleep(2000); 25 } catch (Exception e) { 26 e.printStackTrace(); 27 } 28 System.out.println(Thread.currentThread().getName()+"-线程被调用了"); 29 System.out.println("count值为:"+(++count[0])); 30 } 31 }

得到的输出结果如下:可知,固定线程数线程池,线程数为5个,提交15个任务给线程池,也就是使用5个线程完成对a[0]++的工作,5个线程之间是异步的,线程池中线程与主线程也是异步的。

============ 0
============ 1
============ 2 ============ 3 ============ 4 ============ 5 ============ 6 ============ 7 ============ 8 ============ 9 ============ 10 ============ 11 ============ 12 ============ 13 ============ 14 pool-1-thread-3-线程被调用了 pool-1-thread-2-线程被调用了 pool-1-thread-5-线程被调用了 pool-1-thread-4-线程被调用了 count值为:2 count值为:3 count值为:1 pool-1-thread-2-线程被调用了 count值为:4 count值为:5 pool-1-thread-3-线程被调用了 pool-1-thread-2-线程被调用了 pool-1-thread-4-线程被调用了 count值为:7 count值为:6 pool-1-thread-2-线程被调用了 count值为:9 count值为:8 pool-1-thread-4-线程被调用了 count值为:10 pool-1-thread-4-线程被调用了 count值为:11 pool-1-thread-2-线程被调用了 count值为:12 pool-1-thread-5-线程被调用了 count值为:13 pool-1-thread-3-线程被调用了 count值为:14
主线程休眠了1秒钟
pool-1-thread-1-线程被调用了 count值为:15

4、怎么关闭线程池?

执行程序时发现,所有线程执行完毕后,JVM并未结束运行,也就说明线程池没有正常结束。怎样正确关闭线程池呢?

调用 Executor 的 shutdown() 方法会等待线程都执行完毕之后再关闭,但是如果调用的是 shutdownNow() 方法,则相当于调用每个线程的 interrupt() 方法。

让线程池在指定时间内立即关闭:

public static void main(String[] args) { 
    
    ExecutorService pool = Executors.newFixedThreadPool(5); 
    final long waitTime = 8 * 1000; 
    final long awaitTime = 2 * 1000; 
    
    Runnable task1 = new Runnable(){ 
        public void run(){ 
            try { 
                System.out.println("task1 start"); 
                Thread.sleep(waitTime); 
                System.out.println("task1 end"); 
            } catch (InterruptedException e) { 
                System.out.println("task1 interrupted: " + e); 
            } 
        } 
    }; 
    
    Runnable task2 = new Runnable(){ 
        public void run(){ 
            try { 
                System.out.println("task2 start"); 
                Thread.sleep(1000); 
                System.out.println("task2 end"); 
            } catch (InterruptedException e) { 
                System.out.println("task2 interrupted: " + e); 
            } 
        } 
    }; 
    //消耗时间很长的任务 8秒
    pool.execute(task1); 
    
    //消耗时间1秒
    for(int i=0; i<1000; ++i){ 
        pool.execute(task2); 
    } 
    
    try { 
        // 告诉线程池,如果所有任务执行完毕则关闭线程池
        pool.shutdown(); 
    
        // 判断线程池是否在限定时间内,或者线程池内线程全部结束
        if(!pool.awaitTermination(awaitTime, TimeUnit.MILLISECONDS)){ 
            // 超时的时候向线程池中所有的线程发出中断(interrupted)。 
            pool.shutdownNow(); 
        } 
    } catch (InterruptedException e) {  
        System.out.println("awaitTermination interrupted: " + e); 
    } 
    
    System.out.println("end"); 
}
task1 start
task2 start
task2 start
task2 start
task2 start
task2 end
task2 end
task2 end
task2 start
task2 end
task2 start
task2 start
task2 start
task2 end
task2 end
task2 end
task2 end
end
task2 interrupted: java.lang.InterruptedException: sleep interrupted
task2 interrupted: java.lang.InterruptedException: sleep interrupted
task1 interrupted: java.lang.InterruptedException: sleep interrupted
task2 interrupted: java.lang.InterruptedException: sleep interrupted
task2 interrupted: java.lang.InterruptedException: sleep interrupted

Process finished with exit code 0

如果只执行shutdown,线程池会等待所有线程全部结束才终止线程池。

且!执行shutdown()后,就不能再继续使用ExecutorService来追加新的任务了,如果继续调用execute/submit方法执行新的任务的话,就会抛出RejectedExecutionException异常。

所以一般的调用顺序为:

shutdown 方法 ,停止接收新的任务
awaitTermination 方法,  判断任务是否执行完毕或者是否在指定时间内
shutdownNow方法 

 参考文献:https://www.cnblogs.com/zhncnblogs/p/10894271.html  https://blog.csdn.net/fwt336/article/details/81530581

原文地址:https://www.cnblogs.com/simpleDi/p/11342440.html