AQS(队列同步器)

目录导引

  一、简介

  二、源码解析(JDK8)

  三、运用示例

一、简介

  1、volatile

  volatile修饰的共享变量可以保证可见性和有序性(禁止指令重排序)。

  2、CAS:

  CAS的原理很简单,包含三个值当前内存值(V)、预期原来的值(A)以及期待更新的值(B),

  如果内存位置V的值与预期原值A相匹配,那么处理器会自动将该位置值更新为新值B,返回true。否则处理器不做任何操作,返回false。

  要实现这个需求,java中提供了Unsafe类,它提供了三个函数,分别用来操作基本类型int和long,以及引用类型Object

    public final native boolean compareAndSwapObject
       (Object obj, long valueOffset, Object expect, Object update);

    public final native boolean compareAndSwapInt
       (Object obj, long valueOffset, int expect, int update);

    public final native boolean compareAndSwapLong
      (Object obj, long valueOffset, long expect, long update);

  obj 和 valueOffset:表示这个共享变量的内存地址。这个共享变量是obj对象的一个成员属性,valueOffset表示这个共享变量在obj类中的内存偏移量。所以通过这两个参数就可以直接在内存中修改和读取共享变量值。

  expect: 表示预期原来的值。

  update: 表示期待更新的值。

  可以看出上面的方法都是native方法,比较替换的原子性是由硬件保证的,可能是由JVM调用Atomic::cmpxchg函数执行CAS操作,也可能对内存的总线加锁实现。

  关于CAS实现的详细介绍可以参与文章:https://www.iteye.com/blog/lobin-2311755

  3、AQS(AbstractQueuedSynchronizer)

  AQS也称队列同步器,核心思想是基于volatile int state变量,配合Unsafe工具对其原子性的操作来实现对当前state状态值进行修改。

  同步器内部依赖一个FIFO的双向队列来完成资源获取线程的排队工作。

  同步器主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,对同步状态的修改或者访问主要通过同步器提供的3个方法:

  getState() 获取当前的同步状态

  setState(int newState) 设置当前同步状态

  compareAndSetState(int expect,int update) 使用CAS设置当前状态,该方法能够保证状态设置的原子性

  同步器可以支持独占式的获取同步状态,也可以支持共享式的获取同步状态,这样可以方便实现不同类型的同步组件,利用同步器实现锁的语义。

  同步状态设计如图所示:

 

 二、源码解析(JDK8)

  由于很多方法复用,故只在第一次调用方法时贴出源码分析。

  内部类Node:

static final class Node {

        /** 分享模式节点 */
        static final Node SHARED = new Node();

        /** 独占模式节点 */
        static final Node EXCLUSIVE = null;

        /** 节点等待状态:线程已取消 */
        static final int CANCELLED =  1;

        /** 节点等待状态:需要唤醒后继节点 */
        static final int SIGNAL    = -1;

        /** 节点等待状态:处于等待队列 */
        static final int CONDITION = -2;

        /** 节点等待状态:共享式同步状态将会无条件的传播下去 */
        static final int PROPAGATE = -3;

        /** 节点的等待状态 */
        volatile int waitStatus;

        /** 前驱节点 */
        volatile Node prev;

        /** 后继节点 */
        volatile Node next;

        /** 节点关联的线程 */
        volatile Thread thread;

        /** 后继节点,有两种情况。
            1、共享式同步队列时,都存放同一个final对象SHARED。
            2、等待队列时,存放下一个Node */
        Node nextWaiter;

        /** 是否是分享模式节点 */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /** 获取前驱节点 */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        /** 构造器:初始化head或者SHARED节点 */
        Node() {
        }

        /** 构造器:新增同步队列节点 */
        Node(Thread thread, Node mode) {
            this.nextWaiter = mode;
            this.thread = thread;
        }

        /** 构造器:新增等待队列节点 */
        Node(Thread thread, int waitStatus) {
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

   内部类ConditionObject:

public class ConditionObject implements Condition, java.io.Serializable {

        private static final long serialVersionUID = 1173984872572414699L;

        /** 等待队列的首节点(等待队列是单向队列,通过Node的nextWaiter进行遍历,首节点是第一个阻塞的线程) */
        private transient Node firstWaiter;

        /** 等待队列的尾节点 */
        private transient Node lastWaiter;

        public ConditionObject() { }

        /** 加入等待队列的尾节点 */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            if (t != null && t.waitStatus != Node.CONDITION) {
                /** 如果尾节点等待状态是cancelled,则清除一次队列所有的cancelled节点 */
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            /** 新增节点,如果队列没有初始化则初始化一个首位相等的队列,否则放入队尾 */
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

        /** 唤醒首节点线程 */
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
                /** 转移节点到同步队列中,并唤醒后继线程 */
            } while (!transferForSignal(first) &&
                    (first = firstWaiter) != null);
        }

        /** 清除一次等待队列所有的cancelled节点 */
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

        /** 唤醒线程 */
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
            /** 如果等待队列首节点不为空,则唤醒首节点线程 */
                doSignal(first);
        }

