java.util.concurrent.locks源码剖析

java.util.concurrent.locks源码剖析

参考

英文文档

英文 jenkov

中文 defonds

Lock接口

接口定义:

package java.util.concurrent.locks;
import java.util.concurrent.TimeUnit;

public interface Lock {
    
    void lock();
    
    void lockInterruptibly() throws InterruptedException;
    
    boolean tryLock();
    
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    
    void unlock();

	// 依赖于Condition接口
    Condition newCondition();
}

Condition接口

public interface Condition {

    void await() throws InterruptedException;

    void awaitUninterruptibly();
    
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    
    boolean awaitUntil(Date deadline) throws InterruptedException;
    
    void signal();
    
    void signalAll();
}


ReentrantLock

参考这篇《ReentrantLock源码分析》

核心成员

ReentrantLock只有一个成员sync

	// 提供所有实现机制的同步器
    private final Sync sync;

核心方法

可以看出ReentrantLock只是Sync的一个代理

public class ReentrantLock implements Lock, java.io.Serializable {

	// 默认是非公平锁
	public ReentrantLock() {
        sync = new NonfairSync();
    }
	
	public void lock() {
        sync.lock();
    }

	public void unlock() {
        sync.release(1);
    }

	public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

	// 要求状态值减释放数,如果当前线程不是独自模式下的拥有者,那么它无权修改状态值,抛异常,可见ReentrantLock是独占锁
	protected final boolean tryRelease(int releases) {
			// 计算新的状态值
            int c = getState() - releases;
            // 如果线程不是独自模式下的拥有者,那么它就无权要求修改状态值,就抛出异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

	public Condition newCondition() {
        return sync.newCondition();
    }
}

数据结构

我们主要关心这些数据结构的成员以及核心方法

Sync

从定义可以看出Sync是一个抽象类,继承了AbstractQueuedSynchronizer,它的实现类有FairSync和NonfairSync.

 abstract static class Sync extends AbstractQueuedSynchronizer {
		// 没有定义成员

		// 子类需要去实现lock
        abstract void lock();
            
        final ConditionObject newCondition() {
	        // ConditionObject继承自AbstractQueuedSynchronizer
            return new ConditionObject();
        }
 }

NonfairSync

默认的同步机制,发现这个类非常简单,主要调用AbstractQueuedSynchronizer,因此,要了解ReentrantLock,最重要的是了解AbstractQueuedSynchronizer,同时注意对比FairSync和NonfairSync。

非公平锁(Nonfair):获取锁时不考虑排队等待问题,直接尝试获取锁,获取不到自动到队尾等待

    static final class NonfairSync extends Sync {

        final void lock() {
	        // 如果sync中的state没有被任何线程占有,则设定当前线程为锁的拥有者
            if (compareAndSetState(0, 1))
	            // AbstractOwnableSynchronizer中的方法,设置当前线程为锁的拥有者
                setExclusiveOwnerThread(Thread.currentThread());
            // 否则当前线程需要和sync队列中的其它线程竞争state的占有权
            else
                acquire(1);
        }
		
		  
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
		
		//非公平锁的获取方法,值得注意的是:这个方法实际是在NonfairSync的父类Sync中
	final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            // 获取state的值
            int c = getState();
            // 如果state没被占有,就独占它
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 如果state已经被占有,并且是被当前线程占有的
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                // 如果引用次数溢出,抛出异常而不是返回false
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 更新引用次数
                setState(nextc);
                return true;
            }
            // 如果已被其他线程占有,那么不能修改state的值
            return false;
        }
        
    }

FairSync

