生产者消费者模型

生产者消费者模型

模型就是解决某个问题的固定方法或套路

1.1某种问题

生产者:泛指产生数据的一方

消费者:泛指处理数据的一方

由于生产者和消费者的对处理速度不一样,数据传输过程耦合度过高,造成CPU工作效率低的情况

1.2解决思路

  1. 先将双方解开耦合,让不同进程负责不同的任务
  2. 提供一个共享的容器,来平衡双方的能力,引入进程间通信的队列模型(Queue)
#模拟爬虫案例

from multiprocessing import Process,Queue
import requests
import re,os,time,random

#生产者任务
def product(urls,q):
    i = 0
    for url in urls:
        response = requests.get(url)
        data = response.text
        time.sleep(random.random())	#模拟网路延时
        q.put(data)
        i += 1

#消费者任务
def customer(q):
    while True:
        data = q.get()
        time.sleep(random.random())
        res = re.findall('src=//(.*?) width',data)

if __name__ == '__main__':
    urls_list = ["http://www.baidu.com"]
	
    #创建队列进程
    q = Queue()

    p = Process(target=product,args=(urls_list,q))
    p.start()

    c = Process(target=customer,args=(q,))
    c.start()

#虽然实现了并发的目的,由于进程间数据无法交互,所以无法控制进程的关闭

引入JoinableQueue继承了Queue的方法,增加了join和task_done这个组合

#如何关闭进程?
#1.确定生产者任务完成
#确定消费者任务完成

#情况一:生产者1与消费者1

from multiprocessing import Process,JoinableQueue

#生产者任务
def product(q):
    for i in range(5):
        q.put(i)

#消费者任务
def customer(q):
    while True:
        i = q.get()
        print(i)
        
if __name__ == '__main__':
    
    #创建双方共享的容器
    q = JoinableQueue()
    
    #生产者进程
    p = Process(target=product,args=(q,))
    p.start()
    
    #消费者进程
    c = Process(target=customer,args=(q,))
    c.start()
    c.daemon = True	#守护生产进程,随生产进程的结束而结束
    
    p.join()	#生产任务结束
    

#情况二:多个生产

from multiprocessing import Process,JoinableQueue

#生产者任务
def product(q):
    for i in range(5):
        q.put(i)

#消费者任务
def customer(q):
    while True:
        i = q.get()
        print(i)
        q.task_done()	#标记消费任务完成

if __name__ == '__main__':

    #创建双方共享的容器
    q = JoinableQueue()

    #生产者进程
    p1 = Process(target=product,args=(q,))
    p2 = Process(target=product,args=(q,))
    p1.start()
    p2.start()

    #消费者进程
    c = Process(target=customer,args=(q,))
    c.start()


    p1.join()
    p2.join()	#生产任务结束

    q.join()	#队列任务完成

    #结束所有任务
    c.terminate()	#直接终止消费进程

原文地址:https://www.cnblogs.com/bruce123/p/11184425.html