python-- 多进程队列 Queue、生成者和消费者

多进程队列 Queue

# 栈:先进后出(First In Last Out       简称 FILO)
# 队列: 先进先出(First In First Out   简称 FIFO)
# 
# 
# import queue  不能进行多进程之间的数据传输
# from multiprocessing import Queue   借助Queue解决生产者消费者模型
# 队列是安全的。自带锁
from multiprocessing import Queue

q = Queue(num)  # num 队列的最大长度,为一个数字
q.get()  # 阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待
q.put()  # 阻塞,如果可以直接往队列中放数据,就直接放,如果不能放,就阻塞等待

q.get_nowait()  # 不阻塞,如果有数据直接获取,没有数据就报错

q.put_nowait()  # 不阻塞,如果可以继续往队列中放数据,就直接放,不能放就报错

案例

from multiprocessing import Queue

q = Queue(3)  # 队列的最大长度

q.put(123)
q.put("abc")
q.put([4, 5, 6])
print(q.get())
print(q.get())
print(q.get())

结果:

123
abc
[4, 5, 6]

正常执行,队列的长度为3,添加了三个,获取了三个

from multiprocessing import Queue

q = Queue(3)  # 队列的最大长度

q.put(123)
q.put("abc")
q.put([4, 5, 6])
q.put(999)
print(q.get())
print(q.get())
print(q.get())

执行到q.put(999)阻塞,程序一直在等

from multiprocessing import Queue

q = Queue(3)  # 队列的最大长度

q.put(123)
q.put("abc")
q.put([4, 5, 6])
q.put_nowait(999)
print(q.get())
print(q.get())
print(q.get())

执行到q.put(999)报错

from multiprocessing import Queue

q = Queue(3)  # 队列的最大长度

q.put(123)
q.put("abc")
q.put([4, 5, 6])
print(q.get())
print(q.get())
print(q.get())
print(q.get_nowait())  # queue.Empty

执行到print(q.get_nowait())报错

生产者和消费者

生产者消费者模型主要是为解耦,借助队列来实现生产者消费者模型

from multiprocessing import Queue, Process
from time import sleep


def consumer(q, name):
    while 1:
        info = q.get()
        if info:
            print('%s 拿走了%s' % (name, info))
        else:  # 当消费者获得队列中数据时,如果获得的是None,就是获得到了生产者不再生产数据的标识
            break  # 此时消费者结束即可


def producer(q, product):
    for i in range(5):
        info = '生产了' + product + '版的娃娃%s号' % str(i)
        q.put(info)
        print(info)
    q.put(None)  # 让生产者生产完数据后,给消费者一个不再生产数据的标识


if __name__ == '__main__':
    q = Queue(5)
    pro = Process(target=producer, args=(q, '波多小姐'))
    con = Process(target=consumer, args=(q, '苍老师'))
    pro.start()
    con.start()

结果:

生产了波多小姐版的娃娃0号
生产了波多小姐版的娃娃1号
生产了波多小姐版的娃娃2号
生产了波多小姐版的娃娃3号
生产了波多小姐版的娃娃4号
苍老师 拿走了生产了波多小姐版的娃娃0号
苍老师 拿走了生产了波多小姐版的娃娃1号
苍老师 拿走了生产了波多小姐版的娃娃2号
苍老师 拿走了生产了波多小姐版的娃娃3号
苍老师 拿走了生产了波多小姐版的娃娃4号

改版上版的生产者和消费者

from multiprocessing import Queue, Process
import time


def consumer(q, name, color):
    while 1:
        info = q.get()
        if info:
            print('%s %s 拿走了%s 33[0m' % (color, name, info))
        else:  # 当消费者获得队列中数据时,如果获得的是None,就是获得到了生产者不再生产数据的标识
            break  # 此时消费者结束即可


def producer(q, product):
    for i in range(20):
        info = product + '的娃娃%s号' % str(i)
        q.put(info)


if __name__ == '__main__':
    q = Queue(10)
    p_pro1 = Process(target=producer, args=(q, '波多小姐'))
    p_pro2 = Process(target=producer, args=(q, '苍老师'))
    p_pro3 = Process(target=producer, args=(q, '小泽老师'))
    p_con1 = Process(target=consumer, args=(q, '麻老师', '33[31m'))
    p_con2 = Process(target=consumer, args=(q, '王老师', '33[32m'))
    p_l = [p_con1, p_con2, p_pro1, p_pro2, p_pro3]
    [i.start() for i in p_l]
    # 父进程如何感知到生产者子进程不再生产数据了?
    p_pro1.join()
    p_pro2.join()
    p_pro3.join()
    q.put(None)  # 几个消费者就要接受几个结束标识
    q.put(None)

