ThreadPoolTaskExecutor的简单使用

项目中,有时会需要多线程来提高处理速度。
线程重用的核心是,它把Thread.start()给屏蔽起来了(一定不要重复调用),所以要重用Thread,就不能让Thread执行完一个任务后终止,因此就必须阻塞Thread.run()方法,让该方法不停地从任务队列中获取任务并执行。循环在跑的过程中不断检查我们是否有新加入的子Runnable对象,有就调一下我们的run(),其实就一个大run()把其它小run()#1,run()#2,…给串联起来了,基本原理就这么简单
spring为我们提供了TaskExecutor的抽象,
spring会默认提供一个taskExecutor的实现,但一般我们需要根据项目的需要来进行自定义。

	在Spring发行包中预定义了一些TaskExecutor实现。有了它们,你甚至不需要再自行实现了。
	SimpleAsyncTaskExecutor 类
	
	这个实现不重用任何线程,或者说它每次调用都启动一个新线程。但是,它还是支持对并发总数设限,当超过线程并发总数限制时,阻塞新的调用,直到有位置被释放。如果你需要真正的池,请继续往下看。
	
	SyncTaskExecutor类
	
	这个实现不会异步执行。相反,每次调用都在发起调用的线程中执行。它的主要用处是在不需要多线程的时候,比如简单的test case。
	
	ConcurrentTaskExecutor 类
	
	这个实现是对Java 5 java.util.concurrent.Executor类的包装。有另一个备选, ThreadPoolTaskExecutor类,它暴露了Executor的配置参数作为bean属性。很少需要使用ConcurrentTaskExecutor, 但是如果ThreadPoolTaskExecutor不敷所需,ConcurrentTaskExecutor是另外一个备选。
	
	SimpleThreadPoolTaskExecutor 类
	
	这个实现实际上是Quartz的SimpleThreadPool类的子类,它会监听Spring的生命周期回调。当你有线程池,需要在Quartz和非Quartz组件中共用时,这是它的典型用处。
	
	ThreadPoolTaskExecutor 类
	
	它不支持任何对java.util.concurrent包的替换或者下行移植。Doug Lea和Dawid Kurzyniec对java.util.concurrent的实现都采用了不同的包结构,导致它们无法正确运行。
	
	这个实现只能在Java 5环境中使用,但是却是这个环境中最常用的。它暴露的bean properties可以用来配置一个java.util.concurrent.ThreadPoolExecutor,把它包装到一个TaskExecutor中。如果你需要更加先进的类,比如ScheduledThreadPoolExecutor,我们建议你使用ConcurrentTaskExecutor来替代。
	
	TimerTaskExecutor类
	
	这个实现使用一个TimerTask作为其背后的实现。它和SyncTaskExecutor的不同在于,方法调用是在一个独立的线程中进行的,虽然在那个线程中是同步的。
	
	WorkManagerTaskExecutor类
	
	CommonJ 是BEA和IBM联合开发的一套规范。这些规范并非Java EE的标准,但它是BEA和IBM的应用服务器实现的共同标准
	
	这个实现使用了CommonJ WorkManager作为其底层实现,是在Spring context中配置CommonJ WorkManager应用的最重要的类。和SimpleThreadPoolTaskExecutor类似,这个类实现了WorkManager接口,因此可以直接作为WorkManager使用。

而其中ThreadPoolTaskExecutor是最常用的一种。

1.ThreadPoolTaskExecutor

1.1 先看一个常用的配置项

@Configuration
@EnableAsync//开启异步任务的支持
public class TaskExecutorConfig {

    @Bean("TaskExecutor")
    public Executor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        //如果池中的实际线程数小于corePoolSize,无论是否其中有空闲的线程,都会给新的任务产生新的线程
        taskExecutor.setCorePoolSize(5);
        //连接池中保留的最大连接数。
        taskExecutor.setMaxPoolSize(15);
        //queueCapacity 线程池所使用的缓冲队列
        taskExecutor.setQueueCapacity(6000);
        //强烈建议一定要给线程起一个有意义的名称前缀,便于分析日志
        taskExecutor.setThreadNamePrefix("demo Thread-");
        taskExecutor.initialize();
        return taskExecutor;
    }
}

