AbstractQueuedSynchronizer源码解析

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer同步器,是JDK同步工具的基础框架,实现核心为对state属性进行自旋、CAS原子更新,内部存在两个队列——同步队列、条件队列(·Condition·)。同步队列:所有尝试获取锁的线程会在该队列上进行排队,排队时线程通过LockSupport.park()方法阻塞,通过LockSupport.unpark()唤醒;条件队列:当线程需要的某个条件不满足时,通过调用LockSupport.park()方法阻塞进入条件队列,当条件满足时,通过调用LockSupport.unpark()方法唤醒线程,并从条件队列移除,进入到同步队列。

AQS提供了排他、共享两种模式,默认排他模式。在排他模式下,系统可以获取较高吞吐量,但是可能出现较早进来的线程一直无法获得锁,造成线程饥饿;共享模式下,系统可以保证不出现线程饥饿,但是吞吐量会下降。

队列节点内部类Node

static final class Node {
    // 共享模式
    static final Node SHARED = new Node();
    // 排他模式
    static final Node EXCLUSIVE = null;

    // 节点已被取消(因为超时或者打断)
    static final int CANCELLED =  1;
    // 等待被唤醒(当活动节点被释放或者被取消,需要唤醒它的后续节点)
    static final int SIGNAL    = -1;
    // 在等待条件出现(节点在条件队列上等待)
    static final int CONDITION = -2;
    // 下一个对	acquireShared 的调用应该无条件传播
    static final int PROPAGATE = -3;
	// 同步队列节点状态,可取值(CANCELLED/SIGNAL/CONDITION/PROPAGATE),初始值=0
    volatile int waitStatus;
	// 队列前驱
    volatile Node prev;
	// 队列后驱
    volatile Node next;
	// 当前节点线程
    volatile Thread thread;

    // 条件队列的下一个界定、或者共享模式下值为SHARED
    Node nextWaiter;
    
    /**
     * 创建同步节点
     * @param thread 当前线程
     * @param mode 共享模式 or 排他模式
     */
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    /**
     * 创建条件队列节点
     * @param thread 当前线程
     * @param waitStatus 等待状态
     */
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

关键属性

// 同步队列的头节点,通过setHead方法更新
private transient volatile Node head;
// 同步队列的尾节点,当节点入队时被更新
private transient volatile Node tail;
// 同步状态
private volatile int state;

// 自旋时间控制
static final long spinForTimeoutThreshold = 1000L;

// 获取Unsafe对象CAS操作关键属性,保证原子性
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

static {
    try {
        stateOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        headOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        tailOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        waitStatusOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("waitStatus"));
        nextOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("next"));

    } catch (Exception ex) { throw new Error(ex); }
}

获取锁或释放锁

该类为同步中获取锁和释放锁提供了模板方法,子类只需要根据自己的需要重写部分方法,即可实现同步机制。

    /**
     * 排他模式下尝试获取锁。
     * 该方法一般被acquire方法调用,如果获取失败,返回false,当前线程将执行入队操作,如果当前线程不在同      * 步队列。进入同步队列后需要等待其他线程执行release操作后将其唤醒才能出队。
     *
     * @param arg 通常传递1,如果节点是刚从条件队列唤醒,那么传递阻塞之前的state值(如重入次数)
     * @return 获取成功,返回true,否则返回false
     */
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
	 * 排他模式下释放锁
     *
     * @param arg 通常传递1,如果节点即将进入条件队列,那么传递入队时的state(如重入次数)
     * @return 如果state已经被完全释放(state = 0),返回true,否则返回false
     */
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 共享模式下尝试获取锁
     * 该方法一般被acquire方法调用,如果获取失败,返回false,当前线程将执行入队操作,如果当前线程不在同      * 步队列。进入同步队列后需要等待其他线程执行release操作后将其唤醒才能出队。
     * @param arg 通常传递1,如果节点是刚从条件队列唤醒,那么传递阻塞之前的state值(如重入次数)
     * @return 如果获取失败,返回负数,共享下返回0,成功返回整数
     */
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
     * 共享模式下释放锁
     *
     * @param arg 通常传递1,如果节点即将进入条件队列,那么传递入队时的state(如重入次数)
     * @return {@code true} if this release of shared mode may permit a
     *         waiting acquire (shared or exclusive) to succeed; and
     *         {@code false} otherwise
     */
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

    /**
	 * 排他模式返回true,共享模式返回false
     */
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }

