多进程操作-进程队列multiprocess.Queue的使用

一、ipc机制 进程通讯

管道:pipe 基于共享的内存空间
队列:pipe+锁 queue

下面拿代码来实现Queue如何使用:

案例一:

from multiprocessing import Queue
q = Queue()     # 实例产生一个q队列
q.put('蔡徐坤')  # 将括号内的数据加入队列中,先进先出
q.put([1,2,3])
q.put(3)
print(q.get())  # 将队列里的数据取出来,先进先出
print(q.get())
print(q.get())
# q.put(5)
print(q.get())  # 如果队列里面没有值,就会一直等待队0列有值。

案例二:

from multiprocessing import Queue
q = Queue(4)  # 4 代表队列最大项数为4,不写则为无限制大小
q.put('蔡徐坤')  # 将括号内的数据加入队列中,先进先出
q.put([1,2,3])
q.put(3)
q.put(3)
q.put(3)    # 队列满了的话,会阻塞,等待q.get()放值后,才能加入队列


案例三:(从这往下都是了解)

from multiprocessing import Queue
q = Queue(3)
q.put('zhao')
q.put('zhao')
q.put('zhao')

q.put('zhao',block=True,timeout=5) # put里的  block=True(默认) 如果满了会等待,timeout最多等待n s,如果ns还是队列还是满的就报错了,如果block=False,队列满了直接报错。

案例四:

from multiprocessing import Queue
q = Queue()
q.put('yyyy')
q.get()
q.get(block=True,timeout=5) # block=True 阻塞等待,timeout最多等5s, 剩下同上

案例五:

from multiprocessing import Queue
q = Queue(3)
q.put('qwe')
q.put('qwe')
q.put('qwe')
q.put('qwe',block=False) # 对于put来说block=False 如果队列满了就直接报错

q = Queue(3)
q.put('qwe')
q.get()
q.get(block=False)  # 对于get来说:block = Flase 拿不到不阻塞,直接报错

案例六:

from multiprocessing import Queue
q = Queue(1)
q.put('123')
q.get()
q.put_nowait('666') # 相当于block = False
q.get_nowait() #     block = False



二、生产者消费者模型:

​ 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度

2.1 为什么要使用生产者和消费者模式?

​ 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

2.2什么是生产者消费者模式?

​ 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

简述:

​ 生产者: 生产数据的任务
​ 消费者: 处理数据的任务

​ 生产者--队列(盆)-->消费者

​ 生产者可以不停的生产,达到了自己最大的生产效率,消费者可以不停的消费,也达到了自己最大的消 费效率.
​ 生产者消费者模型大大提高了生产者生产的效率和消费者消费的效率.

​ 补充: queue不适合传大文件,通产传一些消息.

生产者消费者模型一:

from multiprocessing import Process,Queue

def producer(q,name,food):
    '''生产者'''
    for i in range(10):
        print(f'{name}生产了{food}{i}')
        res = f'{food}{i}'
        q.put(res)
    q.put(None)  # 发送结束信号

def consumer(q,name):
    '''消费者'''
    while True:
        res = q.get(timeout=5)
        if res is None:
            break
        print(f'{name}吃了{res}')

if __name__ == '__main__':
    q=Queue()
    p1 = Process(target=producer,args=(q,'蔡徐坤','包子'))
    c1 = Process(target=consumer,args=(q,'周琦'))
    p1.start()
    c1.start()
    # p1.join()
    # q.put(None)  # 不一定由生产者发送结束信号,也可以由主进程来发送

多个消费者例子:有几个消费者就需要发送几次结束信号:

from multiprocessing import Process,Queue
import time,random

def producer(q,name,food):
    '''生产者'''
    for i in range(3):
        print(f'{name}生产了{food}{i}')
        time.sleep(random.randint(1,3))
        res = f'{food}{i} '
        q.put(res)

def consumer(q,name):
    '''消费者'''
    while True:
        res = q.get(timeout=5)
        if res is None:
            break
        time.sleep(random.randint(1,3))
        print(f'{name}吃了{res}')

if __name__ == '__main__':
    q =Queue()
    p1 = Process(target=producer,args=(q,'蔡徐坤','包子'))
    p2 = Process(target=producer,args=(q,'周琦','手抓饼'))
    p3 = Process(target=producer,args=(q,'吴亦凡','羊肉串'))
    c1 = Process(target=consumer,args=(q,'叶问'))
    c2 = Process(target=consumer,args=(q,'黄飞鸿'))
    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()  # 生产者生产完毕
    q.put(None) # 几个消费者就put几次None
    q.put(None)

JoinableQueue队列实现消费者生产者模型:

from multiprocessing import Process,JoinableQueue
import time,random

def producer(q,name,food):
    '''生产者'''
    for i in range(3):
        print(f'{name}生产了{food}{i}')
        time.sleep(random.randint(1,3))
        res = f'{food}{i} '
        q.put(res)

def consumer(q,name):
    '''消费者'''
    while True:
        res = q.get(timeout=5)
        time.sleep(random.randint(1,3))
        print(f'{name}吃了{res}')
        q.task_done()   #向q.join()发送一次信号,证明一个数据已经被取走了

if __name__ == '__main__':
    q =JoinableQueue()
    p1 = Process(target=producer,args=(q,'蔡徐坤','包子'))
    p2 = Process(target=producer,args=(q,'周琦','手抓饼'))
    p3 = Process(target=producer,args=(q,'吴亦凡','羊肉串'))
    c1 = Process(target=consumer,args=(q,'叶问'))
    c2 = Process(target=consumer,args=(q,'黄飞鸿'))
    p1.start()
    p2.start()
    p3.start()

    c1.daemon = True  # 定义消费者为守护进程
    c2.daemon = True
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()  # 生产者生产完毕
    # q.put(None) # 几个消费者就put几次None
    # q.put(None)
    q.join()  # 生产完毕,使用此方法进行阻塞,直到队列中所有项目均被处理。
    # 分析:
    #  生产者生产完毕---》消费者已经取干净了---》q.join()消费者已经取干净了,没有存在的意义了
    # 这是主进程最后一行代码结束,消费者已经取干净了,没有存在的意义了.守护进程的概念.

    # 分析2:
    # #主进程等--->p1,p2,p3等---->c1,c2
    #     #p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
    #     #因而c1,c2也没有存在的价值了,不需要继续阻塞在进程中影响主进程了。应该随着主进程的结束而结束,所以设置成守护进程就可以了。
原文地址:https://www.cnblogs.com/guapitomjoy/p/11529990.html