结果

 麻老师 拿走了波多小姐的娃娃0号 
 麻老师 拿走了波多小姐的娃娃2号 
 麻老师 拿走了波多小姐的娃娃3号 
 麻老师 拿走了波多小姐的娃娃4号 
 王老师 拿走了波多小姐的娃娃1号  麻老师 拿走了波多小姐的娃娃5号 

 麻老师 拿走了波多小姐的娃娃6号 
 王老师 拿走了波多小姐的娃娃7号 
 麻老师 拿走了波多小姐的娃娃8号 
 王老师 拿走了波多小姐的娃娃9号 
 麻老师 拿走了波多小姐的娃娃10号 
 王老师 拿走了波多小姐的娃娃11号 
 麻老师 拿走了波多小姐的娃娃12号 
 王老师 拿走了波多小姐的娃娃13号 
 麻老师 拿走了波多小姐的娃娃14号 
 王老师 拿走了波多小姐的娃娃15号 
 麻老师 拿走了波多小姐的娃娃16号 
 王老师 拿走了波多小姐的娃娃17号 
 麻老师 拿走了波多小姐的娃娃18号 
 王老师 拿走了波多小姐的娃娃19号 
 麻老师 拿走了苍老师的娃娃0号 
 王老师 拿走了苍老师的娃娃1号 
 麻老师 拿走了苍老师的娃娃2号 
 王老师 拿走了苍老师的娃娃3号 
 麻老师 拿走了苍老师的娃娃4号 
 王老师 拿走了苍老师的娃娃5号 
 麻老师 拿走了苍老师的娃娃6号 
 王老师 拿走了苍老师的娃娃7号 
 麻老师 拿走了苍老师的娃娃8号 
 王老师 拿走了苍老师的娃娃9号 
 麻老师 拿走了苍老师的娃娃10号 
 王老师 拿走了苍老师的娃娃11号 
 麻老师 拿走了苍老师的娃娃12号 
 王老师 拿走了苍老师的娃娃13号 
 麻老师 拿走了小泽老师的娃娃0号 
 王老师 拿走了苍老师的娃娃14号 
 麻老师 拿走了小泽老师的娃娃1号 
 王老师 拿走了苍老师的娃娃15号 
 麻老师 拿走了苍老师的娃娃16号 
 王老师 拿走了苍老师的娃娃17号 
 麻老师 拿走了苍老师的娃娃18号 
 王老师 拿走了小泽老师的娃娃2号 
 麻老师 拿走了苍老师的娃娃19号 
 王老师 拿走了小泽老师的娃娃3号 
 麻老师 拿走了小泽老师的娃娃4号 
 王老师 拿走了小泽老师的娃娃5号 
 麻老师 拿走了小泽老师的娃娃6号 
 王老师 拿走了小泽老师的娃娃7号 
 麻老师 拿走了小泽老师的娃娃8号 
 王老师 拿走了小泽老师的娃娃9号 
 麻老师 拿走了小泽老师的娃娃10号 
 王老师 拿走了小泽老师的娃娃11号 
 麻老师 拿走了小泽老师的娃娃12号 
 王老师 拿走了小泽老师的娃娃13号 
 麻老师 拿走了小泽老师的娃娃14号 
 王老师 拿走了小泽老师的娃娃15号 
 王老师 拿走了小泽老师的娃娃16号 
 麻老师 拿走了小泽老师的娃娃17号 
 王老师 拿走了小泽老师的娃娃18号 
 麻老师 拿走了小泽老师的娃娃19号 

在改版

from multiprocessing import Process, JoinableQueue

q = JoinableQueue()


def consumer(q, name, color):
    while 1:
        info = q.get()
        print('%s %s 拿走了%s 33[0m' % (color, name, info))
        q.task_done()


def producer(q, product):
    for i in range(20):
        info = product + '的娃娃%s号' % str(i)
        q.put(info)
    q.join()  # 记录了生产了20个数据在队列中,此时会阻塞等待消费者消费完队列中所有数据


if __name__ == '__main__':
    q = JoinableQueue(10)
    p_pro1 = Process(target=producer, args=(q, '波多小姐'))
    p_con1 = Process(target=consumer, args=(q, '苍老师', '33[31m'))
    p_con1.daemon = True  # 把消费者进程设为守护进程
    p_con1.start()
    p_pro1.start()
    p_pro1.join()  # 主进程等待生产者进程结束

结果:

苍老师 拿走了波多小姐的娃娃0号 
 苍老师 拿走了波多小姐的娃娃1号 
 苍老师 拿走了波多小姐的娃娃2号 
 苍老师 拿走了波多小姐的娃娃3号 
 苍老师 拿走了波多小姐的娃娃4号 
 苍老师 拿走了波多小姐的娃娃5号 
 苍老师 拿走了波多小姐的娃娃6号 
 苍老师 拿走了波多小姐的娃娃7号 
 苍老师 拿走了波多小姐的娃娃8号 
 苍老师 拿走了波多小姐的娃娃9号 
 苍老师 拿走了波多小姐的娃娃10号 
 苍老师 拿走了波多小姐的娃娃11号 
 苍老师 拿走了波多小姐的娃娃12号 
 苍老师 拿走了波多小姐的娃娃13号 
 苍老师 拿走了波多小姐的娃娃14号 
 苍老师 拿走了波多小姐的娃娃15号 
 苍老师 拿走了波多小姐的娃娃16号 
 苍老师 拿走了波多小姐的娃娃17号 
 苍老师 拿走了波多小姐的娃娃18号 
 苍老师 拿走了波多小姐的娃娃19号 

程序有3个进程,主进程和生产者进程和消费者进程。当主进程执行到p_pro1.join()时,主进程会等待生产进程结束

而生产进程中(q.join())会等待消费者进程把所有数据消费完,生产者进程才结束。

现在的状态就是主进程等待生产者进程结束,生产者进程等待消费者消费完所有数据

所以,把消费者设置为守护进程,当主进程执行完,就代表生产进程已经结束,也就代表消费者进程已经把队列中数据消费完
此时,主进程一旦结束,守护进程也就是消费者进程也就跟着结束。整个程序也就能正常结束了。

原文地址:https://www.cnblogs.com/zouzou-busy/p/13775362.html