生产者-消费者模式

  生产者-消费者模式是一个经典的多线程设计模式,它为多线程间的协作提供了良好的解决方案。在生产者-消费者模式中,通常有两类线程,即若干个生产者线程和若干个消费者线程。生产者线程负责提交用户请求,消费者线程负责具体处理生产者提交的任务。生产者和消费者之间通过共享内存缓冲区进行通信。

  生产者-消费者模式的核心组件是共享内存缓冲区,它作为生产者和消费者时间通信的桥梁,避免了生产者和消费者之间直接通信,从而将生产者和消费者时间进行解耦。生产者不需要知道消费者的存在,消费者也不需要知道生产者的存在。同时由于缓冲区的存在,允许生产者和消费者在执行速度上存在时间差,无论是生产者在某一局部时间内速度高于消费者,还是消费者在某一局部时间内速度高于生产者,都可以通过共享内存缓冲区得到缓解,确保系统正常运行。

  生产者-消费者模式的主要角色输入下表所示:

生产者-消费者模式主要角色
生产者 用于提交用户请求,提取用户任务,并装入内存缓冲区
消费者 在内存缓冲区中提取并处理任务
内存缓冲区 缓存生产者提交的任务或者数据,供消费者使用
任务 生产者向内存缓冲区提交的数据结构
Main 使用生产者和消费者的客户端

下图显示了生产者-消费者模式一种实现的具体结构:

 

  其中,BlockingQueue充当了共享内存缓冲区,用于维护任务或数据队列(PCData对象)。PCData对象表示一个生产任务或者相关任务的数据。生产者对象和消费者对象均引用同一个BlockingQueue对象。生产者负责创建PCData,并将它加入到BlockingQueue对象中,消费者则从同一个BlockingQueue中获取PCData,并执行完该任务。
下面代码实现了基于生产者-消费者模式的求整数平方的并行程序。

 1 public class PCData {
 2     private final int intData;//数据
 3 
 4     public PCData(int intData){
 5         this.intData = intData;
 6     }
 7 
 8     public PCData(String d){
 9         intData = Integer.valueOf(d);
10     }
11 
12     public int getIntData(){
13         return intData;
14     }
15 
16     @Override
17     public String toString(){
18         return "data:" + intData;
19     }
20 }
 1 public class Producer implements Runnable {
 2 
 3     private volatile boolean isRunning = true;
 4     //内存缓冲区
 5     private BlockingQueue<PCData> queue;
 6     //总数,原子操作
 7     private static AtomicInteger count = new AtomicInteger();
 8     private static final int SLEEP_TIME = 1000;
 9 
10     public Producer(BlockingQueue<PCData> queue){
11         this.queue = queue;
12     }
13 
14     @Override
15     public void run() {
16         PCData data = null;
17         Random random = new Random();
18 
19         System.out.println("start producer id = " + Thread.currentThread().getId());
20         while (isRunning){
21             try {
22                 Thread.sleep(random.nextInt(SLEEP_TIME));
23                 data = new PCData(count.incrementAndGet());//构造任务数据
24                 System.out.println(data + " is put into queue!");
25                 if (!queue.offer(data,2, TimeUnit.SECONDS)){//提交数据到缓冲区,offer(),当队列满时,直接返回false。
26                     System.out.println("failed to put data:" + data);
27                 }
28             } catch (InterruptedException e) {
29                 e.printStackTrace();
30                 Thread.currentThread().interrupt();
31             }
32         }
33     }
34 
35     public void stop(){
36         isRunning = false;
37     }
38 }
public class Customer implements Runnable {

    private BlockingQueue<PCData> queue;//缓冲区
    private static final int SLEEP_TIME = 1000;

    public Customer(BlockingQueue<PCData> queue){
        this.queue = queue;
    }

    @Override
    public void run() {
        System.out.println("start customer id = " + Thread.currentThread().getId());
        Random random = new Random();

        while (true){
            try {
                PCData data = queue.poll();//提取数据
                if (null != data){
                    int re = data.getIntData() * data.getIntData();//计算平方
                    System.out.println(MessageFormat.format("{0}*{1}={2}",data.getIntData(),data.getIntData(),re));
                    Thread.sleep(random.nextInt(SLEEP_TIME));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
                Thread.currentThread().interrupt();
            }
        }
    }
}
public class Client{
    //测试
    public static void main(String[] args) throws InterruptedException {
        //建立缓冲区
        BlockingQueue<PCData> queue = new LinkedBlockingQueue<PCData>(10);
        Producer producer1 = new Producer(queue);//生产者
        Producer producer2 = new Producer(queue);
        Producer producer3 = new Producer(queue);
        Customer customer1 = new Customer(queue);//消费者
        Customer customer2 = new Customer(queue);
        Customer customer3 = new Customer(queue);

        ExecutorService es = Executors.newCachedThreadPool();
        es.execute(producer1);//运行生产者
        es.execute(producer2);
        es.execute(producer3);
        es.execute(customer1);//运行消费者
        es.execute(customer2);
        es.execute(customer3);

        Thread.sleep(1000);
        producer1.stop();//停止生产者
        producer2.stop();
        producer3.stop();
        Thread.sleep(3000);
        es.shutdown();
    }
}

输出结果:

start producer id = 11
start customer id = 14
start customer id = 15
start producer id = 12
start customer id = 16
start producer id = 13
data:1 is put into queue!
1*1=1
data:2 is put into queue!
2*2=4
data:3 is put into queue!
3*3=9
data:4 is put into queue!
4*4=16
data:5 is put into queue!
5*5=25
data:6 is put into queue!
6*6=36
data:7 is put into queue!
7*7=49
data:8 is put into queue!
8*8=64
data:9 is put into queue!
9*9=81

上述代码很简单,看过文章数据共享通道:BlockingQueue后,注释也比较详细,就不再赘述代码的意思了。

总结:
  生产者-消费者模式很好的对生产者和消费者进行解耦,优化了系统整体结构。同时,由于缓冲区的作用,允许生产者线程和消费者线程存在执行上的性能差异,从一定的程度上缓解了性能瓶颈对系统性能的影响。

作者:Joe
努力了的才叫梦想,不努力的就是空想,努力并且坚持下去,毕竟这是我相信的力量
原文地址:https://www.cnblogs.com/Joe-Go/p/9805079.html