python 并发编程 多进程 JoinableQueue

JoinableQueue和Queue 使用一样

这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
JoinableQueue([maxsize])

参数介绍

maxsize是队列中允许最大项数,省略则无大小限制

方法介绍

JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
q.task_done():使用者使用此方法发出信号,表示q.get()的返回数据已经被处理 数据全部接好了。
如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常

q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。
直到队列中所有的数据都被取。
阻塞将持续到队列中的每个数据都调用q.task_done()方法为止

 

q.join()  作用是 等队列执行完了   队列数据取完 就执行完了

 

基于JoinableQueue实现生产者消费者模型
守护进程应用

现在是消费者给生产者发送结束信号

在所有生产者生产完以后 加上q.join() 等队列所有数据被取完,就不等了,
消费者把数据取完以后,消费者发送结束信号, 就是 q.task_done()

from multiprocessing import Process
from multiprocessing import JoinableQueue
import time


def producer(q):

    for i in range(1,3):
        res = "包子%s" %i
        time.sleep(0.5)
        print("生产者生产%s" % res)

        q.put(res)

    # 等着生产者从队列里放好所有数据
    q.join()


def consumer(q):
    while True:
        # 接取数据
        res = q.get()
        if res is None:break
        time.sleep(1)
        print("消费者消费%s" % res)

        # 发信号由消费者去发送 代表生产者已经把队列数据都取走了
        q.task_done()

if __name__ == "__main__":

    # 容器
    q = JoinableQueue()

    # 生产者们
    # 需要传参数 把生产后数据的往队列里丢
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=producer, args=(q,))
    p3 = Process(target=producer, args=(q,))

    # 消费者们
    c1 = Process(target=consumer, args=(q,))
    c2 = Process(target=consumer, args=(q,))

    # 消费者从队列里取空数据,没有意义存在了
    # 设置守护进程
    c1.daemon = True
    c2.daemon = True

    p1.start()
    p2.start()
    p3.start()

    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

    print("")


 

原文地址:https://www.cnblogs.com/mingerlcm/p/9005796.html