Executors的使用

Executors的使用

一、线程池工作顺序

corePoolSize -> 任务队列 -> maximumPoolSize -> 拒绝策略

二、自带的线程池的工厂方法

1、newFixedThreadPool

  • JDK文档描述:创建一个固定的现成池,该线程重用固定数量的线程。如果所有线程都处于活动状态,并且有新的任务时,会在队列中进行等待,直到线程可用。如果现成在关闭之前发生故障而终止,在执行后续任务时,则新现成将取代它池中的线程将存在,直到明确显示。
  • 构造函数
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

2.newSingleThreadExecutor

  • JDK文档描述:创建一个Executor使用一个工作现成在无界队列中运行,如果在shutdown之前执行期间出现故障而导致单个线程终止,则在执行后续任务时,则使用新的线程。
  • 构造函数
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

3.newCachedThreadPool

  • JDK文档描述:根据实际需要创建新线程的线程池,但是会先重用之前已经有的线程(如果有可用的线程时),如果没有可用的现有线程,回创建一个新的线程并添加到线程池中,这些通常会提高执行许多短期异步任务的程序的性能;如果线程在60s未被使用,则会从线程池中删除,所以,长时间闲置的池将不会消耗任何资源
  • 构造函数
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

4.newSingleThreadScheduledExecutor

  • JDK文档描述:创建一个单线程执行程序,可以在给定的时间内延后执行或者定时执行,如果在shutdown之前执行期间出现故障导致单个线程终止,则在执行后续任务时,会创建一个新的线程。
  • 构造函数
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }

5.newScheduledThreadPool

  • JDK文档描述:创建一个线程池,可以在给定的时间内延后执行活儿定时执行。
  • 构造函数
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }


    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
     public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }

6.newWorkStealingPool

  • JDK文档描述:jdk1.8以后,创建一个线程池,该线程池维护足够的线程并支持给定的并行级别,并且使用多个队列来减少争用,并行度级别对应任务处理中参与的最大线程数。并行度不指定时,默认和CPU数量相等。
  • 构造函数
    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }
    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

三、部分线程池的区别

1.newFixedThreadPool(1) 和newSingleThreadExecutor()的区别

newSingleThreadExecutor 可以保证顺序执行,且当线程中出现异常,会创建一个新的线程替换,newFixedThreadPool中的线程如果遇到错误则会终止,且无法使用代替线程。同理,2.newScheduledThreadPool(1) 和newSingleThreadScheduledExecutor也是一样的。

四、ThreadPoolExecutor 参数说明

参数名称 参数描述
intcorePoolSize 指定线程池中线程的数量,线程池中运行的线程数也永远不会超过 corePoolSize 个,默认情况下可以一直存活。可以通过设置allowCoreThreadTimeOut为True,此时 核心线程数就是0,此时keepAliveTime控制所有线程的超时时间。
int maximumPoolSize 指定线程池中线程的最大数量
long keepAliveTime 当线程池中的数量大约corePoolSize时,多余的空闲线程存活的时间,如果超过了corePoolSize,在KeepAliveTime后,会自动销毁
TimeUnit unit KeepAliveTime的时间单位
BlockingQueue workQueue 线程队列,将被提交但尚未执行的任务缓存队列,是java.util.concurrent下的主要用来控制线程同步的工具。如果BlockQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒。同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间才会被唤醒继续操作
ThreadFactory threadFactory 线程工厂,用户创建线程,不指定的时候使用默认的线程工厂DefaultThreadFactoy
RejectedExecutionHandler handler 拒绝策略

五、BlockingQueue类型

直接提交的队列

  • 定义:没有容量的队列,当线程池进行入队操作的时候,本身没有容量,直接返回false,并没有保存下来,直接提交给线程来做处理,如果没有空余的线程,则执行拒绝策略。
  • 代表队列:SynchronousQueue

有界的任务队列

  • 定义:当使用有界任务队列时,当有任务进行提交时,线程池的线程数量小于corePoolSize则创建新的线程来执行任务,当线程池的线程数量大于corePoolSize的时候,则将提交的任务放入到队列中,当提交的任务塞满队列后,如果线程池的线程数量没有超过maximumPoolSize,则创建新的线程执行任务,如果超过了maximumPoolSize则执行拒绝策略。
  • 代表队列:ArrayBlockingQueue

无界的任务队列

  • 定义:有任务提交到线程池时,如果线程池的数量小于corePoolSize,线程池会产生新的线程来执行任务,当线程池的线程数量大于corePoolSize时,则将提交的任务放入到队列中,等待执行任务的线程执行完之后进行消费队列中的任务,若后续仍有新的任务提交,而没有空闲的线程时,它会不断往队列中入队提交的任务,直到资源耗尽。
  • 代表队列:LinkedBlockingQueue

优先任务队列

  • 定义:队列可以根据任务自身的优先级顺序先后执行,在确保性能的同时,也能有很好的质量保证。
  • 代表队列:PriorityBlockingQueue

五、拒绝策略

  • AbortPolicy策略:默认的线程池的拒绝策略,如果线程池队列满了则直接拒绝,并且抛出 RejectedExecutionException 异常。
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            //直接抛出异常
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
    }

  • CallerRunsPolicy策略:只要线程池没有关闭线程池状态是RUNNING状态,该略略直接调用线程中运行当前被丢弃的任务
  
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                //线程池未关闭的话交由调用者执行
                r.run();
            }
    }

  • DiscardOledestPolicy策略:如果队列满了,将丢弃最老的一个请求腾出空间,并且尝试加入队列。
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                //移除队头
                e.getQueue().poll();
                //尝试加入队列
                e.execute(r);
            }
        

  • DiscardPolicy策略:拒绝任务的处理程序,以静默方式丢弃被拒绝的任务。
  public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    //不做任何处理
  }
  • 自定义拒绝策略:实现RejectedExecutionHandler接口,并且重写rejectedExecution即可

/**
 * The type Custom rejected handler.
 *
 * @author yujuan
 * @time 2019 /08/23 17:10:11
 */
public class CustomRejectedHandler implements RejectedExecutionHandler {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r instanceof Lock1){ //Lock1 为一个自定义的runable
            //TODO 业务逻辑
        }
    }
}


六、如何合理的设置线程池的大小

一般需要根据任务的类型来配置线程池的大小

  • 如果是CPU密集型的任务,就需要尽量压榨CPU,参考值可以设置为NCPU+1
  • 如果是IO密集型的任务,参考值可以设置为2×NCPU

具体的设置可以根据实际情况来做调整,可以根据服务的运行状态及负载资源利用率等来做对应的调整。

参考:如何估计线程池的大小

原文地址:https://www.cnblogs.com/jakaBlog/p/11401363.html