        /** 自我中断标记 */
        private static final int REINTERRUPT =  1;

        /** 抛出中断异常标记 */
        private static final int THROW_IE    = -1;

        /** 节点从等待队列转移到同步队列,如果线程被中断,则返回中断标记 */
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
        }

        /** 抛出中断异常或者自我中断 */
        private void reportInterruptAfterWait(int interruptMode)
                throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

        /** 线程等待 */
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            /** 加入等待队列 */
            Node node = addConditionWaiter();
            /** 释放同步状态,并唤醒后继节点 */
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                /** 如果节点不在同步队列中,则挂起线程 */
                LockSupport.park(this);
                /** 节点从等待队列转移到同步队列,如果没有中断,则跳出循环 */
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            /** 被唤醒或者被中断的线程竞争获取同步状态 */
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
            /** 清除一次队列所有的cancelled节点 */
                unlinkCancelledWaiters();
            /** 如果有中断标记,则抛出中断异常或者自我中断 */
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

    }

   ConditionObject用到的方法:

   /** 释放同步状态 */
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            /** 释放同步状态,并唤醒后继节点 */
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            /** 失败则将节点设置为CANCELLED状态 */
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }


   /** 判断节点是否在同步队列中 */
    final boolean isOnSyncQueue(Node node) {
        /** 同步队列中节点状态如果是CONDITION,则在下一次自旋中会变成SIGNAL,如果状态为CONDITION说明不在队列中
            除了head节点,同步队列的其他节点的前驱节点都不为null */
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        /** 如果有后继节点,则一定是入队了 */
        if (node.next != null)
            return true;

        /** 节点状态不是CONDITION,前驱节点不为null,后继节点为null,则可能是tail节点,遍历队列寻找 */
        return findNodeFromTail(node);
    }


   /** 设置节点等待状态为0,并转移到同步队列 */
    final boolean transferAfterCancelledWait(Node node) {
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        /** 如果cas失败,说明节点可能在入队的过程中,等待入队完成 */
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

   成员属性:

    /** 同步队列:头节点 */
    private transient volatile Node head;

    /** 同步队列:尾节点 */
    private transient volatile Node tail;

    /** 同步状态 */
    private volatile int state;

    /** 超时时间 */
    static final long spinForTimeoutThreshold = 1000L;

    /** CAS各字段的long值属性 */
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset
                    (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset
                    (Node.class.getDeclaredField("next"));

        } catch (Exception ex) { throw new Error(ex); }
    }

   核心方法-独占式获取同步状态:

    /** 独占式获取同步状态 */
    public final void acquire(int arg) {
        /** 先尝试获取同步状态(由子类实现),成功则直接返回,
            失败则新增一个当前线程的独占式的Node,自旋加入队尾,然后自旋获取同步状态,并返回中断标记
            如果中断过,则自我中断*/
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }


    /** 入队 */
    private Node addWaiter(Node mode) {
        /** 新建一个Node,持有当前线程的引用 */
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        /** 如果队列不为空,先尝试一次快速入队:
            新增Node的前驱节点指向原tail节点,
            然后尝试cas更新tail的引用为新增Node,
            最后把原tail节点的后继节点指向新增Node */
        if (pred != null) {
            node.prev = pred;
            /** 并发安全点:只有一个线程能更新成功,因为tail节点已经变更,其他线程都会失败 */
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        /** 程序进到这里,说明同步队列未初始化,或者cas更改失败,则采用自旋方式入队 */
        enq(node);
        return node;
    }


    /** 自旋入队 */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            /** 如果队列未初始化,则采用cas方式新增一个Node,并使头尾节点相同 */
            if (t == null) {
                /** 并发安全点:只有一个线程能初始化队列成功,因为head节点已经不为null,其他线程都会失败,自旋 */
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                /** 如果队列已经初始化,则采用cas方式尝试入队 */
                node.prev = t;
                /** 并发安全点:只有一个线程能更新成功,因为tail节点已经变更,其他线程都会失败,自旋 */
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }


    /** 自旋获取同步状态 */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                /** 如果当前节点的前驱节点是head节点,说明前驱节点可能已经释放了同步状态,并唤醒了自己,
                    则尝试去获取同步状态,如果获取同步状态成功,则设置当前节点为head节点,
                    设置当前节点的前驱节点(原head节点)为null,并设置原head节点的后继节点为null,使原head节点被GC */
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null;
                    failed = false;
                    return interrupted;
                }
                /** 如果发现前驱节点不是head节点,或者获取同步状态失败,
                    则判断是否需要挂起当前线程,
                    如果需要挂起,则挂起当前线程,并检查中断状态,只要被中断过一次,就设置中断标志位为true
                    线程在此处挂起,如果被唤醒,则自旋 */
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            /** 正常情况下,不断自旋,failed的值会是false,只有子类实现的tryAcquire()方法抛出异常时,才会触发下面的逻辑,则把该节点置为取消状态*/
            if (failed)
                cancelAcquire(node);
        }
    }


    /** 判断是否需要挂起当前线程 */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        /** 如果前驱节点的等待状态为SIGNAL,则表示前驱节点释放同步状态后会唤醒自己,可以放心的挂起了 */
        if (ws == Node.SIGNAL)
            return true;
        /** 如果前驱节点的等待状态为CANCELLED,则表示前驱节点被取消了,则往前找第一个没有被取消的节点 */
        if (ws > 0) {
            do {
                /** 原队列
                 *  pp --> pred --> node
                 *  pp <-- pred <-- node
                 *  处理
                 *  pred = pp
                 *  node.prev = pp
                 *  pp.next = node
                 *  新队列
                 *  pp <-- node
                 *  pp --> node
                 *  使取消的Node断开队列引用,被GC,如果下一个前驱节点也被取消,则继续循环
                 */
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /** 如果前驱节点的等待状态为0或者PROPAGATE,则设置前驱节点为SIGNAL,告诉它记得唤醒我,这一步可以失败。
                前驱节点状态不可能为CONDITION,因为从等待队列转移到同步队列之前,已经更新等待状态为SIGNAL */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        /** 如果前驱节点等待状态是CANCELLED或者前驱节点的等待状态cas更新为SIGNAL失败,则返回false,进入外层的自旋
            直到前驱节点的等待状态更新为SIGNAL,才能返回true进入后续的挂起线程处理 */
        return false;
    }


    /** 挂起线程,并检查线程是否中断 */
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }


    /** 节点置为取消状态 */
    private void cancelAcquire(Node node) {
        if (node == null)
            return;

        node.thread = null;

        Node pred = node.prev;
        /** 找到最近的不是取消的前驱节点 */
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        Node predNext = pred.next;

        /** 节点更新为取消状态 */
        node.waitStatus = Node.CANCELLED;
        /** 如果节点是tail节点,则把前驱节点设置为tail节点 */
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            /** 如果前驱节点不是head节点
                并且前驱节点的等待状态为SIGNAL或者<0的时候更新为SIGNAL成功
                并且前驱节点的线程不是null,则设置前驱节点的后继节点为当前节点的后继节点,
                否则直接唤醒后继线程 */
            if (pred != head &&
                    ((ws = pred.waitStatus) == Node.SIGNAL ||
                            (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                    pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }
            /** 这里之所以不设置为null,主要是Condition部分中有判断是否转移到了同步队列用到了node.next!=null断言入队成功,
                指向自己同样不符合根可达,符合被GC条件 */
            node.next = node;
        }
    }


    /** 唤醒下一个线程 */
    private void unparkSuccessor(Node node) {

        /** 尝试将node的等待状态置为0,这样的话,后继争用线程可以有机会再尝试获取一次锁 */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        /** 下一个节点为空或者等待状态为已取消,则从tail节点开始向前找到最近的非取消节点 */
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        /** 下一个节点不为空且等待状态不是已取消,则唤醒该线程 */
        if (s != null)
            LockSupport.unpark(s.thread);
    }

   释放独占式同步状态:

    /** 释放同步状态 */
    public final boolean release(int arg) {
        /** 因为线程已经获取了同步状态,所以释放同步状态应该返回true */
        if (tryRelease(arg)) {
            Node h = head;
            /** 如果head节点不为空,且不是初始节点,则唤醒下一节点。因为不会是取消节点,所以等价于<0 */
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }


    /** 唤醒下一个线程 */
    private void unparkSuccessor(Node node) {

        /** 尝试将node的等待状态置为0,这样的话,后继争用线程可以有机会再尝试获取一次锁 */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        /** 下一个节点为空或者等待状态为已取消,则从tail节点开始向前找到最近的非取消节点 */
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        /** 下一个节点不为空且等待状态不是已取消,则唤醒该线程 */
        if (s != null)
            LockSupport.unpark(s.thread);
    }

   核心方法-共享式获取同步状态:

    /** 共享式获取同步状态 */
    public final void acquireShared(int arg) {
        /** 先尝试一次获取同步状态,子类实现时,返回负数表示获取失败,如果失败则自旋获取同步状态;
            返回0表示成功,但是后继争用线程不会成功;
            返回正数表示获取成功,并且后继争用线程也可能成功 */
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }


    /** 自旋获取同步状态 */
    private void doAcquireShared(int arg) {
        /** 把Node节点加入同步队列队尾 */
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                /** 如果前置节点是head节点,并且获取同步状态成功,则设置head节点,并向后传播 */
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null;
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


    /** 设置head节点,向后传播 */
    private void setHeadAndPropagate(Node node, int propagate) {
        /** 保存原head节点 */
        Node h = head;
        /** 设置新head节点 */
        setHead(node);

        /** 如果返回的propagate>0或者原head节点等待状态<0或者新head节点的等待状态<0,则唤醒下一个线程 */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }


    /** 设置head节点 */
    private void setHead(Node node) {
        head = node;
        /** 关联线程和前驱节点都没有意义了,设置为null,便于后续GC */
        node.thread = null;
        node.prev = null;
    }


    /** 唤醒下一个线程 */
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                /** 如果head节点的等待状态为SIGNAL,则cas更新为0后,唤醒下一个线程 */
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;
                    unparkSuccessor(h);
                }
                /** 如果head节点的等待状态已经被别的线程更新为0,则cas更新状态为PROPAGATE,表明需要传播唤醒 */
                else if (ws == 0 &&
                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;
            }
            /** 检查h是否仍然是head,如果不是的话需要再进行循环 */
            if (h == head)
                break;
        }
    }

   释放共享式同步状态:

    /** 释放同步状态 */
    public final boolean releaseShared(int arg) {
        /** 因为线程已经获取了同步状态,所以释放同步状态应该返回true */
        if (tryReleaseShared(arg)) {
            /** 唤醒下一个线程 */
            doReleaseShared();
            return true;
        }
        return false;
    }

   独占锁流程图:

  共享锁流程图:

  Condition等待与唤醒流程图:

 

