生产者消费者模型

内容:

1.什么是生产者消费者模型

2.python实现生产者消费者模型

3.Java实现生产者消费者模型

参考:https://www.cnblogs.com/Eva-J/articles/8253549.html

1.什么是生产者消费者模型

(1)前言

随着软件业的发展,互联网用户的日渐增多,并发这门艺术的兴起似乎是那么合情合理。每日PV十多亿的淘宝,处理并发的手段可谓是业界一流。用户访问淘宝首页的平均等待时间只有区区几秒,但是服务器所处理的流程十分复杂

首先负责首页的服务器就有好几千台,通过计算把与用户路由最近的服务器处理首页的返回。其次是网页上的资源,就JS和CSS文件就有上百个,还有图片资源等。它能在几秒内加载出来可见阿里几千名顶尖工程师的智慧是如何登峰造极

在大型电商网站中,他们的服务或者应用解耦之后,是通过消息队列在彼此间通信的。消息队列和应用之间的架构关系就是生产者消费者模型

(2)生产者消费者模型

生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品。生产消费者模式如下图:

              

在日益发展的服务类型中,譬如注册用户这种服务,它可能解耦成好几种独立的服务(账号验证,邮箱验证码,手机短信码等)。它们作为消费者,等待用户输入数据,在前台数据提交之后会经过分解并发送到各个服务所在的url,分发的那个角色就相当于生产者。消费者在获取数据时候有可能一次不能处理完,那么它们各自有一个请求队列,那就是内存缓冲区了。做这项工作的框架叫做消息队列

为什么要使用生产者和消费者模式:

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。

在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。

同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式

(3)生产者消费者模型的简单伪代码

 1 semaphore mutex=1;  // 临界区互斥信号量
 2 semaphore empty=n;  // 空闲缓冲区
 3 semaphore full=0;       // 缓冲区初始化为空
 4 producer ()                 // 生产者进程 
 5 {
 6     while(1)
 7     {
 8         produce an item in nextp;      // 生产数据
 9         P(empty);                             // 获取空缓冲区单元
10         P(mutex);                             // 进入临界区.
11         add nextp to buffer;              // 将数据放入缓冲区
12         V(mutex);                            // 离开临界区,释放互斥信号量
13         V(full);                                 // 满缓冲区数加1
14     }    
15 }
16 
17 consumer ()                // 消费者进程
18 {
19     while(1)
20     {
21         P(full);                                  // 获取满缓冲区单元
22         P(mutex);                             // 进入临界区
23         remove an item from buffer;   // 从缓冲区中取出数据
24         V (mutex);                            // 离开临界区,释放互斥信号量
25         V (empty) ;                           // 空缓冲区数加1
26         consume the item;                 // 消费数据
27     }
28 }

2.python实现生产者消费者模型

(1)用基本的队列实现

 1 import time, random
 2 from multiprocessing import Process, Queue
 3 
 4 def producer(name, food, q):
 5     for i in range(10):
 6         time.sleep(random.randint(1, 3))
 7         f = "%s生产了%s%s" % (name, food, i+1)
 8         print(f)
 9         q.put(f)
10 
11 def consumer(name, q):
12     while True:
13         food = q.get()
14         if food is None:
15             print("%s获取到了一个空" % name)
16             break
17         print("33[31m%s消费: %s33[0m" % (name, food))
18         time.sleep(random.randint(1, 3))
19 
20 if __name__ == '__main__':
21     que = Queue(20)
22     p1 = Process(target=producer, args=('wyb', '包子', que))
23     p1.start()
24     c1 = Process(target=consumer, args=('xxx', que))
25     c1.start()
26     p1.join()
27     que.put(None)

(2)JoinableQueue实现实现生产者消费者模型

JoinableQueue([maxsize]) :创建可连接的共享进程队列。这就像是一个Queue对象,但队列允许项目的使用者通知生产者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的

