AQS同步队列和条件队列

static final class Node {
    //共享模式,资源可以同时去拿
    static final Node SHARED = new Node();
    //独占模式,只能有一个线程去拿
    static final Node EXCLUSIVE = null;

   //表示当前线程被中断了,在队列中没有任何意义,可以被剔除了
    static final int CANCELLED =  1;
    /**
     * 后继节点的线程处于等待状态,而当前节点如果释放了同步状态或者被取消,
     * 将会通知后继节点,使后继节点得以运行
     */
    static final int SIGNAL    = -1;

    /**
     * 节点在等待队列中,节点的线程等待在Condition上,当其他线程对Condition调用了signal()方法后,
     * 该节点会从等待队列中转移到同步队列中,加入到同步状态的获取中
     */
    static final int CONDITION = -2;
    /**
     * 表示下一次共享方式同步状态获取将会被无条件的传播下去
     */
    static final int PROPAGATE = -3;

    /**
     * 标记当前节点的信号量状态(1,0,-1,-2,-3)5种状态
     * 使用CAS更改状态,volatile保证线程可见性,并发场景下,
     * 即被一个线程修改后,状态会立马让其他线程可见
     */
    volatile int waitStatus;

    /**
     * 前驱节点,当前节点加入到同步队列中被设置
     */
    volatile Node prev;

    /**
     * 后继节点
     */
    volatile Node next;

    /**
     * 节点同步状态的线程
     */
    volatile Thread thread;

    /**
     * 等待队列中的后继节点,如果当前节点是共享的,那么这个字段是一个SHARED常量
     * 也就是说节点类型(独占和共享)和等待队列中的后继节点公用一个字段
     * (用在条件队列里面)
     */
    Node nextWaiter;
    }

  CLH同步队列

CLH 同步队列是一个 FIFO 双向队列,AQS 依赖它来完成同步状态的管理:

  • 当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程
  • 当同步状态释放时,会把首节点唤醒,使其再次尝试获取同步状态。

 

state为0,表示可以竞争锁。

state为1,表示无锁。可重入锁state可以++。

例如:CountDownLatch,首先通过构造函数设置state = n,需要countDown()执行n次,await()才会返回。这里用到的就是state,每当执行一次countDown(),state就-1,知道所有的子线程执行完毕,state为0,await()方法就可以返回

1、线程一和线程二cas竞争 

2、线程二竞争失败,放入同步队列。调用locksupport.park阻塞。

3、线程一执行成功释放锁,state置为0,唤醒线程二,重复1步骤。

入队操作

通过“自旋”也就是死循环的方式来保证该节点能顺利的加入到队列尾部,只有加入成功才会退出循环,否则会一直循序直到成功。 

private Node addWaiter(Node mode) {
// 以给定的模式来构建节点, mode有两种模式 
//  共享式SHARED, 独占式EXCLUSIVE;
  Node node = new Node(Thread.currentThread(), mode);
    // 尝试快速将该节点加入到队列的尾部
    Node pred = tail;
     if (pred != null) {
        node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 如果快速加入失败,则通过 anq方式入列
        enq(node);
        return node;
    }

private Node enq(final Node node) {
// CAS自旋,直到加入队尾成功        
for (;;) {
    Node t = tail;
        if (t == null) { // 如果队列为空,则必须先初始化CLH队列,新建一个空节点标识作为Hader节点,并将tail 指向它
            if (compareAndSetHead(new Node()))
                tail = head;
            } else {// 正常流程,加入队列尾部
                node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                }
            }
        }
    }

  

出队操作

同步队列(CLH)遵循FIFO,首节点是获取同步状态的节点,首节点的线程释放同步状态后,将会唤醒它的后继节点(next),而后继节点将会在获取同步状态成功时将自己设置为首节点 

private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

  

Condition条件队列

public class ConditionObject implements Condition, java.io.Serializable {    
    /** First node of condition queue. */    
    private transient Node firstWaiter; // 头节点    
    /** Last node of condition queue. */    
    private transient Node lastWaiter; // 尾节点        
    public ConditionObject() {    }    // ... 省略内部代码
}

  

