ThreadPoolExecutor之一:使用基本介绍

一、concurrent包中的线程池的简单介绍

线程池按照线程数量可以分为:一是固定线程数量的线程池;二是可变数量的线程池。

线程池按照执行时间可以分为:一是立即执行线程池;二是延时线程池。


ThreadPoolExecutor是ExecutorService的一个实现类,它使用可能的几个池线程之一执行每个提交的任务,通常使用 Executors 工厂方法配置。每个ThreadPoolExecutor 还维护着一些基本的统计数据,如完成的任务数。为了便于跨大量上下文使用,此类提供了很多可调整的参数和扩展钩子 (hook)。
但是,强烈建议程序员使用较为方便的 Executors 工厂方法

  •  Executors.newCachedThreadPool()无界线程池,可以进行自动线程回收)--可变数量的线程池
  • Executors.newFixedThreadPool(int)固定大小线程池)                                --固定数量的线程池
  •  Executors.newSingleThreadExecutor()单个后台线程),                       --固定数量的线程池
  • Executors.newScheduledThreadPool(int corePoolSize)                                  --延迟执行线程池
  • Executors.newSingleScheduledExecutor()                                                  --延迟执行线程池[一个重要的方法就是:schedule(Runnable command, long delay, TimeUnit unit)该方法返回了一个 ScheduledFuture。]

在JDK帮助文档中,有如此一段话:“强烈建议程序员使用较为方便的 Executors 工厂方法 Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)Executors.newSingleThreadExecutor()(单个后台线程)”它们均为大多数使用场景预定义了设置。

二、ThreadPoolExecutor类可设置的参数

在ThreadPoolExecutor类中提供了四个构造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

如果想自己扩展实现的话,在手动配置和调整此类时,下面几个参数可以参考。

1、corePoolSize

  核心线程数,核心线程会一直存活,即使没有任务需要处理。当线程数小于核心线程数时,即使现有的线程空闲,线程池也会优先创建新线程来处理任务,而不是直接交给现有的线程处理。

  核心线程在allowCoreThreadTimeout被设置为true时会超时退出,默认情况下不会退出。

2、maxPoolSize

  当线程数大于或等于核心线程,且任务队列已满时,线程池会创建新的线程,直到线程数量达到maxPoolSize。如果线程数已等于maxPoolSize,且任务队列已满,则已超出线程池的处理能力,线程池会拒绝处理任务而抛出异常。

ThreadPoolExecutor调节线程的原则是:先调整到最小线程,最小线程用完后,他会将优先将任务放入缓存队列(offer(task)),等缓冲队列用完了,才会向最大线程数调节。这似乎与我们所理解的线程池模型有点不同。我们一般采用增加到最大线程后,才会放入缓冲队列中,以达到最大性能。ThreadPoolExecutor代码段:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();

        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

线程池执行任务的规则:

      1. 当线程数小于核心线程数时,创建线程。
      2. 当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
      3. 当线程数大于等于核心线程数,且任务队列已满:
      4. 3.1.若线程数小于最大线程数,创建线程
        3.2.若线程数等于最大线程数,抛出异常,拒绝任务

3、keepAliveTime

  当线程空闲时间达到keepAliveTime,该线程会退出,直到线程数量等于corePoolSize。如果allowCoreThreadTimeout设置为true,则所有线程均会退出直到线程数量为0。

4、allowCoreThreadTimeout

  是否允许核心线程空闲退出,默认值为false。

5、unit

  有7种取值,在TimeUnit类中有7种静态属性:

TimeUnit.DAYS;               //
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒

6、workQueue

  一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;

所有 BlockingQueue 都可用于传输和保持提交的任务。可以使用此队列与池大小进行交互:

  • 如果运行的线程少于 corePoolSize,则 Executor 始终首选添加新的线程,而不进行排队。(什么意思?如果当前运行的线程小于corePoolSize,则任务根本不会存放,添加到queue中,而是直接抄家伙(new thread)开始运行)
  • 如果运行的线程等于或多于 corePoolSize,则 Executor 始终首选将请求加入队列,而不添加新的线程
  • 如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。

