生产者消费者模型

生产者消费者模型

模型就是解决某个问题的固定方法或套路

生产者:泛指产生数据的一方

消费者:泛指处理数据的一方

用来解决什么问题

  • 案例
    • 食堂饭店就是生产者
    • 我们就是消费者

假设厨师只有一个盘子,那么在他做菜的时候,我们就需要等,而我们吃饭的时候,他也需要等

导致效率低,双方相互等待,因为双方的效率不一致

具体的解决方案:

  1. 先将双方的耦合解开,让不同的进程去负责不同的任务
  2. 提供一个共享的容器,来平衡双方的能力,双方只要使用这个容器就可以了。推荐使用队列,因为队列可以在进程间共享内存

例如:

from multiprocessing import Process, Queue
import requests, re, time, random, os

def product(urls, q):
    '''生产者,爬取数据'''
    for ind, url in enumerate(urls):
        response = requests.get(url)
        response.encoding = response.apparent_encoding
        q.put(response.text)
        print(f'第{ind+1}个网站,爬取状态为{response.status_code},进程编号{os.getpid()}')


def customer(q):
    '''消费者,处理数据'''
    i = 0
    while True:
        text = q.get()
        time.sleep(random.random())
        res = re.findall('src=//(.*?) width', text)
        i += 1
        print("第%s个任务获取到%s个img%s个编码信息" % (i, len(res), len(text)))


if __name__ == '__main__':
    urls = [
        'http://www.baidu.com',
        'http://www.jd.com',
        'http://www.taobao.com',
    ]
    q = Queue()
    p = Process(target=product, args=(urls, q))
    p.start()

    c = Process(target=customer, args=(q,))
    c.start()

第1个网站,爬取状态为200,进程编号1672
第1个任务获取到1个img2287个编码信息
第2个网站,爬取状态为200,进程编号1672
第2个任务获取到0个img90221个编码信息
第3个网站,爬取状态为200,进程编号1672
第3个任务获取到0个img141513个编码信息
...

但是这样做有一个问题,消费者不知道什么时候结束

如果生产者只有一个,那么可以在生产结束后往q中put一个None,作为生产结束的标识,但是但生产者有多个的时候,这种方法就不可行了

JoinableQueue

  • 继承自Queue,用法一致
  • 新增了join(等待) 和task_done(任务完成)

其中join是一个阻塞函数,会一直阻塞到task_done的调用次数等于存入元素的个数才会释放,用于表示队列任务处理完成

案例:

from multiprocessing import Process, JoinableQueue
import time, random
 '''如何判定今天的热狗真的吃完了
 	1.确定生成者任务完成
 	2.确定生出来的数据已经全部处理完成'''

# 生产热狗
def product(q, name):
    for i in range(3):
        dog = f"{name}的热狗{i + 1}"
        time.sleep(random.random())
        print("生产了", dog)
        q.put(dog)


# 吃热狗
def customer(q):
    while True:
        dog = q.get()
        time.sleep(random.random())
        print("消费了%s" % dog)
        q.task_done()  # 标记这个任务处理完成


if __name__ == '__main__':
    # 创建一个双方能共享的容器
    q = JoinableQueue()

    # 生产者进程
    p1 = Process(target=product, args=(q, "上海分店"))
    p2 = Process(target=product, args=(q, "北京分店"))

    p1.start()
    p2.start()

    # 消费者进程
    c = Process(target=customer, args=(q,))
    c.daemon = True # 将消费者设置为守护进程 当主进程确认 任务全部完成时 可以随着主进程一起结束
    c.start()
	
    p1.join()
    p2.join()  # 代码走到这里意味着生产方完成 

    q.join()  # 意味着队列中的任务都处理完成了

    # c.terminate()  # 也可以直接终止消费者进程
   

各种各样的队列(如redis消息队列,MQ消息队列)都是典型的消费者生产者模型。

主要用于流量的削峰,保证服务器不会因为高并发而崩溃

原文地址:https://www.cnblogs.com/lucky75/p/11134837.html