如果在方法上添加@Async,会自动被注入使用ThreadPoolTaskExecutor作为TaskExecutor(线程池),如果配置了多个ThreadPoolTaskExecutor,可以@Async(“ThreadPoolTaskExecutor1”)来指定。

1.2 重点概念解析

private final BlockingQueue<Runnable> workQueue;              //任务缓存队列,用来存放等待执行的任务
private final ReentrantLock mainLock = new ReentrantLock();   //线程池的主要状态锁,对线程池状态(比如线程池大小
                                                              //、runState等)的改变都要使用这个锁
private final HashSet<Worker> workers = new HashSet<Worker>();  //用来存放工作集
 
private volatile long  keepAliveTime;    //线程存货时间   
private volatile boolean allowCoreThreadTimeOut;   //是否允许为核心线程设置存活时间
private volatile int   corePoolSize;     //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数
 
private volatile int   poolSize;       //线程池中当前的线程数
 
private volatile RejectedExecutionHandler handler; //任务拒绝策略
 
private volatile ThreadFactory threadFactory;   //线程工厂,用来创建线程
 
private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数
 
private long completedTaskCount;   //用来记录已经执行完毕的任务个数

corePoolSize: 线程池维护线程的最少数量

keepAliveSeconds 线程池维护线程所允许的空闲时间

maxPoolSize 线程池维护线程的最大数量

queueCapacity 线程池所使用的缓冲队列

当一个任务通过execute(Runnable)方法欲添加到线程池时:

l 如果此时线程池中的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

l 如果此时线程池中的数量等于 corePoolSize,但是缓冲队列 workQueue未满,那么任务被放入缓冲队列。

l 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。

l 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的策略来处理此任务。也就是:处理任务的优先级为:核心线程corePoolSize、任务队列workQueue、最大线程 maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。

l 当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数。 另外MaxPoolSize的设定如果比系统支持的线程数还要大时,会抛出java.lang.OutOfMemoryError: unable to create new native thread 异常

1.3 拒绝策略解析

当最大线程也满后,会使用handler来处理被拒绝的任务,默认的四种处理策略为:

  • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 是一个RuntimeException,因此会中断调用者的处理过程,为java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常,切记ThreadPoolExecutor.execute需要try catch,否则程序会直接退出。
  • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
  • ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面(最旧)的任务,然后重新尝试执行任务(重复此过程)。
  • ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 。
  • 另外还可以定义拒绝策略,这里提供一种方式:
taskExecutor.setRejectedExecutionHandler((Runnable r, ThreadPoolExecutor executor) -> {
                    if (!executor.isShutdown()) {
                        try {
                            executor.getQueue().put(r);
                        } catch (InterruptedException e) {
                            logger.error(e.toString(), e);
                            Thread.currentThread().interrupt();
                        }
                    }
                }
        );

这里的executor.getQueue()会得到BlockingQueue,
BlockingQueue的核心方法:
放入数据:
  offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,
    则返回true,否则返回false.(本方法不阻塞当前执行方法的线程)
  offer(E o, long timeout, TimeUnit unit),可以设定等待的时间,如果在指定的时间内,还不能往队列中
    加入BlockingQueue,则返回失败。
  put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断
    直到BlockingQueue里面有空间再继续.
获取数据:
  poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,
    取不到时返回null;
  poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,
    队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
  take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到
    BlockingQueue有新的数据被加入;
  drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),
    通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
我们可以利用put方法来阻塞调用线程,来避免默认拒绝策略的丢弃任务或者抛出异常。

用ThreadPoolExecutor自定义线程池,看线程是的用途,如果任务量不大,可以用无界队列,如果任务量非常大,要用有界队列,防止OOM。
如果任务量很大,还要求每个任务都处理成功,要对提交的任务进行阻塞提交,重写拒绝机制,改为阻塞提交。保证不抛弃一个任务。
最大线程数一般设为2N+1最好,N是CPU核数。
核心线程数,看应用,如果是任务,一天跑一次,设置为0,合适,因为跑完就停掉了,如果是常用线程池,看任务量,是保留一个核心还是几个核心线程数

源码分析参考:https://www.cnblogs.com/sessionbest/articles/8689220.html

1.4 提交任务

无返回值的任务使用execute(Runnable)
有返回值的任务使用submit(Runnable)

原文地址:https://www.cnblogs.com/seasail/p/12179401.html