【Java线程安全】 同步队列 AQS

模型

  只懂volatile和CAS是不是可以无视concurrent包了呢,下面跟大家介绍整个Current包的核心,AQS,看这篇文章要注意场景:我们所看到的都是一个线程的视觉,所以每一个非原子操作,外界都可能发生了变化。首先是AQS的整个构造,上图:

  就是上图的构造,结合volatile(上图的volatile int state),CAS(各种如compareAndSetTail()方法),for(;;)自旋,整个concurrent包的灵魂就出来了。这里明确一下,state是多线程访问的。

  state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

  Node结点是对每一个访问同步代码的线程的封装,其包含了需要同步的线程本身以及线程的状态,如是否被阻塞,是否等待唤醒,是否已经被取消等。变量waitStatus则表示当前被封装成Node结点的等待状态,共有4种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。

  • CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
  • SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
  • CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
  • PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
  • 0状态:值为0,代表初始化状态。

   AQS在判断状态时,通过用waitStatus>0表示取消状态,而waitStatus<0表示有效状态。

  初认识AQS只需要知道用它分获取和释放两种操作,每个操作分为独占和共享,也就是acqurie、release、acquireShared、releaseshare这四个方法,可以多个子类同步器,如CyclicBarrire

记录文:

独占锁

  

  还有得到个好东西,一图明确acquire()方法,如下图:

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

   其实比较关心的就是如何去获取锁的,state的值一般代表占取的资源数,所以很明显如果没有争夺那就是0,所以tryAcquire就是想获取,成功的标志是把state变成1,当然这个方法公平锁和非公平锁的实现是不一样的。

 /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

