Java中的线程池

线程池

java.util.concurrent.ExecutorService 表述了异步执行的机制,并且可以让任务在后台执行。ExecutorService是一个接口,继承了Executor接口。Executor接口只包含了一个方法:void execute(Runnable command);,该方法接收一个 Runable 实例,它用来执行一个任务,任务即一个实现了 Runnable 接口的类。jdk默认提供了四种线程池供我们使用。

大体的继承结构如下:

一、线程池的状态

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量

image-20201018204822162

状态名 高三位 接受新任务 处理阻塞队列任务 说明
RUNNING 111 线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。
SHUTDOWN 000 不会接收新任务,但会处理阻塞队列剩余任务(调用shutdown()方法)
STOP 001 会中断正在执行的任务,并抛弃阻塞队列任务(调用shutdownNow()方法)
TIDYING 010 - - 任务全执行完毕,活动线程为 0 即将进入终结
TERMINATED 011 - - 终结状态

从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING,因为最高位表示符号位,这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值。

状态转换图如下:

二、ThreadPoolExecutor构造方法

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 
  • corePoolSize: 核心线程数目 (最多保留的线程数),线程池中会维护一个最小的线程数量,即使这些线程处理空闲状态,他们也不会 被销毁,除非设置了allowCoreThreadTimeOut。这里的最小线程数量即是corePoolSize。

  • maximumPoolSize :最大线程数目,一个任务被提交到线程池以后,首先会找有没有空闲存活线程,如果有则直接执行,如果没有则会缓存到工作队列中,如果工作队列满了,才会创建一个新线程(这就是救急线程),然后从工作队列的头部取出一个任务交由新线程来处理,而将刚提交的任务放入工作队列尾部。线程池不会无限制的去创建救急线程,它会有一个最大线程数量的限制,这个数量即由maximunPoolSize的数量减去corePoolSize的数量来确定,最多能达到maximunPoolSize即最大线程池线程数量。

  • keepAliveTime :生存时间 - 针对救急线程

  • unit: 时间单位 - 针对救急线程

  • workQueue :新任务被提交后,如果没有空闲的核心线程就会先进入到此工作队列中,任务调度时再从队列中取出任务。jdk中常见的任务队列:

    ①ArrayBlockingQueue

    基于数组的有界阻塞队列,按FIFO排序。有界的数组可以防止资源耗尽问题。当线程池中线程数量达到corePoolSize后,再有新任务进来,则会将任务放入该队列的队尾,等待被调度。如果队列已经是满的,则创建一个新线程,如果线程数量已经达到maxPoolSize,则会执行拒绝策略。

    ②LinkedBlockingQuene

    基于链表的无界阻塞队列(其实最大容量为Interger.MAX),按照FIFO排序。由于该队列的近似无界性,当线程池中线程数量达到corePoolSize后,再有新任务进来,会一直存入该队列,而不会去创建新线程直到maxPoolSize,因此使用该工作队列时,参数maxPoolSize其实是不起作用的。

    ③SynchronousQuene

    一个不缓存任务的阻塞队列,生产者放入一个任务必须等到消费者取出这个任务。也就是说新任务进来时,不会缓存,而是直接被调度执行该任务,如果没有可用线程,则创建新线程,如果线程数量达到maxPoolSize,则执行拒绝策略。

    ④PriorityBlockingQueue

    具有优先级的无界阻塞队列,优先级通过参数Comparator实现。

  • threadFactory :线程工厂 - 可以为线程创建时起个好名字

  • handler: 拒绝策略

    ​ 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现:

    image-20201018212537896

    • AbortPolicy策略:抛出 RejectedExecutionException 异常,这是默认策略

      image-20201018212901799

    • CallerRunsPolicy策略: 在调用者线程中直接执行被拒绝任务的run方法,除非线程池已经shutdown,则直接抛弃任务。

      image-20201018213014259

    • DiscardPolicy策略: 放弃本次任务

      image-20201018213333323

    • DiscardOldestPolicy策略: 放弃队列中最早的任务,本任务取而代之

      image-20201018213434194

三、四种线程池

Executors提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。

  1. public static ExecutorService newFixedThreadPool(int nThreads) : 创建固定数目线程的线程池。

     public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    特点:

    • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间。
    • 阻塞队列是无界的,可以放任意数量的任务
    • 适用于任务量已知,相对耗时的任务
  2. public static ExecutorService newCachedThreadPool():创建一个可缓存的线程池,调用execute将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线 程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。

     public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>(),
                                          threadFactory);
        }
    

    特点:

    • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着全部都是救急线程(60s 后可以回收)

    • 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)

    • 救急线程可以无限创建。

    • 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线

      程。 适合任务数比较密集,但每个任务执行时间较短的情况

  3. public static ExecutorService newSingleThreadExecutor(): 创建一个单线程化的Executor。

      public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    特点:

    • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作

    • 相较于其它三种线程池,newSingleThreadExecutor()创建的是FinalizableDelegatedExecutorService对象,其应用的是装饰者模式 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法。FinalizableDelegatedExecutorService继承自DelegatedExecutorService中的方法都是调用ExecutorService 接口的方法

      image-20201018215704221

    • 适用于希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程也不会被释放。

  4. public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize): 创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。

     public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
     public ScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue());
        }
    

    ScheduledExecutorService接口的方法

    /**
    *指定延迟时间后执行
    */
    public ScheduledFuture<?> schedule(Runnable command,
                                           long delay, TimeUnit unit);
    
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                               long delay, TimeUnit unit);
    
    /**
    *scheduleAtFixedRate ,是以上一个任务开始的时间计时,period时间过去后,检测上一个任务是否执行完毕,如果上一个任务执行完*
    毕,则当前任务立即执行,如果上一个任务没有执行完毕,则需要等上一个任务执行完毕后立即执行。
    */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit);
    /**
    *scheduleWithFixedDelay,是以上一个任务结束时开始计时,period时间过去后,立即执行。
    */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                         long initialDelay,
                                                         long delay,
                                                         TimeUnit unit);
    

四、ExecutorService的常用方法

// 执行任务
void execute(Runnable command);

// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);

// 提交 tasks 中所有任务,会阻塞,必须等待所有的任务执行完成后统一返回结果
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

// 提交 tasks 中所有任务,带超时时间,这里的超时时间是针对的所有tasks,而不是单个task的超时时间。同样会阻塞,会堵塞,必须等待所有的任务执行完成后统一返回。但是如果超时,会取消没有执行完的所有任务,并抛出超时异常
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,会阻塞,直到返回结果
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间,直到返回结果,取消没有执行完的所有任务,并抛出超时异常
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)
 throws InterruptedException, ExecutionException, TimeoutException;


/*
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 不会中断当前运行的线程
- 此方法不会阻塞调用线程的执行
*/
void shutdown();


/*
线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow()
    
// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();


// 线程池状态是否是 TERMINATED
boolean isTerminated();


// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事
//情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

原文地址:https://www.cnblogs.com/myblogstart/p/13848456.html