并发学习第七篇——ThreadPoolExecutor

老经验,先学习源码中对类的注释,注释懂了,大概怎么工作的的雏形就差不多清晰了

线程池是什么

ThreadPoolExecutor是一个ExecutorService,使用池化的线程来执行任务,通用使用Executors类中的"工厂方法"来进行配置

ps:这个Executors类的定位就像集合框架里的Collections,es各种Builder中的QueryBuilders类,像一个辅助包,有很多"工厂方法"

来输出产品

线程池的定位:解决2个问题

1、在执行大量异步任务时,通过减少每个任务的调用开销(重复创建销毁线程等),提高了性能

2、提供了可以管理并限制资源的方法,这些资源包括线程(限制线程池的大小,队列大小等等),每个线程池都维护了一些基础的统计信息,例如完成的任务数。

可配置的参数和hooks

1、线程池的core size和max size:

线程池可以通过core size和max size动态调节池的大小

当通过 execute(Runnable)提交新任务时,

如果worker threads < corePoolSize,线程池会新建一个新线程来处理这个新任务,即使这些工作线程处于空闲的状态(也就是没有处理任务)

如果corePoolSize<  worker threads < maximumPoolSize,只有当queue满了,才会创建新的线程(否则都会丢入queue排队)

如果设置corePoolSize=maximumPoolSize,相当于创建了一个固定大小的线程池,等价于这个:newFixedThreadPool(int nThreads)

如果设置maximumPoolSize=Integer.MAX_VALUE,那么这个线程池可以同时处理任意多个任务

通常情况下,核心数和最大线程数在初始化的时候就设置好。当然,也可以通过setCorePoolSizesetMaximumPoolSize方法动态更改

2、按需构造线程池

默认情况下,当新任务到达时,工作线程才会被创建并启动。

但是也可以通过prestartCoreThread或者prestartAllCoreThreads方法预先让core size个线程提前创建

当创建的线程池带有一个非空的任务队列时,则需要预启动若干个线程(prestart threads)

3 、创建新线程

新线程由ThreadFactory创建,如果未指定ThreadFactory,线程池将使用Executors.defaultThreadFactory作为默认的线程工厂

通过默认工厂创建的线程都在同一个threadGroup里,他们拥有同样的优先级(NORM_PRIORITY),且都是非守护线程。

如果提供的是其他的ThreadFactory,则可以修改线程的名字,线程组,优先级,守护状态等

当工厂新建线程失败,池会继续运行,但是可能无法处理任何任务,后面的是权限方面的问题,没看

4 、Keep-alive 时间

​ 如果当前线程池的线程数量 > corePoolSize,对于这些多出来的线程,如果空闲时间超过keep-alive设置的时间,将会被终止

这种方式可以在线程池没有太多任务的时候,降低线程资源的消耗。

keep-alive time可以通过setKeepAliveTime(TimeUnit)方法动态更改

如果空闲时长设置为Long.Max_VALUE,那么空闲线程将永远不会被终止。

默认情况下,只有线程数超过core size, 超时策略才会使用到。但allowCoreThreadTimeOut(boolean)方法也可以让超时策略用在core threads上

5 、入队

任何的阻塞队列可以传递和保存任务,但是具体策略和当前线程池大小有关:

  如果线程数 < core,线程池会创建新线程来处理新任务,任务不会入队

  如果线程数 >= core,新任务会入队,而不是创建新线程

  如果新任务无法入队(例如满了),并且线程数 < max,那么会新建线程处理任务。如果已经=max(那么创建一个线程后肯定会超过max),那么任务会被拒绝

线程池有以下三个入队策略(就是三种BlockingQueue选择)

  •  直接传递:

一个很不错的默认选择,同步队列SynchronousQueue.它会在任务入队后,立刻将任务转给线程处理,而不保留任务。

如果没有可用的线程(没法新建更多的线程)来立即处理新任务,那么会入队失败

这个策略可以避免任务被锁住,有点像生产者-消费者模式

  •  无界队列:

