线程池小结(JDK8)

 1、线程池的好处

  1. 降低资源消耗(重复利用已创建的线程减少创建和销毁线程的开销)
  2. 提高响应速度(无须创建线程)
  3. 提高线程的可管理性

2、相关类图

JDK5以后将工作单元和执行机制分离开来,工作单元包括Runnable和Callable;执行机制由Executor框架提供,管理线程的生命周期,将任务的提交和如何执行进行解耦。Executors是一个快速得到线程池的工具类,相关的类图如下所示:

3、Executor框架接口

Executor接口

Executor接口只有一个execute方法,用来替代通常创建或启动线程的方法。

public interface Executor {
    void execute(Runnable command);
}

ExecutorService接口

ExecutorService接口继承自Executor接口,加入了关闭方法、submit方法和对Callable、Future的支持。

ScheduledExecutorService接口

 ScheduledExecutorService扩展ExecutorService接口并加入了对定时任务的支持。

4、ThreadPoolExecutor分析

ThreadPoolExecutor继承自AbstractExecutorService,也是实现了ExecutorService接口。

 4.1 内部状态

 1     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 2     private static final int COUNT_BITS = Integer.SIZE - 3;
 3     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 4 
 5     // runState is stored in the high-order bits
 6     private static final int RUNNING    = -1 << COUNT_BITS;
 7     private static final int SHUTDOWN   =  0 << COUNT_BITS;
 8     private static final int STOP       =  1 << COUNT_BITS;
 9     private static final int TIDYING    =  2 << COUNT_BITS;
10     private static final int TERMINATED =  3 << COUNT_BITS;
11 
12     // Packing and unpacking ctl
13     private static int runStateOf(int c)     { return c & ~CAPACITY; }
14     private static int workerCountOf(int c)  { return c & CAPACITY; }
15     private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl是对线程池的运行状态(高3位)和线程池中有效线程的数量(低29位)进行控制的一个字段。线程池有五种状态,分别是:

  1. RUNNING:-1 << COUNT_BITS,即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
  2. SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
  3. STOP : 1 << COUNT_BITS,即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
  4. TIDYING : 2 << COUNT_BITS,即高3位为010, 所有的任务都已经终止;
  5. TERMINATED: 3 << COUNT_BITS,即高3位为011, terminated()方法已经执行完成。

 

4.2 构造方法

构造方法有4个,这里只列出其中最基础的一个。

 1     public ThreadPoolExecutor(int corePoolSize,
 2                               int maximumPoolSize,
 3                               long keepAliveTime,
 4                               TimeUnit unit,
 5                               BlockingQueue<Runnable> workQueue,
 6                               ThreadFactory threadFactory,
 7                               RejectedExecutionHandler handler) {
 8         if (corePoolSize < 0 ||
 9             maximumPoolSize <= 0 ||
10             maximumPoolSize < corePoolSize ||
11             keepAliveTime < 0)
12             throw new IllegalArgumentException();
13         if (workQueue == null || threadFactory == null || handler == null)
14             throw new NullPointerException();
15         this.corePoolSize = corePoolSize;
16         this.maximumPoolSize = maximumPoolSize;
17         this.workQueue = workQueue;
18         this.keepAliveTime = unit.toNanos(keepAliveTime);
19         this.threadFactory = threadFactory;
20         this.handler = handler;
21     }

构造方法中参数的含义如下:

  • corePoolSize:核心线程数量,线程池中应该常驻的线程数量
  • maximumPoolSize:线程池允许的最大线程数,非核心线程在超时之后会被清除
  • keepAliveTime:线程没有任务执行时可以保持的时间
  • unit:时间单位
  • workQueue:阻塞队列,存储等待执行的任务。JDK提供了如下4种阻塞队列:
    • ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
    • LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQuene;
    • SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQuene;
    • PriorityBlockingQuene:具有优先级的无界阻塞队列;
  • threadFactory:线程工厂,来创建线程
  • handler:线程池的饱和策略。如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。线程池提供了4种策略:
    • AbortPolicy:直接抛出异常,这是默认策略;
    • CallerRunsPolicy:用调用者所在的线程来执行任务;
    • DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    • DiscardPolicy:直接丢弃任务。

4.3 execute方法

