进程之生产者消费者模型(队列,管道,数据共享,进程池)

1.生产者和消费者模型

  • 在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题
    • 该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度;
  • 为什么要使用生产者和消费者模式:
    • 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式
  • 什么是生产者消费者模式?
    • 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力;

2.基于队列实现生产者消费者模型

  • 栈:先进后出(First In Last Out  FILO)
  • 队列:先进先出(First In FIrst Out FIFO)

1)from multiprocessing import Queue模块:解决生产者和消费者模型,队列是安全的;

  • 进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式,队列和管道,这两种模式都是使用消息传递的;
  • 模块参数:
    • Queue(maxsize):maxsize队列中允许的最大项数,省略则无大小限制
    • q.get():从队列中取出一个数据,如果为队列为空,会一直等待,
    • q.get_nowait():直接跳过等待,然后取数据,如果队列为空,则报错:q.Empty
    • q.put():往队列放一个数据,如果队列满了,则等待;
    • q.put_nowait():跳过等待,如果队列满了,执行这句话时报错:q.Full

代码展示:get_nowait()和put_nowait()

from multiprocessing import Queue,JoinableQueue,Pipe,Process
import time
def consumer(q):
    pass
    while 1:
        time.sleep(1)
        info = q.get_nowait()
        print(info)
def producter(q):
    for i in range(1,11):
        q.put_nowait(i)
    #如果不写nowait,则会一直停留在此,等待队列中元素被取走
if __name__ == '__main__':
    q = Queue(4)#创建一个队列,可以放置4个元素
    p = Process(target=consumer,args=(q,))
    p1 = Process(target=producter,args=(q,))
    p.start()
    p1.start()
#输出结果:
Process Process-2:
Traceback (most recent call last):
queue.Full
1
2
3
4
Process Process-1:
Traceback (most recent call last):
queue.Empty
1 q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞
2 q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
3 q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为
其他方法

#正确方法展示:

def consumer(q):
    while 1:
        info = q.get()
        print(info)
        q.task_done()#每消费一个数据,会返回给join一个标识符
def product(q):
    for i in range(10):
        info = str(i) + ''
        q.put(info)
    q.join()#等待task_done()的返回结果,
# 假设生产者生产了100个数据,join就能记录下100这个数字。每次消费者消费一个数据,就必须要task_done返回一个标识,
# 当生产者(join)接收到100个消费者返回来的标识的时候,生产者就能知道消费者已经把所有数据都消费完了。
if __name__ == '__main__':
    q = JoinableQueue(10)
    p = Process(target=consumer,args=(q,))
    p1 = Process(target=product,args=(q,))
    p.daemon = True #把消费者设为守护进程
    p.start()
    p1.start()
    p1.join()#不加join,守护进程结束,product中的q.join()会一直等待标识符

 #新模块:JoinableQueue:继承Queue,跟它的方法一样,get和put都是读取和取出数据;

新方法:

  • 对消费者来说:q.task_done()是指每消费队列中一个数据,就给join返回一个标识,应将它设为守护进程,如果主进程代码段的结束,那么它的while循环也会随着结束
  • 对生产者来说:q.join():会先记录生产者的生产的数据,每次消费者消费一个数据,就会向join发一个标识,最后这个标识累计到100,也就意味着队列的元素

3.基于管道解释生产者和消费者模型

进程间通信(IPC)方式二:管道

  • 创建管道的类:q = Pipe(),进程间创建一条管道,并返回一个元组(coon1,coon2)两个链接对象(只有两个),而且必须在产生Process之前产生管道
  • 参数:dumplex:默认管道是全双工的,如果将duplex设为False,conn1只能接收,2只能发送;
  • conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
  • conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象

代码展示:

def func(conn):
    coon1,coon2 = conn
    print(coon2.recv())
    print(coon1.recv())