三、运用示例

  1、自定义一个独占锁,测试使用情况,因为condition只能在独占锁情景下使用,所以结合一起测试,示例:

public class MyExclusiveLock implements Lock {

    public MyExclusiveLock() {
        myAQS = new MyAbstractQueuedSynchronize();
    }

    private final MyAbstractQueuedSynchronize myAQS;

    static class MyAbstractQueuedSynchronize extends AbstractQueuedSynchronizer {

        @Override
        protected final boolean tryAcquire(int arg) {
            int state = getState();
            if (state == 0 && compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            } else {
                return false;
            }
        }

        @Override
        protected final boolean tryRelease(int arg) {
            setState(0);
            setExclusiveOwnerThread(null);
            return true;
        }

        @Override
        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
    }

    @Override
    public void lock() {
        myAQS.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {

    }

    @Override
    public boolean tryLock() {
        return myAQS.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public void unlock() {
        myAQS.release(1);
    }

    @Override
    public Condition newCondition() {
        return myAQS.new ConditionObject();
    }

    public static void main(String[] args) throws InterruptedException {
        MyExclusiveLock myExclusiveLock = new MyExclusiveLock();
        Condition condition = myExclusiveLock.newCondition();
        Runnable runnableAwait = () -> {
            System.out.println(Thread.currentThread().getName() + " : 尝试获取锁");
            myExclusiveLock.lock();
            System.out.println(Thread.currentThread().getName() + " : 获取锁成功");
            try {
                Thread.sleep(500);
                System.out.println(Thread.currentThread().getName() + " : 进入等待,并释放锁");
                condition.await();
                System.out.println(Thread.currentThread().getName() + " : 被唤醒了!并获取锁");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                myExclusiveLock.unlock();
                System.out.println(Thread.currentThread().getName() + " : 释放锁");
            }
        };
        Runnable runnableSignal = () -> {
            System.out.println(Thread.currentThread().getName() + " : 尝试获取锁");
            myExclusiveLock.lock();
            System.out.println(Thread.currentThread().getName() + " : 获取锁成功");
            condition.signal();
            System.out.println(Thread.currentThread().getName() + " : 唤醒等待队列首节点线程");
            myExclusiveLock.unlock();
            System.out.println(Thread.currentThread().getName() + " : 释放锁");
        };
        new Thread(runnableAwait).start();
        new Thread(runnableAwait).start();
        new Thread(runnableSignal).start();
        new Thread(runnableSignal).start();
    }

}


 Result:
 Thread-0 : 尝试获取锁
 Thread-2 : 尝试获取锁
 Thread-1 : 尝试获取锁
 Thread-3 : 尝试获取锁
 Thread-0 : 获取锁成功        // 4个线程竞争锁,只有一个线程能获取成功,符合锁的语义
 Thread-0 : 进入等待,并释放锁    // condition的await()会释放独占锁,并挂起线程
 Thread-2 : 获取锁成功        // 线程2第一个入队,入队时发现同步队列为空,于是初始化了一个队列,head节点与tail节点相同,都指向一个new Node(),然后设置线程2为tail节点,前驱节点为head节点,被唤醒后,获得锁
 Thread-2 : 唤醒等待队列首节点线程  // 唤醒等待队列首节点的线程,转移到同步队列中
 Thread-2 : 释放锁          // 手动释放锁
Thread
-1 : 获取锁成功        // 线程2获取锁之后设置自己为head节点,线程1在同步队列中是线程2的后继节点,所以线程2释放锁后会唤醒后继线程1,线程1检测到前驱节点是head节点,尝试获取锁,并成功获取 Thread-1 : 进入等待,并释放锁    // 后面的线程逻辑与上述分析基本类似,由后面的锁的释放与获取可以看出,自定义类确实实现了锁的语义 Thread-3 : 获取锁成功 Thread-3 : 唤醒等待队列首节点线程 Thread-3 : 释放锁 Thread-0 : 被唤醒了!并获取锁 Thread-0 : 释放锁 Thread-1 : 被唤醒了!并获取锁 Thread-1 : 释放锁

  2、自定义一个共享锁,测试使用情况,示例:

public class MySharedLock {

    public MySharedLock(int permits) {
        myAQS = new MyAbstractQueuedSynchronize(permits);
    }

    private final MyAbstractQueuedSynchronize myAQS;

    static class MyAbstractQueuedSynchronize extends AbstractQueuedSynchronizer {

        public MyAbstractQueuedSynchronize(int permits) {
            setState(permits);
        }

        @Override
        protected final int tryAcquireShared(int arg) {
        //这里要自旋是因为共享锁的获取与释放可能出现并发情况,如果是由于cas更新失败,则需要重试
for (; ; ) { int state = getState(); int count = state - arg; if (count >= 0 && compareAndSetState(state, count)) { return count; } if (count < 0) { return count; } } } @Override protected final boolean tryReleaseShared(int arg) {
        //这里要自旋是因为共享锁的获取与释放可能出现并发情况,如果是由于cas更新失败,则需要重试
for (; ; ) { int state = getState(); int count = state + arg; if (count >= 0 && compareAndSetState(state, count)) { return true; } if (count < 0) { return false; } } } @Override protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } } public void acquire() { myAQS.acquireShared(1); } public void release() { myAQS.releaseShared(1); } public static void main(String[] args) { MySharedLock mySharedLock = new MySharedLock(2); Runnable runnable = () -> { System.out.println(Thread.currentThread().getName() + " : 尝试获取凭证"); mySharedLock.acquire(); System.out.println(Thread.currentThread().getName() + " : 成功获取"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " : 处理完事情,释放凭证"); mySharedLock.release(); }; for (int i = 0; i < 5; i++) { new Thread(runnable).start(); } } } Result: Thread-0 : 尝试获取凭证        //凭证初始数量为2 Thread-0 : 成功获取          //凭证可用数量为1 Thread-2 : 尝试获取凭证 Thread-2 : 成功获取          //凭证可用数量为0 Thread-1 : 尝试获取凭证 Thread-4 : 尝试获取凭证 Thread-3 : 尝试获取凭证        //凭证数量不足,线程都需等待 Thread-2 : 处理完事情,释放凭证    //凭证可用数量为1 Thread-0 : 处理完事情,释放凭证    //凭证可用数量为2 Thread-1 : 成功获取          //后面的凭证逻辑与上面分析的类似,而且可以看出同时可以有2个线程获取凭证,实现了共享锁的语义 Thread-4 : 成功获取 Thread-4 : 处理完事情,释放凭证 Thread-1 : 处理完事情,释放凭证 Thread-3 : 成功获取 Thread-3 : 处理完事情,释放凭证

  AQS的设计精妙绝伦,令人叹服!

  如有疑问欢迎提出,如有错误欢迎指正。

  转载请注明本文地址:https://www.cnblogs.com/yqxx1116/p/11668674.html

原文地址:https://www.cnblogs.com/yqxx1116/p/11668674.html