ThreadPoolExecutor.execute(task)实现了Executor.execute(task),用来提交任务,不能获取返回值,代码如下:

 1     public void execute(Runnable command) {
 2         if (command == null)
 3             throw new NullPointerException();
 4         /*
 5          * Proceed in 3 steps:
 6          *
 7          * 1. If fewer than corePoolSize threads are running, try to
 8          * start a new thread with the given command as its first
 9          * task.  The call to addWorker atomically checks runState and
10          * workerCount, and so prevents false alarms that would add
11          * threads when it shouldn't, by returning false.
12          *
13          * 2. If a task can be successfully queued, then we still need
14          * to double-check whether we should have added a thread
15          * (because existing ones died since last checking) or that
16          * the pool shut down since entry into this method. So we
17          * recheck state and if necessary roll back the enqueuing if
18          * stopped, or start a new thread if there are none.
19          *
20          * 3. If we cannot queue task, then we try to add a new
21          * thread.  If it fails, we know we are shut down or saturated
22          * and so reject the task.
23          */
24         int c = ctl.get();
25     /*
26      * workerCountOf方法取出低29位的值,表示当前活动的线程数;
27      * 如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中;
28      * 并把任务添加到该线程中。
29      */
30     
31         if (workerCountOf(c) < corePoolSize) {
32         /*
33          * addWorker中的第二个参数表示限制添加线程的数量是根据corePoolSize来判断还是maximumPoolSize来判断;
34          * 如果为true,根据corePoolSize来判断;
35          * 如果为false,则根据maximumPoolSize来判断
36          */
37             if (addWorker(command, true))
38                 return;
39         /*
40          * 如果添加失败,则重新获取ctl值
41          */
42             c = ctl.get();
43         }
44     /*
45      * 线程池处于RUNNING状态,把提交的任务成功放入阻塞队列中
46      */
47         if (isRunning(c) && workQueue.offer(command)) {
48     // 重新获取ctl值
49             int recheck = ctl.get();
50         // 再次判断线程池的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了,
51         // 这时需要移除该command
52         // 执行过后通过handler使用拒绝策略对该任务进行处理,整个方法返回
53             if (! isRunning(recheck) && remove(command))
54                 reject(command);
55         /*
56          * 获取线程池中的有效线程数,如果数量是0,则执行addWorker方法
57          * 这里传入的参数表示:
58          * 1. 第一个参数为null,表示在线程池中创建一个线程,但不去启动;
59          * 2. 第二个参数为false,将线程池的有限线程数量的上限设置为maximumPoolSize,添加线程时根据maximumPoolSize来判断;
60          * 如果判断workerCount大于0,则直接返回,在workQueue中新增的command会在将来的某个时刻被执行。
61          */
62             else if (workerCountOf(recheck) == 0)
63                 addWorker(null, false);
64         }
65     /*
66      * 如果执行到这里,有两种情况:
67      * 1. 线程池已经不是RUNNING状态;
68      * 2. 线程池是RUNNING状态,但workerCount >= corePoolSize并且workQueue已满。
69      * 这时,再次调用addWorker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumPoolSize;
70      * 如果失败则拒绝该任务
71      */
72         else if (!addWorker(command, false))
73             reject(command);
74     }

如果线程池状态一直是RUNNING,则执行过程如下:

  1. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任务;
  2. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
  3. 如果workerCount >= corePoolSize && workerCount < maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新提交的任务;
  4. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

 

4.4 addWorker方法

