生产者消费模型

生产者消费模型

模型就是解决某个问题固定方法和套路

e.g.

生产者和消费者

存在的问题 :效率不同,往往双方之间的处理速度不一样,导致双方需要等待对方

tips:

1将双方解开耦合,让不同进程负责不同的任务

2提供一个共享的容器,来平衡双方的能力,之所以用进程队列是因为可以在进程之间共享

case:

from multiprocessing import Process,Queue
import requests
import re,os,time,random

#生产者的任务
def task(url,q):
    i=0
    for url in urls:
        response =requests.get(url)
        text=response.text
        #将生产完成的数放入队列中
        time.sleep(random.random())
        q.put(text)
        i+=1
        print(os.getpid(),"生产了第%s个数据"%i)
        
        
#消费者的任务
def customer(q):
    i=0
    while True:
        text=q.get()
        time.sleep(random.random())
        res =re.findall('src=//(.*?)width',text)
        
        i+=1
        print('第%s任务获取到%s个img'%(i,len(res))
if __name__=='__main__'
	url=[
        "http://www.baidu.com",
        "http://www.baidu.com",
        "http://www.baidu.com",
        "http://www.baidu.com",
    ]
      #创建一个双方能共享的容器
      q=Queue()
    #生产者进程
    p1=Process(target=product,args=(url,q))
    p1.start()
    
    #消费这进程
    c=Process(target=customer,args=(q,))
    c.start()
    

遇到的问题:

1消费不知道何时结束

加了一个模块joinableQueue -------->和 继承Queue 用法一致

增加了join和taskDone(任务完成)

join这个是阻塞函数,会阻塞直到taskdone的调用次数等于存入的元素个数,可以用于表示队列任务处理完成

完善上述代码案列case

from multiprocessing import Process,joinableQueue
import requests
import re,os,time,random
"""热狗为例子"""
#生产者任务
def product(q,name):
    for i in range(5):
        dog ="%s的热狗%s"%(name,(i+1))
        time.sleep(random.random())
        print('生产了',dog)
        q.put(dog)
        
#吃热狗
def customer(q):
    while True:
        dog=q.get()
        time.sleep(random.random())
        print('消费了%s'%dog)
        q.task_done()    #标记这个任务处理完成
if __name__=="__main__":
    #创建一个双方共享的容器
    q=JoinableQueue()
    
    #生产者的进程
    p1=Process(target=product,args=(q,"上海分店"))
    p2=Process(target=product,args=(q,"北京分店"))
    
    p1.start()
    p2.start()
    
    
    #消费者进程
    c=Process(target=customer,args=(q,))
    
    #c.deamon=True  可以将消费者设置为守护进程,将主进程确认 任务全部完成时 可以随着主进程一起结束
    c.start()
    
    p1.join()
    p2.join()  #代码走到这儿代表生产方已经完成
    
    q.join()   #意味着队列中的任务都处理完成了
    
    
    #结束所有任务
    c.terminate()   #直接终止消费者进程
    
    #解决问题的关键
    1确定生成者的任务完成
    2确定生出来的数据已经全部处理完成
原文地址:https://www.cnblogs.com/zhuyuanying123--/p/11134130.html