并发编程(十八):ScheduledThreadPoolExcutor和FutureTask


学习资料

《Java并发编程的艺术》10.3~10.4


1.ScheduledThreadPoolExecutor详解

1.1 简介

主要用来在给定的延迟之后运行任务,或者定期执行任务,功能与Timer类似

Timer对应的是单个后台线程,而ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数

1.2 ScheduledThreadPoolExecutor的运行机制

使用scheduleAtFixedRate()scheduleWithFixedDelay()方法添加任务ScheduledFutureTask

对ThreadPoolExecutor做了如下的修改:

  • 使用DelayQueue作为任务队列
  • 获取任务的方式不同
  • 执行周期任务后,增加了额外的处理

1.3 ScheduledThreadPoolExecutor的实现

ScheduledFutureTask主要包含3个成员变量

  • long型成员变量 time ,表示这个任务将要被执行的具体时间

  • long型成员变量 sequenceNumber ,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号

  • long型成员变量period,表示任务执行的间隔周期

DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中的ScheduledFutureTask进行排序:(如果两个任务的被执行时间相同,那么先提交的任务将被先执行)

  • 先比较time,time小的排在前面
  • time相同,就比较sequenceNumber,sequenceNumber小的排在前面

任务执行的四个步骤:

主要流程:

获取已到期任务—>执行任务—>修改time为下次执行时间—>添加回任务队列

1.4 DelayQueue的take()与add()

第一步(获取已到期任务)和最后一步测操作(添加回任务队列)

DelayQueue.take()

获取任务示意图:

三步:

  1. 获取Lock
  2. 获取周期任务
    • PriorityQueue为空,当前线程到Condition中等待;否则执行2.2(2.1)
    • PriorityQueue的头元素time>now,到Condition中等待time;否则执行2.3.1(2.2)
    • 获取PriorityQueue的头元素(2.3.1);之后如果PriorityQueue不为空,则唤醒在Condition中等待的所有线程(2.3.2)
  3. 释放Lock

DelayQueue.add()

添加任务示意图:

三步:

  1. 获取Lock
  2. 添加任务
    • 向PriorityQueue添加任务(2.1)
    • 如果在2.1中添加的任务是PriorityQueue的头元素,唤醒在Condition中等待的所有线程
  3. 释放Lock

2.FutureTask详解

Future接口和实现Future接口的FutureTask类,代表异步计算的结果

2.1 FutureTask简介

run()状态迁移

FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。

FutureTask.run()执行时FutureTask可以处于下面3种状态:

  • 未启动:该方法还没有被执行之前
  • 已启动:该方法被执行的过程中
  • 已完成:正常执行完,取消(cancel(…))或者出现异常

FutureTask的状态迁移的示意图

get()和cancel()

FutureTask.get()的行为:

  • 未启动或已启动,导致调用线程阻塞;
  • 已完成,导致调用线程立即返回结果或抛出异常;

FutureTask.cancel()的行为:

  • 未启动,导致此任务永远不会被执行;
  • 已启动:
    • FutureTask.cancel(true)会中断当前线程执行
    • FutureTask.cancel(false)让正在执行的任务运行完成再停止
  • 已完成,返回false

get方法和cancel方法的执行示意图:

2.2 FutureTask使用

三种使用方式:

  • 把FutureTask交给Executor执行;
  • 通过ExecutorService.submit(…)方法返回一个FutureTask,然后执行FutureTask.get()方法或FutureTask.cancel(…)方法。
  • 单独使用FutureTask

当一个线程需要等待另一个线程把某个任务执行完后它才能继续执行,此时可以使用FutureTask

2.3 FutureTask实现

AQS内部类

FutureTask的实现基于AQS,AQS被作为“模板方法模式”的基础类提供给FutureTask的内部子类Sync,Sync实现了AQS的tryAcquireShared(int)方法和tryReleaseShared(int)方法,Sync通过这两个方法来检查和更新同步状态,示意图如下:

  • FutureTask所有的的公有方法都直接委托给了内部私有的Sync
  • tryAcquireShared()方法来判断acquire操作是否可以成功
    • state为执行完成状态RAN或已取消状态CANCELLED,且runner不为null,则返回true
  • 其他线程执行release操作会唤醒当前线程,会造成级联唤醒

FutureTask的级联唤醒

FutureTask.run()的执行过程如下:

  1. 执行在构造函数中指定的任务
  2. 以原子方式来更新同步状态,成功设置返回值并调用AQS.releaseShared(int arg)
  3. AQS.releaseShared(int arg)调用实现方法sync.tryReleaseShared(arg),唤醒线程等待队列中的第一个线程
  4. 调用FutureTask.done()

执行FutureTask.get()等待,FutureTask.run()唤醒第一个线程A,A唤醒B后从get()方法返回,以此类推:

最终,在队列中等待的所有线程都被级联唤醒并从get()方法返回


原文地址:https://www.cnblogs.com/kenshine/p/14520720.html