JUC锁框架源码阅读-ReentrantReadWriteLock

说明

我们都知道ArrayList不是线程安全的,在读的时候同时并发在写,在写的时候同时在读,会出现索引越界问题,解决这个问题醉简单的方式在写和读的地方都加上锁。都加上锁的话并发读也会产生互斥

但是为了性能 读的频繁写的时候比较少。我们都会允许读读共享,读写互斥,写写互斥,ReentrantReadWriteLock 就是读写锁 读读共享,读写互斥,写写互

为什么ArrayList线程不安全可以参考《ArrayList在多线程的线程不安全的几种体现》

ReentrantReadWriteLock 是基于AQS实现的 AQS源码阅读参考《JUC锁框架源码阅读-AbstractQueuedSynchronizer》

关于位运算可以参考:《java位运算符,&,|,>>>,>>,<<,<<<的区别》

初始化

main

 //<1>
 ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

1.通过位运算符& 通过AQS state&EXCLUSIVE_MASK 进行运算,如果大于0则表示写锁以及被持有,则获取读锁(AQS共享锁)失败 返回-1加入CLH队列

2.如果小于0则表示没有写锁。读读共享(共享锁可以多线程持有)则尝试获取,通过state从高位右移可以获取到写锁数量。

3.先因为int是32位 16位是临界,则65536是最大支持个线程可获取读锁数量(一般够用了),需要校验是否超过65536如果超过了直接获取失败 加入CLH队列

4.没有超过如果限制没有任何线程获取读锁,则直接CAS尝试获取

5.获取失败则自旋获取,每次获取都需要判断是否写锁被持有了,自旋直到获取成功或者因为写锁被持有,或者超出最大可获取数量返回对应结果(注获取失败都是返回-1 AQS将阻塞线程并加入CLH队列)

6.获取成功 因为可重入将更新ThreadLocal的HoldCounter count+1 

<1>

    public ReentrantReadWriteLock() {
        this(false);
    }
    public ReentrantReadWriteLock(boolean fair) {
        //可以sync是ReentrantReadWriteLock 的2个内部类。可以发现ReentrantReadWriteLock 也是支持公平和非公平的
        sync = fair ? new ReentrantReadWriteLock.FairSync() : new ReentrantReadWriteLock.NonfairSync();
        //读锁也是内部内 内部保存了ReentrantReadWriteLock的引用 意味着可以拿到sync
        readerLock = new ReentrantReadWriteLock.ReadLock(this);
        //写锁也是内部内 内部保存了ReentrantReadWriteLock的引用 意味着可以拿到sync
        writerLock = new ReentrantReadWriteLock.WriteLock(this);
    }

Sync

ReentrantReadWriteLock 的公平锁和非公平锁都继承了Sync

继续看类结构 他们只重写了部分方法 主要是为了实现公平和非公平锁,具体加锁都在父类Sync

 /**
     * Nonfair version of Sync
     */
    static final class NonfairSync extends ReentrantReadWriteLock.Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        //重写了writerShouldBlock方法
        final boolean writerShouldBlock() {
            return false; // writers can always barge
        }
        final boolean readerShouldBlock() {
            return apparentlyFirstQueuedIsExclusive();
        }
    }

    /**
     * Fair version of Sync
     */
    static final class FairSync extends ReentrantReadWriteLock.Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        //重写了writerShouldBlock
        final boolean writerShouldBlock() {
            return hasQueuedPredecessors();
        }
        //重写了readerShouldBlock
        final boolean readerShouldBlock() {
            return hasQueuedPredecessors();
        }
    }
View Code

ReentrantReadWriteLock 是通过将AQS state int 32转2进制通过高16位表示读锁 低16位表示写锁

 

    //因为int 是32位,通过16将他们一分为二 高位加读锁 低位加写锁
    static final int SHARED_SHIFT   = 16;
    //1转为二进制向左边移动16位
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    //1转为二进制向左边移动16位 -1表示高的最小值
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    //1转为二进制向左边移动16位 -1表示低的最大值
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    //将state向右移动16位 则可以得到持有锁的数量 我的理解 就是高位的移动到后面当成16进制处理
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

    /**
     * 位运算 将&左右2边转为二进制 同时为1 则为1否则为0 得出写锁数量
     * 00000000 00000001 00000000 00000000  高位读锁与EXCLUSIVE_MASK计算后 =0
     * 00000000 00000001 00000000 00000001  低位写锁与EXCLUSIVE_MASK计算后  =1
     * 00000000 00000000 11111111 11111111 EXCLUSIVE_MASK 
     * @param c
     * @return
     */
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

