读《Java并发编程的艺术》学习笔记(十)

第10章  Executor框架
        Java线程既是工作单元,也是执行机制。从JDK 5开始,把工作单元和执行机制分离开来。工作单元包括Runnable和Callable,而执行机制有Executor框架提供

10.1  Executor框架简介
    10.1.1  Executor框架的两级调度模型
        在HotSpot VM的线程模型中,Java线程被一对一映射为本地操作系统线程。Java线程启动时会创建一个本地操作系统线程。当该Java线程终止时,这个操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。 

        在上层,Java多线程程序通常会把应用分解成若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。 其模型图如下: 

  1、Executor框架的结构 
          Executor框架主要又3大部分组成: 

              1. 任务:包括被执行任务需要实现的接口:Runnable接口或Callable接口。 

              2. 任务的执行:包括任务执行机制的核心接口Executor,及其子接口ExecutorService接口。 Executor框架有两个关节类实现了ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)    

              3. 异步计算的结果:包括接口Future和其实现类FutureTask类。 

    Executor框架的类与接口关系示意图: 

 

  下面是这些类和接口的简介。

          Executor是一个接口,它是Executor框架的基础,它将任务的提交和任务的执行分离开来。

          ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。

          ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。SchduledThreadPoolExecutor比Timer更灵活,功能更强大。

          Future接口和实现Future接口的FutureTask类,代表异步计算的结果

          Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor或SchduledThreadPoolExecutor执行

    2、Executor框架的成员
        Executor框架的主要成员:ThreadPoolExecutor、ScheduledThreadPoolExecutor、Future接口、Runnable接口、Callable接口、Executors。

        (1)ThreadPoolExecutor

          ThreadPoolExecutor通常使用工厂类Executors来创建,Executors可以创建3种类型的ThreadPoolExecutor: SingleThreadExecutor、FixedThreadPool、CachedThreadPool。

              1) FixedThreadPool:创建固定线程数的线程池,构造函数中可以指定线程数量,适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。

                FixedThreadPool内部使用无界队列LinkedBlockingQueue作为任务队列,队列的容量为Integer.MAX_VALUE,由于是无界队列,所以不会拒绝任务,可能会造成任务无限堆积,从而导致系统资源耗尽的情况。

              2) SingleThreadExecutor:创建单个线程的线程池,可以保证顺序执行任务。与FixedThreadPool类似,只是SingleThreadExecutor的线程数固定为1

               3) CachedThreadPool:可以根据需要创建新的线程,CachedThreadPool是大小无界的线程池,适用于执行很多短期异步任务的小程序,或者是负载比较轻的服务器。CachedThreadPool的corePool为空,maximumPoolSize为Integer.MAX_VALUE,keepAliveTime为60L,这意味着线程空闲超过60秒则会进行回收。CachedThreadPool内部使用不存储元素的SynchronousQueue作为任务队列(一个put操作等待着一个take操作),这意味着如果任务的提交速度高于线程的处理速度,那么CachedThreadPool则会不断的创建新的线程,在极端的情况下,会耗尽CPU和内存资源。            

        (2) SchduledThreadPoolExecutor

          SchduledThreadPoolExecutor通常使用工厂类Executors来创建,Executors可以创建2中类型的SchduledThreadPoolExecutor,如下:

                  SchduledThreadPoolExecutor。包含若干个线程的SchduledThreadPoolExecutor。

                  SingleThreadSchduledExecutor。只包含一个线程的SchduledThreadPoolExecutor。

        (3)Future接口

          Future接口和实现Future接口的FutureTask类用来表示异步计算的结果,当我们把Runnable接口或者Callable接口的实现类提交(submit)给ThreadPoolExecutor或者SchduledThreadPoolExecutor时,ThreadPoolExecutor或者SchduledThreadPoolExecutor会向我们返回一个FutureTask对象。下面是对应的API

       (4) Runnable和Callable接口

          Runnable和Callable接口的实现类都可以被hreadPoolExecutor或者SchduledThreadPoolExecutor执行。它们之间的区别是Runnable不会返回结果,而Callable可以返回结果。

          除了可以自已创建实现Callable接口的对象外,还可以使用工厂类Executors来把一个Runnable包装成一个Callable。

          当我们把一个Callable对象提交给ThreadPoolExecutor或者SchduledThreadPoolExecutor执行时,summit()会向我们返回一个FutureTask对象。我们可以执行FutureTask.get()来等待任务执行完成。当任务完成后FutureTask.get()将会返回任务的结果。

