队列与生产者消费者模型

一、队列

​ 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

创建队列的类(底层就是以管道和锁定的方式实现)

1 Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。 
**参数介绍:**
1 maxsize是队列中允许最大项数,省略则无大小限制。

方法介绍

1 q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
2 q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
3  
4 q.get_nowait():同q.get(False)
5 q.put_nowait():同q.put(False)
6 
7 q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
8 q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
9 q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样

二、消费者生产者模型

什么是生产者消费者模式

​ 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

为什么要使用生产者和消费者模式

​ 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

基于队列(Queue)实现生产者消费者模型###

使用joinablequeue实现队列
(1)消费者不需要判断从队列里拿到None再退出执行消费者函数了
(2)消费者每次从队列里面q.get()一个数据,处理过后就使用队列.task_done()
(3)生产者for循环生产完所有产品,需要q.join()阻塞一下,对这个队列进行阻塞。
(4)启动一个生产者,启动一个消费者,并且这个消费者做成守护进程,然后生产者需要p.join()阻塞一下。
(5)我启动了生产者之后,生产者函数一直在生成数据,直到生产完所有数据将队列q.join()一下,意思是当我生产的数据都被消费者消费完之后 队列的阻塞才结束。
(6)结束过程:消费者这边是每消费完一个数据给队列返回一个q.task_done(),直到所有的数据都被消费完之后,生产者函数这边的队列.阻塞结束了,队列阻塞结束了生产者函数执行结束了。生产者函数结束了,那么p.join()生产者进程对象就结束了。生产者进程对象结束了整个主进程的代码就执行结束了。主进程代码结束了守护进程及消费者进程也结束了
joinablequeue与Queue二者区别
1)Queue有多少消费者,就要put多少个None。要在消费者函数添加if 不是真(非None数据)就退出死循环
2)二者效果一样但是从程序员角度看,joinablequeue更加严谨,更符合编程思维
'''
生产者: 生产数据的任务
消费者: 处理数据的任务

生产者--队列(盆)-->消费者

生产者可以不停的生产,达到了自己最大的生产效率,消费者可以不停的消费,也达到了自己最大的消费效率.
生产者消费者模型大大提高了生产者生产的效率和消费者消费的效率.

# 补充: queue不适合传大文件,通产传一些消息.
'''
from multiprocessing import Process, Queue
#Queue

def f1(q):
    for i in range(10):
        q.put(i)


def f2(q):
    while 1:
        print(q.get())


if __name__ == '__main__':
    q = Queue(10)  # 队列的容量
    a = Process(target=f1, args=(q,))
    b = Process(target=f2, args=(q,))
    a.start()
    b.start()

问题:

​ 上面的是错误的 卡在q.put这里

解决方案:

​ 因为q.put拿不到值的时候会一直等待 导致程序阻塞,所以导入JoinableQueue模块

补充知识点q.task_done()方法

from multiprocessing import Process,Queue,JoinableQueue

q = JoinableQueue()

q.put('zhao') # 放队列里一个任务
q.put('qian')

print(q.get())
q.task_done() # 完成了一次任务
print(q.get())
q.task_done() # 完成了一次任务
q.join() #计数器不为0的时候 阻塞等待计数器为0后通过
# 想象成一个计数器 :put +1   task_done -1

正确方案

from multiprocessing import JoinableQueue, Process

def up(p):
    for i in range(10):
        print(f'生产了{i}')
        p.put(i)
    p.join()  # 等待task_done()返回的信号量和put进去的数量一直才会往下执行
    print('当task_done()返回的信号数量一致时就会执行这里')


def down(p):
    while 1:
        print('消费了', p.get())
        p.task_done()


if __name__ == '__main__':
    p = JoinableQueue()  # 这个是队列的容量
    p1 = Process(target=up, args=(p,))#生产者
    p2 = Process(target=down, args=(p,))
    p1.start()
    p2.daemon = True  # 将p2消费者设置成守护进程 因为p2一直是死循环,设置成守护进程之后当主程序代码运行完毕,p2就会结束,不会成为僵尸进程
    p2.start()
    p1.join()
    print('主进程')
    #q.join() 
    # 这行代码分析
    # 生产者生产完毕--这是主进程最后一行代码结束--q.join()消费者已经取干净了,没有存在的意义了.
    #这是主进程最后一行代码结束,消费者已经取干净了,没有存在的意义了.涉及到守护进程的概念.

生产者消费者模型总结

#程序中有两类角色
    一类负责生产数据(生产者)
    一类负责处理数据(消费者)
    
#引入生产者消费者模型为了解决的问题是:
    平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度
    
#如何实现:
    生产者<-->队列<——>消费者
#生产者消费者模型实现类程序的解耦和
原文地址:https://www.cnblogs.com/demiao/p/11528399.html