python学习笔记 day37 生产者消费者模型

  1. 生产者消费者模型

生产者消费者速度不匹配时,比如生产的快但是消费的慢,就可以给消费者多开几个进程,但是消费者其实是不知道生产者生产了多少数据,什么时候生产结束,消费者这边其实不太好接收,解决办法就是消费者使用while循环接收生产者发来的消息;生产者给消费者发送一个信号,消费者者接受到就立马跳出循环:

from multiprocessing import Queue
from multiprocessing import Process
import time
import random
def procuder(q,food):
    for i in range(10):
        q.put("%s-%s"%(food,i))
        print(i,"生产了%s"%food)
    q.put(None)   # 当消费者不再生产数据时,往队列中放入元素None,作为一个标志,告诉消费者生产者不再生产数据
    q.put(None)   # 因为后面开了两个进程来执行consumer()所以这里会发送两个信号,因为队列就是数据比较安全,即使开两个进程,也只有一个可以拿到值
def consumer(q,name):
    while True:
        food = q.get()
        time.sleep(random.randint(1,3))
        if food==None:  # 这里一定要使用food来判断,不能q.get() 因为这样会从队列中取值!!所以刚开始我只能取得队列中一半的元素,另一半都放这判断了,没打印,,,
            break    # 消费者接收到None表明生产者不再继续生产数据,消费者这边也不会再从队列中取值,跳出死循环
        print("%s拿到了%s"%(name,food))

if __name__=="__main__":
    q=Queue()  # 获得一个队列
    p=Process(target=procuder,args=(q,"面包"))
    p.start()
    c1=Process(target=consumer,args=(q,"xuanxuan"))
    c1.start()
    c2=Process(target=consumer,args=(q,"xixi"))
    c2.start()

运行结果:

上述生产者消费者速度不匹配,可以开多个进程来解决这种不平衡,但是消费者不知道生产者发了多少数据,不知道什么时候发送完,即使生产者给消费者发一个信号,告诉消费者收到这个标志表明我不再生产数据了,但是如果消费者开多个进程,那标志就得写很多个,十分不方便;

生产者消费者模型:

1. 消费者要处理多少数据是不确定的;

2. 所以只能使用while 循环来处理数据,但是while循环无法结束;

3. 需要生产者发送信号;

4. 有多少消费者就需要发送多少信号;

5. 但是发送信号的数量要根据生产者和消费者之间的数量进行计算,所以十分不方便;

 2. JoinableQueue-----解决生产者消费者速度不匹配问题

思路:

生产者生产数据,正常往队列中放数据,但是会有一个join()方法,等待消费者那边回信,消费者正常接收消息,使用while循环,但是每接受一个数据,就会发送一个task_done()告诉生产者我这边从队列中取了一个数据,但是直到消费者从队列中取完数据,生产者那边的join()方法就会接收到(可以想象成一个计数器,消费者每次取一个数据,计数器减一,直到减为0 表明消费者从队列中取完数了,生产者就不再等了)然后在主进程中开启一个子进程执行生产者生产数据的操作,然后开启任意多个子进程执行消费者消费数据(假设生产数据很快,但是消费数据很慢),然后把消费者的这些进程设置为主进程的守护进程,在主进程中判断生产者子进程是否结束,如果结束了主进程就结束,主进程的守护进程(消费者进程)也结束:

import time
import random
from multiprocessing import Process
from multiprocessing import JoinableQueue  # 可以在往队列中正常放值(但是最后会有一个join()方法,等待取值那端回信)取值那端正常取值;
                                           # 但是每次从队列中取一个值都会执行task_done()方法,告诉往队列放值的那端,我这边已经取了一个值
                                           # 直到从队列取完值,往队列中放值的那端join()方法就会得到信号,表明取值完毕,放值的进程可以结束
def procuder(q,food):
    for i in range(10):
        q.put("%s-%i"%(food,i))
        print(i,"生产了%s"%food)
    q.join()  # 生产者等待消费者回信,等消费者从队列中取完值,该进程就会结束

def consumer(q,name):
    while True:
        food=q.get()   # 从队列中取值,
        print("%s消费了%s"%(name,food))
        time.sleep(random.randint(1,3))  # 生产者生产速度快,消费者消费速度慢,所以需要开多个子进程来执行消费者消费的过程
        q.task_done()   # 消费者每次从队列中取一个值,就会告诉生产者我已经取了一个值了,当消费者取了队列中的所有值,生者者中的join()就会执行完毕

if __name__=="__main__":
    q=JoinableQueue()  # 创建队列
    p1=Process(target=procuder,args=(q,"酸奶酪"))  # 创建进程,执行生产者生产数据
    p1.start()
    p2=Process(target=procuder,args=(q,"葡萄"))  # 创建进程,执行生产者生产数据
    p2.start()
    c1=Process(target=consumer,args=(q,"xuanxuan"))  # 创建子进程,执行消费者消费数据(消费速度慢sleep()了,所以需要开多个子进程来执行consumer())
    c1.daemon=True   # 把消费者进程设置为守护进程(当生产者子进程执行完毕,(就是消费者从队列中取完值),主进程就结束了,主进程的守护进程--消费者进程也结束(正常情况无法结束,因为是一个死循环))
    c1.start()
    c2=Process(target=consumer,args=(q,"xixi"))
    c2.daemon=True
    c2.start()

    p1.join()
    p2.join()    # 当生产者子进程结束,主进程也结束,主进程的守护进程--消费者进程(死循环)也结束

 运行结果:

 

talk is cheap,show me the code
原文地址:https://www.cnblogs.com/xuanxuanlove/p/9780619.html