Semaphore信号量深度解析

1. 使用指南

package com.multthread;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class SemaphoreTest {
    //由于信号量是计数器递增,初始值可以随便设置
    static volatile Semaphore sh = new Semaphore(2);

    public static void main(String[] args) throws InterruptedException {

        ExecutorService es = Executors.newFixedThreadPool(2);
        // 将任务A加入线程池
        es.submit(()->{
            try {
                System.out.println("t1...");
                sh.release();
            }catch (Exception e){}
        });
        // 将任务B加入线程池
        es.submit(()->{
            try {
                System.out.println("t2...");
                sh.release();
            }catch (Exception e){}
        });

        // 等待子线程执行完release方法返回,注意这里release可以是同一个线程执行,只要调用了两次就行
        // 此函数入参=当初始信号计数+调用次数时,才会放行,同时将计数器state重置为0
        sh.acquire(4);
        
        // 将任务C加入到线程池
        es.submit(()->{
            try {
                Thread.sleep(100);
                System.out.println("t1...");
                sh.release();
            }catch (Exception e){}
        });

        // 将任务D加入到线程池
        es.submit(()->{
            try {
                System.out.println("t2...");
                sh.release();
            }catch (Exception e){}
        });
        //由于state被重置为0了,所有所以这里入参写调用次数
        sh.acquire(2);
        System.out.println("main.....");
        es.shutdown();
    }
}

2.

  基于AQS实现,与CountDownLatch不同的是,Semaphore内部的计数器是递增的。初始化的时候可以执行一个计数器的值,但是需要在需要同步的地方调用acquire方法执行需要同步的线程数。并且,内部的AQS实现(sync)获取信号量有公平策略和非公平策略之分。

3. 源码分析

  • 构造函数
// 构造函数,默认采用非公平策略
public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }
  • release函数
public void release() {
        sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
     // 尝试释放资源
if (tryReleaseShared(arg)) {
       // 资源释放成功则调用park方法唤醒aqs队列里面最先挂起的线程 doReleaseShared();
return true; } return false; } protected final boolean tryReleaseShared(int releases) {
       // CAS循环修改state值,直到修改成功
for (;;) {
          // 获取当前的信号量值
int current = getState();
          // 信号量值加releases,即+1
int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded");
          // 使用CAS更新state的值 if (compareAndSetState(current, next)) return true; } } // 释放资源完毕,调用唤醒挂起线程 private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
  • acquire方法
public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
}

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
     // 尝试获取
        if (tryAcquireShared(arg) < 0)
       // 如果获取失败则加入到阻塞队列,然后再次尝试,如果失败则调用park方法挂起当前线程 doAcquireSharedInterruptibly(arg); }
protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
原文地址:https://www.cnblogs.com/zjting/p/12830629.html