HoldCounter

    //Sync内部类 锁是可重入的,保存每个线程持有数量
    static final class HoldCounter {
        //计数器
        int count = 0;
        //线程id
        // Use id, not reference, to avoid garbage retention
        final long tid = getThreadId(Thread.currentThread());
    }

    /**
     *Sync内部类 重新了ThreadLocal的 initialValue方法,为ThreadLocal设置默认值
     */
    static final class ThreadLocalHoldCounter
            extends ThreadLocal<ReentrantReadWriteLock.Sync.HoldCounter> {
        public ReentrantReadWriteLock.Sync.HoldCounter initialValue() {
            return new ReentrantReadWriteLock.Sync.HoldCounter();
        }
    }
    //sync成员变量
    private transient ReentrantReadWriteLock.Sync.ThreadLocalHoldCounter readHolds;

读锁加锁

main

 public static void main(String[] args)  {
        //<L>初始化
        ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        //获得读锁 内部return readLock
        Lock rlock = readWriteLock.readLock();
        //<1>加锁
        rlock.lock();
    }

<1>lock

public void lock() {
        //<2>调用的继承父类的AQS方法
        sync.acquireShared(1);
    }

<2>acquireShared

可以参考:《AQS源码阅读》

// 获取共享锁
    public final void acquireShared(int arg) {
        //<3>模板模式 抽象方法 由子类实现 尝试去获取共享锁,如果返回值小于0表示获取共享锁失败 这里调用的是Snyc
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

<3>tryAcquireShared

java.util.concurrent.locks.ReentrantReadWriteLock.Sync#tryAcquireShared

 protected final int tryAcquireShared(int unused) {
        //获得当前线程
        Thread current = Thread.currentThread();
        //获得AQS状态
        int c = getState();
        /**
         * exclusiveCount判断是否已经持有了写锁
         * getExclusiveOwnerThread 获得持有写锁的线程,如果不是当前线程持有直接返回-1 获取读锁失败 这里说明 同一个线程加了写锁是可以继续获取读锁的
         */
        if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
            return -1;
        //获取获取读锁的数量
        int r = sharedCount(c);
        /**
         * readerShouldBlock 公平锁的实现 NonfairSync FairSync都重新了此方法
         * FairSync(公平锁):直接如果队列不为空就返回false
         * NonfairSync(非公平锁)遍历CLH队列如果已经在队列中表示轮到他获取锁了返回true否则返回false 需要重新加入CLH队列
         * r < MAX_COUNT是否超了 读锁的获取临界,因为state划分了高位和低位所以不能超过最大值 1<<16-1=65536 所以同时支持65536 获取这么多度锁。。理论上够用了
         * compareAndSetState(c, c + SHARED_UNIT)
         */
        if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
            //没有持有任何锁的情况
            if (r == 0) {
                //记录第一个获读锁的线程
                firstReader = current;
                //第一个获取获取读锁线程+1
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                //如果当前线程是第一获取读锁的线程 计数器加1
                firstReaderHoldCount++;
            } else {
                //获取最近一个获取成功获取读锁的计数器 这省却了ThreadLocal查找,
                ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter;
                //判断是否是当前线程的 如果不是则从ThreadLocal查找 并设置到最近一个获取
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                //计数器+1
                rh.count++;
            }
            return 1;
        }
        //<4>并发获取锁的情况 尝试获取读锁失败,自旋重试。
        return fullTryAcquireShared(current);
    }

