37 生产者消费者模型 管道 进程间的数据共享 进程池

主要内容

进程之间能不能直接通信? 

  正常情况下, 多进程之间是无法直接进行通信的. 因为每个进程都有自己独立的内存空间.

1 . 生产者消费者模型  

  a  : 主要作用 : 为了解耦         借助队列来实现生产者消费者模型(是安全的)

    栈 :     先进后出(First In last Out)    简称 : FILO

    队列 : 先进先出(First In First Out)   简称 : FIFO

  b  : 队列的介绍   from multiprocessing import Queue

      q = Queue(num)                   num  : 队列的最大长度

      q.get()  阻塞等待获取数据, 如果有数据直接获取, 如果没有数据, 阻塞等待

      q.put()  阻塞, 如果可以继续往队列中放数据, 就直接放, 不能放就直接阻塞等待.

      q.get_nowait()   不阻塞, 如果有数据直接获取, 没有数据直接报错

      q.put_nowait()   不阻塞, 如果可以往队列中放数据,就放,如果不可以就直接报错.

  c  : 采用队列实现生产者消费者模型

from multiprocessing import Process, Queue
import time
def consumer(q, name):
    while 1:
        info = q.get()
        if info:
            print('%s拿了%s' % (name, info))
        else:
            break
def product(q):
    for i in range(20):
        info = '娃娃%s号' % i
        q.put(info)
    q.put(None)       # 加一个结束标识none, 如果消费者读到该数值时,代表数据已经取完, 可以结束了.
if __name__ == '__main__':
    q = Queue(10)
    p_con = Process(target=consumer, args=(q,'lily'))
    p_con.start()
    p_pro = Process(target=product, args=(q,))
    p_pro.start()

  将生产者的结束标识放在父进程中

from multiprocessing import Process, Queue
import time
def consumer(q, name, color):
    while 1:
        info = q.get()
        if info:
            print('%s %s拿了%s33[0m' % (color, name, info))
        else:
            break
def product(q, version):
    for i in range(20):
        info = version + '的娃娃%s号' % i
        q.put(info)
if __name__ == '__main__':
    q = Queue(10)
    p_con = Process(target=consumer, args=(q,'lily', '33[31m'))
    p_con1 = Process(target=consumer, args=(q,'dior', '33[32m'))
    p_pro = Process(target=product, args=(q, 'cute'))
    p_pro1 = Process(target=product, args=(q, 'cool'))
    p_pro2 = Process(target=product, args=(q, 'fashion'))
    l = [p_con, p_con1, p_pro, p_pro1, p_pro2]
    [p.start() for p in l]   如果有多个进程要开启的时候, 可以使用列表推导式.
    p_pro.join()
    p_pro1.join()
    p_pro2.join()
    q.put(None)             有几个用户加几个none
    q.put(None)

  d  : 可连接队列的介绍 JoinableQueue    from multiprocessing import JoinableQueue   

         1)  JoinableQueue是继承Queue , 所以可以使用Queue中的所有方法 

    2)  JoinableQueue还多了两个方法 

      q.join() : 用于生产者, 等待q.task_done的返回结果, 通过返回结果, 生产者就可以获得消费者当前消费的数据.

      q.task_done () : 用于消费者, 是指每消费队列中的一个数据, 就给join一个标识.

  e  : 用可连接队列实现生产者消费者模型

from multiprocessing import Process, JoinableQueue
import time
# 假设生产者生产了100个数据,join就能记录下100这个数字。每次消费者消费一个数据,就必须要task_done返回一个标识,当生产者(join)
# 接收到100个消费者返回来的标识的时候,生产者就能知道消费者已经把所有数据都消费完了。
def consumer(q, name, color):
    while 1:
        info = q.get()
        print('%s %s拿了%s33[0m' % (color, name, info))
        q.task_done()   #用于消费者, 每执行队列中的数据时,会给jon返回一个标识.
def product(q, version):
    for i in range(20):
        info = version + '的娃娃%s号' % i
        q.put(info)
    q.join()         #记录生产了20个数据在队列中, 此时会阻塞等待消费者消费完队列中的所有数据
if __name__ == '__main__':
    q = JoinableQueue()
    p_con = Process(target=consumer, args=(q, 'lily', '33[31m'))
    p_pro = Process(target=product, args=(q, 'cute'))
    p_con.daemon = True       #将消费者进程设置为守护进程.
    p_con.start()
    p_pro.start()
    p_pro.join()              #主进程等待生产这进程结束而结束
# 程序有3个进程,主进程和生产者进程和消费者进程。  当主进程执行到35行代码时,主进程会等待生产进程结束
# 而生产进程中(第26行)会等待消费者进程把所有数据消费完,生产者进程才结束。
# 现在的状态就是  主进程等待生产者进程结束,生产者进程等待消费者消费完所有数据
# 所以,把消费者设置为守护进程。  当主进程执行完,就代表生产进程已经结束,也就代表消费者进程已经把队列中数据消费完
# 此时,主进程一旦结束,守护进程也就是消费者进程也就跟着结束。    整个程序也就能正常结束了。

