python学习Day37--生产者消费者模型

一、回顾

1、并发与并行:

  并发:在同一个时间短内多个任务同时进行。

  并行:在用一个时间点上多个任务同时进行。

2、进程的三大基本状态:

  就绪状态:所有进程需要的资源都获取到了,除了CPU。

  执行状态:获取到了所有资源包括CPU,进程处于运行状态。

  阻塞状态:进程停止不再运行,放弃了CPU,进程此时处于内存中。

3、什么叫进程?

  正在运行的程序;由代码段,数据段,PCB(进程控制块)组成;

  进程是资源分配的基本单位;

4、进程之间能不能直接通信?

  正常情况下,多进程之间是无法进行通信的。因为每一个进程都有自己独立的内存空间。

5、锁机制

  为了多进程通信时,保护数据的安全性。

  一把锁醅一把钥匙。

  l = Lock( )

  l.acquire( )

  l.release( )

6、信号量

  一把锁配多把钥匙。

  sem = Semaphore(num)

  num代表的是几把钥匙。

7、事件

  e = Event( )

  e.is_set( )  返回一个bool值。

  e.wait( )     阻塞和非阻塞

  e.set( )       把is_set的bool值变为True

  e.clear( )    把is_set的bool变为False

二、生产者消费者模型

  主要是为了解耦

  借助队列来实现生产者消费者模型。

1、栈与队列介绍

  栈:先进后出(First In Last Out 简称FILO)

  队列:先进先出(First In First Out 简称FIFO),队列是安全的。

import queue # 不能进程多进程之前的数据传输(不用)
from multiprocessing import Queue # 用这个模块

(1)队列的主要内容

q = Queue(num)
# num:队列的最大长度
q.get() # 阻塞等待获取数据,如果有数据直接获取,如果没有数据,阻塞等待
q.put() # 阻塞,如果可以继续往队列中放数据,就直接放,不能放就阻塞等待

q.get_nowait() # 不阻塞,如果有数据直接获取,没有数据就报错
q.put_nowait() # 不阻塞,如果可以继续网队列中放数据,就直接放,不能就报错

(2)初始队列

 1 # *******************练习**********************开始
 2 from multiprocessing import Queue
 3 
 4 q = Queue(3) # 长度为3
 5 
 6 q.put(1)
 7 q.put('abc')
 8 q.put([4,5,6])
 9 print(123)
10 # q.put('娃哈哈') # 阻塞,不报错
11 try:
12     q.put_nowait('娃哈哈') # 阻塞,并报错
13 except:
14     print("队列满了")
15 print(111)
16 
17 print(q.get())
18 print(q.get())
19 print(q.get())
20 # print(q.get()) # 阻塞,等待
21 try:
22     print(q.get_nowait())
23 except:
24     print("队列已经空了")
25 
26 # *******************练习**********************结束
练习

2、队列实现生产者消费模型

 1 # *******************队列实现生产者消费者模型**********************开始
 2 from multiprocessing import Queue,Process
 3 import time
 4 
 5 def consumer(q,name):
 6     while 1:
 7         info = q.get_nowait()
 8         if info:
 9             print('%s拿走了%s' % (name,info))
10         else:
11             print("没有娃娃了,等下一批吧")
12             break
13 
14 
15 # 消费者如何判断,生产者是没来得及生产,还是生产者不在生产了?
16 
17 def producer(q,product):
18     for i in range(20):
19         info = product+'娃娃%s号' % str(i)
20         q.put(info)
21     q.put(None)
22 
23 if __name__ == '__main__':
24     q = Queue(10)
25     p_pro = Process(target=producer,args=(q,'mini版本'))
26     p_con = Process(target=consumer,args=(q,'xxx'))
27     p_pro.start()
28     p_con.start()
29 # *******************队列实现生产者消费者模型**********************结束

3、进阶生产者消费者模型

(1)进阶:将生产者生产结束的标识,放到父进程中。

 1 # *******************进阶生产者消费者模型**********************开始
 2 from multiprocessing import Queue,Process
 3 import time
 4 
 5 def consumer(q,name,color):
 6     while 1:
 7         info = q.get()
 8         if info:
 9             print('%s%s拿走了%s33[0m' % (color,name,info))