<4>fullTryAcquireShared

 final int fullTryAcquireShared(Thread current) {
        ReentrantReadWriteLock.Sync.HoldCounter rh = null;
        //自旋
        for (;;) {
            //获取AQS 锁状态
            int c = getState();
            /**
             * exclusiveCount判断是否已经持有了写锁
             * getExclusiveOwnerThread 获得持有写锁的线程,如果不是当前线程持有直接返回-1 获取读锁失败 这里说明 同一个线程加了写锁是可以继续获取读锁的
             */
            if (exclusiveCount(c) != 0) {
                //如果写锁以及被获取 同时不是当前线程直接返回-1 不自旋
                if (getExclusiveOwnerThread() != current)
                    return -1;
                /**
                 *          * readerShouldBlock 公平锁的实现 NonfairSync FairSync都重新了此方法
                 *          * FairSync(公平锁):直接如果队列不为空就返回false
                 *          * NonfairSync(非公平锁)遍历CLH队列如果已经在队列中表示轮到他获取锁了返回true否则返回false 需要重新加入CLH队列
                 */
            } else if (readerShouldBlock()) {
                // 第一个获取读锁的是当前线程不做处理
                if (firstReader == current) {
                    // assert firstReaderHoldCount > 0;
                } else {
                    //更新锁计数(可重入的体现)
                    if (rh == null) {
                        //获取最近一个获取读锁线程的Counter
                        rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current)) {
                            //如果不是最近的从ThreadLocal获取
                            rh = readHolds.get();
                            if (rh.count == 0)
                                //如果当前线程的持有读锁数为0,那么就没必要使用计数器,直接移除
                                readHolds.remove();
                        }
                    }
                    if (rh.count == 0)
                        //结束自旋
                        return -1;
                }
            }
            //超过最大可获取数量
            if (sharedCount(c) == MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            //自旋获取共享锁 获取成功Countter++
            if (compareAndSetState(c, c + SHARED_UNIT)) {
                if (sharedCount(c) == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    if (rh == null)
                        rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                    cachedHoldCounter = rh; // cache for release
                }
                return 1;
            }
        }
    }

读锁释放锁

1.每次释放通过HoldCounter 的count -1直到减为0则返回AQS父类释放成功,将AQS的state -1,并将计数器从ThreadLcal清除

2.AQS则会唤醒CLH阻塞队列重新尝试重新获取锁

main

   public static void main(String[] args)  {
        //<L>初始化
        ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        //获得读锁 内部return readLock
        Lock rlock = readWriteLock.readLock();
        rlock.lock();
        //<1>
        rlock.unlock();

    }

<1>unlock

   public void unlock() {
        //<2>调用Sync继承AQC方法
        sync.release(1);
    }

<2>releaseShared

可以参考《AQS源码》

// 释放共享锁
    public final boolean releaseShared(int arg) {
        //<3>模板模式 抽象方法尝试释放共享锁
        if (tryReleaseShared(arg)) {
            // 唤醒等待共享锁的线程
            doReleaseShared();
            return true;
        }
        return false;
    }

<3>tryReleaseShared

 protected final boolean tryReleaseShared(int unused) {
        Thread current = Thread.currentThread();
        //是不第一个获取锁
        if (firstReader == current) {
            //可重入的特点 直接置空
            if (firstReaderHoldCount == 1)
                firstReader = null;
            else
                firstReaderHoldCount--;//计数器-1
        } else {
            //判断是不是最近获取读锁的线程
            ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                rh = readHolds.get();//如果不是 从ThreadLocal获取
            int count = rh.count;
            //可重入特点 判断数量 觉得
            if (count <= 1) {
                readHolds.remove();
                if (count <= 0)
                    throw unmatchedUnlockException();
            }
            --rh.count;
        }
        //自旋释放释放锁
        for (;;) {
            int c = getState();
            int nextc = c - SHARED_UNIT;
            if (compareAndSetState(c, nextc))
                // Releasing the read lock has no effect on readers,
                // but it may allow waiting writers to proceed if
                // both read and write locks are now free.
                return nextc == 0;
        }
    }

写锁加锁

main

 //<L>初始化
        ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        //获得读锁 内部return readLock
        Lock writeLock = readWriteLock.writeLock();
        //<1>加锁
        writeLock.lock();

1.如果AQS state不等于0,表示有可能获取的写锁或者读锁,根据位运算符&获取写锁数量,如果数量=0那么表示是读锁。直接获取失败(加入CLH队列阻塞)

2.如果获取写锁数量大于0 根据getExclusiveOwnerThread() 判断是否不是当前线程持有,如果是的话直接获取成功(可重入) state+1并通过CAS修改

<1>lock

