java学习之生产者消费者模式

        在并发编程中,生产者消费者模式是一种很常见的一种模式。生产者和消费者模式是有两类线程,即若干个生产者线程和若干个消费者线程,生产者线程负责提交用户请求,消费者线程负责处理消费者提交的任务,在生产者和消费者之间通过共享缓存区进行通信。

        生产者消费者模式的核心组件为共享内存缓存区,它作为生产者和消费者通信的桥梁,避免了生产者和消费者直接通信,从而将生产者和消费者直接解耦。生产者不需要指导消费者的存在,消费者也不需要知道生产者的存在。同时由于内存缓冲区的存在,允许生产者和消费者在执行速度上存在时间差,无论是生产者在某一局部时间内速度高于消费者,还是消费者在局部时间内高于生产者,都可以通过共享缓存区得到缓解,确保系统的正常运行。

该模式的架构为:

    

生产者:

 1 package com.css.demo;
 2 
 3 import java.util.Random;
 4 import java.util.concurrent.BlockingQueue;
 5 import java.util.concurrent.TimeUnit;
 6 import java.util.concurrent.atomic.AtomicInteger;
 7 
 8 public class Provider implements Runnable{
 9     
10     //共享缓存区
11     private BlockingQueue<Data> queue;
12     //多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程的状态
13     private volatile boolean isRunning = true;
14     //id生成器
15     private static AtomicInteger count = new AtomicInteger();
16     //随机对象
17     private static Random r = new Random(); 
18     
19     public Provider(BlockingQueue queue){
20         this.queue = queue;
21     }
22 
23     @Override
24     public void run() {
25         while(isRunning){
26             try {
27                 //随机休眠0 - 1000 毫秒 表示获取数据(产生数据的耗时) 
28                 Thread.sleep(r.nextInt(1000));
29                 //获取的数据进行累计...
30                 int id = count.incrementAndGet();
31                 //比如通过一个getData方法获取了
32                 Data data = new Data(Integer.toString(id), "数据" + id);
33                 System.out.println("当前线程:" + Thread.currentThread().getName() + ", 获取了数据,id为:" + id + ", 进行装载到公共缓冲区中...");
34                 if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
35                     System.out.println("提交缓冲区数据失败....");
36                     //do something... 比如重新提交
37                 }
38             } catch (InterruptedException e) {
39                 e.printStackTrace();
40             }
41         }
42     }
43     
44     public void stop(){
45         this.isRunning = false;
46     }
47     
48 }

消费者:

 1 package com.ssc.demo;
 2 
 3 import java.util.Random;
 4 import java.util.concurrent.BlockingQueue;
 5 import java.util.concurrent.TimeUnit;
 6 
 7 public class Consumer implements Runnable{
 8 
 9     private BlockingQueue<Data> queue;
10     
11     public Consumer(BlockingQueue queue){
12         this.queue = queue;
13     }
14     
15     //随机对象
16     private static Random r = new Random(); 
17 
18     @Override
19     public void run() {
20         while(true){
21             try {
22                 //获取数据
23                 Data data = this.queue.take();
24                 //进行数据处理。休眠0 - 1000毫秒模拟耗时
25                 Thread.sleep(r.nextInt(1000));
26                 System.out.println("当前消费线程:" + Thread.currentThread().getName() + ", 消费成功,消费数据为id: " + data.getId());
27             } catch (InterruptedException e) {
28                 e.printStackTrace();
29             }
30         }
31     }
32 }

主要类:

 1 package com.ssc,demo;
 2 
 3 import java.util.concurrent.BlockingQueue;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6 import java.util.concurrent.LinkedBlockingQueue;
 7 
 8 public class Main {
 9 
10     public static void main(String[] args) throws Exception {
11         //内存缓冲区
12         BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
13         //生产者
14         Provider p1 = new Provider(queue);
15         
16         Provider p2 = new Provider(queue);
17         Provider p3 = new Provider(queue);
18         //消费者
19         Consumer c1 = new Consumer(queue);
20         Consumer c2 = new Consumer(queue);
21         Consumer c3 = new Consumer(queue);
22         //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程,没有任务的时候不创建线程。空闲线程存活时间为60s(默认值)
23 
24         ExecutorService cachePool = Executors.newCachedThreadPool();
25         cachePool.execute(p1);
26         cachePool.execute(p2);
27         cachePool.execute(p3);
28         cachePool.execute(c1);
29         cachePool.execute(c2);
30         cachePool.execute(c3);
31 
32         try {
33             Thread.sleep(3000);
34         } catch (InterruptedException e) {
35             e.printStackTrace();
36         }
37         p1.stop();
38         p2.stop();
39         p3.stop();
40         try {
41             Thread.sleep(2000);
42         } catch (InterruptedException e) {
43             e.printStackTrace();
44         }        
45 //        cachePool.shutdown(); 
46 //        cachePool.shutdownNow();
47         
48 
49     }
50     
51 }

对象:

 1 package com.ssc.demo;
 2 
 3 public final class Data {
 4 
 5     private String id;
 6     private String name;
 7     
 8     public Data(String id, String name){
 9         this.id = id;
10         this.name = name;
11     }
12     
13     public String getId() {
14         return id;
15     }
16 
17     public void setId(String id) {
18         this.id = id;
19     }
20 
21     public String getName() {
22         return name;
23     }
24 
25     public void setName(String name) {
26         this.name = name;
27     }
28 
29     @Override
30     public String toString(){
31         return "{id: " + id + ", name: " + name + "}";
32     }
33     
34 }

主要参考:《白鹤翔视频》

生于忧患,死于安乐
原文地址:https://www.cnblogs.com/songlove/p/14962408.html