从executor的方法实现可以看出,addWorker主要负责创建新的线程并执行任务。线程池创建新线程执行任务时,需要获取全局锁:

 1     private boolean addWorker(Runnable firstTask, boolean core) {
 2         retry:
 3         for (;;) {
 4             int c = ctl.get();
 5             // 获取运行状态
 6             int rs = runStateOf(c);
 7         /*
 8          * 这个if判断
 9          * 如果rs >= SHUTDOWN,则表示此时不再接收新任务;
10          * 接着判断以下3个条件,只要有1个不满足,则返回false:
11          * 1. rs == SHUTDOWN,这时表示关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务
12          * 2. firsTask为空
13          * 3. 阻塞队列不为空
14          *
15          * 首先考虑rs == SHUTDOWN的情况
16          * 这种情况下不会接受新提交的任务,所以在firstTask不为空的时候会返回false;
17          * 然后,如果firstTask为空,并且workQueue也为空,则返回false,
18          * 因为队列中已经没有任务了,不需要再添加线程了
19          */
20             // Check if queue empty only if necessary.
21             if (rs >= SHUTDOWN &&
22                 ! (rs == SHUTDOWN &&
23                    firstTask == null &&
24                    ! workQueue.isEmpty()))
25                 return false;
26 
27             for (;;) {
28                 // 获取线程数
29                 int wc = workerCountOf(c);
30               // 如果wc超过CAPACITY,也就是ctl的低29位的最大值(二进制是29个1),返回false;
31               // 这里的core是addWorker方法的第二个参数,如果为true表示根据corePoolSize来比较,
32               // 如果为false则根据maximumPoolSize来比较。
33                 if (wc >= CAPACITY ||
34                     wc >= (core ? corePoolSize : maximumPoolSize))
35                     return false;
36                // 尝试增加workerCount,如果成功,则跳出第一个for循环
37                 if (compareAndIncrementWorkerCount(c))
38                     break retry;
39               // 如果增加workerCount失败,则重新获取ctl的值
40                 c = ctl.get();  // Re-read ctl
41               // 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行
42                 if (runStateOf(c) != rs)
43                     continue retry;
44                 // else CAS failed due to workerCount change; retry inner loop
45             }
46         }
47 
48         boolean workerStarted = false;
49         boolean workerAdded = false;
50         Worker w = null;
51         try {
52             // 根据firstTask来创建Worker对象
53             w = new Worker(firstTask);
54             // 每一个Worker对象都会创建一个线程
55             final Thread t = w.thread;
56             if (t != null) {
57                 final ReentrantLock mainLock = this.mainLock;
58                 mainLock.lock();
59                 try {
60                     // Recheck while holding lock.
61                     // Back out on ThreadFactory failure or if
62                     // shut down before lock acquired.
63                     int rs = runStateOf(ctl.get());
64                   // rs < SHUTDOWN表示是RUNNING状态;
65                   // 如果rs是RUNNING状态或者rs是SHUTDOWN状态并且firstTask为null,向线程池中添加线程。
66                   // 因为在SHUTDOWN时不会在添加新的任务,但还是会执行workQueue中的任务
67                     if (rs < SHUTDOWN ||
68                         (rs == SHUTDOWN && firstTask == null)) {
69                         if (t.isAlive()) // precheck that t is startable
70                             throw new IllegalThreadStateException();
71                 // workers是一个HashSet
72                         workers.add(w);
73                         int s = workers.size();
74                 // largestPoolSize记录着线程池中出现过的最大线程数量
75                         if (s > largestPoolSize)
76                             largestPoolSize = s;
77                         workerAdded = true;
78                     }
79                 } finally {
80                     mainLock.unlock();
81                 }
82                 if (workerAdded) {
83             // 启动线程,执行任务(Worker.thread(firstTask).start());
84             //启动时会调用Worker类中的run方法,Worker本身实现了Runnable接口,所以一个Worker类型的对象也是一个线程。
85                     t.start();
86                     workerStarted = true;
87                 }
88             }
89         } finally {
90             if (! workerStarted)
91                 addWorkerFailed(w);
92         }
93         return workerStarted;
94     }

4.5 Worker类

线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象。Worker类设计如下:

  1. 继承了AQS类,用于判断线程是否空闲以及是否可以被中断,可以方便的实现工作线程的中止操作;
  2. 实现了Runnable接口,可以将自身作为一个任务在工作线程中执行;
  3. 当前提交的任务firstTask作为参数传入Worker的构造方法;
 1     private final class Worker
 2         extends AbstractQueuedSynchronizer
 3         implements Runnable
 4     {
 5         /**
 6          * This class will never be serialized, but we provide a
 7          * serialVersionUID to suppress a javac warning.
 8          */
 9         private static final long serialVersionUID = 6138294804551838833L;
10 
11         /** Thread this worker is running in.  Null if factory fails. */
12         final Thread thread;
13         /** Initial task to run.  Possibly null. */
14         Runnable firstTask;
15         /** Per-thread task counter */
16         volatile long completedTasks;
17 
18         /**
19          * Creates with given first task and thread from ThreadFactory.
20          * @param firstTask the first task (null if none)
21          */
22         Worker(Runnable firstTask) {
23             setState(-1); // inhibit interrupts until runWorker
24             this.firstTask = firstTask;
25             this.thread = getThreadFactory().newThread(this);
26         }
27 
28         /** Delegates main run loop to outer runWorker  */
29         public void run() {
30             runWorker(this);
31         }
32 
33         // Lock methods
34         //
35         // The value 0 represents the unlocked state.
36         // The value 1 represents the locked state.
37 
38         protected boolean isHeldExclusively() {
39             return getState() != 0;
40         }
41 
42         protected boolean tryAcquire(int unused) {
43             if (compareAndSetState(0, 1)) {
44                 setExclusiveOwnerThread(Thread.currentThread());
45                 return true;
46             }
47             return false;
48         }
49 
50         protected boolean tryRelease(int unused) {
51             setExclusiveOwnerThread(null);
52             setState(0);
53             return true;
54         }
55 
56         public void lock()        { acquire(1); }
57         public boolean tryLock()  { return tryAcquire(1); }
58         public void unlock()      { release(1); }
59         public boolean isLocked() { return isHeldExclusively(); }
60 
61         void interruptIfStarted() {
62             Thread t;
63             if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
64                 try {
65                     t.interrupt();
66                 } catch (SecurityException ignore) {
67                 }
68             }
69         }
70     }

