Semaphore类

1.简述

  Semaphore通常我们叫它信号量, 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源。

  Semaphore对于信号量的控制是基于AQS(AbstractQueuedSynchronizer)来做的。Semaphore有一个内部类Sync继承了AQS。而且Semaphore中还有两个内部类FairSync和NonfairSync继承Sync,也就是说Semaphore有公平锁和非公平锁之分。

  Semaphore优点

  • 可以控制线程的数量,不会超出线程范围。

  Semaphore缺点

  • 当线程死锁时,永远没法释放,导致一直阻塞。

  Semaphore使用场景

  • 流量控制,特别是公用资源有限的应用场景,比如数据库连接。
  • 限制接口调用大并发量,保护系统可用性。
  • 可以用来实现互斥锁的。

2.Semaphore的常用方法

/**构造方法
 */
//创建一个新的 Semaphore,默认创建一个非公平的锁的同步阻塞队列,把初始令牌数量赋值给同步队列的state状态,state的值就代表当前所剩余的令牌数量。
Semaphore(int permits)
//创建一个新的 Semaphore,提供指定是否为公平的,如果为true可以保证FIFO,把初始令牌数量赋值给同步队列的state状态,state的值就代表当前所剩余的令牌数量。
Semaphore(int permits, boolean fair)


/**常用方法
 */
//获取一个令牌,在获取到令牌、或者被其他线程调用中断之前线程一直处于阻塞状态。
void acquire()  
//获取一个令牌,在获取到令牌、或者被其他线程调用中断、或超时之前线程一直处于阻塞状态。
void acquire(int permits) 
//获取一个令牌,在获取到令牌之前线程一直处于阻塞状态(忽略中断)。
void acquireUninterruptibly()
//尝试获得令牌,返回获取令牌成功或失败,不阻塞线程。
boolean tryAcquire()
//尝试获得令牌,在超时时间内循环尝试获取,直到尝试获取成功或超时返回,不阻塞线程。​
boolean tryAcquire(long timeout, TimeUnit unit)
//释放一个令牌,唤醒一个获取令牌不成功的阻塞线程。
void release()
​//等待队列里是否还存在等待线程。
boolean hasQueuedThreads()
​//获取等待队列里阻塞的线程数。
int getQueueLength()
//清空令牌把可用令牌数置为0,返回清空令牌的数量。
int drainPermits()
//返回可用的令牌数量。
int availablePermits()
View Code

3.Semaphore的源码分析

  Semaphore内部也是基于AQS并发组件来实现的,提供了内部类Sync和FairSync(公平锁)、NofairSync(非公平锁)。

  Semaphore的主要属性

//AQS同步器的实现类
private final Sync sync;

/**内部类,继承自AQS
 */
abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;
    
    //构造函数
    Sync(int permits) {
        //设置状态数
        setState(permits);
    }

    //获取许可
    final int getPermits() {
        return getState();
    }

    //共享模式下非公平策略获取
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            //获取许可数
            int available = getState();
            //剩余的许可
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))//许可小于0或者比较并且设置状态成功
                return remaining;
        }
    }

    //共享模式下尝试释放许可
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            //获取许可
            int current = getState();
            //可用的许可
            int next = current + releases;
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))//比较并进行设置
                return true;
        }
    }

    //根据指定的缩减量减小可用许可的数目
    final void reducePermits(int reductions) {
        for (;;) {
            //获取许可
            int current = getState();
            //可用的许可
            int next = current - reductions;
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))//比较并进行设置成功
                return;
        }
    }

    //获取并返回立即可用的所有许可
    final int drainPermits() {
        for (;;) {
            //获取许可
            int current = getState();
            if (current == 0 || compareAndSetState(current, 0))//许可为0或者比较并设置
                return current;
        }
    }
}

/**公平锁的方式
 */
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    //共享模式下获取
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

/**公平锁的方式
 */
static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        for (;;) {
            //同步队列中存在其他节点
            if (hasQueuedPredecessors())
                return -1;
            //获取许可
            int available = getState();
            //剩余的许可
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))//剩余的许可小于0或者比较设置
                return remaining;
        }
    }
}
View Code

  Semaphore构造函数