例如没有预设容量的LinkedBlockingQueue,使用无界队列将会导致当所有的核心线程都在运行任务时,新任务全部要入队阻塞

这样的话,永远只有coreSize个线程执行任务,不会有更多的线程被创建,maxSize这个参数将不会起任何作用

当任务彼此不相互依赖时,这是一个很好的做法,比如一个web服务器,服务处理速度慢于外部请求速度,但是仍然有爆发式的请求不断到来

  • 有界队列:

有界队列(例如 ArrayBlockingQueue)可以通过设定maxSize来保护资源,但同时也更难协调和控制。队列的长度和池的大小需要相互协调

​ 长队列和小(线程池)池的组合减少了CPU的使用,OS 资源和上下文切换带来的损耗,但是可能会人为地降低吞吐量。

如果任务经常阻塞(例如I/O密集型任务),系统可以为更多的线程安排时间,可能比你设定的线程数还要多(没有充分利用CPU)

短队列通常需要和大(线程)池搭配使用,它们能充分利用CPU,但是也可能会带来不可预计的调度开销,因而降低吞吐量

6 、拒绝任务

当线程池shutdown之后,或者没有shutdown,但是队列容量有限且线程池饱和运行(达到最大线程且均在运行任务)时,通过execute(Runnable)方法提交的任务会被拒绝

此时execute方法会调用RejectedExecutionHandler.rejectedExecution(Runnable,ThreadPoolExecutor)方法

RejectedExecutionHandler是一个接口,每个线程池的RejectedExecutionHandler变量不一样,该接口有四种具体实现:

  ThreadPoolExecutor.AbortPolicy(默认):拒绝新任务,并抛出RejectedExecutionException异常。

  ThreadPoolExecutor.CallerRunsPolicy : 调用execute方法的线程本身来执行这个任务。这种做法提供了一个简易的反馈控制机制降低新任务的提交频率

  ThreadPoolExecutor.DiscardPolicy:直接丢弃

  ThreadPoolExecutor.DiscardOldestPolicy:线程池正常运行的情况下,dropped掉队头的任务,然后重试 execute,重试也可能会再次失败,然后会再次dropped掉一个处于

head位置的任务

7 、钩子方法

可以理解为callback方法,或spring里的aop的切面函数

ThreadPoolExecutor类提供 beforeExecute(Thread, Runnable)afterExecute(Runnable, Throwable)} 两个可被覆盖的钩子函数

它们在任务的开始和结束的时候被调用。可被用于配置运行环境,例如更改ThreadLocals,收集统计信息,或者加日志。

此外,terminated()方法也可以被覆盖,在线程池完全终止的时候,可以通过这个方法做一些特殊的处理。

如果钩子方法抛出异常,内部的工作线程可能会逐个失败直至线程池终止

8 、队列维护

getQueue()可以获取队列来监控和调试,但是强烈不建议使用这个方法来干别的事情。

当大量的入队任务被取消时,remove(Runnable)和 purge方法可以帮助来回收空间

9 、回收

当一个线程池不再被其他程序引用,并且池中没有线程的时候,会自动shutdown

如果你希望一个不再被引用的线程池可以被自动回收(不是手动使用shutdown方法),那么你必须确保空闲线程会自动停止

可以通过设置合适的keep-alive time,core size设为0,并且调用allowCoreThreadTimeOut()方法来达到目的

附:线程池的生命周期

线程池的状态贯穿了线程池的整个生命周期,有以下5个生命周期:

RUNNING: 接收新任务,处理队列的任务。

SHUTDOWN:不接收新任务,继续处理队列的任务。

STOP:不接收新任务,也不处理队列里的任务,并尝试停止正在运行的任务。

TIDYING:所有的任务都终止了,线程数为0之后,线程池状态会过度到TIDYING,然后执行terminated()钩子方法。

TERMINATED:在terminated()方法执行完之后,线程池状态就会变成TERTMINATED。

原文地址:https://www.cnblogs.com/yb38156/p/14492638.html