AQS源码解析

AbstractQueuedSynchronizer简称(AQS)是同步组件的基础框架,可以理解为是一个模版类,AQS以模版方法定义获取和释放同步状态的语义,并且通过队列管理获取同步状态失败的线程。本篇通过解析ReentrantLock重入锁解析AbstractQueuedSynchronizer(AQS)。

一、AQS主要内部结构

1. 同步状态变量

/**
 * The synchronization state.
 */
private volatile int state;

volatile类型对int值,类似于计数器,在不同的同步组件中有不同含义,在ReentrantLock中,当state为0时说明锁不被任何线程持有,大于0时表示锁被同一个线程重入了state次,因为可能会出现并发的情况,所以使用volatile修饰。

2. 同步等待队列

AQS中的等待队列是一个带有头尾节点的双向链表。

    /**
     * Head of the wait queue, lazily initialized.  Except for
     * initialization, it is modified only via method setHead.  Note:
     * If head exists, its waitStatus is guaranteed not to be
     * CANCELLED.
     */
    private transient volatile Node head;//指向队首节点的指针

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;//指向队尾节点的指针
static final class Node {

        //等待状态
        volatile int waitStatus;
        
        //前一个节点
        volatile Node prev;
        
        //后一个节点
        volatile Node next;
        
        /**
        * 当前节点代表的线程
        * 0:初始化状态;
        * -1(SIGNAL):当前结点表示的线程在释放锁后需要唤醒后续节点的线程;
        * 1(CANCELLED):在同步队列中等待的线程等待超时或者被中断,取消继续等待;
        */
        volatile Thread thread;
    }

同步等待队列特性:

  • 先进先出队列,获取锁失败的队列将构造节点并加入队列的尾部
  • 队列的首节点可以表示正在获取锁的线程
  • 当前线程释放锁之后将尝试唤醒后续处于阻塞状态的线程

3. 持有同步状态的线程的标示

private transient Thread exclusiveOwnerThread;

继承自父类AOS的属性,用来标示锁被哪个线程持有。

二、ReentrantLock重入锁

重入的意思是线程1执行ReentrantLock.lock()方法获取到锁之后再次执行lock()方法获取锁,线程不会阻塞,而是会增加重入次数。

//demo
public class Main {

    ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        new Main().m1();
    }

    void m1(){
        lock.lock();
        System.out.println("m1 running!");
        m2();
        lock.unlock();
    }

    void m2(){
        lock.lock();
        System.out.println("m2 running!");
        lock.unlock();
    }
}

执行结果

m1 running!

m2 running!

可以看到同一个线程两次获取同一个锁时没有发生阻塞。

ReentrantLock源码分析

1. ReentrantLock()构造函数

我们一般通过new ReentrantLock()构造一个重入锁,ReentrantLock有一下两种构造方法:

//无参构造函数,默认为非公平锁
public ReentrantLock() {
        sync = new NonfairSync();
    }
//bool类型入参,定义是否为公平锁
public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

可以看到关键的两个类是Sync和NonfairSync,并且NonfairSync是Sync的子类,接下来我们深入了解。

1.1 Sync
public class ReentrantLock implements Lock, java.io.Serializable {
    
    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
        
        abstract void lock();

        final boolean nonfairTryAcquire(int acquires) {
            ...
        }

        protected final boolean tryRelease(int releases) {
            ...
        }

    }

可以看到sync是ReentrantLock的一个属性,并且继承了AQS。

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
        ...
    }

AQS继承了AbstractOwnableSynchronizer(AOS)。

1.2 Sync子类(FairSync/NonfairSync)
public class ReentrantLock implements Lock, java.io.Serializable {

    ...
    
    static final class NonfairSync extends Sync {

        final void lock() {
            ...
        }

        protected final boolean tryAcquire(int acquires) {
            ...
        }
    }

NonfairSync是ReentrantLock的内部类,并且继承了Sync接口。
同样继承了Sync接口的还有FairSync:

public class ReentrantLock implements Lock, java.io.Serializable {

    static final class FairSync extends Sync {
        
        final void lock() {
            ...
        }

        protected final boolean tryAcquire(int acquires) {
            ...
        }
    }
}

分析到这里我们可以总结出以下几点:

