Java线程池ThreadPoolExecutor解析

一、线程池的概念

线程池: 简单理解,它就是一个管理线程的池子。

  • 它帮我们管理线程,避免增加创建线程和销毁线程的资源损耗。 因为线程其实也是一个对象,创建一个对象,需要经过类加载过程,销毁一个对象,需要走GC垃圾回收流程,都是需要资源开销的。
  • 提高响应速度。 如果任务到达了,相对于从线程池拿线程,重新去创建一条线程执行,速度肯定慢很多。
  • 重复利用。 线程用完,再放回池子,可以达到重复利用的效果,节省资源。

二、线程池的UML图如下

cmd-markdown-logo

三、线程创建的几种方式,线程有哪些状态?

线程创建有三种方式:

  • 继承Thread类,重写run方法
//匿名类写法
Thread t = new Thread(){
    @Override
    public void run(){
        // do bussiness
    }
};
  • 使用Thread构造函数Thread(Runnable target)
//匿名类写法
Thread t = new Thread(new Runnable() {
    @Override
    public void run() {
        // do bussiness
    }
});
  • 使用实现了Runable接口的FutureTask类构造Thread(Runnable target)
//匿名类方式构造futureTask
FutureTask<User> ft = new FutureTask<>(new Callable<User>() {
    @Override
    public User call() throws Exception {
        // do bussiness
    }
});

//使用Thread(Runnable target)构造线程
Thread  t = new Thread(ft);

线程有以下几种状态:
cmd-markdown-logo

在Java当中,线程通常都有五种状态,创建、就绪、运行、阻塞和死亡。

第一是创建状态。在生成线程对象,并没有调用该对象的start方法,这是线程处于创建状态;

第二是就绪状态。当调用了线程对象的start方法之后,该线程就进入了就绪状态,但是此时线程调度程序还没有把该线程设置为当前线程,此时处于就绪状态。在线程运行之后,从等待或者睡眠中回来之后,也会处于就绪状态

第三是运行状态。线程调度程序将处于就绪状态的线程设置为当前线程,此时线程就进入了运行状态,开始运行run函数当中的代码。

第四是阻塞状态。线程正在运行的时候,被暂停,通常是为了等待某个时间的发生(比如说某项资源就绪)之后再继续运行。sleep,suspend等方法都可以导致线程阻塞。

第五是死亡状态。如果一个线程的run方法执行结束,该线程就会死亡。对于已经死亡的线程,无法再使用start方法令其进入就绪状态。

四、线程池各参数的作用,如何运行的?

构造函数如下:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,  
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)
  • corePoolSize: 线程池核心线程数最大值
  • maximumPoolSize: 线程池最大线程数大小
  • keepAliveTime:线程池中非核心线程空闲的存活时间大小
  • unit: 线程空闲存活时间单位
  • workQueue: 存放任务的阻塞队列
  • threadFactory:用于设置创建线程的工厂,可以给创建的线程设置有意义的名字,可方便排查问题
  • handler: 线城池的饱和策略事件,主要有四种类型。

另外可以设置 allowCoreThreadTimeOut = true,让核心线程在达到最大存活时间后退出,默认为false。

运行流程图如下:
cmd-markdown-logo

  • 提交一个任务,线程池里存活的核心线程数小于线程数corePoolSize时,线程池会创建一个核心线程去处理提交的任务。

  • 如果线程池核心线程数已满,即线程数已经等于corePoolSize,一个新提交的任务,会被放进任务队列workQueue排队等待执行。

  • 当线程池里面存活的线程数已经等于corePoolSize了,并且任务队列workQueue也满,判断线程数是否达到maximumPoolSize,即最大线程数是否已满,如果没到达,创建一个非核心线程执行提交的任务。

  • 如果当前的线程数达到了maximumPoolSize,还有新的任务过来的话,直接采用拒绝策略处理。

五、线程数该如何设置?

线程池大小 = ((线程 IO time + 线程 CPU time )/ 线程 CPU time )* CPU数目

最大线程数一般设成核心线程数的2倍左右


六、几种常见景的线程池,及使用场景?

可以通过Executors工具类快速创建一个线程池。

newSingleThreadExecutor(单线程的线程池)

  • 核心线程数为1
  • 最大线程数也为1
  • 阻塞队列是LinkedBlockingQueue
  • keepAliveTime为0

使用场景:适用于串行执行任务的场景,一个任务一个任务地执行。

newFixedThreadPool (固定数目线程的线程池)

  • 核心线程数和最大线程数大小一样
  • 没有所谓的非空闲时间,即keepAliveTime为0
  • 阻塞队列为无界队列LinkedBlockingQueue

使用场景:FixedThreadPool 适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。

newCachedThreadPool(可缓存线程的线程池)

  • 核心线程数为0
  • 最大线程数为Integer.MAX_VALUE
  • 阻塞队列是SynchronousQueue
  • 非核心线程空闲存活时间为60秒

使用场景: 用于并发执行大量短期的小任务。

newScheduledThreadPool(定时及周期执行的线程池)

  • 最大线程数为Integer.MAX_VALUE
  • 阻塞队列是DelayedWorkQueue
  • keepAliveTime为0
  • scheduleAtFixedRate() :按某种速率周期执行
  • scheduleWithFixedDelay():在某个延迟后执行

