Python中的生产者消费者模型

---恢复内容开始---

了解知识点:

1、守护进程:

·什么是守护进程:

守护进程其实就是一个‘子进程’,守护即伴随,守护进程会伴随主进程的代码运行完毕后而死掉

·为何用守护进程:

当该子进程内的代码在父进程代码运行完毕后就没有存在的意义了,就应该将进程设置为守护进程,会在父进程代码结束后死掉

 1 from multiprocessing import Process
 2 
 3 import time,os
 4 
 5 def task(name):
 6     print('%s is running'%name)
 7     time.sleep(3)
 8 
 9 if __name__ == '__main__':
10     p1=Process(target=task,args=('守护进程',))
11     p2=Process(target=task,args=('正常的子进程',))
12     p1.daemon=True  # 一定要放到p.start()之前
13     p1.start()
14     p2.start()
15     print('')
16     
守护进程举例

以下是守护进程会迷惑人的范例:

 1 #主进程代码运行完毕,守护进程就会结束
 2 from multiprocessing import Process
 3 import time
 4 def foo():
 5     print(123)
 6     time.sleep(1)
 7     print("end123")
 8 
 9 def bar():
10     print(456)
11     time.sleep(3)
12     print("end456")
13 
14 if __name__ == '__main__':
15     p1=Process(target=foo)
16     p2=Process(target=bar)
17 
18     p1.daemon=True
19     p1.start()
20     p2.start()
21     print("main-------")
22 
23     '''
24     main-------
25     456
26     enn456
27     '''
28 
29 
30     '''
31     main-------
32     123
33     456
34     enn456
35     '''
36 
37     '''
38     123
39     main-------
40     456
41     end456
42     '''
View Code

2、互斥锁:

互斥锁:可以将要执行任务的部分代码(只涉及到修改共享数据的代码)变成串行

join:是要执行任务的所有代码整体串行

强调:必须是lock.acquire()一次,然后 lock.release()释放一次,才能继续lock.acquire(),不能连续的lock.acquire()。否者程序停在原地。
互斥锁vs join: 
大前提:二者的原理都是一样,都是将并发变成串行,从而保证有序(在多个程序共享一个资源时,为保证有序不乱,需将并发变成串行)
区别一:join是按照人为指定的顺序执行,而互斥锁是所以进程平等地竞争,谁先抢到谁执行
区别二:互斥锁可以让一部分代码(修改共享数据的代码)串行,而join只能将代码整体串行(详见抢票系统)

互斥锁
 1 from multiprocessing import Process,Lock
 2 import json
 3 import os
 4 import time
 5 import random
 6 
 7 def check():
 8     time.sleep(1) # 模拟网路延迟
 9     with open('db.txt','rt',encoding='utf-8') as f:
10         dic=json.load(f)
11     print('%s 查看到剩余票数 [%s]' %(os.getpid(),dic['count']))
12 
13 def get():
14     with open('db.txt','rt',encoding='utf-8') as f:
15         dic=json.load(f)
16     time.sleep(2)
17     if dic['count'] > 0:
18         # 有票
19         dic['count']-=1
20         time.sleep(random.randint(1,3))
21         with open('db.txt','wt',encoding='utf-8') as f:
22             json.dump(dic,f)
23         print('%s 购票成功' %os.getpid())
24     else:
25         print('%s 没有余票' %os.getpid())
26 
27 
28 def task(mutex):
29     # 查票
30     check()
31 
32     #购票
33     mutex.acquire() # 互斥锁不能连续的acquire,必须是release以后才能重新acquire
34     get()
35     mutex.release()
36 
37 
38 
39     # with mutex:
40     #     get()
41 
42 if __name__ == '__main__':
43     mutex=Lock()
44     for i in  range(10):
45         p=Process(target=task,args=(mutex,))
46         p.start()
47         # p.join()
模拟抢票

3、IPC通信机制

进程之间通信必须找到一种介质,该介质必须满足
1、是所有进程共享的
2、必须是内存空间
附加:帮我们自动处理好锁的问题
 
a、   from multiprocessing import Manager(共享内存,但要自己解决锁的问题)
b、   IPC中的队列(Queue) 共享,内存,自动处理锁的问题(最常用)
c、   IPC中的管道(Pipe),共享,内存,需自己解决锁的问题