1 JoinableQueue的实例除了与Queue对象相同的方法之外,还具有以下方法:
2 
3 q.task_done(): 
4 使用者使用此方法发出信号,表示q.get()返回的项目已经被处理。如果调用此方法的次数大于从队列中删除的项目数量,将引发ValueError异常
5 
6 q.join():
7 生产者将使用此方法进行阻塞,直到队列中所有项目均被处理。阻塞将持续到为队列中的每个项目均调用q.task_done()方法为止
 1 # JoinableQueue实现生产者消费者模型
 2 import time
 3 import random
 4 from multiprocessing import Process, JoinableQueue
 5 
 6 def producer(name, food, q):
 7     for i in range(10):
 8         time.sleep(random.randint(1, 3))
 9         f = "%s%s produced by %s" % (food, i+1, name)
10         print(("%s生产: " % name) + f)
11         q.put(f)
12     q.join()        # 阻塞 直到一个队列中的数据 全部被处理完毕
13 
14 def consumer(name, q):
15     while True:
16         food = q.get()
17         print("33[31m%s消费: %s33[0m" % (name, food))
18         time.sleep(random.randint(1, 3))
19         q.task_done()       # 向q.join()发送一次信号,证明一个数据已经被取走了
20 
21 
22 if __name__ == '__main__':
23     que = JoinableQueue(20)
24     p1 = Process(target=producer, args=('wyb', '包子', que))
25 
26     c1 = Process(target=consumer, args=('xxx', que))
27     c1.daemon = True        # 设置消费者为守护进程 当主进程的代码执行完毕之后 子进程自动结束
28 
29     p1.start()
30     c1.start()
31     p1.join()
 1 # 上面代码的过程:
 2 #  在消费者这一端:
 3     # 每次获取一个数据
 4     # 处理一个数据
 5     # 发送一个记号 : 标志一个数据被处理成功
 6 
 7 # 在生产者这一端:
 8     # 每一次生产一个数据,
 9     # 且每一次生产的数据都放在队列中
10     # 在队列中刻上一个记号
11     # 当生产者全部生产完毕之后,
12     # join信号 : 已经停止生产数据了
13                 # 且要等待之前被刻上的记号都被消费完
14                 # 当数据都被处理完时,join阻塞结束
15 
16 # consumer 中把所有的任务消耗完
17 # producer 端 的 join感知到,停止阻塞
18 # 所有的producer进程结束
19 # 主进程中的p.join结束
20 # 主进程中代码结束
21 # 守护进程(消费者的进程)结束

(3)用管道实现生产者消费者模型

 1 from multiprocessing import Process, Pipe, Lock
 2 
 3 def consumer(con, pro, name, lk):
 4     pro.close()
 5     while True:
 6         lk.acquire()
 7         food = con.recv()
 8         lk.release()
 9         if food is not None:
10             print("%s消费: %s" % (name, food))
11         else:
12             con.close()
13             break
14 
15 def producer(con, pro, name, food, lk):
16     con.close()
17     for i in range(20):
18         f = "%s生产: %s%s" % (name, food, i+1)
19         print(f)
20         pro.send(f)
21     pro.send(None)
22     pro.send(None)
23     pro.close()
24 
25 if __name__ == '__main__':
26     lock = Lock()
27     con_pipe, pro_pipe = Pipe()
28     p = Process(target=producer, args=(con_pipe, pro_pipe, 'wyb', '包子', lock))
29     c1 = Process(target=consumer, args=(con_pipe, pro_pipe, 'xxx', lock))
30     c2 = Process(target=consumer, args=(con_pipe, pro_pipe, 'abc', lock))
31     p.start()
32     c1.start()
33     c2.start()
34     con_pipe.close()
35     pro_pipe.close()

注意:

使用管道会带来数据不安全性的问题,可以加锁来控制操作管道的行为 来避免进程之间争抢数据造成的数据不安全现象

但是队列不会带来数据不安全性的问题,这是因为队列是基于管道和锁实现的

3.Java实现生产者消费者模型

原文地址:https://www.cnblogs.com/wyb666/p/9739608.html