获取锁

排他模式

/**
 * 如果获取失败,当前线程进入同步队列
 */ 
public final void acquire(int arg) {
    if (!tryAcquire(arg) && // 尝试获取
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 获取失败,入队
        selfInterrupt();
}

/**
 * 如果获取失败,当前线程进入同步队列,可中断
 */
public final void acquireInterruptibly(int arg)
    throws InterruptedException {
    // 中断检查
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg)) // 尝试获取
        doAcquireInterruptibly(arg); // 获取失败,入队
}

/**
 * 如果获取失败,当前线程进入同步队列,可中断,可超时
 */
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
    throws InterruptedException {
    // 中断检测
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) || // 尝试获取
        doAcquireNanos(arg, nanosTimeout); // 获取失败,调用该方法,入队
}

/**
 * 创建同步节点,并加入到同步队列
 * @param mode 共享模式 or 排他莫斯 
 */ 
private Node addWaiter(Node mode) {
    // 创建同步节点
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // 队列已初始化
    if (pred != null) {
        node.prev = pred;
        // CAS原子修改同步队列尾节点为新建节点
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 队列未初始化、或者第一次入队失败入队新建节点会走到这里
    enq(node);
    return node;
}

/**
 * 自旋+CAS入队节点
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        // 如果队列未初始化,初始队列的头尾节点
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // CAS入队节点
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

/**
 * 排他模式下,入队节点成功后会阻塞当前线程,并从尾部寻找到头节点,并唤醒尝试获取锁
 *
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 从尾节点向头节点遍历
            final Node p = node.predecessor();
            
            // 第一次执行该方法,或者第二次循环,由于第一次执行去除中间已取消节点,或者中间有节点成功获             // 取锁,执行成功退出了队列,导致当前节点成为了第二个节点,那么这里if成立
            
            // 如果遍历到头节点,则让当前节点尝试获取锁
            if (p == head && tryAcquire(arg)) {
                // 获取成功,设置头节点为当前
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // shouldParkAfterFailedAcquire 第一次先更新前驱节点状态为 SIGNAL,第二次循环进入会执行			// parkAndCheckInterrupt阻塞当前线程,直到被唤醒执行循环,
            if (shouldParkAfterFailedAcquire(p, node) &&  // 前驱p状态为SIGNAL则返回true
                parkAndCheckInterrupt()) // 阻塞当前线程,如果线程已被中断,返回true
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

/**
 * 是否阻塞当前活动线程,从当前节点向前找到第一个未取消到的节点,移除遍历过程中已被取消的节点,
 * 并更新第一个未取消节点的状态为 SIGNAL.
 * 当前驱节点状态为 SIGNAL 返回true,否则返回false
 */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 前驱节点等待唤醒
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;
    // 前驱节点已经被取消,向前寻找一个未取消的节点
    if (ws > 0) {
        // 移除队列中已被取消的节点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } 
    // 到这里说明前驱节点的状态为0或者-3  waitStatus = 0 or waitStatus = PROPAGETE
    else {
		// 更新前驱节点状态为 等待唤醒
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

/**
 * 阻塞当前活动线程,返回当前线程是否已被打断
 */
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

/**
 * 可中断,
 */
private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 可中断则检测到中断立即抛出异常
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
    // 超时时间已到
    if (nanosTimeout <= 0L)
        return false;
    // 最长等待到这个时间
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            // 当前节点是头节点的后驱节点
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            // 超时剩余多少
            nanosTimeout = deadline - System.nanoTime();
            // 超时时间已到
            if (nanosTimeout <= 0L)
                return false;
      
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold) // 如果超时时间大于最大自旋时间,那么阻塞当前线程,否则自旋
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                // 检测到中断,抛出异常
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

共享模式

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}

/**
 * 获取共享锁
 */ 
private void doAcquireShared(int arg) {
    // 创建同步节点,并加入同步节点
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 当前节点的前驱
            final Node p = node.predecessor();
            // 当前节点的前驱节点是头节点,则当前节点线程尝试获取
            if (p == head) {
        		// 尝试获取
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
          
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

释放锁

排他模式

public final boolean release(int arg) {
    // 尝试释放
    if (tryRelease(arg)) {
        // 从头节点开始唤起节点
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 唤醒排队的下一个节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

/**
 * 唤醒后驱节点
 *
 */
private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
    	// 当前节点的后驱已被取消,则从尾节点开始查找最早一个未取消的节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
    	// 唤醒下一个后驱节点
        if (s != null)
            LockSupport.unpark(s.thread);
    }

共享模式

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}


private void doReleaseShared() {
    // 自旋,唤醒等待的第一个线程(其他线程将由第一个线程向后传递唤醒)
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                // 唤醒第一个等待线程
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

内部类条件队列ConditionObject

关键属性

// 条件队列头节点
private transient Node firstWaiter;
// 条件队列尾节点
private transient Node lastWaiter;

进入条件队列

await()

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 创建条件队列节点,移除队列中已取消节点,并入队
    Node node = addConditionWaiter();
    // 完全释放锁,调用该方法的线程必须是持锁线程
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 当前节点不在同步队列,阻塞当前线程
    while (!isOnSyncQueue(node)) {
        // 阻塞当前线程
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    
    // 向上:线程完全释放锁(重入次数将会被保存,之后唤醒线程后,再获取等次数重入)并进入条件队列,排队阻塞,等待条件出现
    //////////////////////////////////////////////
    // 向下:条件出现,sigal方法中线程从条件队列移除,线程被唤醒,继续执行
    
    // 当节点收到signal被唤醒后,重新获取阻塞前的持锁次数,然后向下执行
    
    // 进入自旋获取锁 如果线程被打断,处理中断
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 清除条件队列中已被取消的线程
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 中断处理
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

/**
 * 创建条件队列等待节点
 */
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 从头节点开始清除已取消节点
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 创建条件队列
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    // 当前新增节点为尾节点
    lastWaiter = node;
    return node;
}

/**
 * 从头部开始移除已取消节点
 * 条件队列是单链表,所以从头部开始遍历移除
 */
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        // 节点已取消
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        }
        else
            trail = t;
        t = next;
    }
}

/**
 * 完全释放锁
 */
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        // 完全释放锁
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

await(long nanosTimeout) 可超时

public final long awaitNanos(long nanosTimeout)
    throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    final long deadline = System.nanoTime() + nanosTimeout;
    int interruptMode = 0;
    // 如果超时时间大于自旋时间,那么阻塞线程;否则,线程自旋
    while (!isOnSyncQueue(node)) {
        // 已超时,节点从条件队列进入同步队列
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        // 超时时间大于自旋时间,直接阻塞
        if (nanosTimeout >= spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        // 剩余超时时间
        nanosTimeout = deadline - System.nanoTime();
    }
    // 自旋获取锁
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return deadline - System.nanoTime();
}

/**
 * 将节点从条件队列进入同步队列
 */
final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    /*
     * If we lost out to a signal(), then we can't proceed
     * until it finishes its enq().  Cancelling during an
     * incomplete transfer is both rare and transient, so just
     * spin.
     */
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

等待条件出现,唤醒条件队列中线程signal

/**
 * 条件出现,唤醒条件队列头节点
 * 如果当前线程不是持锁线程,抛出异常
 * 如果条件队列头节点不为null,唤醒头节点,为null则什么也不错
 */
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 唤醒条件队列头节点
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        // 第二个等待节点是null,则清空条件队列,lastWaiter、firstWaiter都为空
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}

/**
 * 唤醒节点,节点由条件队列转入同步队列
 */
final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 唤醒
        LockSupport.unpark(node.thread);
    return true;
}
/**
 * 唤醒全部条件队列上的节点
 */
public final void signalAll() {
    // 持锁线程不是当前线程,立即抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    // 从头节点开始逐个遍历唤醒
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

原文地址:https://www.cnblogs.com/QullLee/p/12247724.html