ReentrantLock源码解析

ReentrantLock

1 数据结构

从上图可以看出,ReentrantLock的功能都是通过sync这个对象提供的。

public class ReentrantLock implements Lock, java.io.Serializable {
    private final Sync sync;
    
    public void lock() {
        sync.lock();
    }
    
    public void unlock() {
        sync.release(1);
    }
}

2 获取锁是否要走公平非公平逻辑

  • 区分公平非公平锁,公平锁老实排队
    • lock
    • lockInterruptibly
    • tryLock(long timeout, TimeUnit unit)
  • 不区分是否公平,如果锁可获取就去竞争
    • tryLock()

3 ReentrantLock源码解析

3.1 AQS在ReentrantLock中的抽象实现Sync

/**
 * 锁的同步控制的基础,抽象类提供了基本处理框架,具体逻辑由子类实现
 * 使用AQS的状态作为持有锁的数目
 */
abstract static class Sync extends AbstractQueuedSynchronizer {

    abstract void lock();

    // 执行非公平trylock
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        // 获取AQS的status
        int c = getState();
        // 无线程持有当前锁
        if (c == 0) {
            // 将state更新为acquires
            if (compareAndSetState(0, acquires)) {
                // 如果成功CAS了,那么将AQS的持有线程设置为当前线程,成功获取
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果有线程持有锁且刚好为当前线程,那么将state+1
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

    // 检查当前线程是不是持有锁
    protected final boolean isHeldExclusively() {
        return getExclusiveOwnerThread() == Thread.currentThread();
    }

    // 条件
    final ConditionObject newCondition() {
        return new ConditionObject();
    }

    // 获取持有锁的线程
    final Thread getOwner() {
        return getState() == 0 ? null : getExclusiveOwnerThread();
    }

    // 获取持有锁的次数
    final int getHoldCount() {
        return isHeldExclusively() ? getState() : 0;
    }

    // 是否加锁了
    final boolean isLocked() {
        return getState() != 0;
    }
}

3.2 AQS在ReentrantLock中的具体实现NonFairSync

static final class NonfairSync extends Sync {
    // 上来就获取锁,尝试修改当前AQS的state,成功了就将持有线程设置为当前线程
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            // 失败了就老实使用AQS获取锁
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

3.3 AQS在ReentrantLock中的具体实现FairSync

static final class FairSync extends Sync {

    // 公平锁老实使用AQS获取锁
    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don't grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // 空闲
        if (c == 0) {
            // 如果当前线程没有前驱节点,就去获取锁
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 当前线程持有锁
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

4 AQS在ReentrantLock中的使用

4.1 查询有没有排在当前线程前面的线程

    public final boolean hasQueuedPredecessors() {
        // 方法准确性依赖于头节点在尾节点前初始化以及如果当前线程在队列中的首个节点时head.next可以保证准确
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        // 当头节点和尾节点不相等且头节点的下个节点为空且不是当前线程,就代表当前线程前还有个线程在排队啦
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

4.2 在给定模式下给当前线程创建节点并入队

    private Node addWaiter(Node mode) {
        // 创建节点【线程+模式】
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        // 如果尾节点不为空
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

4.3 AQS独占模式下获取锁

    /**
     * AQS独占模式下获取锁,忽略中断,至少会调用一次tryAcquire
     * 这里的tryAcquire是子类实现的
     * 成功获取到会直接返回,否则线程会入队列,可能会重复的在阻塞和非阻塞间切换
     * 并一直调用tryAcruire直到成功
     */
    public final void acquire(int arg) {
        // 如果获取锁成功,那么直接返回了
        if (!tryAcquire(arg) &&
            // 如果失败了那么入队,看当前线程是被中断唤醒还是unpark()唤醒
            // 如果被中断了,自行中断自己,因为之前的中断标志被清除了
            // 如果被unpark(),没了
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    
    // 如果当前线程没获取到锁入队了,之后unpark的时候是被中断了,再中断自己???
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
    
    // 在给定模式下给当前线程创建节点并入列,添加等待者,并返回新增节点
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        
        // 前驱节点设置为tail
        Node pred = tail;
        // 尾节点不为空的话,将新节点的前驱节点设置为尾节点
        if (pred != null) {
            // 将这句放到这里而不是if里面可以避免某个时刻tail.prev=null的情况,好好体会
            node.prev = pred;
            // 原子性更改tail
            if (compareAndSetTail(pred, node)) {
                // tail修改之后才会修改前驱节点的后继节点
                pred.next = node;
                return node;
            }
        }
        // 前驱节点为空,那么当前节点一定是首个节点
        enq(node);
        return node;
    }
    
    // 将给定节点入列
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            // tail为空,那么需要初始化head,然后将尾节点指向默认的初始化节点
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 如果尾节点不为空,新入节点的前置节点设置为尾节点,有可能其他线程节点插进来了,排队
                node.prev = t;
                // 原子性设置尾节点为当前节点,设置老的尾节点的下个节点为当前节点
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    // 队列获取锁
    // 返回在获取锁的时候是否被打断了
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 死循环
            for (;;) {
                final Node p = node.predecessor();
                // 获取锁的资格,前置节点是head
                // 跳出时机,前驱节点为head哨兵且获取锁成功
                if (p == head && tryAcquire(arg)) {
                    // 将当前节点设置为头节点,清空线程和prev
                    // 此时头节点的waitStatus并没有清空
                    setHead(node);
                    p.next = null; // help GC
                    // 获取成功
                    failed = false;
                    // 只有成功获取到锁了才会返回中断状态
                    return interrupted;
                }
                
                
                // 如果当前节点的前驱节点并非是头节点,即当前线程不是排在最前面
                // 或者获取锁失败了,都会走到这里
                
                // 此时需要根据前驱节点来判断当前线程是否要park
                
                // 如果需要park就去park,等待唤醒
                // 不需要park则继续循环,
                // 被唤醒后会返回中断状态,知道是怎么被唤醒的
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    // 将获锁节点设置为头节点,其实就是移出队列
    // 这里并没有清空waitStatus状态
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
    
    // 前驱节点状态不为SIGNAL的都不会park,而是重新循环
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        // 1)-1 SINGAL 返回true 安心park
        if (ws == Node.SIGNAL)
            // 这个值代表前驱线程早就已经设置了节点状态,释放锁的时候去通知当前线程,所以当前线程可以安全park,
            // 不继续循环,等待前驱节点获取锁后unpark
            return true;
        if (ws > 0) {
            // 2)1 CANCEL 取消 删除所有的前驱取消节点,自旋
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 3)0 -2 -3 
            // 将前驱节点状态设置为SIGNAL,自旋
            // 如果pred是head,head的waitStatus是0,设置成Node.Signal < 0
            // 获锁失败且前驱节点不为SIGNAL,设置前驱节点ws为SIGNAL
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    
    private void cancelAcquire(Node node) {
        // 无效节点过滤
        if (node == null)
            return;
        // 设置该节点不关联任何线程
        node.thread = null;

        // 将当前节点的前驱节点设置为向前第一个非取消节点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // 获取第一个非取消节点的后继节点,CAS用
        Node predNext = pred.next;

        // 将当前节点的状态设置为CANCELLED
        node.waitStatus = Node.CANCELLED;

        // 如果当前节点是尾节点,将向前的第一个非取消的节点设置为尾节点
        // 更新成功将其next设置为null
        // 如果当前节点不是尾节点或者更新next指针为null失败则进入else
        if (node == tail && compareAndSetTail(node, pred)) {
            // 这里即使更新失败了也无所谓,代表其他的线程更新好了
            compareAndSetNext(pred, predNext, null);
        } else {
            // 如果node还有后继节点,那么将pred和后继节点拼装起来
            int ws;
            // 1)当前节点不是head的后继节点
            // 2)前驱节点的ws为SINGNAL或【ws=0,-2,-3】且成功设置前驱节点的状态为SIGNAL【保证前驱节点是SIGNAL】 且
            // 3)前驱节点不是虚节点,是有关联线程的
            // 如果上面三个条件都满足
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                
                // 如果上面的条件不满足。即
                // 1)前驱节点为头节点
                // 2)前驱节点被取消
                // 3)前驱节点是个虚节点,代表已经被清空thread了 ??? 不知道什么情况下pred = head,但是前驱节点thread被清空, ===> CAS过程中执行完了setHead
                // 为了保证队列的活跃性,需要直接唤醒当前节点的后置节点
                // 唤起后继节点
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

    // unpark后继节点
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        // 该节点如果waitStatus<0,设置成0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        // 如果后继节点被取消或者是null,从后向前遍历,找到等待线程并unpark
        // 为什么要从后往前呢?一直有线程在入队,所以从后往前,不断替换s
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

    
    // 什么情况下会被唤醒呢?
    // 1) 其他线程调用了unpark()来唤醒该线程
    // 2)其他线程中断了当前线程
    // 3)莫名其妙被唤醒
    // 返回的时候并不会返回被唤醒的原因,所以还是需要去检查中断状态
    private final boolean parkAndCheckInterrupt() {
        // 不参与线程调度了,即等待
        LockSupport.park(this);
        // 返回当前线程是否被中断了
        return Thread.interrupted();
    }

4.4 ReentrantLock释放锁

public class ReentrantLock implements Lock, java.io.Serializable {
    private final Sync sync;
    
    public void unlock() {
        // 解锁,直接调用AQS定义的释放锁主流程
        sync.release(1);
    }
}

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer 
    implements java.io.Serializable {
   // 这里是final,定义了释放的主流程
   public final boolean release(int arg) {
        // 调用同步器自定义的释放锁方法,在完全释放掉独占锁后,这时后继线程就可以获取到独占锁了
        if (tryRelease(arg)) {
            // 如果释放成功了,拿到头节点
            Node h = head;
            // head的三种状态
            // head == null 但是head什么情况下头节点为空呢? 第一个线程进来不需要排队,直接获取到了锁,此时head = null
            // 线程在获取锁后,通过setHead设为了当前节点,并清除了thread信息,但是waitStatus没有变化
            // 由于已经通过tryRelease释放了锁,有新的线程获取到了锁并设置了head
            
            // 如果头节点不为空且头节点的waitStatus不为0,代表有后继节点在等待,唤醒后继节点
            // head节点的状态不可能是CANCELLED,这里等价 < 0
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
    // 定义了设置state的final方法
    protected final void setState(int newState) {
        state = newState;
    }
}

abstract static class Sync extends AbstractQueuedSynchronizer {
    // 如果当前锁完全释放了,任何的等待线程可以尝试获取,会返回true
    // ReentrantLock定义的同步器锁释放方法,这里会传入1,因为非共享
    protected final boolean tryRelease(int releases) {
        // 将AQS的锁状态state-releases,拿到释放后的状态,即0
        int c = getState() - releases;
        // 仅当当前线程是持有锁的线程时,才能释放锁
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        
        // 锁是否是空闲的
        boolean free = false;
        // 仅当c=0时才代表锁被释放了,并将当前持有锁线程设置为null
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }
}


原文地址:https://www.cnblogs.com/zerodsLearnJava/p/12744370.html