JoinableQueue

其实我们的思路无非是发送结束信号而已,有另外一种队列提供了这种机制

JoinableQueue([maxsize])

这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

参数介绍

maxsize是队列中允许最大项数,省略则无大小限制。

方法介绍

JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

基于JoinableQueue实现生产者消费者模型

from multiprocessing import Process,JoinableQueue
import time,random,os
def consumer(q,name):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('33[43m%s 吃 %s33[0m' %(name,res))
        q.task_done() #发送信号给q.join(),说明已经从队列中取走一个数据并处理完毕了

def producer(q,name,food):
    for i in range(3):
        time.sleep(random.randint(1,3))
        res='%s%s' %(food,i)
        q.put(res)
        print('33[45m%s 生产了 %s33[0m' %(name,res))
    q.join() #等到消费者把自己放入队列中的所有的数据都取走之后,生产者才结束

if __name__ == '__main__':
    q=JoinableQueue() #使用JoinableQueue()

    #生产者们:即厨师们
    p1=Process(target=producer,args=(q,'egon1','包子'))
    p2=Process(target=producer,args=(q,'egon2','骨头'))
    p3=Process(target=producer,args=(q,'egon3','泔水'))

    #消费者们:即吃货们
    c1=Process(target=consumer,args=(q,'alex1'))
    c2=Process(target=consumer,args=(q,'alex2'))
    c1.daemon=True
    c2.daemon=True

    #开始
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    #1、主进程等生产者p1、p2、p3结束
    #2、而p1、p2、p3是在消费者把所有数据都取干净之后才会结束
    #3、所以一旦p1、p2、p3结束了,证明消费者也没必要存在了,应该随着主进程一块死掉,因而需要将生产者们设置成守护进程
    print('主')






下面是示例:

from multiprocessing import Process, JoinableQueue
import time
def producer(q):
for i in range(10):
res = '包子%s'%i
time.sleep(0.5)
print('生产者生产了%s个'%res)
q.put(res)
q.join()
def consmer(q):
while True:
res = q.get()
if res is None:break
time.sleep(1)
print('消费者吃了%s个'%res)
q.task_done()
if __name__ == '__main__':
q = JoinableQueue()
p1 = Process(target=producer, args=(q, ))
c1 = Process(target=consmer, args=(q, ))
p1.start()
c1.daemon = True
c1.start()
p1.join()
print('主')

 

原文地址:https://www.cnblogs.com/yuexijun/p/11524335.html