并发编程学习笔记(三十一、线程池源码一,工作线程Worker)

目录:

  • ThreadPoolExecutor架构及类结构
  • 工作线程Worker源码解析
  • 总结

ThreadPoolExecutor架构及类结构

从上图来说ThreadPoolExecutor的锁同步是通过AQS来实现的

注意:想要读懂ThreadPoolExecutor源码你先了解一下知识点:

  • AQS源码
  • CAS原理
  • Unsafe类
  • 位运算
  • 设计模式

工作线程Worker源码解析

从整体架构上来说ThreadPoolExecutor的Worker是继承与AQS的,我们来看看Worker类的定义。

1 private final class Worker
2         extends AbstractQueuedSynchronizer
3         implements Runnable {}

从Worker的定义来看我们不难猜到Worker需要有一些锁同步功能,所以继承了AQS;然后它本身想当成线程来使用,所以事先了Runnable接口。

接下来我们来看下Worker内部的实现:

 1 private final class Worker
 2     extends AbstractQueuedSynchronizer
 3     implements Runnable
 4 {
 5     /**
 6      * This class will never be serialized, but we provide a
 7      * serialVersionUID to suppress a javac warning.
 8      */
 9     private static final long serialVersionUID = 6138294804551838833L;
10 
11     /** 当前Worker对象正在运行的线程。如果工厂构造失败值会null */
12     final Thread thread;
13     /** 运行的初始任务,可能为null */
14     Runnable firstTask;
15     /** 每个线程完成的任务计数器 */
16     volatile long completedTasks;
17 
18     /**
19      * Creates with given first task and thread from ThreadFactory.
20      * @param firstTask the first task (null if none)
21      */
22     Worker(Runnable firstTask) {
23         // 设置锁状态为-1(runWorker方法执行前禁止中断)
24         setState(-1); // inhibit interrupts until runWorker
25         this.firstTask = firstTask;
26         // 通过线程工厂创建一个线程
27         this.thread = getThreadFactory().newThread(this);
28     }
29 
30     /** Delegates main run loop to outer runWorker  */
31     public void run() {
32         runWorker(this);
33     }
34 
35     // 一下是锁相关的方法
36     //
37     // state=0,解锁状态
38     // state=1,加锁状态
39 
40     /**
41      * 判断是否处于独占锁状态
42      */
43     protected boolean isHeldExclusively() {
44         return getState() != 0;
45     }
46 
47     /**
48      * 尝试加锁;
49      * true=加锁成功,fasle=加锁失败
50      */
51     protected boolean tryAcquire(int unused) {
52         // CAS从0变成1,也印证了上面所说的0解锁,1加锁
53         if (compareAndSetState(0, 1)) {
54             // 设置持有者为当前线程
55             setExclusiveOwnerThread(Thread.currentThread());
56             return true;
57         }
58         return false;
59     }
60 
61     /**
62      * 尝试解锁;
63      * true=解锁成功,fasle=解锁失败
64      */
65     protected boolean tryRelease(int unused) {
66         // 解锁后清空持有者
67         setExclusiveOwnerThread(null);
68         // 解锁后state=0
69         setState(0);
70         return true;
71     }
72 
73     /** 加锁,调用AQS.acquire **/
74     public void lock()        { acquire(1); }
75     /** 尝试加锁,调用AQS.tryAcquire **/
76     public boolean tryLock()  { return tryAcquire(1); }
77     /** 解锁,调用AQS.release **/
78     public void unlock()      { release(1); }
79     /** 判断当前锁是否处于加锁状态 **/
80     public boolean isLocked() { return isHeldExclusively(); }
81 
82     /**
83      * 中断,需要线程开始后才能中断
84      */
85     void interruptIfStarted() {
86         Thread t;
87         // state >=0:需要获取到锁(state = 1)
88         // !t.isInterrupted():线程未中断才能中断
89         if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
90             try {
91                 // 尝试中断线程
92                 t.interrupt();
93             } catch (SecurityException ignore) {
94             }
95         }
96     }
97 }

总结

1、线程池的Worker类实现了一个不可重进入互斥锁,其实现主要还是依靠AQS类。

2、Worker类继承自AQS,实现了加锁、解锁等方法。

3、Worker类实现了Runmable接口,重写了run方法,其主要执行逻辑依托于外部类的runWloke方法

  • 在runWorcer方法中将会获取用户提交的任务,将其放在线程池中异步执行,并维护线程池中线程的创建和回收等动作。
原文地址:https://www.cnblogs.com/bzfsdr/p/13338792.html