公平锁(Fair):加锁前检查是否有排队等待的线程,优先排队等待的线程,先来先得

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            // 没有没线程占有
            if (c == 0) {
	            // 必须是队列中的第一个线程,才能获取state的使用权,修改state的值
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 当前线程已经获得state的使用权
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

AbstractQueuedSynchronizer

简称AQS,是一个非常核心的类,也是本文的重点关注对象。其实AQS主要就是维护了一个状态值,release对状态值做减法,acquire对状态值做加法。并且AQS提供了独占和共享两种模式。

内部类

AQS中有两个内部类:Node和ConditionObject。

Node

Node是等待队列中的节点类。
等待队列是"CLH" (Craig, Landin, and Hagersten)锁队列的一种变体。CLH锁一般用作自旋锁(spinlocks)。然而这里使用了相同的策略,把关于thread的控制信息保存在前一个节点中,只不过会阻塞。每个节点中的状态字段都记录了线程是否应该阻塞。节点会在前一个节点释放的时候收到信号被唤醒。因此队列中的每一个节点充当了一种特定通知风格的监视器,同时持有一个正在等待的线程。状态字段不控制线程是否授予锁等。如果线程在队列中的第一个的话,可能尝试获取锁。但是拍照第一个并不能保证获取锁成功;只是给予这个线程参与竞争的条件。所以当前被释放的竞争者可能还要等待。

为了让一个CLH锁入队,你可以让它自动拼接到新的tail中。离队只需要重新设置head字段。

      +------+  prev +-----+       +-----+
 head |      | <---- |     | <---- |     |  tail
      +------+       +-----+       +-----+

入队与离队分别是作用于tail和head的原子操作。然而,由于超时和中断可能导致线程被取消,node还需要确定它的下一个节点是谁。相比原来CLH锁的实现,增加了prev字段主要用来处理线程被取消这种情况。如果一个node中的线程被取消了,它的下一个节点就需要重新找一个没有被取消的节点来作为前继。想了解自旋锁的相似机制,可以看Scott和Scherer的论文

    static final class Node {
         /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus的值,表示thread被取消了 */
        static final int CANCELLED =  1;
        /** waitStatus的值,表示下一个节点的线程需要阻塞 */
        static final int SIGNAL    = -1;
        /** waitStatus的值,表示thread正在等待某个条件 */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;
        
        volatile int waitStatus;

        volatile Node prev;
        
        volatile Node next;
        
        volatile Thread thread;

        Node nextWaiter;
    }

ConditionObject

仅列出重要的成员以及方法。

/*
 * 维护了一个条件队列,注意区别于AQS中的同步队列。
 * 条件队列用来记录未满足条件的线程,每当一个线程需要等待条件满足的时候,就加入条件队列进行等待;
 * 当条件被满足的时候,线程就会把等待队列中所有的线程按照顺序加入到同步队列,
 * 并与同步队列竞争state的使用权。
 */
public class ConditionObject implements Condition, java.io.Serializable {

         // 条件队列的第一个节点
         private transient Node firstWaiter;
        
         // 条件队列的最后一个节点
         private transient Node lastWaiter;

		 public ConditionObject() { }

		 public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 添加了一个当前线程节点到条件队列的尾部
            Node node = addConditionWaiter();
            // 完全释放当前线程对state的占有权,唤醒同步队列中第一个等待的线程,并记录当前线程占有的state的值
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            
            // 等待另一个其它线程将当前线程从条件队列加入到同步队列中(调用sigal函数)
            while (!isOnSyncQueue(node)) {
	            // 当前线程被挂起,等待被唤醒
                LockSupport.park(this);
                // 如果是因为被中断而醒过来,就把当前线程直接加入到同步队列中
                // 自己将自己加入同步队列,需要抛异常,如果等待其它线程将自己加入同步队列,不需要抛异常
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                 // 也有可能是由于未知原因而醒过来,这时候interruptMode不是0,所以需要一个循环来确保当前线程被加入到同步队列中
            }
			
			// 当前线程与同步队列中其它的线程进行竞争,直到当前线程获取到state的使用权
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            // 遍历条件队列,移除所有被取消的线程
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            // 如果被中断了,并且是通过其他线程将当前线程加入到同步队列中的
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

		
		private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

		// 遍历条件队列,移除所有被取消的线程
		 private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            // 从前向后遍历条件队列
            while (t != null) {
                Node next = t.nextWaiter;
                // 在条件队列中,如果waitStatus != CONDITION,表示线程被取消
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

		// 线程醒过来之后检查是否被中断,
		// 如果没有被中断,返回0;
		// 如果被中断了,且自己可以将当前线程加入到同步队列中,返回THROW_IE;
		// 如果被中断了,通过等待其它线程将当前线程加入到同步队列中,返回REINTERRUPT。
		private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }

		// 将当前线程添加到同步队列中,返回是否自己可以将自己加入到同步队列中
        final boolean transferAfterCancelledWait(Node node) {
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
        // 等待其他线程通过signal把当前线程加入到同步队列中
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }
    
	private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            // 中断之后抛异常
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            // 中断之后当前线程重新进入中断状态
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

		// 将条件队列中的所有线程加入到同步队列中
		public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
	            // 将条件队列中的所有线程加入到同步队列中
                doSignal(first);
        }

		private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

		final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
         // 如果线程已被取消,就不放入同步队列中
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        // 如果同步队列中前一个线程已被取消或者将前一个线程状态设置成signal失败,就唤醒该线程与同步队列其它线程竞争
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
}

核心成员

	/**
     * 等待队列的头结点,它是一个虚拟节点,延迟初始化。
     * 如果head != null,可以保证head.waitStatus不是CANCELLED
     */
    private transient volatile Node head;

    // 等待队列尾节点,只能通过enq来加入新等待节点
    private transient volatile Node tail;

    // AQS对象的状态,初始值是0,表示state没有被任何线程占有
    // AQS最重要的成员,不同场景下具有不同的含义,一般指锁被引用的次数
    // AQS提供了竞争这个状态值占有权的框架
    private volatile int state;