4.6 runWorker方法

 Worker类中的run方法调用了runWorker方法来执行任务,执行过程如下:

  1. 线程启动之后,通过unlock方法释放锁,设置AQS的state为0,表示运行可中断;
  2. Worker执行firstTask或从workQueue中获取任务:
    1. 进行加锁操作,保证thread不被其他线程中断(除非线程池被中断)
    2. 检查线程池状态,倘若线程池处于中断状态,当前线程将中断。
    3. 执行beforeExecute
    4. 执行任务的run方法
    5. 执行afterExecute方法
    6. 解锁操作
 1     final void runWorker(Worker w) {
 2         Thread wt = Thread.currentThread();
 3         // 获取第一个任务
 4         Runnable task = w.firstTask;
 5         w.firstTask = null;
 6         // 允许中断
 7         w.unlock(); // allow interrupts
 8         boolean completedAbruptly = true;
 9         try {
10         // 如果task为空,则通过getTask来获取任务
11             while (task != null || (task = getTask()) != null) {
12                 w.lock();
13                 // If pool is stopping, ensure thread is interrupted;
14                 // if not, ensure thread is not interrupted.  This
15                 // requires a recheck in second case to deal with
16                 // shutdownNow race while clearing interrupt
17                 if ((runStateAtLeast(ctl.get(), STOP) ||
18                      (Thread.interrupted() &&
19                       runStateAtLeast(ctl.get(), STOP))) &&
20                     !wt.isInterrupted())
21                     wt.interrupt();
22                 try {
23                     beforeExecute(wt, task);
24                     Throwable thrown = null;
25                     try {
26                         task.run();
27                     } catch (RuntimeException x) {
28                         thrown = x; throw x;
29                     } catch (Error x) {
30                         thrown = x; throw x;
31                     } catch (Throwable x) {
32                         thrown = x; throw new Error(x);
33                     } finally {
34                         afterExecute(task, thrown);
35                     }
36                 } finally {
37                     task = null;
38                     w.completedTasks++;
39                     w.unlock();
40                 }
41             }
42             completedAbruptly = false;
43         } finally {
44             processWorkerExit(w, completedAbruptly);
45         }
46     }

4.7 getTask方法

 getTask方法用来从阻塞队列中取等待的任务

 1     private Runnable getTask() {
 2         boolean timedOut = false; // Did the last poll() time out?
 3 
 4         for (;;) {
 5             int c = ctl.get();
 6             int rs = runStateOf(c);
 7 
 8             // Check if queue empty only if necessary.
 9             if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
10                 decrementWorkerCount();
11                 return null;
12             }
13 
14             int wc = workerCountOf(c);
15 
16             // Are workers subject to culling?
17         // timed变量用于判断是否需要进行超时控制。
18         // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
19         // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
20         // 对于超过核心线程数量的这些线程,需要进行超时控制    
21             boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
22 
23             if ((wc > maximumPoolSize || (timed && timedOut))
24                 && (wc > 1 || workQueue.isEmpty())) {
25                 if (compareAndDecrementWorkerCount(c))
26                     return null;
27                 continue;
28             }
29 
30             try {
31             /*
32              * 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null;
33              * 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。
34              *
35              */
36                 Runnable r = timed ?
37                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
38                     workQueue.take();
39                 if (r != null)
40                     return r;
41                 timedOut = true;
42             } catch (InterruptedException retry) {
43                 timedOut = false;
44             }
45         }
46     }