附带acquire()的总结:

  1. 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  2. 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

  这里有个有趣的问题就是在加入等待队列之后,线程是否应该进入wait状态【图中的park(),其实是用Unsafe.unpark实现的。unpark使得线程不需要获取锁进入wait状态】,加入队尾后,利用acquireQueued休息:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;//标记是否成功拿到资源
    try {
        boolean interrupted = false;//标记等待过程中是否被中断过
        
        //又是一个“自旋”!这里自旋什么时候会跳出呢?看return可知,只有在获取到了锁会退出,如果获取不到,那么就会被unpark把线程带到wait状态。
      //当unpark唤醒后会继续自旋看自己是否是老二,一般情况下就是了,然后退出循环,返回中断状态,用作补偿处理,比较wai是不响应中断的
for (;;) { final Node p = node.predecessor();//拿到前驱 //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。 if (p == head && tryAcquire(arg)) { setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。 p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了! failed = false; return interrupted;//返回等待过程中是否被中断过 } //如果自己可以休息了,就进入waiting状态,直到被unpark() if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true } } finally { if (failed) cancelAcquire(node); } }

   acquireQueued方法是如何找到安全点的呢,找安全点其实就是找到一个前驱节点,该节点的waitStatus == Node.SIGNAL,而这个整型值为-1,这个标志的Node遵守一个协议就是该节点在释放锁的时候会唤醒下一个需要唤醒的节点(不一定是后继节点);如果找不到,就会在自旋中变为直接争取锁,结合上面和下面代码:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;//拿到前驱的状态
    if (ws == Node.SIGNAL)
        //如果已经告诉前驱拿完号后通知自己一下,那就可以安心休息了
        return true;
    if (ws > 0) {
        /*
         * 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。
         * 注意:那些放弃的结点,由于被自己“加塞”到它们前边,它们相当于形成一个无引用链,稍后就会被保安大叔赶走了(GC回收)!
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
         //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

   介绍下waitStatus的状态取:

  1、如果pred的waitStatus == 0,则通过CAS指令修改waitStatus为Node.SIGNAL。
  2、如果pred的waitStatus > 0,表明pred的线程状态CANCELLED,需从队列中删除。
  3、如果pred的waitStatus为Node.SIGNAL,则通过LockSupport.park()方法把线程A挂起,并等待被唤醒。
      /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

  我们得知,如果waitStatus为-1那么就可以让线程upark进入等待状态(同thread.wait()),那么CLH是如何办到唤醒的呢?答案在realease()方法上。

独占锁释放release(int):

  该方法的目的是用unpark()唤醒等待队列中最前边的那个未放弃线程,问题是所有的线程(不包括头节点线程,因为是它需要释放锁)要么被中断放弃,要么在等待被唤醒,那么该如何唤醒?就是用到waitStatus,首先尝试释放首节点,如果释放失败,那么就从tail开始,一直往前找,如果找到就Node s变量用记下来,找到多个就把最后的覆盖前面的,那么遍历到头节点后,自然s记录的就是最前的需要唤醒的节点了。

  假设下面的H代表要释放锁的head节点,而它需要找到最接近它的等待的节点O并释放该O,算法是先尝试释放H的下一个节点,如果是X,那么就从队列最后开始遍历,用两个指针找到第一个O

  H X O O X X O O O O O O X

  这里也可以理解为两个指针,用一个去遍历,另一个记录有效结果,知道遍历完就知道最合适的是哪个节点了,结合代码就很好看懂:

  

private void unparkSuccessor(Node node) {
    //这里,node一般为当前线程所在的结点。
    int ws = node.waitStatus;
    if (ws < 0)//置零当前线程所在的结点状态,允许失败,使得找节点只需找<0的,可以跳过头节点。
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;//找到下一个需要唤醒的结点s,先从头结点开始,不对就从尾部开始往前找到最前的
    if (s == null || s.waitStatus > 0) {//如果为空或已取消
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//唤醒
}

   

  从以上代码可以看出,唤醒这个操作是当前线程做的,由获取锁的线程去唤醒下一个线程;这里只需要LocakSupport.unpark,当被唤醒的线程成功后,就会设置自己为head,然后退出代码块,具体见acquireQueue方法注释。

共享获取

  这个其实和acquire差不多,只不过获取的status是多个,而且获取失败后直接继续唤醒后调用doAcquireShare

 public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

  而doAcquireShare方法和acquireQueued方法是差不多的,具体看如下代码注释:

 /**
     * Acquires in shared uninterruptible mode.
     * @param arg the acquire argument
     */
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);//首先保证入队
        boolean failed = true;
        try {
            boolean interrupted = false;  //追踪中断状态
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {    //如果是头结点那么久尝试获取资源,然后根据资源情况决定是否唤醒后面线程
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);  //r>0,唤醒后面线程
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
          //这里也是找安全点
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

  再看看该方法是如何继续唤醒后面线程的,这里注意,唤醒线程有两种情况,第一种是头结点获取到锁后还有资源所以唤醒后面的一起共享,第二种情况是获得锁的线程释放资源的时候去唤醒后面的节点,如下代码所示,doReleaseShared方法就是用作唤醒线程的。

  另外,在众多代码中我们都看到interrupted的标记位,可以看出,等待过程是不响应中断的,只会在后期补上中断响应,而中断响应是线程自身决定的。

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; 
    setHead(node);//head指向自己
     //如果还有剩余量,继续唤醒下一个邻居线程
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

释放共享

  该方法同样简单,尝试释放资源,否则唤醒后面线程,这里我们同样看到了doReleaseShared方法

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//尝试释放资源
        doReleaseShared();//唤醒后继结点
        return true;
    }
    return false;
}

  代码:

  这里又用到了unparkSuccessor方法,首先判断头结点的waitStates是否为SIGNAL标志,是就设置为0,代表已经不再需要资源,然后自旋(for ;;)调用unparkSusseor,直到后继线程获取成功,后继线程获取成功会被唤醒,唤醒后后继线程第一件事就是在acquireQueue中自旋内部把自己设置为头结点,从而导致head引用(注意,head引用是各个线程共用的)发生变化,这样一来,这个释放的线程节点就能退出循环,代表资源释放完毕。

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h);//唤醒后继
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)// head发生变化,当唤醒成功后,老二结点会自动替换head为自己,所以这里就不再相等,释放成功
            break;
    }
}

  head发生变化,当唤醒成功后,老二结点会自动替换head为自己,所以这里就不再相等,释放成功,请参看appendQueue代码注释、

线程状态转换  

  另外因为复习该知识点通常要知道一下线程的状态和之间的关系,所以再贴一张图,由此特别说明一下,unpark方法是不加锁进入waiting的唯一方法


原文地址:https://www.cnblogs.com/iCanhua/p/8965604.html