/**该构造函数会创建具有给定的许可数和非公平的公平设置的Semaphore。
 */
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
/**该构造函数会创建具有给定的许可数和给定的公平设置的Semaphore。
 */
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
View Code

  Semaphore的acquire方法

/**获取一个许可
 */
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
/**获取指定数量许可
 */
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}

/**AQS的acquireSharedInterruptibly方法
 */
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    //判断当前线程是否被中断,中断的话抛出异常
    if (Thread.interrupted())
        throw new InterruptedException();
    //判断当前计数器是否为0是的话返回1否则返回-1,该方法分公平模式和非公平模式
    if (tryAcquireShared(arg) < 0)
        //加入到同步阻塞队列等待被唤醒
        doAcquireSharedInterruptibly(arg);
}

/**AQS子类如果要使用共享模式的话,需要实现tryAcquireShared方法
 * 公平与非公平模式的区别就在于会首先判断当前队列中有没有线程在等待,如果有,就老老实实进入到等待队列。而不像非公平模式一样首先试一把,说不定就恰好获得了一个许可,这样就可以插队了。
 */
protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
/**非公平模式的实现
 */
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        //获取当前许可证
        int available = getState();
        //减许可证
        int remaining = available - acquires;
        //如果小于0直接返回否则CAS替换剩余值
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
/**公平模式的实现
 */
protected int tryAcquireShared(int acquires) {
    for (;;) {
        //如果前面有线程再等待,直接返回-1
        if (hasQueuedPredecessors())
            return -1;
       //后面与非公平一样
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}
View Code

  Semaphore的release方法

/**释放许可
 */
public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}
/**AQS的releaseShared方法
 */
public final boolean releaseShared(int arg) {
    //如果改变许可数量成功
    if (tryReleaseShared(arg)) {
        //释放阻塞的线程
        doReleaseShared();
        return true;
    }
    return false;
}
/**AQS子类的tryReleaseShared方法
 */
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        //获取当前许可数量
        int current = getState();
        //计算回收后的数量
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        //CAS改变许可数量成功,返回true
        if (compareAndSetState(current, next))
            return true;
    }
}
View Code

  Semaphore的reducePermits方法

/**减小许可数量
 */
protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}

/**Sync的reducePermits方法
 */
final void reducePermits(int reductions) {
    for (;;) {
        //得到当前剩余许可数量
        int current = getState();
        //得到减完之后的许可数量
        int next = current - reductions;
        if (next > current) // underflow
            throw new Error("Permit count underflow");
        //如果CAS改变成功
        if (compareAndSetState(current, next))
            return;
    }
}
View Code

  Semaphore的drainPermits方法

/**获取剩余许可数量
 */
public int drainPermits() {
    return sync.drainPermits();
}
/**Sync的drainPermits方法
 */
final int drainPermits() {
    for (;;) {
        //得到当前剩余许可数量
        int current = getState();
        //如果当前剩余许可等于0,则使用CAS改变
        if (current == 0 || compareAndSetState(current, 0))
            return current;
    }
}
View Code

4.Semaphore的使用示例

public class Test {
    public static void main(String[] args) throws Exception {
        final Semaphore semaphore = new Semaphore(3);
        //模拟10辆车进入停车场
        for(int i = 1; i <= 10; i++){
            new Thread(new Runnable() {
                public void run() {
                    try {
                        System.out.println("===="+Thread.currentThread().getName()+"来到停车场");
                        if(semaphore.availablePermits() == 0)
                            System.out.println("车位不足,请耐心等待");
                        
                        semaphore.acquire();//获取令牌尝试进入停车场
                        System.out.println(Thread.currentThread().getName()+"成功进入停车场");
                        Thread.sleep(new Random().nextInt(10000));//模拟车辆在停车场停留的时间
                        System.out.println(Thread.currentThread().getName()+"驶出停车场");
                        semaphore.release();//释放令牌,腾出停车场车位
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, i+"号车").start();
        }
    }
}
View Code

5.总结

  Semaphore是信号量,用于管理一组资源。其内部是基于AQS的共享模式,AQS的状态表示许可证的数量,在许可证数量不够时,线程将会被挂起。而一旦有一个线程释放一个资源,那么就有可能重新唤醒等待队列中的线程继续执行。

原文地址:https://www.cnblogs.com/bl123/p/14182732.html