(四)多进程之队列与生产者消费者模型

一、队列介绍


进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing 模块支持两种形式:队列和管道,这两种方式都是使用消息传递的。

1,创建队列的类(底层就是以管道和锁定的方式实现):

# Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。
# 以后不会真的使用 Queue 去实现生产者消费者模型,如果真使用这种方式,意味着,你的生产者,消费者,还有 Queue 必须在同一台机器上,这称之为集中式,
# 你程序的所有组件都集中在一台机器上,集中式带来的问题:
# 1)稳定性:如果你机器坏了,那么你程序所有的组件都跟着一起崩掉
# 2)性能问题:因为一台机器的性能总归是有极限的。
# 需要把组件分散出去。

2,参数介绍:

"""
maxsize 是队列中允许最大项数,省略则无大小限制。
但需要明确:
    1、队列内存放的是消息而非大数据
    2、队列占用的是内存空间,因而maxsize即便是无大小限制也受限于内存大小
"""

3,主要方法介绍:

"""
q.put:方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
q.get:方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.

q.get_nowait():同q.get(False)
q.put_nowait():同q.put(False)

q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
"""

4,其他方法(了解)

"""
q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞
q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为
"""

5,队列的使用

# 队列是用来,多个进程之间通信使用的
from multiprocessing import Queue

# 队列里面不应该放一些大的文件,是一个进程把自己的消息放到队列中,然后另一个进程在这个队列中取这条消息。
q = Queue(3)        # 发的是消息,精简的小数据,所以里面参数不用太大,队列用的内存中的空间。

# 往里放数据
q.put("hello")
q.put({"a":1})
q.put([3,3,3])

# 判断队列是否满了,返回布尔值
print(q.full())

# q.put(4)  # 最多只有三个,再往里放会卡住,锁住,直到有人取走一个,才能再往里放。

# 取数据
print(q.get())      # hello
print(q.get())      # {'a': 1}
print(q.get())      # [3, 3, 3]

# 是否清空,返回布尔值
print(q.empty())


# print(q.get())      # 在取干净的情况下,再取一次,会卡住,自动加锁,得有人往里面放数据之后,才能再取。

二、生产者消费者模型介绍


 1,生产者消费者模型:

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

2,为什么要使用生产者和消费者模型:

生产者指的是生产数据的任务,消费者指的是处理数据的任务,在并发编程中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

3,什么是生产者消费者模型:

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的。

import time

def producer():
    for i in range(3):
        res = "包子%s" % i
        time.sleep(2)
        print("生产者生产了%s" % res)
        consumer(res)

def consumer(res):
    time.sleep(1)
    print("消费者吃了%s" % res)

producer()

# 消费者必须等待生产者生产完后才能吃,生产者必须等消费者吃完了,才能再生产

"""
生产者生产了包子0
消费者吃了包子0
生产者生产了包子1
消费者吃了包子1
生产者生产了包子2
消费者吃了包子2
"""
强耦合

三、生产者消费者模型实现


 1,基于队列来实现一个生产者消费者模型:

# 队列实现生产者消费者模型
from multiprocessing import Process,Queue
import time

def producer(q):
    for i in range(10):
        res = "包子%s" % i
        time.sleep(0.5)
        print("生产者生产了%s" % res)

        q.put(res)

def consumer(q):
    while True:
        res = q.get()
        time.sleep(1)
        print("消费者吃了%s" % res)

if __name__ == "__main__":
    # 容器
    q = Queue()

    # 生产者们
    p1 = Process(target=producer,args=(q,))

    # 消费者们
    c1 = Process(target=consumer,args=(q,))

    p1.start()
    c1.start()

    print("")
# 生产者消费者互不影响

"""
主
生产者生产了包子0
生产者生产了包子1
消费者吃了包子0
生产者生产了包子2
生产者生产了包子3
消费者吃了包子1
生产者生产了包子4
生产者生产了包子5
消费者吃了包子2
生产者生产了包子6
生产者生产了包子7
消费者吃了包子3
生产者生产了包子8
生产者生产了包子9
消费者吃了包子4
消费者吃了包子5
消费者吃了包子6
消费者吃了包子7
消费者吃了包子8
消费者吃了包子9
"""

此时的问题是主进程永远不会结束,原因是:生产者 p1 在生产完后就结束了,但是消费者 c1 在取空了 q 之后,就一直处于死循环中且卡在 q.get() 这一步。

解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以 break 出死循环。