5 任务的提交

  • submit任务,等待线程池execute
  • 执行FutureTask类的get方法时,会把主线程封装成WaitNode节点并保存在waiters链表中, 并阻塞等待运行结果;
  • FutureTask任务执行完成后,通过UNSAFE设置waiters相应的waitNode为null,并通过LockSupport类unpark方法唤醒主线程。
 1 public class Test{
 2 
 3     public static void main(String[] args) {
 4 
 5         ExecutorService es = Executors.newCachedThreadPool();
 6         Future<String> future = es.submit(new Callable<String>() {
 7             @Override
 8             public String call() throws Exception {
 9                 try {
10                     TimeUnit.SECONDS.sleep(2);
11                 } catch (InterruptedException e) {
12                     e.printStackTrace();
13                 }
14                 return "future result";
15             }
16         });
17         try {
18             String result = future.get();
19             System.out.println(result);
20         } catch (Exception e) {
21             e.printStackTrace();
22         }
23     }
24 }

在实际业务场景中,Future和Callable基本是成对出现的,Callable负责产生结果,Future负责获取结果。

  • Callable接口类似于Runnable,只是Runnable没有返回值。
  • Callable任务除了返回正常结果之外,如果发生异常,该异常也会被返回,即Future可以拿到异步执行任务各种结果;
  • Future.get方法会导致主线程阻塞,直到Callable任务执行完成;

 5.1 submit方法

AbstractExecutorService.submit()实现了ExecutorService.submit(),可以获得执行完的返回值。而ThreadPoolExecutor是AbstractExecutorService的子类,所以submit方法也是ThreadPoolExecutor的方法。

 1     public Future<?> submit(Runnable task) {
 2         if (task == null) throw new NullPointerException();
 3         RunnableFuture<Void> ftask = newTaskFor(task, null);
 4         execute(ftask);
 5         return ftask;
 6     }
 7     public <T> Future<T> submit(Runnable task, T result) {
 8         if (task == null) throw new NullPointerException();
 9         RunnableFuture<T> ftask = newTaskFor(task, result);
10         execute(ftask);
11         return ftask;
12     }
13     public <T> Future<T> submit(Callable<T> task) {
14         if (task == null) throw new NullPointerException();
15         RunnableFuture<T> ftask = newTaskFor(task);
16         execute(ftask);
17         return ftask;
18     }

通过submit方法提交的Callable或者Runnable任务会被封装成了一个FutureTask对象。通过Executor.execute方法提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法。

5.2 FutureTask对象

类图

内部状态

    /**
     *...
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

内部状态的修改通过sun.misc.Unsafe修改。

get方法

1     public V get() throws InterruptedException, ExecutionException {
2         int s = state;
3         if (s <= COMPLETING)
4             s = awaitDone(false, 0L);
5         return report(s);
6     }

内部通过awaitDone方法对主线程进行阻塞,具体实现如下:

 1     /**
 2      * Awaits completion or aborts on interrupt or timeout.
 3      *
 4      * @param timed true if use timed waits
 5      * @param nanos time to wait, if timed
 6      * @return state upon completion
 7      */
 8     private int awaitDone(boolean timed, long nanos)
 9         throws InterruptedException {
10         final long deadline = timed ? System.nanoTime() + nanos : 0L;
11         WaitNode q = null;
12         boolean queued = false;
13         for (;;) {
14             if (Thread.interrupted()) {
15                 removeWaiter(q);
16                 throw new InterruptedException();
17             }
18 
19             int s = state;
20             if (s > COMPLETING) {
21                 if (q != null)
22                     q.thread = null;
23                 return s;
24             }
25             else if (s == COMPLETING) // cannot time out yet
26                 Thread.yield();
27             else if (q == null)
28                 q = new WaitNode();
29             else if (!queued)
30                 queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
31                                                      q.next = waiters, q);
32             else if (timed) {
33                 nanos = deadline - System.nanoTime();
34                 if (nanos <= 0L) {
35                     removeWaiter(q);
36                     return state;
37                 }
38                 LockSupport.parkNanos(this, nanos);
39             }
40             else
41                 LockSupport.park(this);
42         }
43     }
  1. 如果主线程被中断,则抛出中断异常;
  2. 判断FutureTask当前的state,如果大于COMPLETING,说明任务已经执行完成,则直接返回;
  3. 如果当前state等于COMPLETING,说明任务已经执行完,这时主线程只需通过yield方法让出cpu资源,等待state变成NORMAL;
  4. 通过WaitNode类封装当前线程,并通过UNSAFE添加到waiters链表;
  5. 最终通过LockSupport的park或parkNanos挂起线程。

