AQS使用及原理

1、AQS是AbstractQueuedSynchronizer的简称。提供用于实现阻塞锁和同步器框架(信号量、事件等),依靠先入先出(FIFO)等待队列。AQS为一系列同步器依赖于一个单独的原子变量(state)的同步器提供了一个非常有用的基础。AQS对于state的操作都是基于CAS操作,保证了state的原子性和可见性。

state使用

AQS 提供了三种操作state的方法

  • getState()
  • setState(int)
  • compareAndSetState(int, int)

具体源码如下

protected final int getState() {
    return state;
}
    
protected final void setState(int newState) {
state = newState;
}
    
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
AQS资源

AQS定义了两种获取资源的方式,独占(exclusive)和共享(Shared)两种方式,使用AQS值需要子类实现了以下方法的一部分或全部,如果没有实现而调用会报UnsupportedOperationException异常

  • tryAcquire(int) 尝试获取独占资源
  • tryRelease(int) 尝试释放独占资源
  • tryAcquireShared(int) 尝试获取共享资源
  • tryReleaseShared(int) 尝试释放共享资源
  • isHeldExclusively() 该线程是否正在独占资源。只有用到condition才需要去实现它。
AQS的其他重要方法
  • acquire(int arg) 在独占模式中获得,忽略中断。
  1. 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  2. 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
public final void acquire(int arg) {
    if (!tryAcquire(arg) && // 如果没有拿到锁
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        //提供中断当前线程的方法
        selfInterrupt();
}

/**将当前线程加入到等待队列的队尾,并返回代表当前线程的节点
 * Creates and enqueues node for current thread and given mode.
 */
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);//EXCLUSIVE(独占)和SHARED(共享)
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {// 直接放到队尾。
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);// 按正常逻辑入队列
    return node;
}


/**将node加入队尾
 * Inserts node into queue, initializing if necessary. See picture above.
 * @param node the node to insert
 * @return node's predecessor
 */
private Node enq(final Node node) {
    for (;;) { // CAS + 循环 = 自旋
        Node t = tail;
        if (t == null) { // Must initialize 队列为空
            if (compareAndSetHead(new Node()))// 创建一个空的标志结点作为head结点
                tail = head; // tail 和 head都是同一个节点
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) { // 放入tail尾部
                t.next = node;
                return t;
            }
        }
    }
}

/**
 * 线程阻塞等待获取锁
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false; // 当前线程释放中断的标志位
        for (;;) {// 循环抢锁
            final Node p = node.predecessor(); // 获取前一个节点
            if (p == head && tryAcquire(arg)) { // 如果前一个节点是head,就尝试抢锁,并且尝试抢锁成功
                setHead(node); // 更换head,即当前Node出队列
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&// 检查状态,是否需要挂起线程
                parkAndCheckInterrupt())   // 挂起
                interrupted = true;     // 如果出现中断,则修改标记
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  • release(int) 释放独占模式。
  1. 一般来说,独占都是可以释放成功,但是得判断该线程释放完全释放
public final boolean release(int arg) {
    if (tryRelease(arg)) {//尝试释放
        Node h = head; // 从头开始找
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h); // 唤醒下一个线程
        return true;
    }
    return false;
}

/** 唤醒等待者
 * Wakes up node's successor, if one exists.
 *
 * @param node the node
 */
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus; // 正在释放锁的线程节点状态
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0); // 修改当前节点状态

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next; // 找下一个节点
    if (s == null || s.waitStatus > 0) { // 如果不存在或者被取消了,继续寻找合适的下一个节点
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null) // 如果找到了合适的节点,就唤醒它
        LockSupport.unpark(s.thread);
}
  • acquireShared(int) 在共享模式中获取,忽略中断。
  1. 只要没有独占资源,该方法都应该成功,需要注意唤醒线程时需要判断队列中的下个线程是否时共享资源,如果时,也要同时唤醒。
/** 线程获取共享资源的入口
 */
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0) // 判断量够不够
        doAcquireShared(arg); // 没拿到资源,需要等待
}

/**等待..
 * Acquires in shared uninterruptible mode.
 * @param arg the acquire argument
 */
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED); // 以共享模式加入队列尾部
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) { // 自旋
            final Node p = node.predecessor(); // 前置节点
            if (p == head) { // 如果前置为head
                int r = tryAcquireShared(arg); // 尝试获取资源,返回资源剩余的数量
                if (r >= 0) { // 拿到资源
                    setHeadAndPropagate(node, r); // 修改head节点
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

  • releaseShared(int)释放共享资源
  1. 释放共享资源都能成功
/** 线程释放共享资源
*/
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//尝试
        doReleaseShared();//执行
        return true;
    }
    return false;
}

/** 共享模式下 - 唤醒当前head节点的后续节点
 */
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) { // 判定是否还有后续节点
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) { // 如果状态为SIGNAL,代表需要通知后续节点
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))// 修改状态为0,通知一次
                    continue;            // loop to recheck cases 修改失败,代表已经通知,继续处理
                unparkSuccessor(h); // 唤醒
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 通知过后,修改节点状态为PROPAGATE
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed 知道其他的节点,把这个head挤下来,它才跳出循环
            break;
    }
}

参考:https://www.jianshu.com/p/da9d051dcc3d

原文地址:https://www.cnblogs.com/yz-yang/p/11638970.html