ThreadPoolExecutor 线程池

线程池的好处

实现了任务提交和任务执行的解耦, 用户只需提供Runnable对象, 而任务的执行和调度都由线程池负责

  • 降低资源消耗
  • 提升系统响应速度
  • 提高线程的可管理性

工作流程

执行过程图

  1. 首先判断线程池是否已满, 未满->创建新的核心线程执行任务
  2. 已满->判断阻塞队列是否已满, 未满->加入阻塞队列
  3. 已满->线程池是否已满(自旋?), 未满->创建新线程执行任务
  4. 已满->按照饱和策略 handler处理

线程的创建 参数讲解

ThreadPoolExecutor 的构造方法

ThreadPoolExecutor(
    int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler){}
  • corePoolSize:表示核心线程池的大小
    如果当前线程池中线程个数小于它, 新任务到来时就会创建新的线程(即使当前核心线程池有空闲的线程)

  • maximumPoolSize:表示线程池能创建线程的最大个数
    如果当阻塞队列已满时,假如线程池线程个数没有超过 maximumPoolSize 的话,就会创建新的线程来执行任务

  • keepAliveTime:空闲线程存活时间

    超过corePoolSize的空闲线程, 空闲时间如果超过该值就会被销毁

  • unit: 时间单位

  • workQueue:阻塞队列, 用于保存任务的阻塞队列
    可以使用ArrayBlockingQueue, LinkedBlockingQueue, SynchronousQueue, PriorityBlockingQueue

  • threadFactory:创建线程的工厂类

  • handler: 饱和策略, 如果线程和线程池阻塞队列都已饱和, 就需要一种策略来处理新来的线程

    • AbortPolicy: 直接拒绝所提交的任务,并抛出RejectedExecution-Exception异常;
    • CallerRunsPolicy:直接用调用者所在的线程来执行任务;
      也就是说, 让处理请求的线程来执行任务, 这会降低新任务的提交速度,影响整体性能
      对于可伸缩的应用, 建议使用该策略, 因为最大池填满之后, 这个策略会提供一个可伸缩队列, 保存任务
    • DiscardPolicy:不处理直接丢弃掉任务;
    • DiscardOldestPolicy:丢弃掉阻塞队列中存放时间最久的任务,添加当前任务

线程池参数 - 如何设计线程池

主要从任务的性质角度考虑: CPU 密集型任务,IO 密集型任务和混合型任务

  • 任务执行的时间: 长、中、短
  • 任务的优先级:
  • 任务的依赖性:

不同类型的任务

  • CPU 密集型任务配置尽可能少的线程数量, 可以配置NCPU+1个线程

  • IO 密集型任务则由于需要等待 IO 操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2xNCPU

  • 混合型的任务,可以将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务

    只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率
    如果这两个任务执行时间相差太大,则没必要进行分解

  • 优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优先级高的任务先得到执行
    如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行, 饿死

  • 执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行

  • 依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,等待的时间越长 CPU 空闲时间就越长
    所以数据连接池, 线程数应该设置尽量大,才能更好的利用 CPU

  • 阻塞队列最好是使用有界队列,如果采用无界队列的话,一旦任务积压在阻塞队列中的话就会占用过多的内存资源,甚至会使得系统崩溃

ThreadPoolExecutor 的数据结构

使用整型状态数 ctl - 线程池的状态

通过位切分同时表示线程池状态和活动线程数量, 这样也更容易保证线程安全, 状态位在高3位,活动线程数在低29位, 正常的增减操作就可以控制线程数量

  • COUNT_BITS 为 29
    int RUNNING    = -1 << COUNT_BITS;
    int SHUTDOWN   =  0 << COUNT_BITS;
    int STOP       =  1 << COUNT_BITS;
    int TIDYING    =  2 << COUNT_BITS;
    int TERMINATED =  3 << COUNT_BITS;
    
  • running - 111
    能够接受新任务
  • shutdown - shutdown() - 000
    不接受新任务, 可以处理已加入的任务
  • stop - shutdownNow() - 001
    不接受新任务, 不处理已添加的任务, 中断正在执行的任务
    如何中断? 在安全点中断
  • tidying - 010
    所有的任务都已终止, ctl记录为0
    可以定义钩子函数 terminated()
  • terminated - 011
    线程池彻底终止后转为的彻底终止状态

worker类 - 可以说是一把封装了线程和任务的锁

继承了AQS, mainLock&termination 非公平锁, 成员 thread 通过 getThreadFactory().newThread(this) 实例化

  • 成员 firstTask 在实例化时由线程池提供
  • 成员 completedTasks 表明完成任务的数量
  • HashSet:workers 是线程池中所拥有的线程集合