核心方法

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

	// 在独占模式下获取state的占有权,并使state加arg
	public final void acquire(int arg) {
		/**
		 * tryAcquire在FairSync和NonfairSync等AQS的子类中被实现。
		 * 
		 * 首先调用tryAcquire方法来尝试独占并修改state,
		 * tryAcquire如果返回false,就说明已经有thread获得了state的占有权,当前线程无权修改state,
		 * 这时候(执行acquireQueued方法)把当前节点入队并参与竞争state的占有权,当前节点变为首节点的时候获得state的占有权,state加arg。
		 * 如果当前线程在竞争过程中被中断过,则把当前线程恢复到中断状态。
		 */
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

	// 添加当前线程节点到等待队列中
	private Node addWaiter(Node mode) {
		// 构造线程节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // CAS操作,更新尾节点
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // pred是空或者CAS操作失败(尾节点已变),就入队
        enq(node);
        return node;
    }

	// 循环嵌套CAS,直到CAS成功为止
	// 将节点node加入到等待队列尾部
	 private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // 队列为空
            if (t == null) { // Must initialize
	            // 注意这时候创建并设置虚拟的头结点,而不是在创建AQS对象的时候,属于延迟加载,创建完虚拟头结点仍然继续循环
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

     /*
      * 在独占不可中断模式下,当前线程与Sync队列中的其它线程竞争。
      * 当前线程成为队列中首节点的时,它获得state的占有权,并給state加arg;
      * 否则,Sync队列中存在有效(没有被取消)的线程,由于队列中前面的线程拥有更高的权利使用state,
      * 所以当前线程就需要阻塞,当前一个线程使用完state之后唤醒当前线程。
      * 线程处于阻塞状态时,也可以被中断而醒来,由于是不可中断模式,所以会记录并清除中断状态,
      * 将中断状态返回给调用方处理,例如acquire中会把当前线程恢复到中断状态。
      */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
	            // 等待队列是一个双向列表,这里p是node的前节点
                final Node p = node.predecessor();
                // 如果是队列中第一个线程,就获得state的占有权,并使用state
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 如果不是队列中第一个线程,那么竞争失败,它无权使用state,
                // 检查队列中前面的线程是否有效(没有被取消),如果存在有效的线程,
                // 当前线程就需要阻塞,在线程醒过来之后检查是否被中断,如果被中断了,
                // 就清除中断标志位继续竞争state的使用权,但是要记录当前线程在竞争过程被中断过
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
	
	// 头结点出等待队列
	private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

     // 如果队列中存在有效的线程(没有被取消的)排在当前线程前面,那么当前线程就需要被阻塞,因为前面的线程等了更长的时间,拥有更高的使用权
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
	    //waitStatus初始值为0
        int ws = pred.waitStatus;
        // 前一个线程未使用完state,当前线程就要被阻塞
        if (ws == Node.SIGNAL)
            // pred节点正在请求一个signal信号,所以它可以被安全挂起
            return true;
        // 在Sync队列中,如果大于0表示线程处于被取消的状态,
        // 被取消的线程没有必要获取state的使用权,所以直接从队列中删除取消的线程,不让它们参与竞争。暂时不挂起当前线程。
        if (ws > 0) {
             // 删除所有被取消的先继节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            // 前一个节点没有被取消,暂时不挂起当前线程,将前一个线程状态设置为需要sigal信号(未使用完state),
            // 下次竞争如果发现前一个线程仍没有使用完state再挂起
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

	  private final boolean parkAndCheckInterrupt() {
	  // 挂起当前线程,在被挂起的情况下,有三种情况会被唤醒,具体见park方法注释
        LockSupport.park(this);
        // 获取中断标记,如果中断标记是true,会清除中断标记
        return Thread.interrupted();
    }

	// 释放state的使用权
	final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

	public final boolean release(int arg) {
		// 如果当前线程释放state使用权成功(独占模式即有权利将state减arg成功),
		// 就唤醒队列中的等待的第一个线程。
        if (tryRelease(arg)) {
	        // head指向的是虚节点,没有记录线程id,head下一个节点才存了第一个线程
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

	// 唤醒node的下一个节点
	private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        // 找到下一个没有被取消的线程节点,在sync队列中,waitStatus > 0 表示当前节点中的线程被取消
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            //从后向前遍历,找到最前面的一个没有被取消的线程
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
}

AbstractOwnableSynchronizer

AQS继承了这个抽象类,这个类非常简单,所有成员及方法定义如下:

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    protected AbstractOwnableSynchronizer() { }

     // 独占独占模式下占有Synchronizer的线程对象
    private transient Thread exclusiveOwnerThread;

     // 赋予thread对锁的排他访问权限
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    // 获取独占锁的线程对象
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

原文地址:https://www.cnblogs.com/mcai/p/9627665.html