run方法

 1     public void run() {
 2         if (state != NEW ||
 3             !UNSAFE.compareAndSwapObject(this, runnerOffset,
 4                                          null, Thread.currentThread()))
 5             return;
 6         try {
 7             Callable<V> c = callable;
 8             if (c != null && state == NEW) {
 9                 V result;
10                 boolean ran;
11                 try {
12                     result = c.call();
13                     ran = true;
14                 } catch (Throwable ex) {
15                     result = null;
16                     ran = false;
17                     setException(ex);
18                 }
19                 if (ran)
20                     set(result);
21             }
22         } finally {
23             // runner must be non-null until state is settled to
24             // prevent concurrent calls to run()
25             runner = null;
26             // state must be re-read after nulling runner to prevent
27             // leaked interrupts
28             int s = state;
29             if (s >= INTERRUPTING)
30                 handlePossibleCancellationInterrupt(s);
31         }
32     }

FutureTask.run方法是在线程池中被执行的,而非主线程

  1. 通过执行Callable任务的call方法;
  2. 如果call执行成功,则通过set方法保存结果;
  3. 如果call执行有异常,则通过setException保存异常。

6 Executors类

 Exectors工厂类提供了线程池的初始化接口,主要有如下几种:

newFixedThreadPool

1     public static ExecutorService newFixedThreadPool(int nThreads) {
2         return new ThreadPoolExecutor(nThreads, nThreads,
3                                       0L, TimeUnit.MILLISECONDS,
4                                       new LinkedBlockingQueue<Runnable>());
5     }

创建一个固定大小、任务队列容量无界(Integer.MAX_VALUE)的线程池,其中corePoolSize =maximumPoolSize=nThreads,阻塞队列为LinkedBlockingQuene。

注意点:

  1. 线程池的线程数量达corePoolSize后,即使线程池没有可执行任务时,也不会释放线程;
  2. 线程池里的线程数量不超过corePoolSize,这导致了maximumPoolSizekeepAliveTime将会是个无用参数 ;
  3. 由于使用了无界队列, 所以FixedThreadPool永远不会拒绝, 即饱和策略失效。

newSingleThreadExecutor

1     public static ExecutorService newSingleThreadExecutor() {
2         return new FinalizableDelegatedExecutorService
3             (new ThreadPoolExecutor(1, 1,
4                                     0L, TimeUnit.MILLISECONDS,
5                                     new LinkedBlockingQueue<Runnable>()));
6     }

只有一个线程来执行无界任务队列的单一线程池。如果该线程异常结束,会重新创建一个新的线程继续执行任务,唯一的线程可以保证所提交任务的顺序执行。由于使用了无界队列, 所以SingleThreadPool永远不会拒绝,即饱和策略失效。与newFixedThreadPool(1)的区别在于单一线程池的大小不能再改变。

newCachedThreadPool

1     public static ExecutorService newCachedThreadPool() {
2         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3                                       60L, TimeUnit.SECONDS,
4                                       new SynchronousQueue<Runnable>());
5     }

创建一个大小无界的缓冲线程池。任务队列是一个同步队列。缓冲线程池适用于执行耗时较小的异步任务。池的核心线程数=0 最大线程数=Integer.MAX_VLUE。与前两种稍微不同的是:

  1. 任务加入到池中,如果池中有空闲线程,则用空闲线程执行,如无则创建新线程执行。
  2. 池中的线程空闲超过60秒,将被销毁释放。
  3. 池中的线程数随任务的多少变化。

newScheduledThreadPool

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

能定时执行任务的线程,池的核心线程数由参数指定。和前面3个线程池基于ThreadPoolExecutor类实现不同的是,它基于ScheduledThreadPoolExecutor实现。

7 线程池的监控

 可以使用ThreadPoolExecutor以下方法:

  • getTaskCount:线程池已经执行的和未执行的任务总数;
  • getCompletedTaskCount:线程池已完成的任务数量,该值小于等于taskCount;
  • getLargestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过,也就是达到了maximumPoolSize;
  • getPoolSize:线程池当前的线程数量;
  • getActiveCount:当前线程池中正在执行任务的线程数量。
原文地址:https://www.cnblogs.com/aaron-shu/p/6436036.html