排队有三种通用策略:

  1. 直接提交工作队列的默认选项是 SynchronousQueue,它将任务直接提交给线程而不保持它们。在此,如果不存在可用于立即运行任务的线程,则试图把任务加入队列将失败,因此会构造一个新的线程。此策略可以避免在处理可能具有内部依赖性的请求集时出现锁。直接提交通常要求无界 maximumPoolSizes 以避免拒绝新提交的任务。当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
  2. 无界队列使用无界队列(例如,不具有预定义容量的 LinkedBlockingQueue)将导致在所有 corePoolSize 线程都忙时新任务在队列中等待。这样,创建的线程就不会超过 corePoolSize。(因此,maximumPoolSize 的值也就无效了。)当每个任务完全独立于其他任务,即任务执行互不影响时,适合于使用无界队列;例如,在 Web 页服务器中。这种排队可用于处理瞬态突发请求,当命令以超过队列所能处理的平均数连续到达时,此策略允许无界线程具有增长的可能性。
  3. 有界队列当使用有限的 maximumPoolSizes 时,有界队列(如 ArrayBlockingQueue)有助于防止资源耗尽,但是可能较难调整和控制。队列大小和最大池大小可能需要相互折衷:使用大型队列和小型池可以最大限度地降低 CPU 使用率、操作系统资源和上下文切换开销,但是可能导致人工降低吞吐量。如果任务频繁阻塞(例如,如果它们是 I/O 边界),则系统可能为超过您许可的更多线程安排时间。使用小型队列通常要求较大的池大小,CPU 使用率较高,但是可能遇到不可接受的调度开销,这样也会降低吞吐量。  

7、queueCapacity

  任务队列容量。从maxPoolSize的描述上可以看出,任务队列的容量会影响到线程的变化,因此任务队列的长度也需要恰当的设置。

8、handler(回绝任务)

  执行回绝的场景,当 Executor 已经关闭,并且 Executor 将有限边界用于最大线程和工作队列容量,且已经饱和时,在方法execute(java.lang.Runnable) 中提交的新任务将被拒绝。在以上两种情况下,execute 方法都将调用其RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四种预定义的处理程序策略:

A. 在默认的 ThreadPoolExecutor.AbortPolicy 中,(默认)回绝任务并抛出异常 RejectedExecutionException。
B. 在 ThreadPoolExecutor.CallerRunsPolicy 中,调用execute()的线程自己来处理该任务,绝大部分情况下是主线程。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。

注意:由于主线程执行这个任务,那么新到来的任务就不会被提交到线程池中执行(而是提交到TCP层的队列,TCP层队列满了,就开始拒绝,此时性能已经很低了),直到主线程执行完这个任务。
C. 在 ThreadPoolExecutor.DiscardPolicy 中,不能被执行的任务会直接被扔掉。
D. 在 ThreadPoolExecutor.DiscardOldestPolicy 中,如果执行程序尚未关闭,如果executor没有被关闭,队列头部的任务将会被丢弃,然后将该任务加到队尾,然后重试执行程序(如果再次失败,则重复此过程)。
定义和使用其他种类的 RejectedExecutionHandler 类也是可能的,但这样做需要非常小心,尤其是当策略仅用于特定容量或排队策略时。

注意1:AbortPolicy,CallerRunsPolicy,DiscardPolicy和DiscardOldestPolicy都是rejectedExecution的一种实现。
    当然也可以自己定义个rejectedExecution实现。

 详细见:《juc线程池原理(二)

系统负载

参数的设置跟系统的负载有直接的关系,下面为系统负载的相关参数:

  • tasks,每秒需要处理的最大任务数量
  • tasktime,处理第个任务所需要的时间
  • responsetime,系统允许任务最大的响应时间,比如每个任务的响应时间不得超过2秒。

钩子 (hook) 方法
    此类提供 protected是可重写的:

protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }

 beforeExecute(java.lang.Thread, java.lang.Runnable)和 afterExecute(java.lang.Runnable, java.lang.Throwable) 方法,这两种方法分别在执行每个任务之前和之后调用。

 它们可用于操纵执行环境;例如,重新初始化 ThreadLocal、搜集统计信息或添加日志条目等。
 此外,还可以重写方法 terminated() 来执行 Executor 完全终止后需要完成的所有特殊处理。
 如果钩子 (hook) 或回调方法抛出异常,则ThreadPoolExecutor的所有线程将依次失败并突然终止。 


终止
    如果ThreadPoolExecutor在程序中没有任何引用且没有任何活动线程,它也不会自动 shutdown。
    如果希望确保回收线程(即使用户忘记调用 shutdown()),则必须安排未使用的线程最终终止:
设置适当保持活动时间,使用0核心线程的下边界和/或设置 allowCoreThreadTimeOut(boolean)。
 

三、ThreadPoolExecutor的函数说明

主要成员函数
public void execute(Runnable command)
    在将来某个时间执行给定任务。可以在新线程中或者在现有池线程中执行该任务。 如果无法将任务提交执行,或者因为此执行程序已关闭,或者因为已达到其容量,则该任务由当前 RejectedExecutionHandler 处理。
    参数:
        command - 要执行的任务。 
    抛出:
        RejectedExecutionException - 如果无法接收要执行的任务,则由 RejectedExecutionHandler 决定是否抛出 RejectedExecutionException 
        NullPointerException - 如果命令为 null
public void shutdown()
    按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。如果已经关闭,则调用没有其他作用。
    抛出:
        SecurityException - 如果安全管理器存在并且关闭此 ExecutorService 可能操作某些不允许调用者修改的线程(因为它没有 RuntimePermission("modifyThread")),或者安全管理器的 checkAccess 方法拒绝访问。
public List<Runnable> shutdownNow()
    尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。在从此方法返回的任务队列中排空(移除)这些任务。
    并不保证能够停止正在处理的活动执行任务,但是会尽力尝试。 此实现通过 Thread.interrupt() 取消任务,所以无法响应中断的任何任务可能永远无法终止。
    返回:
        从未开始执行的任务的列表。 
    抛出:
        SecurityException - 如果安全管理器存在并且关闭此 ExecutorService 
        可能操作某些不允许调用者修改的线程(因为它没有 RuntimePermission("modifyThread")),
        或者安全管理器的 checkAccess 方法拒绝访问。
public int prestartAllCoreThreads()
    启动所有核心线程,使其处于等待工作的空闲状态。仅当执行新任务时,此操作才重写默认的启动核心线程策略。
    返回:
        已启动的线程数
public boolean allowsCoreThreadTimeOut()
    如果此池允许核心线程超时和终止,如果在 keepAlive 时间内没有任务到达,新任务到达时正在替换(如果需要),则返回 true。当返回 true 时,适用于非核心线程的相同的保持活动策略也同样适用于核心线程。当返回 false(默认值)时,由于没有传入任务,核心线程不会终止。
    返回:
        如果允许核心线程超时,则返回 true;否则返回 false
public void allowCoreThreadTimeOut(boolean value)
    如果在保持活动时间内没有任务到达,新任务到达时正在替换(如果需要),则设置控制核心线程是超时还是终止的策略。当为 false(默认值)时,由于没有传入任务,核心线程将永远不会中止。当为 true 时,适用于非核心线程的相同的保持活动策略也同样适用于核心线程。为了避免连续线程替换,保持活动时间在设置为 true 时必须大于 0。通常应该在主动使用该池前调用此方法。
    参数:
        value - 如果应该超时,则为 true;否则为 false 
    抛出:
        IllegalArgumentException - 如果 value 为 true 并且当前保持活动时间不大于 0。
public boolean remove(Runnable task)
    从执行程序的内部队列中移除此任务(如果存在),从而如果尚未开始,则让其不再运行。
    此方法可用作取消方案的一部分。它可能无法移除在放置到内部队列之前已经转换为其他形式的任务。
    例如,使用 submit 输入的任务可能被转换为维护 Future 状态的形式。但是,在此情况下,purge() 方法可用于移除那些已被取消的 Future。
    参数:
        task - 要移除的任务 
    返回:
        如果已经移除任务,则返回 true
public void purge()
    尝试从工作队列移除所有已取消的 Future 任务。此方法可用作存储回收操作,它对功能没有任何影响。
    取消的任务不会再次执行,但是它们可能在工作队列中累积,直到worker线程主动将其移除。
    调用此方法将试图立即移除它们。但是,如果出现其他线程的干预,那么此方法移除任务将失败。
当然它还实现了的ExecutorServicesubmit系列接口

abstract <T> Future<T> submit(Runnable task, T result) --如果执行成功就返回T result
abstract <T> Future<T> submit(Callable<T> task)--Submits a value-returning task for execution and returns a Future representing the pending results of the task.
abstract Future<?> submit(Runnable task) --Submits a Runnable task for execution and returns a Future representing that task.

四、Executor 的执行逻辑

原文地址:https://www.cnblogs.com/duanxz/p/5050684.html