public void lock() {
        //<2>调用继承自AQS加锁方法
        sync.acquire(1);
    }

<2>acquire

可以参考《AQS源码阅读》

public final void acquire(int arg) {
        /**
         *<3>tryAcquire  模板模式 是抽象方法由子类实现获取锁步骤
         *addWaiter   如果加锁失败 创建节点并放入队列尾
         *<acquireQueued 通过新创建的节点 判断是重试获取锁。还是阻塞线程
         */
        if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(node.EXCLUSIVE), arg))
            selfInterrupt();
    }

<3>tryAcquire

 protected final boolean tryAcquire(int acquires) {
        Thread current = Thread.currentThread();
        //获得AQS锁状态
        int c = getState();
        //获得写锁数量
        int w = exclusiveCount(c);
        //>0表示写锁或者读锁被获取
        if (c != 0) {
            //==0表示读锁被持有 直接返回false 加入CLH队列 如果是写锁判断是是否是当前线程持有(可重入)
            if (w == 0 || current != getExclusiveOwnerThread())
                return false;
            //判断是否有超过可获取写锁数量
            if (w + exclusiveCount(acquires) > MAX_COUNT)
                throw new Error("Maximum lock count exceeded");
            //则state++ 可重入
            setState(c + acquires);
            return true;
        }
        /**
         * writerShouldBlock 公平锁的实现 NonfairSync FairSync都重新了此方法
         * FairSync(公平锁): 不在队列返回true 在队列返回false
         * NonfairSync(非公平锁)直接返回false 尝试直接获取锁
         * r < MAX_COUNT是否超了 读锁的获取临界,因为state划分了高位和低位所以不能超过最大值 1<<16-1=65536 所以同时支持65536 获取这么多度锁。。理论上够用了
         * compareAndSetState(c, c + SHARED_UNIT)
         */
        if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
            return false;
        //设置当前持有锁线程
        setExclusiveOwnerThread(current);
        return true;
    }

写锁释放锁

main 

  public static void main(String[] args)  {
        //<L>初始化
        ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
        //获得读锁 内部return readLock
        Lock writeLock = readWriteLock.writeLock();
        //<1>释放锁
        writeLock.unlock();

    }

1.getExclusiveOwnerThread() == Thread.currentThread() 判断是否是当前线程持有锁 如不过不是 直接抛错

2.AQS sate-- 更新到AQS state 如果大于0返回释放失败 如果==0  清空exclusiveOwnerThread 返回释放成功 后续交给AQS处理 唤醒CLH等待线程重新获取锁

<1>unlock

    public void unlock() {
        //<2>调用的继承AQS的共享锁释放方法
        sync.release(1);
    }

<2>relase

可以参考 《AQS源码阅读》

// 在独占锁模式下,释放锁的操作
    public final boolean release(int arg) {
        // <3>调用tryRelease子类方法,尝试去释放锁,由子类具体实现
        if (tryRelease(arg)) {
            Node h = head;
            // <如果队列头节点的状态不是0,那么队列中就可能存在需要唤醒的等待节点。
            // 还记得我们在acquireQueued(final Node node, int arg)获取锁的方法中,如果节点node没有获取到锁,
            // 那么我们会将节点node的前一个节点状态设置为Node.SIGNAL,然后调用parkAndCheckInterrupt方法
            // 将节点node所在线程阻塞。
            // 在这里就是通过unparkSuccessor方法,进而调用LockSupport.unpark(s.thread)方法,唤醒被阻塞的线程
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

<3>tryRelease

 protected final boolean tryRelease(int releases) {
        //判断是不是当前线程持有getExclusiveOwnerThread() == Thread.currentThread(); 如果不是则抛异常 避免别的线程调用此方法释放了锁
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        //获取状态--
        int nextc = getState() - releases;
        //判断是否释放完了(可重入) 如果调用2次lock 则需要调用2次unlock才会真正释放锁
        boolean free = exclusiveCount(nextc) == 0;
        if (free)
            setExclusiveOwnerThread(null);//置空持有线程
        //设置新的锁状态 这里不用CAS是 读锁 只会有有一个线程持有
        setState(nextc);
        return free;
    }
原文地址:https://www.cnblogs.com/LQBlog/p/15219124.html