线程之Semaphore生产消费模型

private final int capacity = 10;
private final Semaphore empty = new Semaphore(capacity); //仓库中空的槽的信号量
private final Semaphore full = new Semaphore(0); //仓库中被占用的槽的信号量
private final Semaphore mutex = new Semaphore(1); //互斥信号量
private int insertIndex = 0; //仓库中当前可以放置物品的位置
private int removeIndex = 0; //仓库中当前可以拿走物品的位置。

信号量是多线程环境下使用的一种方式,可以用来保证两个或多个程序不能同时进入临界区,从而不能同时放一个共享资源,达到线程互斥的作用。
此外,通过使用信号量,我们也可以实现线程同步:在生产者消费者问题中,信号量full和empty用来保证某种事件顺序发生或者不发生,
即对于一个物品,生产的过程始终要在消费的过程之前。
里面有三个信号量full,empty和mutex,其中full和empty用来做进程同步,mutex用来做进程互斥。
每当要生产一个物品时,我们首先检查能否获取信号量empty的一个许可,如果不可以则阻塞线程,如果可以则继续获取items数组的访问权,该访问权由互斥信号量mutex来控制,当放置完物品后,会释放信号量empty的一个许可。
同理,每当要消费一个物品时,我们首先检查能否获取信号量full的一个许可,如果不可以则阻塞线程,如果可以择继续获取items数组的访问权,当拿走物品后,会释放信号量full的一个许可。这样,我们就用信号量实现了生产者消费者问题。

1.Semaphore的使用案例?

需求
要求:使用2个线程,分别代表:生产者、消费者。让他们并发的去生产、消费产品。生产的总数是不能超过N的。

实现思路
这里我们使用的是使用信号量去控制线程的生产消费,通过释放令牌的形式去控制生产者消费者的上限。使用互斥锁保证每次最多只有一个角色去修改共享变量。

