生产者-消费者中的缓冲区:BlockingQueue接口

BlockingQueue接口使用场景
相信大家对生产者-消费者模式不陌生,这个经典的多线程协作模式,最简单的描述就是生产者线程往内存缓冲区中提交任务,消费者线程从内存缓冲区里获取任务执行。在生产者-消费者模式中最重要的就是这个内存缓冲区,可能你会疑问,为什么不让生产者直接把任务提交给消费者来执行,而是要通过一个中间媒介,也就是一个缓冲区来交换任务?

通过缓冲区,可以缓解生产者和消费者之间的速度差。假设生产者的速度大于消费者,生产者不断向缓冲区内提交任务,但是缓冲区大小有限,当内存缓冲区满时,生产者不得不被阻塞,此时消费者仍不断从缓冲区内获取任务执行,直到缓冲区不为空,生产者才能继续执行。
通过缓冲区,生产者不需要知道消费者是谁,生产者只需把任务提交到缓冲区即可;同样消费者也不需要直到生产者是谁,获取任务通过缓冲区。这样做的好处在于,对于代码的维护和升级,如果我们要改动消费者,我们不需要修改生产者和缓冲区。生产者和消费者之间的通信通过缓冲区。
在生产者-消费者模式中,充当这个缓冲区使用的是BlockingQueue接口,BlockingQueue继承自Queue接口,在实例化时,可以使用ArrayBlockingQueue和LinkedBlockingQueue两种队列,前者是基于数组实现的,而后者是基于链表实现,从名字我们就可以看出。看到这两个队列大家应该有点印象,在线程池中也有这么一个参数BlockingQueue:

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
/*
*corePoolSize:线程池中的线程数量.
*maximumPoolSize:线程池中的最大线程数.
*keepAliveTime:当线程池中线程数量超过corePoolSize时,多余的空闲线程的存活时间.
*unit:keepAliveTime的单位.
*workQueue:任务队列,被提交进线程池,但没被执行的任务.
*threadFactory:线程工厂,用于创建线程,可自定义线程.
*handler:拒绝服务,当线程池中没有空闲线程为新任务服务时,且等待队列中也已经满时,执行的策略.
*/
线程池中的workQueue任务等待队列用来保存被提交进线程池,但因为没有空闲线程,所以尚未被执行的任务。使用ArrayBlockingQueue做为有界队列,LinkedBlockingQueue做为无界队列,无界队列因为基于链表实现,所以不会出现任务入队列失败的情况,直到内存耗尽为止。

为什么使用BlockingQueue做为内存缓冲区
用回生产者-消费者模式举例说,在多线程环境下,当生产者线程向内存缓冲区提交了一个任务后,消费者线程怎么知道此时内存缓冲区内有新的任务提交?如果我们让消费者线程不断查询缓冲区内的任务提交情况,是可以,不过这样不是一个效率高的方法。

在线程池中也是,使用BlockingQueue队列,关键是Blocking,假设我们使用的是ArrayBlockingQueue,基于数组实现的有界队列,生产者线程不断向任务队列(也就是缓冲区)内提交任务时,当任务队列已经放满待执行任务后,生产者线程就会被阻塞,直到缓冲区内有空闲位置后,才会唤醒生产者线程。当消费者线程不断从缓冲区内获取任务执行时,假设所有任务都被获取后,消费者线程也会被阻塞,直到缓冲区内有新的任务被提交,消费者线程被重新唤醒。这是怎么做到的?使用BlockingQueue队列的线程是怎么如何在队列满时,让提交任务线程阻塞,而在队列为空,如何让获取任务线程阻塞?来看看BlockingQueue的内部实现。

BlockingQueue内部实现
为了实现上面所说的情况,用生产者-消费者模式为例,即:

当缓冲队列满时,生产者线程被阻塞,无法继续向缓冲区内提交任务;消费者线程正常执行,如果消费者线程被阻塞,则将其唤醒。
当缓冲队列为空时,消费者线程被阻塞,无法继续从缓冲区中获取任务;生产者线程正常执行,如果生产者线程被阻塞,则将其唤醒。
BlockingQueue队列中,维护着两个Condition字段,一个为notEmpty,一个为notFull,和一把重入锁lock:

ArrayBlockingQueue内部实现:

final Object[] items;     
private final AtomicInteger count = new AtomicInteger(); //当前队列中元素个数
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull; 

public ArrayBlockingQueue(int capacity, boolean fair) {
if(capacity <= 0) {
throw new IllegalArgumentException();
}
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
在ArrayBlockingQueue队列中,使用数组实现存储,所以实例化一个Object对象数组存放元素,AtomicInteger类型的count变量是使用了无锁CAS操作的线程安全类,用来保存当前队列中的元素个数。
LinkedBlockingQueue内部实现:

private final int capacity; //链表的容量
private final AtomicInteger count = new AtomicInteger(); //当前队列中元素个数
transient Node<E> head; //链表头节点
private transient Node<E> last; //链表尾节点
private final ReentrantLock takeLock = new ReentrantLock(); //出队列锁
private final ReentrantLock putLock = new ReentrantLock(); //入队列锁
private final Condition notEmpty = takeLock.newCondition();
private final Condition notFull = putLock.newCondition();

public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
看到源码,大家应该猜到了,就是使用Condition条件配合重入锁,让线程在某一时刻等待。而且使用加锁操作,表明BlockingQueue队列也是线程安全的。对于队列来说,两个最基本的操作:入队和出队,用ArrayBlockingQueue源码来看,ArrayBlockingQueue有offer()和put()两个方法实现往队列中插入元素,两者不同之处在于,使用offer()方法,如果此时队列中已满,那么offer()方法会插入失败,并立刻返回false;如果使用put()方法,当队列满时,使用put()方法的线程会等待,直到队列中有空闲的位置后,继续执行如对操作,这是如何做到的?来看看put()方法的实现:

public void put(E e) throws InterruptedException {
checkNotNull(e); //检查入队元素是否为空
final ReentrantLock lock = this.lock; //抓住当前BlockingQueue实例的lock重入锁
lock.lockInterruptibly(); //加锁,可以响应中断
try {
while(count == item.length) {
notFull.await(); //在notFull的Condition对象上等待
}
insert(e); //队列入队操作
} finally {
lock.unlock;
}
}
入队操作中,首先获得该队列的锁,然后特殊情况判断,while死循环不断判断,如果count == item.length,也就是当前队列已经满了,那么就让线程在notFull上等待,表示当前队列满,这就做到了,当内存缓冲区满时,生产者线程等待。当队列中有空闲位置了,则跳出跳出while循环,执行insert()插入操作。

private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
在执行插入操作的实现中,会把等待在Condition实例notEmpty的线程唤醒,等于是告诉正在等待的消费者线程,当前有新任务进入缓冲区了。
上面是入队操作,接着看出队,和入队相似,在队列中出队一个元素也有两个方法,poll()和take(),使用poll()方法出队,如果队列为空,则返回null。take()方法则会等待在这个队列上。与put()方法对比,可以直到,当队列为空时,调用take()方法的线程会等待在notEmpty上,实际上就是这样的,来看看take()方法的实现:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while(count == 0) {
notEmpty.await();
}
return extract();
} finally {
lock.unlock();
}
}
假设当前队列为空,也就是count == 0;那么就让当前线程在notEmpty上等待,直到有新的任务提交进队列,就执行入队操作extract()。
private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeInddex]);
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal();
return x;
}
同理往队列中入队一个元素后,会让等待在notFull上的线程唤醒,意思是告诉它们,当前队列不空了,你们可以提交新的任务进来了。
来具体看个例子,在生产者-消费者模式中怎么用这个BlockingQueue队列:
package producerconsumer;