Condition 将 Object 监视器方法(wait、notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意 Lock 实现组合使用,为每个对象提供多个等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。

// ========== 阻塞 ==========   
// 造成当前线程在接到信号或被中断之前一直处于等待状态。
void await() throws InterruptedException; 
// 造成当前线程在接到信号之前一直处于等待状态。
void awaitUninterruptibly(); 
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,
// 如果在`nanosTimeout` 之前唤醒,那么返回值 `= nanosTimeout - 消耗时间` ,如果返回值 `<= 0` ,
//则可以认定它已经超时了。
long awaitNanos(long nanosTimeout) throws InterruptedException; 
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
boolean await(long time, TimeUnit unit) throws InterruptedException; 
// 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回 // true ,否则表示到了指定时间,返回返回 false 。
boolean awaitUntil(Date deadline) throws InterruptedException; 
// ========== 唤醒 ==========
// 唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。 pthread_cond_signal
void signal(); 
// 唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。
void signalAll(); 

  

例子:

Condition.await()  CLH队列首部出队,入队condition队列尾部
Condition.signal()  condition队列首部唤醒出队,入队CLH队列尾部

 

 入队

public final void await() throws InterruptedException {
    // 当前线程中断
    if (Thread.interrupted())
        throw new InterruptedException();
    //当前线程加入等待队列
    Node node = addConditionWaiter();
    //释放锁
    long savedState = fullyRelease(node);
    int interruptMode = 0;
    /**
     * 检测此节点的线程是否在同步队上,如果不在,则说明该线程还不具备竞争锁的资格,则继续等待
     * 直到检测到此节点在同步队列上
     */
    while (!isOnSyncQueue(node)) {
        //线程挂起
        LockSupport.park(this);
        //如果已经中断了,则退出
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    //竞争同步状态
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 清理下条件队列中的不是在等待条件的节点
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

  

private Node addConditionWaiter() {
    Node t = lastWaiter;    //尾节点
    //Node的节点状态如果不为CONDITION,则表示该节点不处于等待状态,需要清除节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        //清除条件队列中所有状态不为Condition的节点
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //当前线程新建节点,状态 CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    /**
     * 将该节点加入到条件队列中最后一个位置
     */
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

  

出队

调用 ConditionObject的 #signal() 方法,将会唤醒在等待队列中等待最长时间的节点(条件队列里的首节点),在唤醒节点前,会将节点移到CLH同步队列中。 

public final void signal() {
    //检测当前线程是否为拥有锁的独
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //头节点,唤醒条件队列中的第一个节点
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);    //唤醒
}

private void doSignal(Node first) {
    do {
        //修改头结点,完成旧头结点的移出工作
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
            (first = firstWaiter) != null);
}

 final boolean transferForSignal(Node node) {
    //将该节点从状态CONDITION改变为初始状态0,
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    //将节点加入到CLH 同步队列中去,返回的是CLH 同步队列中node节点前面的一个节点
    Node p = enq(node);
    int ws = p.waitStatus;
    //如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

  

 实现一个AQS所需要实现的方法

// 互斥模式下尝试获取锁
protected boolean tryAcquire(int arg) {
   throw new UnsupportedOperationException();
}
// 互斥模式下尝试释放锁
protected boolean tryRelease(int arg) {
   throw new UnsupportedOperationException();
}
// 共享模式下尝试获取锁
protected int tryAcquireShared(int arg) {
   throw new UnsupportedOperationException();
}
// 共享模式下尝试释放锁
protected boolean tryReleaseShared(int arg) {
   throw new UnsupportedOperationException();
}
// 如果当前线程独占着锁,返回true
protected boolean isHeldExclusively() {
   throw new UnsupportedOperationException();
}

  这里用到了一种设计模式,即模板方法模式,自定义同步器时只需要重写上面几个方法即可,AQS中其他类都是final类型的,只有这几个方法能被其它类使用。那么重写了几个方法为什么可以实现同步器呢?这是因为AQS父类已经帮我买写好了一系列操作,包括入队,出队等。

原文地址:https://www.cnblogs.com/chenfx/p/15349107.html