  • FairSync、NonfairSync是Sync的子类
  • Sync继承AQS,AQS继承AOS
  • Sync、FairSync、NonfairSync是ReentrantLock的内部类

得到以下UML图

2. lock()方法

通过执行lock()方法加锁:

public void lock() {
        sync.lock();
    }
2.1 非公平锁加锁

以下是NonfairSync的lock()方法源码:

final void lock() {
    if (compareAndSetState(0, 1))
        //获取锁成功
        setExclusiveOwnerThread(Thread.currentThread());
    else
        //获取锁失败
        acquire(1);
}

可以看到首先调用了父类AQS的compareAndSetState方法,也就是我们平时说的CAS,源码如下:

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

其中,unsafe是sun.misc包下的原子操作类,stateOffset是state在内存中的偏移量,可以间接拿到state的值,expect是预期的state的值,这个方法的含义是:如果state=expect,将state的值更新为update,并且返回true,否则返回false。

unsafe.compareAndSwapInt保证了原子性,就算有多个线程同时执行这个方法,也只有一个线程能成功,也就是将状态从0置为1,即成功获取锁。

状态更新成功后,执行AOS下的setExclusiveOwnerThread方法,具体实现如下:

protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

可以看出获取锁成功后,将当前线程标记为持有锁的线程。

如果compareAndSetState执行结果为false,也就是获取锁失败,接下来执行acquire(1),acquire源码如下:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

在该方法中,主要的逻辑都在if的判断条件中,接下来,我们要对tryAcquire、addWaiter、acquireQueued这三个方法进行深入解析。

2.1.1 尝试获取锁 tryAcquire()

tryAcquire是AQS中定义的钩子方法,如下:

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

方法默认会抛出异常,强制同步组件通过扩展AQS实现同步功能时必须重写该方法,ReentrantLock在非公平模式下对该方法的实现如下:

    static final class NonfairSync extends Sync {

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

调用了nonfairTryAcquire()方法,具体实现以及说明如下:

    final boolean nonfairTryAcquire(int acquires) {
    
        //获取当前线程的实例
        final Thread current = Thread.currentThread();
        //获取当前锁被重入的次数
        int c = getState();
        if (c == 0) {
            //如果state为0,说明当前锁未被任何线程持有,尝试以CAS方式获取锁
            if (compareAndSetState(0, acquires)) {
                //获取锁成功,将当前线程标记为持有锁的线程
                setExclusiveOwnerThread(current);
                //获取锁成功,并且非重入
                return true;
            }
        }
        //如果当前线程是持有锁的线程,说明锁被重入
        else if (current == getExclusiveOwnerThread()) {
            //计算锁被重入的次数,因为当前线程已经获取了锁,所以这个累加操作不用同步
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            //更新state
            setState(nextc);
            //获取锁成功,重入
            return true;
        }
        //当前锁被其他线程持有,获取锁失败
        return false;
    }

我们回退到上一步的acquire方法:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

如果tryAcquire返回true,也就是获取锁成功,方法将直接返回,如果失败将执行后半部分的判断逻辑,我们先看addWaiter(Node.EXCLUSIVE)。

2.1.2 获取锁失败的线程加入同步队列 addWaiter()

具体实现:

    private Node addWaiter(Node mode) {
    
        //创造一个新节点,将当前线程实例封装在内部,此处mode为null
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

因为中间部分代码冗余,直接进入enq()方法,具体实现:

    private Node enq(final Node node) {
    
        //for循环确保所有获取锁失败的线程都能够加入同步队列
        for (;;) {
            //t指向队尾节点,如果队列为空,那么t=tail=NULL
            Node t = tail;
            if (t == null) { // Must initialize //队列为空
                //CAS方式设置新节点为队首节点,当队首节点为NULL时设置成功
                if (compareAndSetHead(new Node()))
                    //当前队列只有一个节点,所以队尾节点=队首节点
                    tail = head;
            } else {    //队列不为空
                //将当前节点放置队尾
                node.prev = t;
                //CAS方式设置队尾指针指向当前节点,当t(获取到的队尾指针)=tail(实际的队尾指针)时,设置成功
                if (compareAndSetTail(t, node)) {
                    //尾节点的next指针指向当前节点
                    t.next = node;
                    return t;
                }
            }
        }
    }

外层的for循环确保所有获取锁失败的线程都能够加入同步队列,当同步队列为空时,先插入一个空节点作为队首节点,这是由于当前线程所在的节点不能插入空队列,因为阻塞的线程是由前驱节点唤醒的。整个过程如下图所示:

接下来我们考虑一下线程安全的问题:

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // step:1
                if (compareAndSetHead(new Node()))  // step:2
                    tail = head;    // step:3
            } else {
                node.prev = t;  // step:4
                if (compareAndSetTail(t, node)) {   // step:5
                    t.next = node;  // step:6
                    return t;
                }
            }
        }
    }
  • 当同步队列为空时,step:1通过,假设step:2执行成功,插入了一个节点为队首节点,也就是head不为null,如果在step:3执行前有其他线程进入该方法,compareAndSetHead将会一直返回false,所以step:3不做同步处理也不会有线程安全的问题。
  • 当同步队列不为null时,假设step:5执行成功,则可以确认step:4的操作是正确的,也就意味着我们获取到的队尾指针就是实际的队尾指针,此时 step:5已经将队尾指针指向当前节点,其他线程执行step:5将返回false,所以step:6何时执行也不会有线程安全的问题。
