Semaphore 源码解读

之前一篇ReentrantLock的文章如果看过,并且对AQS的代码比较熟知的话,Semaphore的代码阅读起来就相对会轻松很多,如果不熟知的话,可以参考那篇文章或者自行学习下AQS的代码。

这里我们来分析下Semaphore的源码。

使用代码举例:

最早接触Semaphore是在前边翻阅Flume的代码的时候,看到的,后来借鉴这个做了一个业务上的提交通道。挺好用的就是 Semaphore + LinkedBlockingDeque实现的一个常用消费者。

大致代码:

1):初始化 许可资源池及存储队列

    private void init(){
        if(this.capacity == 0 || this.capacity == null){
            capacity = defaultCapacity;
        }
        
        queue = new LinkedBlockingDeque<T>(capacity);
                queueStored = new Semaphore(0);
        
    }

2):往队列里塞进资源,每塞进一个资源就调用queueStore.release()增加一个许可

public boolean put(T obj) throws Exception{
        if (!queue.offer(obj)) {
            throw new ChannelException(
                    "Put queue for Memory of capacity " + queue.size() + " full, consider committing more frequently, "
                            + "increasing capacity or increasing thread count");
        }
        queueStored.release();
        return false;
    }

3):从队列中消费资源,获取相应量的许可成功,才能获取成功,否则线程失败阻塞

public List<T> take(int n) throws InterruptedException{
        if(!queueStored.tryAcquire(n, defaultKeepAlive, TimeUnit.MILLISECONDS)){
            return null;
        }
        List<T> ret = new ArrayList<T>();
        int j = n;
        synchronized (queueLock) {
            while(j>0){
                ret.add(queue.poll());
                j--;
            }
        }
        return ret;
    }

以上例子,是一个常用的使用Semaphore的场景。

Semaphore整理上思路可以理解为一个资源池,资源池中有相应数量的资源。这样当多线程争抢资源时,就可以通过Semaphore来限定资源数量或者说明当前资源多少,来进行资源争抢,那么问题来了。。。

1:为毛不能用volatile的int变量直接说明资源数量就可以了,而是搞一个这个呢?

事实上,看过代码就知道,实际的资源数量确实是通过volatile 内存可见的整型数据来表示资源数量的,但是Semaphore不仅仅是用来表示资源数量的,它是一个管理资源及相关争抢线程的数据结构及处理逻辑的能力者,比如如果只是一个变量来表明大小,那么线程争抢的时候,争抢不到该怎么办,是不是要客户线程要自己来处理继续争抢还是怎么样,如果没有资源了呢,是不是又要自己来处理继续等待还是放弃,这些事情就是Semaphore为我们提供的内容。

OK,解除了上边的疑问我们就来继续翻代码。

由于采用的是AQS实现,所以来认识下AQS中的资源表示

  /**
     * The synchronization state.
   *
用来表示资源数量的就是state属性,类型当然是内存可见型:
   */
private volatile int state;
/**
     * Acquires in shared timed mode.
     *
     * @param arg the acquire argument
     * @param nanosTimeout max wait time
     * @return {@code true} if acquired
     */
    private boolean doAcquireSharedNanos(int arg, long nanosTimeout)

构造器:

    public Semaphore(int permits) {
// 这里也区分公平锁、非公平锁,默认是非公平 sync
= new NonfairSync(permits); }

请求资源:

我们以例子中的用法来看

public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
// 请求资源数当然不能小于0
if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); }

接上边的分析,这里的sync是非公平锁

public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
// 进入核心代码,同样是常见的模板模式
// 尝试获取要求数量的许可,tryAcquireShared(arg)的返回值,是资源减去请求arg数量后的剩余量,如果剩余量大于0,则表示成功获取指定数量的许可
// 如果失败,则调用doAcquireSharedNanos(arg, nanosTimeout)方法,我们先看tryAcquireShared(arg)方法
return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }

tryAcquireShared(arg) 模板方法的实现在,sync的实际类型NonfairSync中


/**
* NonFair version
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);
}
// 非公平Sync实现的模板方法
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }
        // 调用父类的方法实现
        final int nonfairTryAcquireShared(int acquires) {
// 这里比较简答,尝试获取资源,那么就去减少资源的数量
for (;;) { int available = getState(); int remaining = available - acquires;
// 如果remaining的值小于0,则表示当前资源数量小于需求量,那么直接返回负数值,如果不是小于0,则表示有可操作资源,则争抢资源
// 只要有满足数量的资源,就在这里循环争抢直到成功,反之直接返回负值,表示没有足够许可使用
if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
     ....


 

前文在模板方法

tryAcquireSharedNanos 中,如果获取资源成功,那么直接返回,如果失败则执行
doAcquireSharedNanos(arg, nanosTimeout),我们来看:

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
// 添加到等待者, 将代表当前线程的Node加入队列中
final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

我们继续模板方法中的代码

释放资源:

大于0,并且争抢成功    =>    直接返回,不做其他操作

大于0,并且争抢失败    =>    阻塞,继续争抢

小于0                         =>    直接返回,并进入等待序列,让出占用cpu

再来一篇,Semaphore VS ReentrantLock。

原文地址:https://www.cnblogs.com/aquariusm/p/9675878.html