使用场景 : 周期性执行任务的场景,需要限制线程数量的场景。

七、常用的几种线程池,有什么弊端,会不会导致内存飙升?

newSingleThreadExecutor 和 newFixedThreadPool 线程池使用了无界的工作队列,处理过慢时会堆积大量的请求,从而导致OOM

newCachedThreadPool 、 newScheduledThreadPool和 newSingleThreadScheduledExecutor 线程池
最大线程数为Integer.MAX_VALUE,可能会创建大量线程,从而导致OOM

八、线程池有哪些工作队列?

  • ArrayBlockingQueue

      ArrayBlockingQueue(有界队列)是一个用数组实现的有界阻塞队列,按FIFO排序量。

  • LinkedBlockingQueue

      LinkedBlockingQueue(可设置容量队列)基于链表结构的阻塞队列,按FIFO排序任务,容量可以选择进行设置,不设置的话,将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE,吞吐量通常要高于ArrayBlockingQuene;newFixedThreadPool线程池使用了这个队列。

  • SynchronousQueue

      SynchronousQueue(同步队列)一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene,newCachedThreadPool线程池使用了这个队列。

  • DelayQueue

      DelayQueue(延迟队列)是一个任务定时周期的延迟执行的队列。根据指定的执行时间从小到大排序,否则根据插入到队列的先后排序。newScheduledThreadPool线程池使用了这个队列

  • PriorityBlockingQueue

      PriorityBlockingQueue(优先级队列)是具有优先级的无界阻塞队列。

九、按照线程池的内部机制,提交线程池时有哪些拒绝策略需要考虑?

考虑所使用的拒绝策略,根据拒绝策略做响应的处理

  • AbortPolicy(抛出一个异常,默认的)
  • DiscardPolicy(直接丢弃任务)
  • DiscardOldestPolicy(丢弃队列里最老的任务,将当前这个任务继续提交给线程池)
  • CallerRunsPolicy(交给线程池调用所在的线程进行处理)

十、如何进行线程池的异常处理?

  1. 使用try-catch捕捉异常,注意要在run方法里面捕捉异常
ExecutorService threadPool = Executors.newFixedThreadPool(5);
// submit方式提交
for (int i = 0; i < 5; i++) {
    threadPool.submit(() -> {
        System.out.println("current thread name" + Thread.currentThread().getName());
        try{
           Object object = null;
           System.out.print("result## "+object.toString()); 
        }catch(Exception e){
            System.out.println("程序执行出错啦!");
        }
        
    });
}

//execute方式执行
threadPool.execute(new Runnable() {
		@Override
		public void run() {
		    for (Integer productSkn : productSkns) {
			try {
			    String url = apiUrl + "/?method=clear.productCache&detail=platform&productSkn=" + productSkn;
			    log.info("start clearProductCache, productSkn is: {}, url is: {}", productSkn, url);
			    serviceCaller.get("app.clearProductCache", url, null, String.class, null);
			    log.info("success clearProductCache productSkn is {}", productSkn);
			} catch (Exception e) {
			    log.error("clearProductCache error with productSkn={}", productSkn, e);
		        }
		}
	}
});
  1. submit执行,从future.get()接收异常
ExecutorService threadPool = Executors.newFixedThreadPool(5);
// submit方式提交
for (int i = 0; i < 5; i++) {
    Future f = threadPool.submit(() -> {
        System.out.println("current thread name" + Thread.currentThread().getName());
        Object object = null;
        System.out.print("result## "+object.toString()); 
    });
    try{
        f.get();
    }catch(Exception e){
        System.out.println("程序执行出错啦!");
    }
}
  1. 重写ThreadPoolExecutor.afterExecute()方法,处理传递的异常引用
class ExtendedExecutor extends ThreadPoolExecutor {
    // 这可是jdk文档里面给的例子。。
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                Object result = ((Future<?>) r).get();
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt(); // ignore/reset
            }
        }
        if (t != null)
            System.out.println(t);
    }
}}
  1. 实例化时,传入自己的ThreadFactory,设置Thread.getUncaughtExceptionHandler,处理未检测的异常
// 创建单个线程的线程池
ExecutorService threadPool = Executors.newFixedThreadPool(1, r -> {
            Thread t = new Thread(r);
            t.setUncaughtExceptionHandler(
                    (t1, e) -> {
                        System.out.println(t1.getName() + "线程抛出的异常"+e);
                    });
            return t;
           });

//执行线程池           
threadPool.execute(()->{
    Object object = null;
    System.out.print("result## " + object.toString());
});

十一、线程池的几种状态?

cmd-markdown-logo

RUNNING

  • 该状态的线程池会接收新任务,并处理阻塞队列中的任务;
  • 调用线程池的shutdown()方法,可以切换到SHUTDOWN状态;
  • 调用线程池的shutdownNow()方法,可以切换到STOP状态;

SHUTDOWN

  • 该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
  • 队列为空,并且线程池中执行的任务也为空,进入TIDYING状态;

STOP

  • 该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
  • 线程池中执行的任务为空,进入TIDYING状态;

TIDYING

  • 该状态表明所有的任务已经运行终止,记录的任务数量为0;
  • terminated()执行完毕,进入TERMINATED状态;

TERMINATED

  • 该状态表示线程池彻底终止
原文地址:https://www.cnblogs.com/qingfengEthan/p/12582117.html