15.并发编程--多线程设计模式

并发编程--多线程设计模式 - 生产者-消费者模式

1. 生产者-消费者模式

  • 生产者和消费者也是一个非常经典的多线程模式,我们在实际中开发应用非常广泛的思想理念。在生产-消费模式中:通常由两类线程,即若干个生产者和若干个消费者的线程。
  • 生产者负责提交用户数据,消费者负责具体处理生产者提交的任务,在生产者和消费者之间通过共享内存缓存区进行通信。

示例:下面示例为Future的原理实现;
说明:看类注释即可明了 不明白可以去屎了

  1 //Data.java   * 首先看这个接口Data,数据.
  2   public final class Data {
  3 
  4       private String id;
  5       private String name;
  6 
  7       public Data(String id, String name){
  8           this.id = id;
  9           this.name = name;
 10       }
 11 
 12       public String getId() {
 13           return id;
 14       }
 15 
 16       public void setId(String id) {
 17           this.id = id;
 18       }
 19 
 20       public String getName() {
 21           return name;
 22       }
 23 
 24       public void setName(String name) {
 25           this.name = name;
 26       }
 27 
 28       @Override
 29       public String toString(){
 30           return "{id: " + id + ", name: " + name + "}";
 31       }
 32 
 33   }
 34   //Provider.java   * 然后再看Provider这个类,实现了Runnable接口,生产者真实的业务逻辑.
 35   public class Provider implements Runnable{
 36 
 37       //共享缓存区 理解为消息队列
 38       private BlockingQueue<Data> queue;
 39       //多线程间是否启动变量,有强制从主内存中刷新的功能。即时返回线程的状态
 40       private volatile boolean isRunning = true;
 41       //id生成器
 42       private static AtomicInteger count = new AtomicInteger();
 43       //随机对象
 44       private static Random r = new Random();
 45 
 46       public Provider(BlockingQueue queue){
 47           this.queue = queue;
 48       }
 49 
 50       @Override
 51       public void run() {
 52           while(isRunning){
 53               try {
 54                   //随机休眠0 - 1000 毫秒 表示获取数据(产生数据的耗时)
 55                   Thread.sleep(r.nextInt(1000));
 56                   //获取的数据进行累计...
 57                   int id = count.incrementAndGet();
 58                   //比如通过一个getData方法获取了
 59                   Data data = new Data(Integer.toString(id), "数据" + id);
 60                   System.out.println("当前线程:" + Thread.currentThread().getName() + ", 获取了数据,id为:" + id + ", 进行装载到公共缓冲区中...");
 61                   if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
 62                       System.out.println("提交缓冲区数据失败....");
 63                       //do something... 比如重新提交
 64                   }
 65               } catch (InterruptedException e) {
 66                   e.printStackTrace();
 67               }
 68           }
 69       }
 70 
 71       public void stop(){
 72           this.isRunning = false;
 73       }
 74 
 75   }
 76 
 77   //Consumer.java*
 78   public class Consumer implements Runnable{
 79 
 80       private BlockingQueue<Data> queue;
 81 
 82       public Consumer(BlockingQueue queue){
 83           this.queue = queue;
 84       }
 85 
 86       //随机对象
 87       private static Random r = new Random();
 88 
 89       @Override
 90       public void run() {
 91           while(true){
 92               try {
 93                   //获取数据
 94                   Data data = this.queue.take();
 95                   //进行数据处理。休眠0 - 1000毫秒模拟耗时
 96                   Thread.sleep(r.nextInt(1000));
 97                   System.out.println("当前消费线程:" + Thread.currentThread().getName() + ", 消费成功,消费数据为id: " + data.getId());
 98               } catch (InterruptedException e) {
 99                   e.printStackTrace();
100               }
101           }
102       }
103   }
104   //main方法
105   public class Main {
106 
107       public static void main(String[] args) throws Exception {
108           //内存缓冲区
109           BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
110           //生产者
111           Provider p1 = new Provider(queue);
112           Provider p2 = new Provider(queue);
113           Provider p3 = new Provider(queue);
114           //消费者
115           Consumer c1 = new Consumer(queue);
116           Consumer c2 = new Consumer(queue);
117           Consumer c3 = new Consumer(queue);
118           //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程,没有任务的时候不创建线程。空闲线程存活时间为60s(默认值)
119 
120           ExecutorService cachePool = Executors.newCachedThreadPool();
121           cachePool.execute(p1);
122           cachePool.execute(p2);
123           cachePool.execute(p3);
124           cachePool.execute(c1);
125           cachePool.execute(c2);
126           cachePool.execute(c3);
127           try {
128               Thread.sleep(3000);
129           } catch (InterruptedException e) {
130               e.printStackTrace();
131           }
132           p1.stop();
133           p2.stop();
134           p3.stop();
135           try {
136               Thread.sleep(2000);
137           } catch (InterruptedException e) {
138               e.printStackTrace();
139           }       
140   //      cachePool.shutdown();
141   //      cachePool.shutdownNow();
142 
143       }
144   }
原文地址:https://www.cnblogs.com/Mao-admin/p/9989164.html