ReentrantLock 源码分析

ReentrantLock是一把重入锁,可中断,可以限时,支持公平锁和非公平锁。

下面举一个生活中的例子,帮助大家来更好的理解ReentrantLock这些特性。

火车购票。买票人争先恐后的跑到G1020检票口检票回家,谁先跑到检票口,谁先验票,谁先回家。那些跑的慢没有抢到检票权的,一个一个的在后面排成一队,先到的排在前面。在最前面获得检票权的人通过检票机验票走人,检票机通知下面一个人来验票。这样每次最前面的人验票走人,然后依次下一个。这样一来大家都井然有序的回家,心里面也比较平衡,谁让自己腿短跑的慢呢,排在跑的快的人后面也心安理得。这就是公平锁的基本思路。

在看G1028检票口的人,这批人就比较聪明了,看着前面一眼望不到头的长长的队伍。把自己的行李箱放在队伍中代替自己,自己则在旁边的椅子上歇着。没办法谁让做这趟车的人聪明呢,这下可把检票机给累坏了,每次检查到行李箱到就大声喊,黑色行李箱的来安检了,然后听到的人慢悠悠的在过去验票走人。后来智能的检票机通过智能学习也变聪明了,如果在检查到放行李箱占位置的,如果此时刚好有人过来检票,则直接让此人检票,不需要在队伍尾部排队等候了。这样虽然对抢到前面的人不公平,但是却加快了进站效率。这就是非公平锁的基本思路.

现在检票的规则变了以家庭为单位检票,只要是家庭中的一员抢到检票口,其余人就可以跟着过检票口。由于小明跑的快,先跑到了检票口。他的父母年龄比较大,跑的比较慢。等跑到检票口的时候,后面已经排起来了长队。可是人家有一个跑的快的儿子呀,现在的规则又是以家庭为单位,都是一家人,于是小明的父母也可以直接检票走人,不需要去队伍里面排队。这就是可重入锁,同一个线程可以重复拿锁。

快看G1028检票口的人吵起来了,怎么回事呢。原来是秃头和长毛两个人刚才跑的快撞在了一起,身份ID都掉在了地上,2个人匆忙捡起来就跑到了检票口。秃头跑的快一点,跑到了长毛的前面。这时候秃头拿身份id验证的时候发现身份不对,上面写的是产品经理。后面那个人拿的是秃头的身份ID,上面写的程序员。于是秃头说,我把你的身份ID给你,你把我的身份ID给我。但是长毛平时提需求提的习惯了,对秃头说给你可以,但是让我排到你前面。秃头一听这无脑需求,火冒三丈,就和长毛干了起来。这下好了,长毛不给秃头身份ID,秃头也不给长毛身份ID,两个人就互相僵持着。这导致后面排队的人也没发进站了。这时候长毛手机发生了异常,一个电话打了过来,原来老板让长毛回去改需求,没办法最后中断了2人的争执。长毛灰溜溜的走了,秃头打赢了这场仗,脸上露出了阳光般的笑容。这就是可中断的,当2个线程互相占有锁,不释放导致死锁的时候,ReentrantLock可以用锁中断解决。

在看另一边一个人想要插队,在耐心的说服前面的人,让我先进去吧,让我先进去吧。可是说服了几分钟也没说服成功,于是放弃了不在插队抢先检票了。这个就是在一定时间内锁尝试,尝试着去获取锁,如果没有获取到就结束。

以上就是重入锁,锁中断,锁限时,公平锁和非公平锁的大致概念,相信大家应该会有所理解。下面对ReentrantLock的特性进行详细的讲解。

ReentrantLock非公平锁




ReentrantLock默认实现的是非公平锁,我具体看一下代码:

 ReentrantLock reentrantLock=new ReentrantLock();
 reentrantLock.lock();

我们看一下构造函数

 public ReentrantLock() {
    sync = new NonfairSync();
}

从这里可以看出ReentrantLock默认实现的是非公平锁,我们在看一下非公平锁是怎么具体实现的。

lock是一个接口,构造器默认实现的是NonfairSync,所以reentrantLock.lock() 调用的是NonfairSync的lock接口。

