【1】线程池的使用

思路:通过ThreadPoolExecutor创建线程池,它有一些参数,任务提交给线程池之后的处理流程。不过java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池,各方法的区别,优缺点,问题。

一.Java中的ThreadPoolExecutor类

  java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。

  在ThreadPoolExecutor类中提供了四个构造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

   ThreadPoolExecutor类构造方法中各个参数的含义:

  • corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
  • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
    TimeUnit.DAYS;               //
    TimeUnit.HOURS;             //小时
    TimeUnit.MINUTES;           //分钟
    TimeUnit.SECONDS;           //
    TimeUnit.MILLISECONDS;      //毫秒
    TimeUnit.MICROSECONDS;      //微妙
    TimeUnit.NANOSECONDS;       //纳秒

     workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

    ArrayBlockingQueue;
    LinkedBlockingQueue;
    SynchronousQueue;
    

    ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

  • 阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。
  • threadFactory:线程工厂,主要用来创建线程;
  • handler:表示当拒绝处理任务时的策略,有以下四种取值:

   

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。(默认策略) 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:这个策略从字面上也很好理解,丢弃最老的。也就是说如果队列满了,会将最早进入队列的任务删掉腾出空间,再尝试加入队列。 因为队列是队尾进,队头出,所以队头元素是最老的,因此每次都是移除队头元素后再尝试入队。
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务,使用此策略,如果添加到线程池失败,那么主线程会自己去执行该任务,不会等待线程池中的线程去执行 

二.深入剖析线程池实现原理

任务提交给线程池之后到被执行的整个流程有了一个基本的了解,下面总结一下:

要知道任务提交给线程池之后的处理策略,这里总结一下主要有4点:

  • 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
  • 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
  • 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
  • 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

任务缓存队列及排队策略

  在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。

  workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:

  1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;

  2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;

  3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。配合newCachedThreadPool使用,corePoolSize=maximumPoolSize无上限

SynchronousQueue没有容量。与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue。每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。

cachedThreadPool使用此类型队列来确保,如果现有线程无法接收任务,将会创建新的线程来执行.不然按照上述任务提交给线程池之后的处理策略,将会进入队列等待,虽然cachedThreadPool最大线程数没有上限

任务拒绝策略

饱和策略

(1)当有界队列被填满后,饱和策略开始发挥作用。ThreadPoolExecutor的饱和策略可以通过调用setRejectedExecutionHander来修改。(如果某个任务被提交到一个已被关闭的Executor时,也会用到饱和策略。)JDK提供了几种不同的RejectedExecutionHandler实现,每种实现都包含不同的饱和策略:AbortPolicy、CallerRunsPolicy、DiscardPolicy和DiscardOldestPolicy。

(2)中止(Abort)策略是默认的饱和策略,该策略将抛出未检查的RejectedExecutionException。调用者可以捕获这个异常,然后根据需要编写自己的处理代码。

(3)当新提交的任务无法保存到队列中等待执行时,“抛弃(Discard)”策略会悄悄抛弃该任务。

(4)“抛弃最旧的(Discard-Oldest)”策略则会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。(如果工作队列是一个优先队列,那么“抛弃最旧的”策略将导致抛弃优先级最高的任务,因此最好不要将“抛弃最旧的”饱和策略和优先队列放在一起使用。)

(5)“调用者运行(Caller-Runs)”策略实现了一种调度机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了execute的线程中执行该任务。如果采用有界队列和“调用者运行”饱和策略,当线程池中的所有线程都被占用,并且工作队列被填满后,下一个任务会在调用execute时在主线程中执行。由于执行任务需要一定的时间,因此主线程至少在一段时间内不能提交任何任务,从而使得工作者线程有时间来处理完正在执行的任务。在此期间,主线程不会调用accept,因此到达的请求将被保存在TCP层的队列中而不是在应用程序的队列中。如果持续过载,那么TCP层将最终发现它的请求队列被填满,因此同样会开始抛弃请求。当服务器过载时,这种过载情况会逐渐向外蔓延开来——从线程池到工作队列到应用程序再到TCP层,最终达到客户端,导致服务器在高负载下实现一种平缓的性能降低。

 

线程池的关闭

  ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

  • shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
  • shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

不过在java doc中,并不提倡我们直接使用ThreadPoolExecutor,而是使用Executors类中提供的几个静态方法来创建线程池

Executors.newCachedThreadPool();        //创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE
Executors.newSingleThreadExecutor();   //创建容量为1的缓冲池
Executors.newFixedThreadPool(int);    //创建固定容量大小的缓冲池

下面是这三个静态方法的具体实现;

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

  

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

线程池的基本大小设置为零,最大大小设置为Integer.MAX_VALUE,线程池可以被无限扩展,需求降低时自动收缩,最大大小设置过大在某些情况下也是缺点。

