工作中的点点滴滴对于锁的一点儿认识(ReentrantLock)

  前一段时间,我们在了解了synchronized之后,我们在来看ReentrantLock。

 1 private Lock lock = new ReentrantLock();
 2 public void test(){
 3     lock.lock();
 4     try{
 5         //doSomeThing();
 6     }catch (Exception e){
 7     
 8     }finally {
 9         lock.unlock();
10     }
11 }    
ReentrantLock是实现Lock接口,在类的内部有一个sync内部类,sync集成AbstractQueuedSynchronizer ,而AbstractQueuedSynchronizer 是继承AbstractOwnableSynchronizer的。
另外
ReentrantLock是支持公平锁和非公平锁的,分别是通过sync的两个子类FairSync和NonfairSync来实现的。我们先看下类图。



 从类图代码的结构上,我们了解了ReentrantLock的数据结构。那么它是如何实现加锁和解锁以及公平锁和非公平锁的呢?

我们首先理顺它的方法调用关系,这里先以公平锁为例:

1)ReentrantLock:lock()。
2)FairSync:lock()。
3)AbstractQueuedSynchronizer:acquire(int arg)。
4)ReentrantLock:tryAcquire(int acquires)。
在第4步真正开始加锁,还是老办法,通过查看源码,我们看下lock方法到底做了什么事情。
1 final void lock() {
2     if (compareAndSetState(0, 1))
3         setExclusiveOwnerThread(Thread.currentThread());
4     else
5         acquire(1);
6 }

首先使用CAS把AQS的state从0设置成1。如果设置成功了,则表示当前线程为该锁的独占线程,获取锁成功。当多个线程同时尝试占用同一个锁时,CAS操作只能保证一个线程操作成功,剩下的只能乖乖的去排队啦。其实公平锁和非公平锁的差别也是在这里了。非公平锁说,如果前一个占据锁的线程正好刚刚释放,在排队的线程还没有来得及唤醒,新来的线程就可能会直接抢占该锁,直接插队了。相反公平锁对于新来的线程就是乖乖的排队去。

第二步,我们接着看,如果从0修改到1失败了怎么办呢, 看下acquire.

1 public final void acquire(int arg) {
2     if (!tryAcquire(arg) &&
3         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4         selfInterrupt();
5 }
 1 protected final boolean tryAcquire(int acquires) {
 2             //获取当前线程
 3             final Thread current = Thread.currentThread();
 4             //获取state变量值
 5             int c = getState();
 6             if (c == 0) 
 7              //没有线程占用锁
 8                 if (!hasQueuedPredecessors() &&
 9                     compareAndSetState(0, acquires)) {
10                     //占用锁成功,设置独占线程为当前线程
11                     setExclusiveOwnerThread(current);
12                     return true;
13                 }
14             }
15             else if (current == getExclusiveOwnerThread()) {
16             //当前线程已经占用该锁
17                 int nextc = c + acquires;
18                 if (nextc < 0)
19                     throw new Error("Maximum lock count exceeded");
20                 // 更新state值为新的重入次数
21                 setState(nextc);
22                 return true;
23             }
24             return false;
25         }

在acquire里面,第一步还是检查state字段,若为0,表示锁未被占用,那么尝试占用,若不为0,检查当前锁是否被自己占用,若被自己占用,则更新state字段,表示重入锁的次数。如果以上两点都没有成功,则获取锁失败,返回false。公平锁的tryAcquire和非公平锁的nonfairTryAcquire的区别就在于第一个判断当前锁有没有被占用的时候,公平锁多了一步判断,如果当前线程之前有一个排队的线程,后者当前线程在队列的头部或队列为空, 

在三步,在第二步中,如果获取锁失败了,那么就进入等待队列,也就是addWaiter。

 1 private Node addWaiter(Node mode) {
 2     //初始化节点,设置关联线程和模式(独占 or 共享)
 3     Node node = new Node(Thread.currentThread(), mode);
 4     // 获取尾节点引用
 5     Node pred = tail;
 6     // 尾节点不为空,说明队列已经初始化过
 7     if (pred != null) {
 8         node.prev = pred;
 9         // 设置新节点为尾节点
10         if (compareAndSetTail(pred, node)) {
11             pred.next = node;
12             return node;
13         }
14     }
15     // 尾节点为空,说明队列还未初始化,需要初始化head节点并入队新节点
16     enq(node);
17     return node;
18 }

