ReentrantReadWriteLock详解

ReentrantReadWriteLock详解

简介

特点:

  • ReentrantReadWriteLock允许多个读线程同时访问,不允许写线程和读线程,写线程和写线程同时访问.
  • 一般情况下,共享数据的读操作远多于写操作,比ReentrantLock提供更好的并发性和吞吐量.
  • 读写锁内部维护两个锁:
    • 读操作
    • 写操作
      必须保证获取读锁的线程可以看到写入锁之前版本所做的更新.

功能:

  • 支持公平和非公平的获取锁的方式.
  • 支持可重入.读线程在获取读锁后还可以获得读锁.写线程获取写锁后可以再次获得写锁或者读锁.
  • 允许从写入锁降级为读取锁:先获取写锁,再获取读锁,最后释放写锁.不允许从读锁升级到写锁.
  • 读锁和写锁都支持锁获取期间的中断.
  • Condition支持.写入锁提供一个Condition实现,读取锁不支持Condition(抛出UnsupportedOperationException).

使用示例

在更新缓存后进行锁降级操作

class CachedData{
    Object data; // 缓存的数据
    volatile boolean cacheValid; // 缓存有效性标识
    final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

    void processCachedData(){
        rwl.readLock().lock(); // 先获取读锁
        if(!cacheValid){ // 若缓存过期,释放读锁,获取写锁
            rwl.readLock().unlock();
            rwl.writeLock().lock();
            try{
                if(!cacheValid){ // 对缓存进行更新
                    // 数据更新操作
                    cacheValid = true;
                }
                rwl.readLock().lock(); // 获得读锁
            } finally{
                rwl.writeLock().unlock(); // 释放写锁
            }
        }
        try{
            // 使用缓存数据
        } finally{
            rwl.readLock().unlock(); // 获取读锁
        }
    }
}

用来提高并发性能

在使用集合的情况下,当集合很大,并且读线程远多于写线程时.

class RWDictoary{
    private final Map<String,Data> m = new TreeMap<String,Data>();
    private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
    private final Lock r = rwl.readLock();
    private final Lock w = rwl.writeLock();

    public Data get(String key){
        r.lock();
        try{ return m.get(key);}
        finally{r.unlock();}
    }

    public String[] allKeys(){
        r.lock();
        try{ return m.keySet().toArray();}
        finally { r.unlock();}
    }

    public Data put(String key, Data value){
        w.lock();
        try{return m.put(key,value);}
        finally{w.unlock();}
    }

    public void clear(){
        w.lock();
        try{m.clear();}
        finally{w.unlock();}
    }
}

实现原理

  • 基于AQS实现.
  • 自定义同步器使用同步状态(整型变量state)维护多个读线程和一个写线程的状态.
  • 使用按位切割使用状态变量:state的高16位表示,低16位表示写.

包含两把锁:

  • readerLock:读锁
  • writerLock:写锁

写锁的获取与释放

获取

// ReentrantReadWriteLock中的内部类中的lock()
public void lock() {
    sync.acquire(1);
}

// AbstractQueuedSynchronizer中的acquire()
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

// ReentrantReadWriteLock中内部类Sync的tryAcquire()
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

流程:

  1. 若当前有线程获取读锁或写锁,则检查:
    1. 若没有线程获取写锁(即:有线程已获取读锁)或者当前线程非独占写锁 == > 获取写锁失败
    2. 否则检查获取写锁后写锁总量有没有超过限定,超过抛出异常.
    3. 若没有超过,则更新锁的状态,获取锁成功.
  2. 若当前没有线程获取读锁或写锁,则:
    1. 检查写线程是否需要阻塞,若要阻塞,则阻塞写线程,获取写锁失败.
    2. 否则将当前线程设为独占写锁 == > 获取写锁成功.

获取写锁:
Write lock