/**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

/**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given core pool size.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

从它们的具体实现来看,它们实际上也是调用了ThreadPoolExecutor,只不过参数都已配置好了。

这几种创建线程池方法的使用场景(阿里):

newFixedThreadPool创建的线程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue缺点是LinkedBlockingQueue是无界队列,有些情况下排队的任务会很多。因此设置其他值的maximumPoolSize也是无效的,因为队列不会满。

用途:可控制线程最大并发数

newSingleThreadExecutorcorePoolSize和maximumPoolSize都设置为1,也使用的LinkedBlockingQueue

用途:不会并发,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行,每次任务到来后都会进入阻塞队列,然后按指定顺序执行。

newCachedThreadPoolcorePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE,使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程。

用途:它可以一定程序减少频繁创建/销毁线程,减少系统开销,适用于执行时间短并且数量多的任务场景。

newScheduledThreadPool。

用途:执行定时任务或周期性的任务。

  实际中,如果Executors提供的三个静态方法能满足要求,就尽量使用它提供的三个方法,因为自己去手动配置ThreadPoolExecutor的参数有点麻烦,要根据实际任务的类型和数量来进行配置。

  另外,如果ThreadPoolExecutor达不到要求,可以自己继承ThreadPoolExecutor类进行重写。

  

newScheduledThreadPool

使用 ScheduledExecutor 进行任务调度 

 需要注意的是,只有当任务的执行时间到来时,ScheduedExecutor 才会真正启动一个线程,其余时间 ScheduledExecutor 都是在轮询任务的状态。 

public class ScheduledThreadPoolTest {

    public static void main(String[] args) {
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        //定时,表示延迟3秒执行。
//        scheduledThreadPool.schedule(new Runnable() {
//            @Override
//            public void run() {
//                SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
//                System.out.println("thread:" + Thread.currentThread().getName() + ",time:" + sdf1.format(new Date()));
//            }
//        }, 3, TimeUnit.SECONDS);


        //循环周期执行,表示延迟1秒后每3秒执行一次。
        scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println("delay 1 seconds, and excute every 3 seconds");

            }
        }, 1, 3, TimeUnit.SECONDS);

    }
}
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
 
public class ScheduledExecutorTest implements Runnable {
    private String jobName = "";
 
    public ScheduledExecutorTest(String jobName) {
        super();
        this.jobName = jobName;
    }
 
    @Override
    public void run() {
        System.out.println("execute " + jobName);
    }
 
    public static void main(String[] args) {
        ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
 
        long initialDelay1 = 1;
        long period1 = 1;
        // 从现在开始1秒钟之后,每隔1秒钟执行一次job1
        service.scheduleAtFixedRate(
                new ScheduledExecutorTest("job1"), initialDelay1,
                period1, TimeUnit.SECONDS);
 
        long initialDelay2 = 1;
        long delay2 = 1;
        // 从现在开始2秒钟之后,每隔2秒钟执行一次job2
        service.scheduleWithFixedDelay(
                new ScheduledExecutorTest("job2"), initialDelay2,
                delay2, TimeUnit.SECONDS);
    }
}
Output:
execute job1
execute job1
execute job2
execute job1
execute job1
execute job2

ScheduledExecutorService 中两种最常用的调度方法 ScheduleAtFixedRate 和 ScheduleWithFixedDelay。ScheduleAtFixedRate 每次执行时间为上一次任务开始起向后推一个时间间隔,即每次执行时间为 :initialDelay, initialDelay+period, initialDelay+2*period, …;ScheduleWithFixedDelay 每次执行时间为上一次任务结束起向后推一个时间间隔,即每次执行时间为:initialDelay, initialDelay+executeTime+delay, initialDelay+2*executeTime+2*delay。由此可见,ScheduleAtFixedRate 是基于固定时间间隔进行任务调度,ScheduleWithFixedDelay 取决于每次任务执行的时间长短,是基于不固定时间间隔进行任务调度。

四.如何合理配置线程池的大小

  如果是CPU密集型任务,就需要尽量压榨CPU,参考值可以设为 NCPU+1

  如果是IO密集型任务,参考值可以设置为2*NCPU

  

(3.1)线程池的理想大小取决于被提交任务的类型以及所部署系统的特性。在代码中不会固定线程池的大小,而应该通过某种配置机制来提供,或者根据Runtime.availableProcessors来动态计算。

(3.2)要设置线程池的大小也并不困难,只需要避免“过大”或“过小”这两种极端情况。如果设置过大,那么大量的线程将在相对很少的CPU和内存资源上发生竞争,这不仅会导致更高的内存使用量,而且还可能耗尽资源。如果设置过小,那么将导致很多空闲的处理器无法执行工作,从而降低吞吐率。

(3.3)要想正确设置线程池的大小,必须分析计算环境,资源预算和任务的特性。在部署的系统中有多少CPU?多大的内存?计算是计算密集型、I/O密集型还是二者皆可?它们是否需要像JDBC连接这样的稀缺资源?如果需要执行不同类别的任务,并且它们之间的行为相差很大,那么应该考虑使用多个线程池,从而使每个线程池可以根据自己的工作负载来调整。

(3.4)对于计算密集型的任务,在拥有Ncpu个处理器的系统上,当线程池的大小为Ncpu+1时,通常能实现最优的利用率。(计算当计算密集型的线程偶尔由于页缺失故障或者其他原因而暂停时,这个“额外”的线程也能确保CPU的时钟周期不会被浪费)

(3.5)对于包含I/O操作或者其他阻塞操作的任务,你必须估算出任务的等待时间与计算时间的比值。这种估算不需要很精确,并且可以通过一些分析或者监控工具来获得。你还可以通过另一种方法来调节线程池的大小:在某个基准负载下,分别设置不同大小的线程池来运行应用程序,并观察CPU利用率的水平。给定如下列定义:

                      Ncpu = number of CPUs

                     Ucpu = target CPU utilization,0 <= Ucpu <= 1

                     W/C = ratio of wait time to compute time

      要使处理器达到期望的使用率,线程池的最优大小等于:

                    Nthreads = Ncpu * Ucpu * (1 + W/C)

     可以通过Runtime来获得CPU的数目:

                   int N_CPUS = Runtime.getRuntime().availableProcessors();

 管理队列任务

(1)单线程的Executor是一种值得注意的特例:它们能确保不会有任务并发执行,因为它们通过线程封闭来实现线程安全性。

(2)如果无限制地创建线程,将会导致不稳定性。可以通过采用固定大小的线程池(而不是每收到一个请求就创建一个新线程)来解决这个问题。然而,这个方案并不完整。在高负载的情况下,应用程序仍可能耗尽资源,只是问题的概率较小。如果新请求的到达超过了线程池的处理效率,那么新到来的请求将累计起来。在线程池中,这些请求会在一个由Executor管理的Runable队列中等待,而不会像线程那样去竞争CPU资源,通过一个Runnable和一个链表节点来表现一个等待中的任务,当然比使用线程来表示的开销低得多,但如果客户提交给服务器请求的效率超过了服务器的处理效率,那么仍可能会耗尽资源。

(3)ThreadPoolExecutor允许提供一个BlockingQueue来保存等待执行的任务。基本的任务排队方法有3种:无界队列、有界队列和同步移交(Synchronous Handoff)。队列的选择与其他的配置参数有关,例如线程池的大小。

(4)newFixedThreadPool和newSingleThreadExecutor在默认的情况下将使用一个无界的LinkedBlockingQueue。如果所有工作者线程都处于忙碌状态,那么任务将在队列中等候。如果任务持续快速地到达,并且超过了线程池处理它们的速度,那么队列将无限制地增加

(5)一种更稳妥的资源管理策略是使用有界队列,例如ArrayBlockingQueue、有界的LinkedBlockQueue、PriorityBlockingQueue。有界队列有助于避免资源耗尽的情况发生,但它又带来了新的问题:当队列填满后,新的任务该怎么办?(有许多拒绝策略可以解决这个问题)在使用有界的工作队列时,队列的大小和线程池的大小必须一起调节。如果线程池较小而队列较大,那么有助于减少内存使用量,降低CPU的使用率,同时还可以减少上下文切换,但付出的代价是可能会限制吞吐量。

(6)对于非常大的或者无界的线程池,可以通过使用SynchronousQueue来避免任务排队,以及直接将任务从生产者移交给工作者线程。SynchronousQueue不是一个真正的队列,而是一种在线程之间进行移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接受这个元素。

(7)当使用像LinkedBlockingQueue或ArrayBlockingQueue这样的FIFO(先进先出)队列时,任务的执行顺序与它们的到达顺序相同。如果想进一步控制任务执行顺序,还可以使用PriorityBlockingQueue,这个队列将根据优先级来安排任务。

(8)只有当任务相互独立时,为线程池或工作队列设置界限才是合理的。如果任务之间存在依赖性,那么有界的线程池或队列就可能导致线程“饥饿”死锁问题此时应该使用无界的线程池,例如newCacheThreadPool。

 死锁问题参见:https://www.cnblogs.com/twoheads/p/10729240.html

http://www.cnblogs.com/dolphin0520/p/3932921.html

https://www.cnblogs.com/fdzfd/p/9190244.html

原文地址:https://www.cnblogs.com/twoheads/p/9698432.html