在addWaiter里面如果队列不是空的,那么这个时候有两个线程同时进入enq初始化新的队列。

 1 private Node enq(final Node node) {
 2     //开始自旋
 3     for (;;) {
 4         Node t = tail;
 5         if (t == null) { // Must initialize
 6             // 如果tail为空,则新建一个head节点,并且tail指向head
 7             if (compareAndSetHead(new Node()))
 8                 tail = head;
 9         } else {
10             node.prev = t;
11             // tail不为空,将新节点入队
12             if (compareAndSetTail(t, node)) {
13                 t.next = node;
14                 return t;
15             }
16         }
17     }
18 }

这里体现了经典的自旋+CAS组合来实现非阻塞的原子操作。由于compareAndSetHead的实现使用了unsafe类提供的CAS操作,所以只有一个线程会创建head节点成功。假设线程B成功,之后B、C开始第二轮循环,此时tail已经不为空,两个线程都走到else里面。假设B线程compareAndSetTail成功,那么B就可以返回了,C由于入队失败还需要第三轮循环。最终所有线程都可以成功入队。

然后我们在会到acquire这个方法就是这一行。

看一下acquireQueued的这个方法。

 1 final boolean acquireQueued(final Node node, int arg) {
 2     boolean failed = true; //标记是否成功获取锁
 3     try {
 4         boolean interrupted = false; //标记线程是否被中断过
 5         for (;;) {
 6             final Node p = node.predecessor(); //获取前驱节点
 7             //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取锁
 8             if (p == head && tryAcquire(arg)) {
 9                 setHead(node); // 获取成功,将当前节点设置为head节点
10                 p.next = null; // 原head节点出队,在某个时间点被GC回收
11                 failed = false; //获取成功
12                 return interrupted; //返回是否被中断过
13             }
14             // 判断获取失败后是否可以挂起,若可以则挂起
15             if (shouldParkAfterFailedAcquire(p, node) &&
16                     parkAndCheckInterrupt())
17                 // 线程若被中断,设置interrupted为true
18                 interrupted = true;
19         }
20     } finally {
21         if (failed)
22             cancelAcquire(node);
23     }
24 }

假设B和C在竞争锁的过程中A一直持有锁,那么它们的tryAcquire操作都会失败,因此会走到第2个if语句中就是上面代码的第15行。我们再看下shouldParkAfterFailedAcquire和parkAndCheckInterrupt都做了什么事情。

 1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2     //前驱节点的状态
 3     int ws = pred.waitStatus;
 4     if (ws == Node.SIGNAL)
 5         // 前驱节点状态为signal,返回true
 6         return true;
 7     // 前驱节点状态为CANCELLED
 8     if (ws > 0) {
 9         // 从队尾向前寻找第一个状态不为CANCELLED的节点
10         do {
11             node.prev = pred = pred.prev;
12         } while (pred.waitStatus > 0);
13         pred.next = node;
14     } else {
15         // 将前驱节点的状态设置为SIGNAL
16         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
17     }
18     return false;
19 }
20   
21 /**
22  * 挂起当前线程,返回线程中断状态并重置
23  */
24 private final boolean parkAndCheckInterrupt() {
25     LockSupport.park(this);
26     return Thread.interrupted();
27 }

判断当前线程获取锁失败之后是否需要挂起.,线程入队后能够挂起的前提是,它的前驱节点的状态为SIGNAL,它的含义是“Hi,前面的兄弟,如果你获取锁并且出队后,记得把我唤醒!”。所以shouldParkAfterFailedAcquire会先判断当前节点的前驱是否状态符合要求,若符合则返回true,然后调用parkAndCheckInterrupt,将自己挂起。如果不符合,再看前驱节点是否>0(CANCELLED),若是那么向前遍历直到找到第一个符合要求的前驱,若不是则将前驱节点的状态设置为SIGNAL。