10         else:
11             print("没有娃娃了,等下一批吧")
12             break
13 
14 # 消费者如何判断,生产者是没来得及生产,还是生产者不在生产了?
15 
16 def producer(q,product):
17     for i in range(20):
18         info = product+'娃娃%s号' % str(i)
19         q.put(info)
20 
21 if __name__ == '__main__':
22     q = Queue(10)
23     p_pro1 = Process(target=producer,args=(q,'mini版本'))
24     p_pro2 = Process(target=producer,args=(q,'maxi版本'))
25     p_pro3 = Process(target=producer,args=(q,'super版本'))
26     p_con1 = Process(target=consumer,args=(q,'夏明','33[31m'))
27     p_con2 = Process(target=consumer,args=(q,'小华','33[36m'))
28     p_1 = [p_con1,p_con2,p_pro1,p_pro2,p_pro3]
29     [i.start() for i in p_1]
30 
31     # 父进程如何感知生产者子进p_pro1程不在生产数据了?
32     # 通过加入join来解决
33     p_pro1.join()
34     p_pro2.join()
35     p_pro3.join()
36     q.put(None) # 几个消费者就接收几个结束标识
37     q.put(None)
38 # *******************进阶生产者消费者模型**********************结束
进阶版

(2)新模块JoinableQueue(可连接队列)——改进生产者消费者模型

  继承Queue,所有可以使用Queue里面的方法。

 1 # *******************新模块joinableQueue**********************开始
 2 from multiprocessing import Process,JoinableQueue
 3 
 4 # q = JoinableQueue()
 5 
 6 # q.join() # 用于生产者。等待q.task_done返回结果,通过返回结果,生产和就能获得消费者当前消费了多少个数据
 7 # q.task_done() # 用于消费者,是指每消费队列中一个数据,就给join返回一个标识
 8 
 9 from multiprocessing import Queue,Process
10 import time
11 
12 def consumer(q,name,color):
13     while 1:
14         info = q.get()
15         print('%s%s拿走了%s33[0m' % (color,name,info))
16         q.task_done()
17 
18 # 消费者如何判断,生产者是没来得及生产,还是生产者不在生产了?
19 
20 def producer(q,product):
21     for i in range(20):
22         info = product+'娃娃%s号' % str(i)
23         q.put(info)
24     q.join() #记录了生产20个数据在队列中,此时会阻塞等待消费者消费完队列中所有数据
25 
26 if __name__ == '__main__':
27     q = JoinableQueue(10)
28     p_pro1 = Process(target=producer,args=(q,'mini版本'))
29     # p_pro2 = Process(target=producer,args=(q,'maxi版本'))
30     # p_pro3 = Process(target=producer,args=(q,'super版本'))
31     p_con1 = Process(target=consumer,args=(q,'夏明','33[31m'))
32     # p_con2 = Process(target=consumer,args=(q,'小华','33[36m'))
33     p_con1.daemon = True # 把消费者进程设为守护进程(这样做是:最后才能结束消费者进程,主进程结束,守护进程也就结束了)
34     p_pro1.start()
35     p_con1.start()
36     p_pro1.join() # 主进程等待生产者进程结束
37 
38 '''
39     程序有3个进程,主进程和生产者进程和消费者进程。 当主进程执行到p_pro1.join()代码时,主进程会等待生产进程结束
40     而生产进程中会等待消费者进程把所有数据消费完,生产者进程才结束。
41     现在的状态就是  主进程等待生产者进程结束,生产者进程等待消费者消费完所有数据
42     所以,把消费者设置为守护进程。  当主进程执行完,就代表生产进程已经结束,也就代表消费者进程已经把队列中数据消费完
43     此时,主进程一旦结束,守护进程也就是消费者进程也就跟着结束。    整个程序也就能正常结束了。
44 '''
45 # *******************新模块joinableQueue**********************结束

4、管道

  管道是不安全的。

(1)管道介绍、

 1 # *******************管道**********************开始
 2 from multiprocessing import Pipe
 3 
 4 con1,con2 = Pipe()
 5 
 6 con1.send('abc')
 7 print(con2.recv())
 8 con2.send('456')
 9 print(con1.recv())
10 # *******************管道**********************结束

(2)多进程下的管道

 1 # *******************多进程下的管道**********************开始
 2 from multiprocessing import Pipe,Process
 3 
 4 def func(con):
 5     con1,con2 = con
 6     con1.close()
 7     while 1:
 8         try:
 9             print(con2.recv())
10         except EOFError:
11             con2.close()
12             break
13 
14 if __name__ == '__main__':
15     con1,con2 = Pipe()
16     p = Process(target=func,args=((con1,con2),))
17     p.start()
18     con2.close()
19     for i in range(10):
20         con1.send('abc')
21     con1.close()
22 # *******************多进程下的管道**********************结束

【注意】一个小问题:你知道的IPC有哪些?

  管道、队列、(锁,信号量,事件)

5、进程之间的共享内存Manager

 1 # *******************多进程之前的共享内存**********************开始
 2 from multiprocessing import Process,Manager
 3 
 4 def func(num):
 5     num[0] -= 1
 6     print("子进程中的num的值是",num) # [0, 2, 3]
 7 
 8 
 9 if __name__ == '__main__':