看一下这块的具体代码

 static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

NonfairSync 继承了Sync,Sync继承了AbstractQueuedSynchronizer,到这里大家应该已经明白ReentrantLock是基于AQS实现的,所以只要你搞懂AQS,很多并发类你都会很容易的理解。

reentrantLock.lock(),我们看一下lock方法做了哪些操作.首先通过CAS获取锁,如果获取到锁,把当前线程设置为独占线程。如果获取失败,则调用acquire方法,而此方法为AQS内部方法。

 public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

这里有一个知识点,我们看一下NonfairSync类中tryAcquire方法,此方法也是AQS中的方法,也就是子类NonfairSync重写了父类AQS的tryAcquire方法。

当子类NonfairSync调用acquire方法的时候,执行的是AQS提供的acquire方法,然后从上面代码中可以看出来父类AQS在此方法中执行了tryAcquire方法,而tryAcquire方法在子类中已经重写,那么就会执行子类NonfairSync实现的tryAcquire方法。这就是多态。

这也是AQS的好处,对外提供API,子类继承AQS,按照自己的业务逻辑重写提供的API。而AQS只管线程怎么进行入队列,怎么插入节点,怎么唤醒节点这些底层的方法,对外层提供调用的 API,然后子类只需要继承AQS,实现独有的业务方法即可,从而大大降低了耦合度。

我们在看一下nonfairTryAcquire方法做了哪写操作

  final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();//获取当前线程
      int c = getState();//获取当前线程的状态
      if (c == 0) {//如果当前线程处于初始状态
          if (compareAndSetState(0, acquires)) {//cas竞争锁
              setExclusiveOwnerThread(current);//竞争到锁把当前线程设置为独占线程
              return true;
          }
      }
      else if (current == getExclusiveOwnerThread()) {//如果当前线程是独占线程,注意此方法也是实现重入的地方。
          int nextc = c + acquires;//当前线程状态值+传入的值
          if (nextc < 0) // overflow
              throw new Error("Maximum lock count exceeded");
          setState(nextc);//设置最新的状态值
          return true;
      }
      return false;
  }

首先取到当前线程和当前线程的值,如果当前线程是初始状态那么就去竞争锁,如果竞争到锁,把当前线程设置为独占线程。如果进来的线程是独占线程,那么更新此线程进入的次数,同时也可以获取锁。如果没有竞争到锁,也不是当前的独占线程,那么就返回false。

从此方法中可以看出来,只要有线程进来,就让他获取锁,而不是排队到尾部。只要是独占线程,就可以重复进来.正是通过此方法可以看出来ReentrantLock是可以进行重入的也是可以是实现非公平锁的。

ReentrantLock公平锁



我们在看一下ReentrantLock实现的公平锁的源代码

 ReentrantLock reentrantLock=new ReentrantLock(true);
 reentrantLock.lock();

我们点击构造器看一下

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

下面具体看一下FairSync类的源代码

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
          //没有线程在等队列里面等待同时获取到锁,则设置当前线程为独占线程
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

FairSync也是继承了Sync,也就是也是继承了AQS,调用lock方法,lock方法调用了父类acquire,此方法会调用子类重写的tryAcquire方法。它的实现方式和上面讲的非公平锁实现方式大致一样,业务逻辑都是在重写的tryAcquire里面。

我们看一下hasQueuedPredecessors

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

此方法主要是查看是否有线程在等待队列里面等待。

FairSync类tryAcquire方法业务逻辑就是获取到当前线程和当前线程的状态,如果当前线程是初始状态,会去判断当前队列里面是否有等待的线程,如果队列中没有等待的线程,同时获取到锁,那么就把当前线程设置为独占线程。如果是相同的独占线程进来则,则更新独占线程进来的次数。同时返回true,否则返回false。从这里可以很容易的看出来,这是一个公平锁,进来的线程需要排队,队列中没有了线程才能轮到进来的线程。同时在else if 这个条件中可以看出来也是可重入的。

ReentrantLock锁中断




我们看一下可中断锁的源代码