a、用Manager(了解知识点)

 1 from multiprocessing import Process,Manager,Lock
 2 import time
 3 
 4 mutex=Lock()
 5 
 6 def task(dic,lock):
 7     lock.acquire()
 8     temp=dic['num']
 9     time.sleep(0.1)
10     dic['num']=temp-1
11     lock.release()
12 
13 if __name__ == '__main__':
14     m=Manager()
15     dic=m.dict({'num':10})
16 
17     l=[]
18     for i in range(10):
19         p=Process(target=task,args=(dic,mutex))
20         l.append(p)
21         p.start()
22     for p in l:
23         p.join()
24     print(dic)
View Code

b、用队列Queue

1)共享的空间

2)是内存空间

3)自动帮我们处理好锁定问题

 1 from multiprocessing import Queue
 2 q=Queue(3)  #设置队列中maxsize个数为三
 3 q.put('first')
 4 q.put({'second':None})
 5 q.put('')
 6 # q.put(4)   #阻塞。不报错,程序卡在原地等待队列中清出一个值。默认blok=True
 7 print(q.get())
 8 print(q.get())
 9 print(q.get())
10 
11 强调:
12 1、队列用来存成进程之间沟通的消息,数据量不应该过大
13 2、maxsize的值超过的内存限制就变得毫无意义
14                                                               
 1 了解:
 2 q=Queue(3)
 3 q.put('first',block=False)
 4 q.put('second',block=False)
 5 q.put('third',block=False)
 6 q.put('fourth',block=False)  #报错 queue.Full
 7 
 8 q.put('first',block=True)
 9 q.put('second',block=True)
10 q.put('third',block=True)
11 q.put('fourth',block=True,timeout=3)  #等待3秒后若还进不去报错。注意timeout不能和block=False连用
12 
13 q.get(block=False)
14 q.get(block=False)
15 q.get(block=False)
16 q.get(block=False)           #报错 queue.Empty
17 
18 q.get(block=True)
19 q.get(block=True)
20 q.get(block=True)
21 q.get(block=True,timeout=2)    #等待2秒后还取不出东西则报错。注意timeout不能和block=False连用
了解

4、生产者与消费者模型

该模型中包含两类重要的角色:
1、生产者:将负责造数据的任务比喻为生产者
2、消费者:接收生产者造出的数据来做进一步的处理,该类人物被比喻成消费者
 
实现生产者消费者模型三要素
1、生产者
2、消费者
3、队列
什么时候用该模型:
程序中出现明显的两类任何,一类任务是负责生产,另外一类任务是负责处理生产的数据的
 
该模型的好处:
1、实现了生产者与消费者解耦和
2、平衡了生产者的生产力与消费者的处理数据的能力

注意:生产者消费者模型是解决问题的思路不是技术。可以用进程和队列来实现,也可以用其他的来实现。

 1 from multiprocessing import JoinableQueue,Process
 2 import time
 3 import os
 4 import random
 5 
 6 def producer(name,food,q):
 7     for i in range(3):
 8         res='%s%s' %(food,i)
 9         time.sleep(random.randint(1,3))
10         # 往队列里丢
11         q.put(res)
12         print('33[45m%s 生产了 %s33[0m' %(name,res))
13     # q.put(None)
14 
15 def consumer(name,q):
16     while True:
17         #从队列里取走
18         res=q.get()
19         if res is None:break
20         time.sleep(random.randint(1,3))
21         print('33[46m%s 吃了 %s33[0m' %(name,res))
22         q.task_done()
23 
24 if __name__ == '__main__':
25     q=JoinableQueue()
26     # 生产者们
27     p1=Process(target=producer,args=('egon','包子',q,))
28     p2=Process(target=producer,args=('杨军','泔水',q,))
29     p3=Process(target=producer,args=('猴老师','',q,))
30     # 消费者们
31     c1=Process(target=consumer,args=('Alex',q,))
32     c2=Process(target=consumer,args=('wupeiqidsb',q,))
33     c1.daemon=True
34     c2.daemon=True
35 
36     p1.start()
37     p2.start()
38     p3.start()
39     c1.start()
40     c2.start()
41 
42     p1.join()
43     p2.join()
44     p3.join()
45     q.join() #等待队列被取干净
46     # q.join() 结束意味着
47     # 主进程的代码运行完毕--->(生产者运行完毕)+队列中的数据也被取干净了->消费者没有存在的意义
48 
49     # print('主')
原文地址:https://www.cnblogs.com/huyingsakai/p/9300835.html