10     m = Manager()
11     num = m.list([1,2,3])
12     p = Process(target=func,args=(num,))
13     p.start()
14     p.join()
15     print("父进程中的num的值是",num) # [0, 2, 3]
16 # *******************多进程之前的共享内存**********************结束
Manager

6、进程池

     进程池:一个池子,里边有固定数量的进程。这些进程抑制处于待命状态,一旦有任务来,马上就有进程去处理。

  因为在实际业务中,任务量是有多又有少的,如果任务量特别多,不可能要开对应那么多的进程数;

  开启那么多进程首先就需要消耗大量的时间让操作系统来管理它,其次还需要消耗大量时间让CPU帮你调度它。

  进程池还会帮程序员去管理池中的进程。

进程池有三个方法

(1)map(func, iterable)

  func:进程池中的进程执行的任务函数

  iterable:可迭代对象,是把可迭代对象中的每个元素依次传给任务函数的参数

(2)apply(func, args=( )) ——同步的效率,也就是说池中的进程一个一个的去执行任务

  func:进程池中的进程执行的任务函数

  args:可迭代对象型的参数,是传给任务函数的参数

  同步处理任务时,不需要close和join

(3)apply_async(func, args=( ),callback=None) ——异步的效率,也就是说池中的进程一次性都去执行任务

  func:进程池中的进程执行的任务函数

  args:可迭代对象型的参数,是传给任务函数的参数

  callback:回调函数,就是说每当进程池中有进程处理完任务了,返回的结果可以交给回调函数,由回调函数进行进一步的处理,回调函数只有异步才有,同步没有的。

  异步处理任务时,必须要加上close和join

 1 # *******************进程池**********************开始
 2 from multiprocessing import Pool,Process
 3 import os
 4 import time
 5 
 6 def func(num):
 7     num += 1
 8     print(num)
 9 
10 if __name__ == '__main__':
11     # p = Pool(os.cpu_count())
12     # start = time.time()
13     # p.map(func,[i for i in range(100)])
14     # p.close() # 指不允许再向进程池中添加任务
15     # p.join() # 等待进程池中所有进程执行完所有任务
16     # print("进程池做任务的效率", time.time() - start)
17 
18     # p = Pool(os.cpu_count())
19     # # start = time.time()
20     # for i in range(100):
21     #     res = p.apply(func,args=(i,)) # (进程池的同步处理)是指让进程池中的进程,同步的帮你做任务
22     # print("进程池做任务的效率", time.time() - start)
23 
24     p = Pool(os.cpu_count())
25     start = time.time()
26     for i in range(100):
27         p.apply_async(func, args=(i,)) # (进程池的异步处理)是指让进程池中的进程,异步的帮你做任务
28     print("进程池做任务的效率", time.time() - start)
29 
30     start = time.time()
31     p_1 = []
32     for i in range(100):
33         p1 = Process(target=func,args=(i,)) # (多线程的处理)
34         p1.start()
35         p_1.append(p1)
36     [p1.join() for p1 in p_1]
37     print("多进程做任务的效率",time.time()-start)
38 # *******************进程池**********************结束
进程池的操作

回调函数的使用:

  进程的任务函数的返回值,被当成回调函数的形参接收到,以此进行进一步的操作。

  回调函数是由主进程调用的,而不是子进程,子进程只负责把结果传递给回调函数。

 1 from multiprocessing import Pool
 2 import requests
 3 import time,os
 4 
 5 def func(url):
 6     res = requests.get(url)
 7     print('子进程的pid:%s,父进程的pid:%s'%(os.getpid(),os.getppid()))
 8     # print(res.text)
 9     if res.status_code == 200:
10         return url,res.text
11 
12 def cal_back(sta):
13     url,text = sta
14     print('回调函数的pid', os.getpid())
15     with open('a.txt','a',encoding='utf-8') as f:
16         f.write(url + text)
17     print('回调函数中!',url)
18 
19 if __name__ == '__main__':
20     p = Pool(5)
21     l = ['https://www.baidu.com',
22          'http://www.jd.com',
23          'http://www.taobao.com',
24          'http://www.mi.com',
25          'http://www.cnblogs.com',
26          'https://www.bilibili.com',
27          ]
28     print('主进程的pid',os.getpid())
29     for i in l:
30         p.apply_async(func, args=(i,),callback=cal_back) # 回调函数是由主进程调用的
31         # 异步执行任务func,每有一个进程执行完任务后,在func中return一个结果,结果会自动的被callback指定的函数,当成形式参数来接收到
32     p.close()
33     p.join()
回调函数的使用
原文地址:https://www.cnblogs.com/fengxb1213/p/12747450.html