AQS总结

前言

AQS(Abstract Queued Synchronizer)是JUC并发包中的核心基础组件,作者是大名鼎鼎的Doug Lea。通过AQS可以实现大部分的同步需求。

宏观架构

AQS包括一个state和一个FIFO的CLH队列,如下图所示:


CLH队列中的每个节点Node就可以对应与争用该资源的线程,Node的数据结构如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
   static final class  {

static final Node SHARED = new Node();
/** 独占模式 */
static final Node EXCLUSIVE = null;
/** 当前节点已取消 */
static final int CANCELLED = 1;
/** 当前线程处于同步状态,如果取消或释放,通知下一个等待节点 */
static final int SIGNAL = -1;
/** wait 在某个condition中 */
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
/** 当前节点已取消 */
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
```


### 获取锁的过程
以使用默认构造函数的reentrantLock为例子:

```java
ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();

lock()代码如下:

1
2
3
4
5
6
7
final void lock() {
if (compareAndSetState(0, 1))
//成功获得独占的state资源
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

如果当前的state值为0,当前线程获得lock,将state的值通过cas的方式设置为1。如果不是0,则添加到队列中。通过acquire方法去申请资源。

1
2
3
4
5
6
public final void acquire(int arg) {
//tryAcquire再次尝试获取锁
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

tryAcquire:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果当前已经获取到锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

再次获取锁尝试失败后,调用addWaiter将线程封装成节点信息,加入到等待队列中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
//
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) { 大专栏  AQS总结
pred.next = node;
return node;
}
}
enq(node);
return node;
}

addWaiter首先会通过cas的方式快速的去添加到队列的尾部,如果添加不成功,调enq(node)再次入队;enq(node)是一个死循环,不断的通过cas去添加到节点,直到成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
//cas的方式
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

线程节点进入队列后,调用acquireQueued,acquireQueuedxiang

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果发现自己的前面节点是头节点,表明该节点是正拿到锁的线程,此时再次尝试获取锁资源,因为之前的线程有可能已经解锁了。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

最关键的应该是shouldParkAfterFailedAcquire方法,如果每个线程都在这么自旋的去拿锁,cpu肯定炸了。所以,当当前的前一个节点处于SIGNAL状态的时候,可以挂起当前线程。这个操作就好比在排队的时候和前一个人说:我出去买点吃的,你轮到的时候叫我一下。当前面的节点轮到的时候,会唤醒当前线程,然后又开始自旋,判断自己能否拿到同步状态,如果拿到,就获取到了锁,这就是一个完整的获得同步状态的过程。
至于如何挂起当前线程,使用的是LockSupport的park()挂起当前线程。park可以精确的进行挂起,精确到thread。

释放锁的过程

1
reentrantLock.lock(); //释放锁

释放的过程正好相反,通过release来释放锁。

1
2
3
4
5
6
7
8
9
10
11
public final boolean release(int arg) {
//如果成功的释放了资源
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒下一个节点对应的线程
unparkSuccessor(h);
return true;
}
return false;
}

tryRelease的内容主要是:获取state的值,减去要释放的值,如果state已经是0,把当前的线程设置为null。要注意的是,这里完全没有使用cas,因为当前线程还持有锁,绝对的线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

最为关键的 unparkSuccessor(h),这个时候头节点已经处于了获取同步的状态,通过unparkSuccessor(h)来唤醒头节点的后一个节点。从而后一个节点就可以自旋的去获取同步状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void unparkSuccessor(Node node) {

int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);


Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

总结

个人认为AQS在很多地方使用cas和自旋的方式,一定程度上提升吞吐率,之前看到过测试ReentrantLock的吞吐比synchronized要高很多,不对synchronized一直在优化,估计现在性能也差不多了,以后做个测试。本文只是总结了AQS的独占式的获取同步状态,还有共享式的获取同步状态,还支持很多的特性,将在后面进行总结。

原文地址:https://www.cnblogs.com/lijianming180/p/12326951.html