# 队列实现生产者消费者模型
from multiprocessing import Process,Queue
import time

def producer(q):
    for i in range(10):
        res = "包子%s" % i
        time.sleep(0.5)
        print("生产者生产了%s" % res)

        q.put(res)

def consumer(q):
    while True:
        res = q.get()
        if res is None:break
        time.sleep(1)
        print("消费者吃了%s" % res)

if __name__ == "__main__":
    # 容器
    q = Queue()

    # 生产者们
    p1 = Process(target=producer,args=(q,))

    # 消费者们
    c1 = Process(target=consumer,args=(q,))

    p1.start()
    c1.start()

    p1.join()       # 主进程保证 p1 这个子进程已经运行完了,也就是生产者已经把自己那10个数据全都丢到队列中了。
    q.put(None)     # None 代表结束信号

    print("")

但上述解决方式,在有多个生产者和多个消费者时,我们则需要用一个很 low 的方式去解决,有几个消费者就需要发送几次结束信号:相当 low,例如:

# 队列实现生产者消费者模型
from multiprocessing import Process,Queue
import time

def producer(q):
    for i in range(10):
        res = "包子%s" % i
        time.sleep(0.5)
        print("生产者生产了%s" % res)

        q.put(res)

def consumer(q):
    while True:
        res = q.get()
        if res is None:break
        time.sleep(1)
        print("消费者吃了%s" % res)

if __name__ == "__main__":
    # 容器
    q = Queue()

    # 生产者们
    p1 = Process(target=producer,args=(q,))
    p2 = Process(target=producer,args=(q,))
    p3 = Process(target=producer,args=(q,))

    # 消费者们
    c1 = Process(target=consumer,args=(q,))
    c2 = Process(target=consumer,args=(q,))


    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()       # 主进程保证 p1 这个子进程已经运行完了,也就是生产者已经把自己那10个数据全都丢到队列中了。
    p2.join()
    p3.join()

    q.put(None)     # None 代表结束信号,有几个消费者,就需要几个结束信号,比较 low
    q.put(None)

    print("")

其实我们的思路无非是发送结束信号而已,有另外一种队列提供了这种机制。

2,JoinableQueue([maxsize])

# 这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。

3,参数介绍:

# maxsize是队列中允许最大项数,省略则无大小限制。

4,方法介绍:

# JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
# q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
# q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

5,基于JoinableQueue 实现生产着消费者模型:

from multiprocessing import Process,JoinableQueue
import time

def producer(q):
    for i in range(2):
        res = "包子%s" % i
        time.sleep(0.5)
        print("生产者生产了%s" % res)

        q.put(res)
    q.join()    # 等到消费者把自己放入队列中的所有的数据都取走之后,生产者才结束。

def consumer(q):
    while True:
        res = q.get()
        if res is None:break
        time.sleep(1)
        print("消费者吃了%s" % res)
        q.task_done()       # 发送信号给q.join(),说明已经从队列中取走一个数据并处理完毕了。
        # 只是发信号,但是上面还是在做 get,可是此时的队列已经空了,就会卡死。加个守护进程就好了。

if __name__ == "__main__":
    # 容器
    q = JoinableQueue()     # 可以执行 q.join()
    q.join()                # 等队列执行完,就是里面的值取没了就算完

    # 生产者们
    p1 = Process(target=producer,args=(q,))
    p2 = Process(target=producer,args=(q,))
    p3 = Process(target=producer,args=(q,))

    # 消费者们
    c1 = Process(target=consumer,args=(q,))
    c2 = Process(target=consumer,args=(q,))
    c1.daemon = True
    c2.daemon = True


    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()
    # 1、主进程等生产者p1、p2、p3结束
    # 2、而p1、p2、p3是在消费者把所有数据都取干净之后才会结束
    # 3、所以一旦p1、p2、p3结束了,证明消费者也没必要存在了,应该随着主进程一块死掉,因而需要将生产者们设置成守护进程

    print("")
# 主进程没结束肯定还有没运行完的子进程。

四、生产者消费者模型总结


1,程序中有两类角色:

# 1,一类负责生产数据(生产者)
# 2,一类负责处理数据(消费者)

2,引入生产者消费者模型为了解决的问题是:

# 1,平衡生产者与消费者之间的速度差
# 2,程序解耦合

3,如何实现生产者消费者模型:

# 生产者 <----> 队列 <----> 消费者

原文地址:https://www.cnblogs.com/zoling7/p/13381497.html