10.2  ThreadPoolExecutor详解
        ThreadPoolExecutor是Executor框架最核心的类。主要由corePool(核心线程池大小)、maximumPool(最大线程池大小)、BlockingQueue(工作队列)和RejecteExecutionHandler(饱和策略)构成。 

        ThreadPoolExecutor通常使用Executors来创建。Executors可以创建3中类型的ThreadPoolExecutor:SingleThreadExecutor、FixedThreadPool和CachedThreadPool。

    10.2.1  FixedThreadPool详解 
     FixedThreadPool 被称为可重用固定线程数的线程池。 

  FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads。当线程池数大于corePoolSize时,keepAliveTime为多余空闲线程等待新任务的最长时间,超过这个时间多余线程就会被终止。keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止。 其运行示意图如下: 

     1>如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务。 

             2>在线程池完成预热之后(当前线程中有一个运行的线程),将任务加入LinkedBlockingQueue。 

             3>线程执行完1中的任务后,会在一个无线循环中反复从LinkedBlockingQueue获取任务来执行。 

          FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。使用无界队列作为工作队列会对线程池带来如下影响: 

              a>当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程不会超过corePoolSize。 

              b>maximumPoolSize变为一个无效参数。 

              c>keepAliveTime也变为一个无效参数。 

              d>永远不会执行饱和策略。 

 10.2.2 SingleThreadExecutor详解
        SingleThreadExecutor是使用单个worker线程的Executor。 下面是其源代码: 

    SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1。并使用LinkedBlockingQueue无界队列作为工作队列。 其运行示意图如下: 

  

    1>如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则创建一个新线程来执行任务。 

          2>在线程池完成预热之后(当前线程中有一个运行的线程),将任务加入LinkedBlockingQueue。 

          3>线程执行完1中的任务后,会在一个无线循环中反复从LinkedBlockingQueue获取任务来执行。 

 10.2.3 CachedThreadPool详解
        CacheThreadPool是一个会更加需要创建新线程的线程池。 下面是其源代码: 

   CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE,即maximumPool时无界的。keepAliveTime被设置为60L,意味着空闲线程超过60秒后被终止。 

        CachedThread使用没有容量的SynchronousQueue作为线程池的工作队列,但其maximumPool是无界的。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度是,CachedThreadPool会不断创建新的线程,极端情况下,会耗尽CPU和内存资源。 其执行示意图如下: 

      1>首先执行SynchronousQueue.offer(Runnable task)。如果当前有空闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECODES),则配对成功,将任务交给空闲线程执行。 

             2>当没有空闲线程时,创建一个新线程执行任务。 

             3>线程在执行任务完毕后,执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECODES),向队列请求任务,并等待60秒。如果60之后仍没有新任务,则被终止。如果有新任务则继续执行。 

10.3  ScheduledThreadPoolExecutor 详解
        ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运行任务,或定期执行任务。 ScheduledThreadPoolExecutor通常使用工厂类Executors来创建。Executors可以创建2种类型的ScheduledThreadPoolExecutor:

            1>ScheduledThreadPoolExecutor适用于需要执行周期任务,同时为了满足资源管理的需求而限制后台线程的数量的应用场景。 

            2>SingleThreadScheduledExecutor:只包含一个线程的ScheduledThreadPoolExecutor,适用于需要单个后台线程执行周期任务,同时需要保证顺序地执行各任务的应用场景。 

  10.3.1  ScheduledThreadPoolExecutor的运行机制
          其执行示意图如下: 

   DelayQueue是一个无界队列,所以ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中没什么意义。ScheduledThreadPoolExecutor的执行分为两大部分:

            1>当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()或scheduleWithFixedDelay()时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFutur接口的ScheduledFutureTask。

            2>线程池中的线程从DelayQueue中获取ScheduledFutureTask并执行。 

 10.3.2  ScheduledThreadPoolExecutor的实现 
          ScheduledThreadPoolExecutor把待调度的任务放在一个DelayQueue中。ScheduledFutureTask主要包含3个成员变量: 

                  long型变量 time,表示这个任务将要被执行的具体时间。 

                  long型变量 sequenceNumbe,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号。 

                  long型成员变量 period,表示任务执行的间隔周期。 

          DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对对列中的ScheduledFutureTask进行排序。time小的先执行,被排在前面。如果两个任务time相同则比较sequenceNumbe。 

          ScheduledThreadPoolExecutor执行任务的步骤如下: 

     1>线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())。到期任务是指ScheduledFutureTask的time大于等于当前时间。 

                2>线程1执行这个ScheduledFutureTask。 

                3>线程1修改ScheduledFutureTask的time变量为下次要被执行的时间。 

                4>线程1把修改time之后的ScheduledFutureTask放入DelayQueue(DelayQueue.add())。 

        以下是DelayQueue.take()方法源码: 

  DelayQueue.add()方法源码: 

10.4  FutureTask详解
        Future接口和实现Future接口的FutureTask类用来表示异步计算的结果。

     10.4.1  FutureTask 简介
          FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。FutureTask有未启动、已启动、已完成3种状态。 

          FutureTask的状态迁移示意图如下: 

     1>未启动。FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态。当创建一个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。 

          2>已启动。FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。 

          3>已完成。FutureTask.run()方法执行完后正常结束,或被取消(FutureTask.cancel()),或FutureTask.run()时抛出异常而异常结束,FutureTask处于已完成状态。 

    10.4.2  FutureTask的使用
            下面是FutureTask在不同状态时调用FutureTask.get()及Future.cancel()方法的执行示意图: 

  10.4.3 FutureTask的实现
    FutureTask实现基于AQS.

    FutureTask的实现基于AbstractQueuedSynchronizer(以下简称为AQS)。java.util.concurrent中的很多可阻塞类(比如ReentrantLock)都是基于AQS来实现的。

    AQS是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。

    JDK 6中AQS被广泛使用,基于AQS实现的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch和FutureTask。
    每一个基于AQS实现的同步器都会包含两种类型的操作,如下。
      1)至少一个acquire操作。这个操作阻塞调用线程,除非/直到AQS的状态允许这个线程继续执行。FutureTask的acquire操作为get()/get(long timeout,TimeUnit unit)方法调用。
      2)至少一个release操作。这个操作改变AQS的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞。FutureTask的release操作包括run()方法和cancel(…)方法。
    基于“复合优先于继承”的原则,FutureTask声明了一个内部私有的继承于AQS的子类Sync,对FutureTask所有公有方法的调用都会委托给这个内部子类。

原文地址:https://www.cnblogs.com/mYunYu/p/13077111.html