注:
对于writeShouldBlock()表示是否需要阻塞,NofairSyncFairSync的实现不同:

  • NofairSync:直接返回false,不需要阻塞,可以插队.
  • FairSync:若有前驱节点,则返回true,需要阻塞,否则返回false.

释放

// ReentrantReadWriteLock中WriteLock的unlock()
public void unlock() {
    sync.release(1);
}

// AbstractQueuedSynchronizer中的release()
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// ReentrantReadWriteLock中的内部类Sync中的tryRelease()
protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}

// ReentrantReadWriteLock中内部类Sync中的unparkSuccessor()
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

流程:
尝试释放锁:

  1. 释放成功 == > 唤醒后继线程 == > 释放锁成功
  2. 释放失败

尝试释放锁:

  1. 当前线程是否为独占线程:
    1. 否 == > 释放失败
    2. 是 == > 检查队列状态state:
  2. 队列状态为0:
    1. 为0(无线程持有锁) == > 设独占线程为null == > 释放成功
    2. 非0 == > 释放成功

唤醒后继线程:

  1. head线程是否可运行
    • 可运行 == > 设为待运行
  2. head线程的后继节点为null或被取消请求:
    • 从后先前寻找最靠前的非null节点,作为后继节点.
  3. 若后继节点非null,则唤醒该节点

释放写锁:
Write unlock


读锁的获取和释放

获取

// ReentrantReadWriteLock中内部类ReadLock中的lock()
public void lock() {
    sync.acquireShared(1);
}

// AQS的acquireShared()
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

// ReentrantReadWriteLock的内部类Sync的tryAcquireShared()
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}

final int fullTryAcquireShared(Thread current) {
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                        rh = readHolds.get();
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
                if (rh.count == 0)
                    return -1;
            }
        }
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

// AQS的doAcquireShared()
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

流程:
首先试图获取读锁:

  1. 若存在写锁并且当前线程为持有写锁 == > 获取读锁失败
  2. 若当前线程无需阻塞并且持有的读锁小于最大值并且CAS更新状态:
    1. 若为第一个读线程,设置其为firstReader.
    2. 若当前线程已获取读锁,则更新线程状态(+1).
    3. 若非第一个读线程并且首次获取读锁,则更新读线程缓存和线程状态.
  3. 若获取锁失败,则:循环上述步骤尝试获取读锁。

若读锁获取失败,则:

  1. 检查前驱节点是否是head,若不是则,阻塞自己,等待唤醒后重试。若被中断,则取消请求锁。
  2. 若前驱节点是head,则尝试获取锁。若获取成功,则设置下一节点为待处理的线程。若获取失败,则循环重试。

释放

// ReentrantReadWriteLock的内部类ReadLock中的unlock()
public void unlock() {
    sync.releaseShared(1);
}

// AQS中的releaseShared()
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

// ReentrantReadWriteLock的内部类Sync中的tryReleaseShared()
protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // 释放读锁对于读线程没有影响,但是若读写锁均被释放,则写线程可以得到处理
            return nextc == 0;
    }
}

// AQS的doReleaseShared()
private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

流程:

  1. 若当前线程为第一个读线程,则
    1. 若读线程数为1,则设置第一个读线程为null
    2. 否则第一个读线程数减一
  2. 若非第一个读线程,则更新缓存中的信息
  3. 循环尝试:更新锁状态state,更新成功则返回。

锁升降级

锁降级:写锁降级为读锁。

流程:线程持有写锁的同时,获取到读锁,获取成功后释放写锁。

必要性:为了保证数据的可见性。若当前线程不获取读锁而是直接释放写锁,则其他线程获取写锁并修改数据时,当前线程无法感知到数据的修改。而使用锁降级,则其他线程会被阻塞,直到当前线程已经获取读锁后才能有可能获得写锁。

注:ReentrantReadWriteLock不支持锁升级(读锁==>写锁),为了保证数据的可见性。

参考:

原文地址:https://www.cnblogs.com/truestoriesavici01/p/13235964.html