ReentrantLock reentrantLock=new ReentrantLock();
reentrantLock.lockInterruptibly();
public void lockInterruptibly() throws InterruptedException {
    sync.acquireInterruptibly(1);//调用AQS中方法
}
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())//线程中断抛出异常
        throw new InterruptedException();
    if (!tryAcquire(arg))//此处还是调用创建对象子类的方法,获取不到锁执行下面的方法
        doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);//封装线程节点,并且添加到尾部。前面文章已经详细讲解过,此处不在详细展开。
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();//获取当前节点的前驱节点
            if (p == head && tryAcquire(arg)) {//前驱节点是头节点并且获取到锁
                setHead(node);//设置当前节点为头节点
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())//线程如果是阻塞的并且被中断,则直接抛出异常
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);//如果线程抛出了异常,那么就把线程状态设置为取消状态同时清除节点.
    }
}

此处讲解一下,为什么ReentrantLock可以进行锁中断,为什么可以在产生死锁的时候,可以通过锁中断技术解决死锁。看过源码其实已经明白,首先在当前线程如果调用了interrupted,那么直接抛出异常出来。如果线程被阻塞并且被中断了那么也是抛出异常。也就是他是通过线程调用中断方法抛出异常来打破持有锁的。如果前面的文章看过,你会发在AQS中doAcquireInterruptibly方法和acquireQueued方法很相似,区别就是一个是返回boolean类型的值,让上层做判断,一个是在返回boolean类型值的地方直接抛出了异常。

ReentrantLock锁限时



我们看一下可中断锁的源代码

ReentrantLock reentrantLock=new ReentrantLock();
reentrantLock.tryLock(300, TimeUnit.SECONDS);//300秒内持续获取锁,直到获取到锁或者时间截止
public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())//此处可以看出来支持锁中断
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);//首先获取一次锁,如果没有获取到执行独占计时模式
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)//时间小于0直接返回
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;//队列延迟时间为系统时间+设置的超时时间
    final Node node = addWaiter(Node.EXCLUSIVE);//把当前线程封装为Node并添加到队列
    boolean failed = true;
    try {
        for (;;) {//自旋
            final Node p = node.predecessor();//获取当前节点的前驱节点
            if (p == head && tryAcquire(arg)) {//如果前驱节点是头节点并且获取到锁
                setHead(node);//设置当前节点为头节点
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();//超时间为延迟时间-当前系统的时间
            if (nanosTimeout <= 0L)//表示已经超过设置的尝试时间,直接返回
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)//如果当前线程阻塞,并且超时时间大于1000纳秒
                LockSupport.parkNanos(this, nanosTimeout);//阻塞当前线程并在超时时间内返回
            if (Thread.interrupted())//线程中断
                throw new InterruptedException();//抛出异常
        }
    } finally {
        if (failed)//线程发生异常
            cancelAcquire(node);//把当前线程设置为取消状态并清除该节点
    }
}

通过阅读源码我们发现锁限时获取的步骤

1 首先调用tryAcquire方法获取一次锁,如果没有获取到调用AQS中的doAcquireNanos。

2 System.nanoTime() 获取系统纳秒级时间+传递的延时时间为最后的时间。

3 调用addWaiter方法把当前线程封装为节点并添加到队列的尾部。

4 前驱节点是头节点并且获取到锁设置当前节点为头节点

5 如果当前线程被阻塞了并且超时时间大于1000纳秒,调用LockSupport.parkNanos方法阻塞当前线程并且在规定的超时间内返回

6 如果线程中断,直接抛出异常,这里可以看出支持锁中断

7 如果线程在自旋的过程中发生了异常,那么调用cancelAcquire方法把当前线程设置为取消状态并且清除该节点。

其实此方法和上一篇讲解的独占锁模式调用acquireQueued方法差不多。不同点在于这里增加了超时时间,如果超时时间大于spinForTimeoutThreshold,此值是一个常量为1000的值。也就是如果超时时间大于1000纳秒,那么就调用 LockSupport.parkNanos方法让该线程阻塞,最长阻塞的时间不会超过超时的时间。同时增加了线程中断的判断,发生线程中断则抛出异常,其余和acquireQueued实现都一样。

原文地址:https://www.cnblogs.com/fengyun2050/p/12384569.html