ThreadPoolExecutor线程池参数设置技巧

一、ThreadPoolExecutor的重要参数
  • corePoolSize:核心线程数
    • 核心线程会一直存活,及时没有任务需要执行
    • 当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
    • 设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
  • queueCapacity:任务队列容量(阻塞队列)
    • 当核心线程数达到最大时,新任务会放在队列中排队等待执行
  • maxPoolSize:最大线程数
    • 当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
    • 当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常
  • keepAliveTime:线程空闲时间
    • 当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
    • 如果allowCoreThreadTimeout=true,则会直到线程数量=0
  • allowCoreThreadTimeout:允许核心线程超时
  • rejectedExecutionHandler:任务拒绝处理器
    • 两种情况会拒绝处理任务:
      • 当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务
      • 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务
    • 线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy,会抛出异常
    • ThreadPoolExecutor类有几个内部实现类来处理这类情况:
      • AbortPolicy:直接抛出一个异常,默认策略
      • DiscardPolicy: 直接丢弃任务
      • DiscardOldestPolicy:抛弃下一个将要被执行的任务(最旧任务)
      • CallerRunsPolicy:主线程中执行任务
    • 实现RejectedExecutionHandler接口,可自定义处理器
二、ThreadPoolExecutor执行顺序:
     线程池按以下行为执行任务
  1. 当线程数小于核心线程数时,创建线程。
  2. 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
  3. 当线程数大于等于核心线程数,且任务队列已满
    1. 若线程数小于最大线程数,创建线程
    2. 若线程数等于最大线程数,抛出异常,拒绝任务
 
三、如何设置参数
 
  • 默认值
    • corePoolSize=1
    • queueCapacity=Integer.MAX_VALUE
    • maxPoolSize=Integer.MAX_VALUE
    • keepAliveTime=60s
    • allowCoreThreadTimeout=false
    • rejectedExecutionHandler=AbortPolicy()
  • 如何来设置
    • 需要根据几个值来决定
      • tasks :每秒的任务数,假设为500~1000
      • taskcost:每个任务花费时间,假设为0.1s
      • responsetime:系统允许容忍的最大响应时间,假设为1s
    • 做几个计算
      • corePoolSize = 每秒需要多少个线程处理? 
        • threadcount = tasks/(1/taskcost) =tasks*taskcout =  (500~1000)*0.1 = 50~100 个线程。corePoolSize设置应该大于50
        • 根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可
      • queueCapacity = (coreSizePool/taskcost)*responsetime
        • 计算可得 queueCapacity = 80/0.1*1 = 80。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行
        • 切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。
      • maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)
        • 计算可得 maxPoolSize = (1000-80)/10 = 92
        • (最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数
      • rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理
      • keepAliveTime和allowCoreThreadTimeout采用默认通常能满足
  • 以上都是理想值,实际情况下要根据机器性能来决定。如果在未达到最大线程数的情况机器cpu load已经满了,则需要通过升级硬件(呵呵)和优化代码,降低taskcost来处理。
 

几种典型的工作队列

  • ArrayBlockingQueue:使用数组实现的有界阻塞队列,特性先进先出
  • LinkedBlockingQueue:使用链表实现的阻塞队列,特性先进先出,可以设置其容量,默认为Interger.MAX_VALUE,特性先进先出
  • PriorityBlockingQueue:使用平衡二叉树堆,实现的具有优先级的无界阻塞队列
  • DelayQueue:无界阻塞延迟队列,队列中每个元素均有过期时间,当从队列获取元素时,只有
  • 过期元素才会出队列。队列头元素是最块要过期的元素。
  • SynchronousQueue:一个不存储元素的阻塞队列,每个插入操作,必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态
 

几种典型的线程池

SingleThreadExecutor 单个线程线程池

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
创建单个线程。它适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景。

SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1,使用无界队列LinkedBlockingQueue作为线程池的工作队列。

【260期】Java线程池,这篇能让你和面试官聊了半小时

  • 当线程池中没有线程时,会创建一个新线程来执行任务。
  • 当前线程池中有一个线程后,将新任务加入LinkedBlockingQueue
  • 线程执行完第一个任务后,会在一个无限循环中反复从LinkedBlockingQueue 获取任务来执行。

**使用场景:**适用于串行执行任务场景

FixedThreadPool 创建一个线程数量固定的线程池,可控制线程最大并发数,超出的线程会在队列中等待。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(),
                                  threadFactory);
}

corePoolSize等于maximumPoolSize,所以线程池中只有核心线程,使用无界阻塞队列LinkedBlockingQueue作为工作队列

FixedThreadPool是一种线程数量固定的线程池,当线程处于空闲状态时,他们并不会被回收,除非线程池被关闭。当所有的线程都处于活动状态时,新的任务都会处于等待状态,直到有线程空闲出来。

【260期】Java线程池,这篇能让你和面试官聊了半小时

  • 如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务。
  • 在线程数目达到corePoolSize后,将新任务放到LinkedBlockingQueue阻塞队列中。
  • 线程执行完(1)中任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行。

使用场景:适用于处理CPU密集型的任务,确保CPU在长期被工作线程使用的情况下,尽可能的少的分配线程,即适用执行长期的任务。推荐:250期面试题汇总

CachedThreadPool 一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

核心线程数为0,总线程数量阈值为Integer.MAX_VALUE,即可以创建无限的非核心线程

执行流程

  • 先执行SynchronousQueue的offer方法提交任务,并查询线程池中是否有空闲线程来执行SynchronousQueue的poll方法来移除任务。如果有,则配对成功,将任务交给这个空闲线程
  • 否则,配对失败,创建新的线程去处理任务
  • 当线程池中的线程空闲时,会执行SynchronousQueue的poll方法等待执行SynchronousQueue中新提交的任务。若等待超过60s,空闲线程就会终止

流程形象图

【260期】Java线程池,这篇能让你和面试官聊了半小时

结构形象图

【260期】Java线程池,这篇能让你和面试官聊了半小时

使用场景:执行大量短生命周期任务。因为maximumPoolSize是无界的,所以提交任务的速度 > 线程池中线程处理任务的速度就要不断创建新线程;每次提交任务,都会立即有线程去处理,因此CachedThreadPool适用于处理大量、耗时少的任务。

ScheduledThreadPool 创建一个周期线程池,支持定时及周期性任务执行。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

线程总数阈值为Integer.MAX_VALUE,工作队列使用DelayedWorkQueue,非核心线程存活时间为0,所以线程池仅仅包含固定数目的核心线程。

两种方式提交任务:

  • scheduleAtFixedRate: 按照固定速率周期执行
  • scheduleWithFixedDelay:上个任务延迟固定时间后执行

使用场景:周期性执行任务,并且需要限制线程数量的场景

使用无界队列的线程池会导致内存飙升吗?

答案 :会的,newFixedThreadPool使用了无界的阻塞队列LinkedBlockingQueue,如果线程获取一个任务后,任务的执行时间比较长,会导致队列的任务越积越多,导致机器内存使用不停飙升, 最终导致OOM。

参考:
https://www.javazhiyin.com/86333.html
  • https://blog.csdn.net/liuchangjie0112/article/details/90698401
  • https://zhuanlan.zhihu.com/p/73990200
原文地址:https://www.cnblogs.com/personsiglewine/p/13537059.html