2.1.3 线程加入同步队列后 acquireQueued()

源码如下:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            //死循环,确保线程只有获取锁才能跳出循环
            for (;;) {
                //获取当前节点的前一个节点
                final Node p = node.predecessor();
                //如果前一个节点是头节点,并且当前线程成功获取锁
                //说明:前一个节点是头节点,头节点表示当前正在占有锁的线程,占有锁的线程释放锁之后,会通知后续的阻塞节点,阻塞节点被唤醒后尝试获取锁
                if (p == head && tryAcquire(arg)) {
                    //将当前节点设置为头节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&    //判断是否要阻塞当前线程
                    parkAndCheckInterrupt())    //阻塞当前线程
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

获取锁失败后,执行shouldParkAfterFailedAcquire()方法判断是否要阻塞当前线程,源码如下:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

        int ws = pred.waitStatus;
        
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

可以看到针对前驱节点的不同状态会进行不同的处理

  • pred状态为SIGNAL时,返回true,表示要阻塞当前线程
  • pred状态为CANCELLED时,说明前驱节点取消了等待,这种情况下不应该获取锁,而是向前回溯到一个没有取消等待的节点,然后将自身连接在这个节点后面
  • pred的状态为初始化状态,则通过CAS将状态置为SIGNAL

接下来我们看一下实际阻塞线程的方法parkAndCheckInterrupt():

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

通过调用LockSupport.park()阻塞当前线程,此处不过多展开。

线程加入同步队列到成功获取锁的过程如下:

ReentrantLock非公平锁整体加锁流程如下:

2.2 非公平锁解锁

ReentrantLock解锁方法源码:

    public void unlock() {
        sync.release(1);
    }
    public final boolean release(int arg) {
        //释放锁,如果锁没有被重入,则可以被其他线程获取,返回true
        if (tryRelease(arg)) {
            Node h = head;
            //如果头节点不为空(队列不为空)并且头节点不是初始状态
            if (h != null && h.waitStatus != 0)
                //唤醒队列中被阻塞的线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    protected final boolean tryRelease(int releases) {
        //重新计算state的值
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        //如果state为0,说明释放锁的线程没有重入,释放锁后其他线程可以获取
        if (c == 0) {
            free = true;
            //清除持有锁的线程标示
            setExclusiveOwnerThread(null);
        }
        //重置state
        setState(c);
        return free;
    }

ReentrantLock整体解锁流程如下:

2.3 公平锁与非公平锁的区别

公平锁的lock方法源码:

    final void lock() {
        acquire(1);
    }

可以看出,公平锁的模式下,所有线程获取锁之前必须先进入同步队列,按照先进先出(FIFO)的顺序获取锁,对比非公平锁,少了非重入式获取锁的方法。

接下来看公平锁的获取锁方法tryAcquire():

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            //判断是否有优先级更高的节点
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

可以看到在CAS获取锁之前加了判断是否有优先级更高的节点hasQueuedPredecessors(),实现如下:

    public final boolean hasQueuedPredecessors() {

        Node t = tail; 
        Node h = head;
        Node s;
        //判断队列中是否有第二个节点,并且第二个节点的线程是不是当前线程
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

因为头节点代表当前占有锁的线程,释放锁之后唤醒下一个节点,所以第二个节点是优先级最高的节点。

2.4 总结

公平锁和非公平锁都是基于FIFO的同步队列,获得锁的顺序和加入队列的顺序一致,看起来是一种公平的模式,然而,非公平锁的非公平之处在于,线程并非加入同步队列时才有机会获取锁,回顾一下非公平锁的加锁流程,在进入同步队列前有两次加锁的机会:

  • 当前锁未被任何线程加锁,非重入式获取锁
  • 进入同步队列前,包含所有情况的获取锁方式

这两次获取锁都失败后,才会进入到阻塞队列,我们设想一种情况,正在占有锁的线程A释放锁,但是还没来得及唤醒后继线程B,此时线程C尝试获取锁,并且获取锁成功,此时线程B被唤醒,尝试获取锁但是获取失败,只能在同步队列里继续等待,在线程竞争激烈的情况下,等待队列中的线程可能迟迟获取不到锁,所以说这是非公平的。

那么既然不公平为什么大多数情况下我们都用非公平锁呢?因为非公平锁相较公平锁性能高,体现在两个方面:

  • 不必加入队列就可以获取锁,免去了出队入队、构造节点的操作,同时节省了唤醒线程的开销
  • 减少CAS竞争

参考博客:

  1. 从源码角度彻底理解ReentrantLock(重入锁)
  2. AQS源码分析(含图解详细流程)
原文地址:https://www.cnblogs.com/youtang/p/13551277.html