ReentrantReadWriteLock实现原理

  在java并发包java.util.concurrent中,除了重入锁ReentrantLock外,读写锁ReentrantReadWriteLock也很常用。在实际开发场景中,在使用共享资源时,可能读操作远远多于写操作。这种情况下,如果对这部分共享资源能够让多个线程读的时候不受阻塞,仅仅在写的时候保证安全性,这样效率会得到显著提升。读写锁ReentrantReadWriteLock便适用于这种场景。

  再描述一下进入读锁和写锁的条件。

  进入读锁: 

      1.没有其他线程的写锁

      2.有写请求且请求线程就是持有锁的线程

  进入写锁:

      1.没有其他线程读锁

      2.没有其他线程写锁

  本篇从源码方面,简要分析ReentrantReadWriteLock的实现原理,以及展示一下它的使用效果。

源码

  这是ReentrantReadWriteLock维护的一对锁

/** Inner class providing readlock */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** Inner class providing writelock */
    private final ReentrantReadWriteLock.WriteLock writerLock;

  

  ReentrantReadWriteLock的构造器中,同时实例化读写锁,同时与ReentrantLock相同,也有公平锁和非公平锁之分

public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

写锁

  获取锁

public void lock() {
            sync.acquire(1);
        }
//这里与ReentrantLock相同
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

protected final boolean tryAcquire(int acquires) {
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

  这里解析tryAcquire()方法。

  • 获取当前线程
  • 获取状态
  • 获取写线程数
  • 若state不为0,表示锁已被持有。再判断,如果写线程数为0,则读锁被占用,返回false;如果写线程数不为0,且独占线程不是当前线程,表示写锁被其他线程占用没返回false
  • 如果写锁重入数大于最大值MAX_COUNT,抛错
  • 写锁重入,返回true
  • state为0,根据公平锁还是非公平锁判断是否阻塞线程。不需要阻塞就CAS更新state
  • 当前线程设为独占线程,获取写锁,返回true

  释放锁

public void unlock() {
            sync.release(1);
        }

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }

  分析tryRelease()方法

  • 判断持有写锁的线程是否当前线程,不是则抛错
  • state减1
  • 以新state计算写锁数量,如果为0,表示完全释放;
  • 完全释放就设置独占线程为null
  • 如果独占线程数量不是0,还是更新state,这里就表示多次重入写锁后,释放了一次

读锁

  获取锁

public void lock() {
            sync.acquireShared(1);
        }

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

protected final int tryAcquireShared(int unused) {
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

  这里分析tryAcquireShared()方法

  • 获取当前线程
  • 获取state
  • 如果写锁数量不为0,且独占线程不是本线程,获得读锁失败。因为写锁被其他线程占用
  • 获取读锁数量
  • 根据公平锁或者非公平锁判断是否应该被阻塞,判断读锁数量是否小于最大值MAX_COUNT,再尝试CAS更新state
  • 以上判断都通过且更新state也成功后,如果读锁为0,记录第一个读线程和此线程占用读锁数量
  • 如果第一个读线程是本线程,表示此时是读锁的重入,则把此线程占用读锁数量+1
  • 如果读锁数量不为0,且此线程也不是第一个读线程,则找到当前线程的计数器,并计数+1
  • 如果在阻塞判断,读锁数量判断和CAS更新是否成功这部分没有通过,则进入fullTryAcquireShared()方法,逻辑与上面的获取类似,以无限循环方式保证操作成功,不赘述。

释放锁

  

public void unlock() {
            sync.releaseShared(1);
        }
public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

  分析tryReleaseShared()方法

  • 获取当前线程
  • 如果当前线程是第一个读线程,则释放firstReader或者第一个读线程的锁计数-1
  • 不是就获得当前线程的计数器。根据计数选择删除此计数器或者减少计数
  • 无限循环更新state  

获取锁和释放锁的源码部分代码就分析放到这里,接下来用代码时间看看ReentrantReadWriteLock的使用效果测试。

public class ReadWriteLockTest {
    private static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    private static ExecutorService executorService = Executors.newCachedThreadPool();
    //读操作
    public static void read(){
        try {
       //加读锁 readWriteLock.readLock().
lock(); System.out.println(Thread.currentThread().getName() + " is reading " + System.currentTimeMillis()); Thread.sleep(1000); } catch (InterruptedException e){ }finally { readWriteLock.readLock().unlock(); } } //写操作 public static void write() { try {
       //加写锁 readWriteLock.writeLock().
lock(); System.out.println(Thread.currentThread().getName() + " is writing "+ System.currentTimeMillis()); Thread.sleep(1000); } catch (InterruptedException e){ }finally { readWriteLock.writeLock().unlock(); } } public static void main(String[] args) { for (int i = 0; i < 3; i++) { executorService.execute(new Runnable() { @Override public void run() { ReadWriteLockTest.read(); } }); } for (int i = 0; i < 3; i++) { executorService.execute(new Runnable() { @Override public void run() { ReadWriteLockTest.write(); } }); } } }

  执行结果如下:

pool-1-thread-2 is reading 1549002279198
pool-1-thread-1 is reading 1549002279198
pool-1-thread-3 is reading 1549002279198
pool-1-thread-4 is writing 1549002280208
pool-1-thread-5 is writing 1549002281214
pool-1-thread-6 is writing 1549002282224

  可以看到,thread1,2,3在读时,是同时执行。thread4,5,6在写操作是,都差不多间隔1000毫秒。

原文地址:https://www.cnblogs.com/sunshine-ground-poems/p/10345723.html