2 . 管道 

  a : 多进程下的管道

from multiprocessing import Pipe,Process
def func(con):
    con1,con2 = con
    con1.close()# 子进程使用con2和父进程通信,所以
    while 1:
        try:
            print(con2.recv())#当主进程的con1发数据时,子进程要死循环的去接收。
        except EOFError:# 如果主进程的con1发完数据并关闭con1,子进程的con2继续接收时,就会报错,使用try的方式,获取错误
            con2.close()# 获取到错误,就是指子进程已经把管道中所有数据都接收完了,所以用这种方式去关闭管道
            break
if __name__ == '__main__':
    con1,con2 = Pipe()
    p = Process(target=func,args=((con1,con2),))
    p.start()
    con2.close()# 在父进程中,使用con1去和子进程通信,所以不需要con2,就提前关闭
    for i in range(10):# 生产数据
        con1.send(i)# 给子进程的con2发送数据
    con1.close()# 生产完数据,关闭父进程这一端的管道

  b : 单进程下的管道

from multiprocessing import Pipe
con1,con2 = Pipe()
con1.send('abc')
print(con2.recv())
con2.send(123)
print(con1.recv())

3 . 进程间的数据共享    from multiprocessing import Manager,Value

  m = Manager()

  num = m.dict({键 : 值})

  num = m.list([1,2,3])

  a  : 对列表的操作

from multiprocessing import Process,Manager
def func(num):
    num[0] -= 1
    print('子进程中的num的值是',num)
if __name__ == '__main__':
    m = Manager()
    num = m.list([1,2,3])
    p = Process(target=func,args=(num,))
    p.start()
    p.join()
    print('父进程中的num的值是',num)

  b : 对字典的操作

from multiprocessing import Process,Manager
def func(num):
    num['1'] = 'one1'
    print('子进程中的num的值是',num)
if __name__ == '__main__':
    m = Manager()
    num = m.dict({'1':'one','2':'two'})
    p = Process(target=func,args=(num,))
    p.start()
    p.join()
    print('父进程中的num的值是',num)
# 子进程中的num的值是 {'1': 'one1', '2': 'two'}
# 父进程中的num的值是 {'1': 'one1', '2': 'two'}

  c : 对数字的操作 , 把数字转换成列表, 取其第零项进行操作

    利用manager实现银行存钱取钱的问题

from multiprocessing import Process, Manager, Lock
import time
def get_money(s, l):
    l.acquire()
    for i in range(100):
        s[0] = s[0] + 1
        print(s[0])
        time.sleep(0.01)
    l.release()
def put_money(s, l):
    l.acquire()
    for i in range(100):
        s[0] = s[0] - 1
        print(s[0])
        time.sleep(0.01)
    l.release()
if __name__ == '__main__':
    l = Lock()
    m = Manager()
    num = m.list([100])数字实现共享,采用列表实现,取其第o项进行操作.
    p_get = Process(target=get_money, args=(num, l))
    p_get.start()
    p_put = Process(target=put_money, args=(num, l))
    p_put.start()
    p_put.join()
    p_get.join()
    print('>>>>>>>',num[0])

4 . 进程池

  a  :进程池的定义 :  一个池子,里边有固定数量的进程。这些进程一直处于待命状态,一旦有任务来,马上就有进程去处理。因为在实际业务中,任务量是有多有少的,如果任务量特别的多,不可能要开对应那么多的进程数开启那么多进程首先就需要消耗大量的时间让操作系统来为你管理它。其次还需要消耗大量时间让cpu帮你调度它。进程池还会帮程序员去管理池中的进程。

  b : 进程池的三个方法

    1 ) map (func , iterable)

        func : 进程池中的进程执行的任务函数

        iterable : 可迭代对象, 是把可迭代对象中的每个元素依次传给任务函数当参数

      多进程做任务与进程池做任务的效率对比

import os
from multiprocessing import Pool,Process
import time
def func(num):
    num += 1
    print(num)
if __name__ == '__main__':
    p = Pool(os.cpu_count() + 1)
    start = time.time()
    p.map(func,[i for i in range(100)])
    p.close()# 指不允许在向进程池中添加任务
    p.join()# 等待进程池中所有进程执行完所有任务
    print('进程池做任务的效率:',time.time() - start)
    start = time.time()
    p_l = []
    for i in range(100):
        p1 = Process(target=func,args=(i,))
        p1.start()
        p_l.append(p1)
    [p1.join() for p1 in p_l]
    print('多进程直接做任务的效率',time.time() - start)

      map返回值

from multiprocessing import Pool
def func(num):
    num += 1
    print(num)
    return num
if __name__ == '__main__':
    p = Pool(5)
    res = p.map(func, [i for i in range(10)])
    p.close()
    p.join()
    print('主进程中map的返回值',res)     #主进程中map的返回值 [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

    2 ) apply (func , args = ()) : 池中的进程一个一个的去执行任务

        func:进程池中的进程执行的任务函数

        args: 可迭代对象型的参数,是传给任务函数的参数

        注意 : 同步处理任务时,不需要close和join同步处理任务时,进程池中的所有进程是普通进程(主进程需要等待其执行结束)

