python 生产者与消费者模型

基于队列实现生产者与消费者模型

import time
import random
from multiprocessing import Process
from multiprocessing import Queue

def consumer(q,name):
    while True:
        f = q.get()
        if f == None: break    # 解决方法之一(比较笨)
        time.sleep(random.randint(1, 3))
        print('33[32m{}吃了{}33[0m'.format(name,f))

def producer(q,name,food):
    for i in range(3):
        time.sleep(random.randint(1,3))
        f = '{}生产了{}{}'.format(name,food,i)   # 数据
        print(f)
        q.put(f)   # 往队列中添加数据
    q.put(None)   # 解决方法之一(比较笨)

if __name__ == '__main__':
    q = Queue()
    p = Process(target=producer,args=(q,'admin','馒头'))
    c = Process(target=consumer,args=(q,'xp'))
    p.start()
    c.start()
# 问题来了,生产者,队列中没有数据后,消费者一直处于阻塞状态,怎么解决?

上述代码中解决办法缺点:如果有N个消费者,是否还要生产者put进去N个None呢?

用JoinableQueue 解决消费者结束问题

import time
import random
from multiprocessing import Process
from multiprocessing import JoinableQueue

def consumer(q,name):
    while True:
        f1 = q.get()
        print('33[32m{}吃了{}33[0m'.format(name,f1))
        time.sleep(random.randint(1, 3))
        q.task_done()
#
#
def producer(q,name,food):
    for i in range(10):
        time.sleep(random.randint(1,3))
        f = '{}生产了{}{}'.format(name,food,i)
        q.put(f)
        print(f)
    q.join()

if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer,args=(q,'admin','面条'))
    c = Process(target=consumer,args=(q,'xp'))
    c.daemon = True
    p1.start()
    c.start()
    p1.join()

是不是晕了,那就对了,代码执行顺序:

1、consumer 中把对列中所有的任务消耗完 q.task_done()

2、producer 端中的 q.join 感知到,停止阻塞

3、所有的producer 进程结束

4、主进程代码执行结束

5、守护进程结束

原文地址:https://www.cnblogs.com/xp0919/p/11563968.html