if __name__ == '__main__':
    coon1,coon2 = Pipe()#管道只返回两个通信口
    p = Process(target=func,args=((coon1,coon2),))
    p.start()
    coon1.send('conn1'.encode('utf-8'))
    #接口coon1会被子进程coon2接收到,但是容易发生数据错乱
    coon2.send('coon2'.encode('utf-8'))
    #接口coon2会被子进程coon1接收到
    coon1.close()
    coon2.close()
管道加锁就等于队列
from multiprocessing import Process,Pipe

import time,os
def adder(p,name):
    server,client=p
    client.close()
    while True:
        try:
            x,y=server.recv()
        except EOFError:
            server.close()
            break
        res=x+y
        server.send(res)
    print('server done')
if __name__ == '__main__':
    server,client=Pipe()

    c1=Process(target=adder,args=((server,client),'c1'))
    c1.start()

    server.close()

    client.send((10,20))
    print(client.recv())
    client.close()

    c1.join()
    print('主进程')
#注意:send()和recv()方法使用pickle模块对对象进行序列化。
利用通常在客户端/服务器中使用的请求/响应模型或远程过程调用,就可以使用管道编写与进程交互的程序

4.共享数据Manager和Value

  • 进程间数据是独立的,可以借助于队列或管道实现通信,二者都是基于消息传递的
  • 虽然进程间数据独立,但可以通过Manager实现数据共享,事实上Manager的功能远不止于此

代码展示:

def func(num):
    num.setdefault('name','alex')
if __name__ == '__main__':
    m = Manager()   #实例化一个Manager对象
    num = m.dict({'age':12})  #共享数据
    p = Process(target=func,args=(num,))
    p.start()
    p.join()
    print(num)
#输出结果:
{'age': 12, 'name': 'alex'}
def work(d,lock):
    with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
        d['count']-=1

if __name__ == '__main__':
    lock=Lock()
    with Manager() as m:
        dic=m.dict({'count':10})
        p_l=[]
        for i in range(10):
            p=Process(target=work,args=(dic,lock))
            p_l.append(p)
            p.start()
        [p.join() for p in p_l]
        print(dic)
        #{'count': 1}

5.进程池

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。多进程是实现并发的手段之一,需要注意的问题是:

  1. 很明显需要并发执行的任务通常要远大于核数
  2. 一个操作系统不可能无限开启进程,通常有几个核就开几个进程
  3. 进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)

例如当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个。。。手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

我们就可以通过维护一个进程池来控制进程数目,比如httpd的进程模式,规定最小进程数和最大进程数... 
ps:对于远程过程调用的高级应用程序而言,应该使用进程池,Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。

主要方法:

1 p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),
然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。
如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
2 p.apply_async(func [, args [, kwargs]]):

在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,
callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,
否则将接收其他异步操作中的结果。
4 p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成 5 P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用
from multiprocessing import Pool
import os,time
def work(n):
    print('%s run' %os.getpid())
    time.sleep(3)
    return n**2

if __name__ == '__main__':
    p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
    res_l=[]
    for i in range(10):
        res=p.apply(work,args=(i,)) #同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞,但不管该任务是否存在阻塞,同步调用都会在原地等着,只是等的过程中若是任务发生了阻塞就会被夺走cpu的执行权限
        res_l.append(res)
    print(res_l)

同步调用apply
同步调用apply
from multiprocessing import Pool
import os,time
def work(n):
    print('%s run' %os.getpid())
    time.sleep(3)
    return n**2