如何实现线程复用

  • 这个worker有点特殊, 通过工厂产生的线程并不是任务, 而是自身this

  • 那这个worker将如何工作呢? 它将以线程的形式工作, 也就是 run() 方法, run方法会调用外部的方法 runWorker(),

  • 这才是任务被执行的核心, 在runWorker()中, while (task != null || (task = getTask()) != null) 就调用 task.run() 方法来执行任务.

那么它为什么是个锁? 为什么需要加锁?

  • worker 继承了 AbstractQueuedSynchronizer, 根据其重写的 tryAcquire 和 tryRelease, 可以看出这是一把非公平、独占的非重入锁.

  • worker内置锁在初始化的时候 state为(-1), 只有在worker.start()之后, 才执行worker.unlock()将state++, 将锁置于正常状态, 因为获取锁时都是执行 CAS(0,1) 操作

  • 如果发现state>=0, 说明该worker已经被start(), 已经开始执行任务

  • 其次worker有一个 interruptIfStarted(), state为-1时, 是无法进行中断的, 这应该是出于保护firsttask的目的
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted())

  • 线程位于sleep中,无论何时都会响应 InterruptedException, 那么对于处于sleep的Worker, 执行shtdownNow()会直接抛出异常, 结束执行

execute 方法 - 不能保证顺序执行

1. 比较线程池中线程的数量和core pool size的大小关系

假如小于核心线程池大小,则调用 addWorker(),创建一个线程执行任务

  • addWorker 会实例化一个 worker 对象加入到 workers 中
  • 还要执行一些检查和cas同步操作, 修改workingCount等
  • 如何实现线程复用? 初次执行是来自于实例化时, 之后则来自于阻塞队列,
    默认是无限定时间的, 如果自己是核心线程, 就会一直从阻塞队列中take任务, 一直阻塞一直存活一直工作
    有限定时间的话, 任务线程从任务队列取任务(出队)时poll()是有最大等待时间的, 如果超时, 就对外返回null, 让当前线程结束
  • Excutors类提供了DefaultThreadFactory
  • ThreadFactory中包含ThreadGroup:group、
  • Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);

2. 无法直接创建就要进入阻塞队列

入队成功,我们也要二次检查能否创建一个线程,因为这个过程中可能有线程死亡,还要再检查一次线程池是否要关闭

如果线程池将要关闭,就 remove(command)&reject(command),如果所有的线程都执行任务完毕(workingCount==0),还要创建新线程

3. 两种尝试都失败

如果入队也失败,尝试MAX_POOL_SIZE模式创建线程也失败,就就会 reject(command);

线程池的关闭

关闭线程池,可以通过shutdown和shutdownNow这两个方法

shutdownNow

首先将线程池的状态设置为STOP,然后尝试停止所有的正在执行和未执行任务的线程,并返回等待执行任务的列表;

  • 将运行状态设置为STOP - 修改ctl

  • 执行interruptWorkers() 方法
    for (Worker w : workers) w.interruptIfStarted();
    就像名字一样, 如果worker已经start了, 调用每一个worker线程的 interrupt()

    • 如果线程池状态为STOP,要保证当前线程是中断状态;
      如果不是的话,则要保证当前线程不是中断状态;
      如果是STOP, 需要保证worker在本次任务完成后自行退出
      如果不为STOP, 那么需要保证外界的interrupt不影响内部线程的执行
    • 触发了STOP的interrupted-worker, 会在getTask的时候抛出异常进而执行processWorkerExit()
      为了表明和线程池为空的区别
  • 然后执行tryTerminated()方法
    主要是interruptIdleWorkers()
    和ctl的更新

    • interruptIdleWorkers() 目标是为了终止未在执行任务的worker, 这些worker会阻塞在workingQueue处, 能够成功获取锁, 然后interrupt
    • 更新ctl, 过渡至tidying状态

shutdown

shutdown只是将线程池的状态设置为SHUTDOWN状态,然后中断所有没有正在执行任务的线程, 会等待线程执行完

  • 区别在于先将状态改为SHUTDOWN
  • 如果workingQueue不为空就不再采取措施, 等待线程池将任务执行完毕之后自行终止
  • 当所有的线程都关闭成功,才表示线程池成功关闭,这时调用isTerminated方法才会返回 true

线程的死亡

  1. 外部扰动, 调用了 interruptIfStarted() 方法, 在执行阻塞队列的take()方法时异常, 离开循环
  2. 外部扰动, 将运行状态改为 STOP, 变为interrupted, 转情况1
  3. 外部扰动, 线程阻塞在workingQueue的take()方法处, 外部通过interruptIdleWorkers()将线程变为interrupted, 转情况1
  4. 内部消化 (wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty()), 任务为null, 自行终止
  5. task.run() 方法执行过程中出现运行时错误, 异常在循环外被处理, 离开循环
原文地址:https://www.cnblogs.com/imzhizi/p/thread-pool-executor-source-code-analysis.html