from multiprocessing import Pool
import time
def func(num):
    num += 1
    return num
if __name__ == '__main__':
    p = Pool(5)
    start = time.time()
    for i in range(10000):
        res = p.apply(func,args=(i,))# 同步处理这100个任务,同步是指,哪怕我进程中有5个进程,也依旧是1个进程1个进程的去执行任务
        print(res)
    print(time.time() - start)

    3 ) apply_async(func , args = ()) : 池中的进程一次性的去执行任务

        func:进程池中的进程执行的任务函数

        args: 可迭代对象型的参数,是传给任务函数的参数

        callback: 回调函数,就是说每当进程池中有进程处理完任务了,返回的结果可以交给回调函数,由回调函数进行进一步的处理,回调函数只有异步才有,同步是没有的

        注意 : 异步处理任务时,进程池中的进程都是守护进程.(主进程代码执行完毕守护进程就结束), 异步处理任务时, 必须要加上close和join.

from multiprocessing import Pool
import time
def func(num):
    num += 1
    print(num)
if __name__ == '__main__':
    p = Pool(5)
    start = time.time()
    l = []
    for i in range(1000):
        p.apply_async(func,args=(i,))# 异步处理这100个任务,异步是指,进程中有5个进程,一下就处理5个任务,接下来哪个进程处理完任务了,就马上去接收下一个任务
    p.close()         # 指不允许向进程池中添加任务
    p.join()          # 等待进程池中的所有进程执行完所有任务
    print(time.time() - start)

                     回调函数的使用:

                     进程的任务函数的返回值,被当成回调函数的形参接收到,以此进行进一步的处理操作

             回调函数是由主进程调用的,而不是子进程,子进程只负责把结果传递给回调函数

from multiprocessing import Pool
import requests
import time,os
def func(url):
    res = requests.get(url)
    print('子进程的pid:%s,父进程的pid:%s'%(os.getpid(),os.getppid()))
    print(res)               #<Response [200]>
    # print(res.text)        #内容
    if res.status_code == 200:
        return url,res.text            #返回的结果交给回调函数,由回调函数进行进一步的处理.
def cal_back(sta):
    url,text = sta
    print('回调函数的pid', os.getpid())
    with open('a.txt','a',encoding='utf-8') as f:
        f.write(url + text + '
')
    # print('回调函数中!',url)
if __name__ == '__main__':
    p = Pool(5)
    l = ['https://www.baidu.com',
         'http://www.jd.com',
         'http://www.taobao.com',
         'http://www.mi.com',
         'http://www.cnblogs.com',
         ]
    print('主进程的pid',os.getpid())
    for i in l:
        p.apply_async(func, args=(i,),callback=cal_back)#
        # 异步执行任务func,每有一个进程执行完任务后,在func中return一个结果,结果会自动的被callback指定的函数,当成形式参数来接收到
    p.close()
    p.join()

  4 ) 同步与异步做任务的效率对比

from multiprocessing import Pool
import requests
import time
def func(url):
    res = requests.get(url)
    # print(res.text)
    if res.status_code == 200:
        return 'ok'
if __name__ == '__main__':
    p = Pool(5)
    l = ['https://www.baidu.com',
         'http://www.jd.com',
         'http://www.taobao.com',
         'http://www.mi.com',
         'http://www.cnblogs.com',
         'https://www.bilibili.com',
         ]
    start = time.time()
    for i in l:
        p.apply(func,args=(i,))
    apply_= time.time() - start
    start = time.time()
    for i in l:
        p.apply_async(func, args=(i,))
    p.close()
    p.join()
    print('同步的时间是%s,异步的时间是%s'%(apply_, time.time() - start))
#同步的时间是5.702281713485718,异步的时间是0.5272564888000488

5 . 用进程池实现socket聊天

  服务器端代码: 

from multiprocessing import Pool
import socket
sk = socket.socket()
sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR, 1)
sk.bind(('127.0.0.1', 7867))
sk.listen()
def func(conn):
    while 1:
        try:
            msg = conn.recv(1024).decode('utf-8')
            if not msg: break
            conn.send(msg.upper().encode('utf-8'))
        except Exception:
            break
if __name__ == '__main__':
    p = Pool(4)
    while 1:
        conn, addr = sk.accept()
        p.apply_async(func, args=(conn, ))

客户端代码:

import socket
sk = socket.socket()
sk.connect(('127.0.0.1', 7867))
while 1:
    content = input('请输入内容>>>:')
    if not content: continue
    sk.send(content.encode('utf-8'))
    msg = sk.recv(1024)
    print(msg.decode('utf-8'))

  注意 : 并发开启多个客户端, 服务器端同一时间只有4个不同的pid, 只能结束一个客户端, 另一个客户端才能进来.

原文地址:https://www.cnblogs.com/gyh412724/p/9519431.html