这就是整个lock加锁的过程。他充分的使用到了aqs里面的一个Node这个数据结构,下面结构图和代码中,我尽量把注释写的全面一些,这样在看源码的时候可以很轻松的明白含义。

 1 static final class Node {
 2     //表示线程以共享的模式等待锁
 3     static final Node SHARED = new Node();
 4     //表示线程正在以独占的方式等待锁
 5     static final Node EXCLUSIVE = null;
 6 
 7     //当前节点由于超时或中断被取消
 8     static final int CANCELLED =  1;
 9     //表示当前节点的前节点被阻塞
10     static final int SIGNAL    = -1;
11     //当前节点在等待condition
12     static final int CONDITION = -2;
13     //状态需要向后传播
14     static final int PROPAGATE = -3;
15 
16     /**
17      * 当前节点在队列中的状态,他有五个枚举值:
18      * 0    当一个Node被初始化的时候的默认值
19      * CANCELLED    为1,表示线程获取锁的请求已经取消了
20      * CONDITION    为-2,表示节点在等待队列中,节点线程等待唤醒
21      * PROPAGATE    为-3,当前线程处在SHARED情况下,该字段才会使用
22      * SIGNAL    为-1,表示线程已经准备好了,就等资源释放了
23      */
24     volatile int waitStatus;
25     //前驱节点
26     volatile Node prev;
27     //后继节点
28     volatile Node next;
29     //表示处于该节点的线程
30     volatile Thread thread;
31     //指向下一个处于CONDITION状态的节点
32     Node nextWaiter;
33 
34     final boolean isShared() {
35         return nextWaiter == SHARED;
36     }
37     //返回前驱节点,没有的话抛出npe
38     final Node predecessor() throws NullPointerException {
39         Node p = prev;
40         if (p == null)
41             throw new NullPointerException();
42         else
43             return p;
44     }
45 
46     Node() {
47     }
48 
49     Node(Thread thread, Node mode) {
50         this.nextWaiter = mode;
51         this.thread = thread;
52     }
53 
54     Node(Thread thread, int waitStatus) {
55         this.waitStatus = waitStatus;
56         this.thread = thread;
57     }
58 }

下面花了一个交互图,能够更清晰的理解整个加锁的流程:

上面讲完了加锁的过程,我们在来看一下unlock结果的过程。在unlock的里面,我们直接看tryRelease的这个方法。

 1 protected final boolean tryRelease(int releases) {
 2     // 计算释放后state值
 3     int c = getState() - releases;
 4     // 如果不是当前线程占用锁,那么抛出异常
 5     if (Thread.currentThread() != getExclusiveOwnerThread())
 6         throw new IllegalMonitorStateException();
 7     boolean free = false;
 8     if (c == 0) {
 9         // 锁被重入次数为0,表示释放成功
10         free = true;
11         // 清空独占线程
12         setExclusiveOwnerThread(null);
13     }
14     // 更新state值
15     setState(c);
16     return free;
17 }

这里入参为1。tryRelease的过程为:当前释放锁的线程若不持有锁,则抛出异常。若持有锁,计算释放后的state值是否为0,若为0表示锁已经被成功释放,并且则清空独占线程,最后更新state值,返回free。然后在release中,也就是调用tryRelease的方法中,如果release返回了true,释放成功,那么查看头结点的状态是否为空(其实是会等于SIGNAL,看shouldParkAfterFailedAcquire方法),如果是则唤醒头结点的下个节点关联的线程,如果释放失败那么返回false表示解锁失败。这里我们也发现了,每次都只唤起头结点的下一个节点关联的线程。

结尾

  从上面篇幅中,我们大致的了解到了java中两种常用的锁的实现,那么这样我们在选择使用锁的时候,也更清楚了在什么场景下怎么选择的去使用。比如在执行同步块中,你突然想中断锁,或者想使用公平锁等可以自由操作锁那么就用lock。如果业务本来就很简单,比如想在map.add()加锁就可以直接用synchronize的。

原文地址:https://www.cnblogs.com/yangkangIT/p/7910196.html