生产者-消费者

生产者消费者也是一个非常经典的多线程模式,我们在实际开发中应用非常广泛的思想理念。在生产者-消费模式中:通常由两类线程,即若干个生产者的线程和若干个消费者的线程。生产者线程负责提交用户请求,消费者线程则负责具体处理生产者提交的任务,在生产者和消费者之间通过共享内存缓存区进行通信。

MQ:messageQueue消息队列,是一个中间件

代码实现:

Provide:

package com.java.day04_mode_pro_custom;

import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Provider implements Runnable{
    
    //共享缓存区
    private BlockingQueue<Data> queue;
    
    //多线程间启动自动变量,有强制从主内存中刷新的功能,即时返回线程的状态
    private volatile boolean isRunning  = true;
    
    //id生成器
    private static AtomicInteger count = new AtomicInteger();
    
    //随即对象
    private static Random r = new Random();
    
    public Provider(BlockingQueue<Data> queue){
        this.queue=queue;
    }

    @Override
    public void run() {

        while(isRunning){
            System.out.println("*******开始生产数据*********");
            try {
                //随机休眠0-1000毫秒 表示获取数据(产生数据的耗时)
                Thread.sleep(r.nextInt(1000));
                //获取的数据进行累计
                int id = count.incrementAndGet();
                //比如通过一个getData方法获取了
                Data data = new Data(id,"数据"+id);
                System.out.println("当前线程:"+Thread.currentThread().getName()+"获取了数据,id为:"+id+",进行装载到公共缓冲区中。。。。");
                //有可能缓冲区数据满了的情况,此时会提交失败
                if(!this.queue.offer(data,2,TimeUnit.SECONDS)){
                    System.out.println("数据装载到缓冲区失败。。。");
                    //do something 比如重新提交。。。。
                }
                System.out.println("*******一次生产完成*********");
                
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        
    }
    
    
    public void stop(){
        this.isRunning=false;
        
    }
    
    
}

Consumer:

 1 package com.java.day04_mode_pro_custom;
 2 
 3 import java.util.Random;
 4 import java.util.concurrent.BlockingQueue;
 5 
 6 public class Consumer implements Runnable{
 7 
 8     //数据缓冲区
 9     private BlockingQueue <Data>queue;
10     
11     //随机对象
12     private static Random r = new Random();
13     
14     public Consumer(BlockingQueue<Data> queue){
15         this.queue=queue;
16     }
17 
18     @Override
19     public void run() {
20         //线程不停的在处理数据
21         while(true){
22             try {
23                 Data data = this.queue.take();
24                 //随机休眠时间为0-1000  表示正在处理数据
25                 Thread.sleep(r.nextInt(1000));
26             
27                 System.out.println("当前消费线程:"+Thread.currentThread().getName()+"消费成功,消费数据为:"+data.getName());
28                 
29             
30             } catch (InterruptedException e) {
31                 e.printStackTrace();
32             }
33             
34             
35         }
36     }
37     
38     
39     
40     
41 }

Main:

 1 package com.java.day04_mode_pro_custom;
 2 
 3 import java.util.concurrent.BlockingQueue;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6 import java.util.concurrent.LinkedBlockingQueue;
 7 
 8 public class Main {
 9 
10     
11     public static void main(String[] args) {
12         //内存缓冲区
13         BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(10);
14         
15         //生产者
16         Provider p1 = new Provider(queue);
17         Provider p2 = new Provider(queue);
18         Provider p3 = new Provider(queue);
19         
20         //消费者
21         Consumer c1 = new Consumer(queue);
22 
23         Consumer c2 = new Consumer(queue);
24         Consumer c3 = new Consumer(queue);
25         
26         //创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程,没有任务的时候不创建线程,空闲线程的存活时间为60s
27         ExecutorService cachePool = Executors.newCachedThreadPool();
28         //传的对象要实现runnable接口
29         cachePool.execute(p1);
30         cachePool.execute(p2);
31         cachePool.execute(p3);
32         cachePool.execute(c1);
33         cachePool.execute(c2);
34         cachePool.execute(c3);
35         
36         try {
37             Thread.sleep(3000);
38         } catch (InterruptedException e) {
39             e.printStackTrace();
40         }
41 
42         //不再产生数据
43         p1.stop();
44         p2.stop();
45         p3.stop();
46         
47         System.out.println("停止生产数据。。。");
48         try {
49             Thread.sleep(2000);
50         } catch (InterruptedException e) {
51             e.printStackTrace();
52         }
53 
54         
55         
56         
57     }
58     
59     
60     
61     
62     
63     
64     
65     
66 }

Data:

 1 package com.java.day04_mode_pro_custom;
 2 
 3 public class Data {
 4 
 5     private int id;
 6     private String name;
 7     
 8     public Data(int id,String name){
 9         this.id=id;
10         this.name=name;
11     }
12 
13     public int getId() {
14         return id;
15     }
16 
17     public void setId(int id) {
18         this.id = id;
19     }
20 
21     public String getName() {
22         return name;
23     }
24 
25     public void setName(String name) {
26         this.name = name;
27     }
28 
29     @Override
30     public String toString() {
31         return "Data [id=" + id + ", name=" + name + "]";
32     }
33     
34     
35     
36 }

运行结果:

 1 *******开始生产数据*********
 2 *******开始生产数据*********
 3 *******开始生产数据*********
 4 当前线程:pool-1-thread-2获取了数据,id为:1,进行装载到公共缓冲区中。。。。
 5 *******一次生产完成*********
 6 *******开始生产数据*********
 7 当前线程:pool-1-thread-1获取了数据,id为:2,进行装载到公共缓冲区中。。。。
 8 当前消费线程:pool-1-thread-4消费成功,消费数据为:数据1
 9 *******一次生产完成*********
10 *******开始生产数据*********
11 当前线程:pool-1-thread-3获取了数据,id为:3,进行装载到公共缓冲区中。。。。
12 *******一次生产完成*********
13 *******开始生产数据*********
14 当前线程:pool-1-thread-2获取了数据,id为:4,进行装载到公共缓冲区中。。。。
15 *******一次生产完成*********
16 *******开始生产数据*********
17 当前消费线程:pool-1-thread-6消费成功,消费数据为:数据2
18 当前消费线程:pool-1-thread-4消费成功,消费数据为:数据4
19 当前消费线程:pool-1-thread-5消费成功,消费数据为:数据3
20 当前线程:pool-1-thread-3获取了数据,id为:5,进行装载到公共缓冲区中。。。。
21 *******一次生产完成*********
22 *******开始生产数据*********
23 当前线程:pool-1-thread-1获取了数据,id为:6,进行装载到公共缓冲区中。。。。
24 *******一次生产完成*********
25 *******开始生产数据*********
26 当前线程:pool-1-thread-2获取了数据,id为:7,进行装载到公共缓冲区中。。。。
27 *******一次生产完成*********
28 *******开始生产数据*********
29 当前消费线程:pool-1-thread-6消费成功,消费数据为:数据5
30 当前消费线程:pool-1-thread-5消费成功,消费数据为:数据7
31 当前消费线程:pool-1-thread-4消费成功,消费数据为:数据6
32 当前线程:pool-1-thread-2获取了数据,id为:8,进行装载到公共缓冲区中。。。。
33 *******一次生产完成*********
34 *******开始生产数据*********
35 当前消费线程:pool-1-thread-6消费成功,消费数据为:数据8
36 当前线程:pool-1-thread-3获取了数据,id为:9,进行装载到公共缓冲区中。。。。
37 *******一次生产完成*********
38 *******开始生产数据*********
39 当前线程:pool-1-thread-3获取了数据,id为:10,进行装载到公共缓冲区中。。。。
40 *******一次生产完成*********
41 *******开始生产数据*********
42 当前消费线程:pool-1-thread-4消费成功,消费数据为:数据10
43 当前线程:pool-1-thread-3获取了数据,id为:11,进行装载到公共缓冲区中。。。。
44 *******一次生产完成*********
45 *******开始生产数据*********
46 当前线程:pool-1-thread-1获取了数据,id为:12,进行装载到公共缓冲区中。。。。
47 *******一次生产完成*********
48 *******开始生产数据*********
49 当前消费线程:pool-1-thread-4消费成功,消费数据为:数据12
50 当前线程:pool-1-thread-3获取了数据,id为:13,进行装载到公共缓冲区中。。。。
51 *******一次生产完成*********
52 *******开始生产数据*********
53 当前消费线程:pool-1-thread-5消费成功,消费数据为:数据9
54 当前线程:pool-1-thread-1获取了数据,id为:14,进行装载到公共缓冲区中。。。。
55 *******一次生产完成*********
56 *******开始生产数据*********
57 当前线程:pool-1-thread-2获取了数据,id为:15,进行装载到公共缓冲区中。。。。
58 *******一次生产完成*********
59 *******开始生产数据*********
60 当前线程:pool-1-thread-3获取了数据,id为:16,进行装载到公共缓冲区中。。。。
61 *******一次生产完成*********
62 *******开始生产数据*********
63 当前消费线程:pool-1-thread-6消费成功,消费数据为:数据11
64 当前消费线程:pool-1-thread-4消费成功,消费数据为:数据13
65 停止生产数据。。。
66 当前消费线程:pool-1-thread-6消费成功,消费数据为:数据15
67 当前消费线程:pool-1-thread-5消费成功,消费数据为:数据14
68 当前线程:pool-1-thread-3获取了数据,id为:17,进行装载到公共缓冲区中。。。。
69 *******一次生产完成*********
70 当前消费线程:pool-1-thread-4消费成功,消费数据为:数据16
71 当前线程:pool-1-thread-2获取了数据,id为:18,进行装载到公共缓冲区中。。。。
72 *******一次生产完成*********
73 当前线程:pool-1-thread-1获取了数据,id为:19,进行装载到公共缓冲区中。。。。
74 *******一次生产完成*********
75 当前消费线程:pool-1-thread-6消费成功,消费数据为:数据17
76 当前消费线程:pool-1-thread-4消费成功,消费数据为:数据19
77 当前消费线程:pool-1-thread-5消费成功,消费数据为:数据18
原文地址:https://www.cnblogs.com/syousetu/p/6757265.html