public final class work {
private final int data;

//构造函数初始化
public work(int data) {
this.data = data;
}

public work(String s) {
this.data = Integer.valueOf(s);
}

public int getData() {
return this.data;
}

@Override
public String toString() {
return "data = "+this.data;
}
}
自定义一个work类,模拟生产者和消费者处理的任务,里面就一个int型的data变量。
package producerconsumer;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public class ProducerDemo implements Runnable {
private BlockingQueue<work> workQueue; //内存缓冲区队列
private static AtomicInteger count = new AtomicInteger(); //队列中的总任务数
private volatile boolean isRunning = true; //标识当前线程的状态

//构造函数初始化
public ProducerDemo(BlockingQueue<work> workQueue) {
this.workQueue = workQueue;
}

public void stopProducer() {
this.isRunning = false;
}

@Override
public void run() {
work newWork = new work(count.incrementAndGet());
Random r = new Random();

System.out.println("生产者线程: "+Thread.currentThread().getId()+"开始执行.");
try {
while(isRunning) {
if(!workQueue.offer(newWork)) {
System.out.println("生产者线程: "+Thread.currentThread().getId()+": 缓冲区满,任务-"+newWork+"放入失败.");
} else {
System.out.println("生产者线程: "+Thread.currentThread().getId()+"将任务-"+newWork+"放入缓冲区.");
}
Thread.sleep(r.nextInt(1000));
}
} catch(InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}

}
简单地模拟,在生产者线程的构造函数中获得与消费者通信的缓冲区,然后往里面添加用随机数标记的任务。
package producerconsumer;

import java.util.Random;
import java.util.concurrent.BlockingQueue;

public class ConsumerDemo implements Runnable {
private BlockingQueue<work> workQueue; //内存缓冲区队列
private volatile boolean isRunning = true; //标识当前线程的状态

//构造函数初始化
public ConsumerDemo(BlockingQueue<work> workQueue) {
this.workQueue = workQueue;
}

public void stopConsumer() {
this.isRunning = false;
}

@Override
public void run() {
work takeWork;
Random r = new Random();

System.out.println("消费者线程: "+Thread.currentThread().getId()+"开始执行.");
while(isRunning) {
try {
takeWork = workQueue.take(); //从缓冲区中获取任务
if(takeWork != null) {
System.out.println("消费者线程: "+Thread.currentThread().getId()+"获取任务:"+takeWork.getData());
} else {
System.out.println("缓冲区空.");
}
Thread.sleep(r.nextInt(1000));
} catch(InterruptedException e) {
e.printStackTrace();
Thread.currentThread().interrupt();
}
}
}

}
消费者线程同样构造函数中获得与生产者通信的队列后,调用take()方法获取缓冲区里面的任务,并把任务id打印出来。
package producerconsumer;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MainDemo {

public static void main(String[] args) throws InterruptedException {
//创建缓冲区
BlockingQueue<work> workQueue = new ArrayBlockingQueue<work>(10); //基于数组实现
//创建线程池
ExecutorService pcThreadPool = Executors.newCachedThreadPool(); //根据实际情况调整线程数量的线程池
//创建生产者线程
ProducerDemo producer1 = new ProducerDemo(workQueue);
ProducerDemo producer2 = new ProducerDemo(workQueue);
//创建消费者线程
ConsumerDemo consumer1 = new ConsumerDemo(workQueue);
ConsumerDemo consumer2 = new ConsumerDemo(workQueue);
//将线程提交到线程池
pcThreadPool.execute(producer1);
pcThreadPool.execute(producer2);
pcThreadPool.execute(consumer1);
pcThreadPool.execute(consumer2);

Thread.sleep(3*1000);
producer1.stopProducer();
producer2.stopProducer();
consumer1.stopConsumer();
consumer2.stopConsumer();

Thread.sleep(5*1000);
pcThreadPool.shutdown(); //关闭线程池
}

}
主函数中,创建一个ArrayBlockingQueue队列,也就是基于数组实现的BlockingQueue,然后实例化两个生产者线程和消费者线程,并将它们提交到线程池中执行,线程池使用的是newCachedThreadPool(),是一个可根据实际情况调整线程池内线程数量的线程池。
运行结果:

 
完整实现代码已上传GitHub:
https://github.com/justinzengtm/Java-Multithreading/tree/master/BasicProducerConsumer
https://gitee.com/justinzeng/multithreading/tree/master/BasicProducerConsumer
--------------------- 

原文地址:https://www.cnblogs.com/hyhy904/p/10947581.html