AQS共享锁源码分析

  一 共享锁和排他锁的不同

  排他锁中state的含义是初始值0,每次重入就加1,释放就减1,完全释放完必须是0

  共享锁state初始值是大于0的正数,表示可共享的数量,获取一次就减1,减到0则不能再获取

  二 共享锁的入口

public final void acquireShared(int arg) {
   if (tryAcquireShared(arg) < 0)
       doAcquireShared(arg);
}

  需要用户去实现的方法就是 tryAcquireShared ,该方法在jdk中的注释如下

   * @return a negative value on failure; zero if acquisition in shared
     *         mode succeeded but no subsequent shared-mode acquire can
     *         succeed; and a positive value if acquisition in shared
     *         mode succeeded and subsequent shared-mode acquires might
     *         also succeed, in which case a subsequent waiting thread
     *         must check availability. (Support for three different
     *         return values enables this method to be used in contexts
     *         where acquires only sometimes act exclusively.)  Upon
     *         success, this object has been acquired.

  当失败的时候返回的是负值,如果返回的是0表示获取共享模式成功但是它下一个节点的共享模式无法获取成功。如果返回的是正数也就是大于0,表示当前线程获取共享模式成功,并且它后面的线程也可以获取共享模式。

  当获取失败就会进入doAcquireShared

private void doAcquireShared(int arg) {
   final Node node = addWaiter(Node.SHARED);
   boolean failed = true;
   try {
       boolean interrupted = false;
       for (;;) {
           final Node p = node.predecessor();
           if (p == head) {
               int r = tryAcquireShared(arg);
               if (r >= 0) {
                   setHeadAndPropagate(node, r);
                   p.next = null; // help GC
                   if (interrupted)
                       selfInterrupt();
                   failed = false;
                   return;
              }
          }
           if (shouldParkAfterFailedAcquire(p, node) &&
               parkAndCheckInterrupt())
               interrupted = true;
      }
  } finally {
       if (failed)
           cancelAcquire(node);
  }
}

我们可以对比下排他锁的代码

  看得出来,唯一的区别就是 setHeadAndPropagate,道理也好理解,排他锁每次只能一个线程获得锁,获得后方法return,只需要等着客户端调用unlock唤醒后续的线程。

  但是共享锁每次获得锁的不止一个线程,当一个线程获得锁后,如果state>=0,则需要继续唤醒后续等待中的节点

  下面重点分析下

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
     //
h.waitStatus < 0 当一个
     //h == null || 
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {//propagate>0 表示共享锁本次释放出来的不只是一个,所以可以继续唤醒。至于 Node s = node.next; if (s == null || s.isShared())//s==null那不就是队列里没有节点了?因为只有tail的next才是null的,现在是head==tail了 doReleaseShared(); } }

  关于waitStatus,一个Node刚new出来的时候是0,如果它后面有node继续,会把它的前驱置为signal = -1,如果调用了doReleaseShared,会把头节点置为PROPAGATE= -3.

  h.waitStatus < 0

  上面的代码的意思,在本线程称为header的时候,可能有另一个线程执行了releaseShared,这样头结点的waitStatus可能会被设置为PROPAGATE,正因为有可能是这种情况,那么既然别的线程

  release了,那么现在执行一次唤醒也是合理的,只不过不能保证本次唤醒一定能让等待的线程获得锁。代码注释中也说了 

     may cause  unnecessary wake-ups

  二 释放锁

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
tryReleaseShared 是有子类去实现的,重点看看 doReleaseShared 
private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed 这里不难理解 head变了当然要再次唤醒,唤醒本身不会有啥严重后果,最多就是被唤醒的线程拿不到锁再阻塞
                break;
        }
    }
else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

  这段该怎么理解呢?只有一个节点新加入队列或者 在唤醒后续之后,才会被设置成0。如果是0这里就把他设置为PROPAGATE=-3,同时continue,让自旋再次执行

  如果在本次循环head不变那就退出循环。

  对比排他锁的释放,共享锁这里有两点

  1 释放头节点的后继,通过CAS把signal变成0

  2 如果走到第二个分支,把0 CAS成 -3 。

  这是因为共享锁锁不止一个,所以可能会在不必要的时候释放,这样SIGNAL会变成0,但是也不用担心,被唤醒的线程竞争不到还是会在把自己阻塞前把它的前驱再次置为SIGNAL的

  PROPAGATE 我搜索代码发现设置PROPAGATE地方只有这个  doReleaseShared ,把0变成-3

  然后搜索  waitStatus < 0 

  也只有

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
      
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

  我这里说一下 PROPAGATE的意义

  一个线程刚好获取到锁,那么他会执行 setHeadAndPropagate ,设置了新的head后,

  假设刚好有两个线程都在执行releaseShared

  那么第一个线程会执行

if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }

  因为只有一个线程会CAS成功,所以当第二个线程也执行这段代码

  就只会走else分支

else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;       

  再把0变成-3,continue 自旋继续,如果刚才被释放的线程没有执行到抢锁,也就是说head没变,那么两个线程执行了release方法后,线程就结束了。

  这时候继续在 setHeadAndPropagate 执行

  就会发现  h.waitStatus < 0 就满足了,那么由于两个线程释放锁,说明有可能至少还有一个资源,就应该再执行一次释放流程

  

  另外还需注意一点,共享锁的释放是一个自旋操作,因为可能不止会释放一次。每次都要判断新的head和旧的head是不是一样的,如果不一样了那就继续释放。

  要理解共享锁,还有一个小心得。排他锁当一个线程拿到锁后,旧的head其实就退出了临界区。但是共享锁不一样,后一个线程拿到锁把自己set成了head,但是其实原来的head线程还没有跑完自己的代码。

说一下enq方法

一个小小的enq方法也有值得研究的地方

 private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;//注意先设置pre
                if (compareAndSetTail(t, node)) {//tail通过CAS,如果不成功,自旋后还会再次设置pre,这样才能保证正确性
                    t.next = node;
                    return t;
                }
            }
        }
    }
原文地址:https://www.cnblogs.com/juniorMa/p/13954184.html