来看张图,一图胜千言。

    /**
* 生产者消费者模型
*/
public class ProducerConsumerProblem {
//初始容量
private static final int N = 10;

/***
* full 产品容量
* empty 空余容量
* mutex 读写锁
*/
private static Semaphore full, empty, mutex;
//记录当前的产品数量
private static volatile int count = 0;

static {
/**
* full 初始化0个产品
* empty 初始化有N个空余位置放置产品
* mutex 初始化每次最多只有一个线程可以读写
* */
full = new Semaphore(0);
empty = new Semaphore(N);
mutex = new Semaphore(1);
}

public static void main(String[] args) {
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
}

//生产者类
static class Producer implements Runnable {

@Override
public void run() {
while (true) {
try {
empty.acquire();//等待空位
mutex.acquire();//等待读写锁
count++;
out.println("生产者生产了一个,还剩:" + count);
mutex.release();//释放读写锁
full.release();//放置产品
      //随机休息一段时间,让生产者线程有机会抢占读写锁
Thread.sleep(((int) Math.random()) % 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

//消费者类
static class Consumer implements Runnable {

@Override
public void run() {
while (true) {
try {
full.acquire();//等待产品
mutex.acquire();//等待读写锁
count--;
out.println("消费者消费了一个,还剩:" + count);
mutex.release();//释放读写锁
empty.release();//释放空位
        //随机休息一段时间,让消费者线程有机会抢占读写锁
Thread.sleep(((int) Math.random()) % 10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}

 1.java信号量解决生产消费问题

信号量的基本原理
简单点理解,信号量可看成一个整形量,表示可用资源的个数,使用资源时信号量-1,释放资源时信号量+1,以生产消费为例:
1. 假设此时缓冲区已满,sem=0,表示当前空的缓冲区个数为0
2. 生产者P1生产,sem=-1,陷入阻塞
3. 生产者P2生产,sem=-2,陷入阻塞
4. 消费者C1消费,sem=-1,唤醒P1
5. 消费者C2消费,sem=0,唤醒P2

归纳
信号量的值可以反映有多少线程正在等待资源, 信号量<0表示有线程因无资源而陷入等待,如sem=-2,就表示有2个生产者陷入阻塞
消费时sem++,执行后若sem<=0, 表示之前有线程阻塞,需要唤醒,如上例中唤醒P1,P2
信号量个数与初值的选择:
生产消费问题中,需要判断空和满 2个状态。 首先需要2个信号量, empty表示空缓冲区个数,full表示满缓冲区个数
初值empty=缓冲区大小,   full=0,表示初始状态缓冲区全空,满的缓冲区个数为0。还需要信号量mutex=1保证互斥,初值为1为什么可保持互斥请看P,V操作。
引入P,V操作
P:表示申请资源
V:表示释放资源
P,V操作都是原子性的

P(semaphore s) {
s.value--;
if (s.value < 0)
当前线程阻塞
}

V(semaphore s) {
s.value++;
if (s.value <= 0)
唤醒等待在该资源上的线程
}

P,V解决生产消费

 semaphore full = 0;
semaphore empty = BUFF_SIZE;
semaphore mutex = 1;
Producer() {
P(empty);//要生产,则耗费空资源,empty--
P(mutex);//保证互斥
生产
.....
V(mutex);
V(full);//生产完成则产生1个满资源,full++
}

Consumer() {
P(full);
P(mutex);
消费
...
V(mutex);
V(empty);
}


 
完整的java代码
java.util.concurrent.Semaphore,java中已实现好了Semaphore这个类,可在构造方法中设定初值,require和release方法对应P,V操作

public class SynStack {
private char[] data = new char[6];
int cnt=0;

Semaphore empty = new Semaphore(data.length);//空信号量
Semaphore full = new Semaphore(0);//满信号量
Semaphore mutex = new Semaphore(1);//互斥信号量
    //生产
public void push(char ch){
try {
empty.acquire();
mutex.acquire();
data[cnt]=ch;
cnt++;
System.out.println("生产线程,生产第"+cnt+"个产品,此产品是"+ch);
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
mutex.release();
full.release();
}
}
//消费
public char pop(){
char ch=' ';
try {
full.acquire();
mutex.acquire();
ch=data[cnt-1];
System.out.println("消费线程,消费第"+cnt+"个产品,此产品是"+ch);
--cnt;
} catch (InterruptedException e) {
e.printStackTrace();
}finally{
mutex.release();
empty.release();
}
return ch;
}
}


public class Producer implements Runnable{
private SynStack ss =null;
public Producer(SynStack ss){
this.ss = ss;
}
@Override
public void run() {
char ch;
for(int i=0;i<15;++i){
ch =(char)('a'+i);
ss.push(ch);
/*try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
}
}
}

public class Consumer implements Runnable{
private SynStack ss = null;
public Consumer(SynStack ss){
this.ss = ss;
}
@Override
public void run() {
for(int i=0;i<15;++i){
ss.pop();
}
}
}

public class Test {
public static void main(String[] args) {
SynStack ss = new SynStack();
Producer producer= new Producer(ss);
Consumer consumer =new Consumer(ss);
Thread threadP = new Thread(producer);
Thread threadC = new Thread(consumer);
//Thread threadC2= new Thread(consumer);
threadP.start();
threadC.start();
//threadC2.start();
}
}


 定用一个整数表示信号量,然后有一个增加操作和一个减少操作,于是有如下的

  class Semaphore {
private volatile int permit;
public Semaphore(int permit) {
this.permit = permit;
}

public acquire() {
if (permit <= 0) {
    //等待
}
    //执行permit--操作
}

public release() {
  //执行permit++ 操作
if(permit > 0){
    //唤醒
}
}
}

这里有两点需要说明:
这里先用一个二进制信号量来等效互斥操作;
 由于信号量只能通过0值来进行阻塞和唤醒,所以这里必须使用两个信号量来模拟容器空和容器满两种状态

public class Cache {
private int cacheSize = 0;

public Semaphore mutex;
public Semaphore empty; //保证了容器空的时候(empty的信号量<=0), 消费者等待
public Semaphore full; //保证了容器满的时候(full的信号量 <= 0),生产者等待

public Cache(int size) {
mutex = new Semaphore(1); //二进制信号量,表示互斥锁
empty = new Semaphore(size);
full = new Semaphore(0);
}

public int getCacheSize() throws InterruptedException {
return cacheSize;
}

public void produce() throws InterruptedException {
empty.acquire(); // 消耗一个空位
mutex.acquire();
cacheSize++;
System.out.println("生产了一个产品, 当前产品数为" + cacheSize);
mutex.release();
full.release(); // 增加了一个产品

}

public void consume() throws InterruptedException {
full.acquire(); // 消耗了一个产品
mutex.acquire();
cacheSize--;
System.out.println("消费了一个产品, 当前产品数为" + cacheSize);
mutex.release();
empty.release(); // 增加了一个空位
}
}

public class Consumer implements Runnable {
private Cache cache;

public Consumer(Cache cache) {
this.cache = cache;
}

@Override
public void run() {
while (true) {
try {
cache.consume();
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

}
}

public class Producer implements Runnable {
private Cache cache;
public Producer(Cache cache) {
this.cache = cache;
}

@Override
public void run() {
while (true) {
try {
cache.produce();
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

public class Main {
public static void main(String[] args) {
Cache cache = new Cache(10);

Producer p = new Producer(cache);
Consumer c = new Consumer(cache);
int producerCount = 4, consumerCount = 4;
for (int i = 0; i < producerCount; i++) {
new Thread(p).start();
}
for (int i = 0; i < consumerCount; i++) {
new Thread(c).start();
}
}
}

------

看过了信号量,我们来看看信号。信号(Signal)是一种处理异步事件的通讯方式,用于通知其他进程或者自己本身,来告知将有某种事件发生。在Java中,信号机制通过wait(),notify()和notifyAll()来实现。其中wait()使得当前调用wait()的线程挂起,并释放已经获得的wait()所在代码块的锁;notify()用于随即唤醒一个被wait()挂起的线程进入线程调度队列;notifyAll()用于唤醒所有被wait()挂起的线程进入线程调度队列。
用Java信号实现的生产者和消费问题, 代码如下:

public class TestSignal {
//http://blog.csdn.net/sunset108/article/details/38819529
static Monitor monitor = new Monitor();

//生产者
static class Producer implements Runnable {
static int num = 1;
@Override
public void run() {
while (true) {
try {
monitor.insert(num);
System.out.println("生产物品" + num);
num++;
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}

//消费者
static class Consumer implements Runnable {
@Override
public void run() {
while (true) {
try {
System.out.println("消费物品" + monitor.remove());
Thread.sleep(500);
} catch (InterruptedException e) {
}

}
}

}

//管程,只能有一个线程占用
static class Monitor {
private final int capacity = 10;
private int insertIndex = 0; //仓库中当前可以放置物品的位置
private int removeIndex = 0; //仓库中当前可以拿走物品的位置
private final Object[] items = new Object[capacity]; //仓库中的所有物品
int count = 0; //仓库中的现有物品数

//向仓库中放置物品
public synchronized void insert(Object item) throws InterruptedException {
//当仓库已满时,挂起生产线程
if (count == capacity) {
wait();
}
items[insertIndex++] = item;
if (insertIndex == capacity) {
insertIndex = 0;
}
count++;
//当仓库由空变为不空时,唤起消费线程
if (count == 1) {
notify();
}
}

//从仓库中拿走物品
public synchronized Object remove() throws InterruptedException {
//当仓库没有物品时,挂起消费线程
if (count == 0) {
wait();
}
Object item = items[removeIndex++];
if (removeIndex == capacity) {
removeIndex = 0;
}
count--;
//当仓库由满变为不满时,唤起生产线程
if (count == capacity - 1) {
notify();
}
return item;
}
}

public static void main(String[] args) {
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
}
}

可以看到,该例子使用一个Monitor类来实现仓库中放置和拿走物品的方法,该类相当于一个管程,只能保证同一时刻只能有一个线程使用该类。具体实现是采用静态类和synchronized方法,保证当insert()调用时拥有Monitor类的类锁,使得remove()无法获得Monitor类的类锁,同理,保证当remove()调用时拥有Monitor类的类锁,使得remove()无法被调用。里面实现的巧妙之处在于,当count为capacity时,我们会挂起生产进程,当count从capacity变为capacity - 1时,就会唤醒生产进程加入线程调度队列,同理,当count为0时,我们会挂起消费进程,当count从0变为1时,就会唤醒消费进程加入线程调度队列。

这种方式的执行结果,与信号量的结果类似,不再列出来。

看完了这两个例子,我们对信号量和信号有了一定的了解。但我们依然不清楚的是:它们的区别到底在哪里呢?

如果我们单单使用信号,即使用wait和notify方法,很有可能会错过丢失某些信号通知。比如,如果我们不对count的访问添加限制,当count为0时,调度程序知道仓库里没有物品了,便准备挂起消费者线程并启动生产者线程,如果生产者线程的启动时间比消费者线程的挂起时间快得多,很可能会有这种情况:当生产者线程生产了一个物品,使count由0变为1,此时会向消费者线程发送一个唤醒信号,但此时消费者线程还没有完全挂起,因此它会忽略这个信号,这样一来,生产者线程会一直生产物品,直到count为capacity时挂起,而消费者线程在完全挂起之后不会再收到唤醒信号,因此也会一直挂起,这样整个系统就全部挂起,永远不会执行下去。当然,我们的实现方式是同一时刻只能有一个线程操作count,因此消费者线程的wait方法一定在生产者线程的notify方法之前执行,即当消费者线程完全挂起之后,生产者线程才能启动,于是不会出现错过丢失信号的问题。

而Java中Semaphore的实现也是建立在信号的基础上的,但不同的是它会保存所有的信号,不会丢弃任何信号,这样就很好地避免了前面的问题,这也是信号量和信号的最大区别所在。 



原文地址:https://www.cnblogs.com/awkflf11/p/12637966.html