if __name__ == '__main__':
    p=Pool(3) #进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,)) #同步运行,阻塞、直到本次任务执行完毕拿到res
        res_l.append(res)

    #异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
    p.close()
    p.join()
    for res in res_l:
        print(res.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
异步调用apply_async
#一:使用进程池(异步调用,apply_async)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        res_l.append(res)
    print("==============================>") #没有后面的join,或get,则程序整体结束,进程池中的任务还没来得及全部执行完也都跟着主进程一起结束了

    pool.close() #关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束

    print(res_l) #看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>对象组成的列表,而非最终的结果,但这一步是在join后执行的,证明结果已经计算完毕,剩下的事情就是调用每个对象下的get方法去获取结果
    for i in res_l:
        print(i.get()) #使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get

#二:使用进程池(同步调用,apply)
#coding: utf-8
from multiprocessing import Process,Pool
import time

def func(msg):
    print( "msg:", msg)
    time.sleep(0.1)
    return msg

if __name__ == "__main__":
    pool = Pool(processes = 3)
    res_l=[]
    for i in range(10):
        msg = "hello %d" %(i)
        res=pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
        res_l.append(res) #同步执行,即执行完一个拿到结果,再去执行另外一个
    print("==============================>")
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束

    print(res_l) #看到的就是最终的结果组成的列表
    for i in res_l: #apply是同步的,所以直接得到结果,没有get()方法
        print(i)

详解:apply_async与apply
详解:apply和apply_async

回调函数:

需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数

我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。

from multiprocessing import Pool
import requests
import json
import os

def get_page(url):
    print('<进程%s> get %s' %(os.getpid(),url))
    respone=requests.get(url)
    if respone.status_code == 200:
        return {'url':url,'text':respone.text}

def pasrse_page(res):
    print('<进程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]
' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://help.github.com/',
        'http://www.sina.com.cn/'
    ]

    p=Pool(3)
    res_l=[]
    for url in urls:
        res=p.apply_async(get_page,args=(url,),callback=pasrse_page)
        res_l.append(res)

    p.close()
    p.join()
    print([res.get() for res in res_l]) #拿到的是get_page的结果,其实完全没必要拿该结果,该结果已经传给回调函数处理了

'''
打印结果:
<进程3388> get https://www.baidu.com
<进程3389> get https://www.python.org
<进程3390> get https://www.openstack.org
<进程3388> get https://help.github.com/
<进程3387> parse https://www.baidu.com
<进程3389> get http://www.sina.com.cn/
<进程3387> parse https://www.python.org
<进程3387> parse https://help.github.com/
<进程3387> parse http://www.sina.com.cn/
<进程3387> parse https://www.openstack.org
[{'url': 'https://www.baidu.com', 'text': '<!DOCTYPE html>
...',...}]
'''
View Code
from multiprocessing import Pool
import time,random
import requests
import re

def get_page(url,pattern):
    response=requests.get(url)
    if response.status_code == 200:
        return (response.text,pattern)

def parse_page(info):
    page_content,pattern=info
    res=re.findall(pattern,page_content)
    for item in res:
        dic={
            'index':item[0],
            'title':item[1],
            'actor':item[2].strip()[3:],
            'time':item[3][5:],
            'score':item[4]+item[5]

        }
        print(dic)
if __name__ == '__main__':
    pattern1=re.compile(r'<dd>.*?board-index.*?>(d+)<.*?title="(.*?)".*?star.*?>(.*?)<.*?releasetime.*?>(.*?)<.*?integer.*?>(.*?)<.*?fraction.*?>(.*?)<',re.S)

    url_dic={
        'http://maoyan.com/board/7':pattern1,
    }

    p=Pool()
    res_l=[]
    for url,pattern in url_dic.items():
        res=p.apply_async(get_page,args=(url,pattern),callback=parse_page)
        res_l.append(res)

    for i in res_l:
        i.get()

    # res=requests.get('http://maoyan.com/board/7')
    # print(re.findall(pattern,res.text))
爬虫案例

如果在主进程中等待进程池中所有任务都执行完毕后,再统一处理结果,则无需回调函数

from multiprocessing import Pool
import time,random,os

def work(n):
    time.sleep(1)
    return n**2
if __name__ == '__main__':
    p=Pool()

    res_l=[]
    for i in range(10):
        res=p.apply_async(work,args=(i,))
        res_l.append(res)

    p.close()
    p.join() #等待进程池中所有进程执行完毕

    nums=[]
    for res in res_l:
        nums.append(res.get()) #拿到所有结果
    print(nums) #主进程拿到所有的处理结果,可以在主进程中进行统一进行处理
View Code
原文地址:https://www.cnblogs.com/0627zhou/p/9518055.html