25 Apr 18 守护进程 互斥锁 抢票系统 IPC通信机制 生产者消费者模型

25 Apr 18
一、上节课复习
if __name__ == '__main__': 放在最后面
obj.join(1)  #只等1秒
 
二、守护进程
from multiprocessing import Process
import time
 
def task(name):
    print('%s is running' % name)
    time.sleep(3)
 
if __name__ == '__main__':
    obj = Process(target=task, args=('egon',))
    obj.daemon=True    #将obj变成守护进程,主进程执行完毕后子进程跟着结束
    obj.start()  # 发送信号给操作系统
print('主')
 
三、互斥锁
强调:必须是lock.acquire()一次,然后 lock.release()释放一次,才能继续lock.acquire(),不能连续的lock.acquire()。否者程序停在原地。
 
互斥锁vs join: 
大前提:二者的原理都是一样,都是将并发变成串行,从而保证有序(在多个程序共享一个资源时,为保证有序不乱,需将并发变成串行)
区别一:join是按照人为指定的顺序执行,而互斥锁是所以进程平等地竞争,谁先抢到谁执行
区别二:互斥锁可以让一部分代码(修改共享数据的代码)串行,而join只能将代码整体串行(详见抢票系统)
 
from multiprocessing import Process,Lock
import time,random
 
mutex=Lock()
 
def task1(lock):
    lock.acquire() 
    print('task1:名字是egon')
    time.sleep(random.randint(1,3))
    print('task1:性别是male')
    time.sleep(random.randint(1,3))
    print('task1:年龄是18')
    lock.release()
 
def task2(lock):
    lock.acquire()
    print('task2:名字是alex')
    time.sleep(random.randint(1,3))
    print('task2:性别是male')
    time.sleep(random.randint(1,3))
    print('task2:年龄是78')
    lock.release()
 
def task3(lock):
    lock.acquire()
    print('task3:名字是lxx')
    time.sleep(random.randint(1,3))
    print('task3:性别是female')
    time.sleep(random.randint(1,3))
    print('task3:年龄是30')
    lock.release()
 
if __name__ == '__main__':
    p1=Process(target=task1,args=(mutex,))
    p2=Process(target=task2,args=(mutex,))
    p3=Process(target=task3,args=(mutex,))
 
    p1.start()
    p2.start()
    p3.start()
 
四、抢票系统
import json
import time
import random
import os
from multiprocessing import Process,Lock
 
mutex=Lock()
 
def search():
    time.sleep(random.randint(1,3))
    with open('db.json','r',encoding='utf-8') as f:
        dic=json.load(f)
        print('%s 剩余票数:%s' %(os.getpid(),dic['count']))
 
def get():
    with open('db.json','r',encoding='utf-8') as f:
        dic=json.load(f)
    if dic['count'] > 0:
        dic['count']-=1
        time.sleep(random.randint(1,3))
        with open('db.json','w',encoding='utf-8') as f:
            json.dump(dic,f)
        print('%s 购票成功' %os.getpid())
 
def task(lock):
    search()
    lock.acquire()
    get()
    lock.release()
 
if __name__ == '__main__':
    for i in range(10):
        p=Process(target=task,args=(mutex,))
        p.start()
 
五、IPC通信机制
进程之间通信必须找到一种介质,该介质必须满足
1、是所有进程共享的
2、必须是内存空间
附加:帮我们自动处理好锁的问题
 
a、 from multiprocessing import Manager(共享内存,但要自己解决锁的问题)
b、 IPC中的队列(Queue) 共享,内存,自动处理锁的问题(最常用)
c、 IPC中的管道(Pipe),共享,内存,需自己解决锁的问题
 
a、 用Manager
from multiprocessing import Process,Manager,Lock
import time
 
mutex=Lock()
 
def task(dic,lock):
    lock.acquire()
    temp=dic['num']
    time.sleep(0.1)
    dic['num']=temp-1
    lock.release()
 
if __name__ == '__main__':
    m=Manager()
    dic=m.dict({'num':10})
 
    l=[]
    for i in range(10):
        p=Process(target=task,args=(dic,mutex))
        l.append(p)
        p.start()
 
    for p in l:
        p.join()
print(dic)
 
b、 用队列Queue
1)共享的空间
2)是内存空间
3)自动帮我们处理好锁定问题
 
from multiprocessing import Queue
q=Queue(3)  #设置队列中maxsize个数为三
q.put('first')
q.put({'second':None})
q.put('三')
# q.put(4)   #阻塞。不报错,程序卡在原地等待队列中清出一个值。默认blok=True
print(q.get())
print(q.get())
print(q.get())
 
强调:
1、队列用来存成进程之间沟通的消息,数据量不应该过大
2、maxsize的值超过的内存限制就变得毫无意义
                                                              
                                                                                                                                                                                                      
了解:
q=Queue(3)
q.put('first',block=False)
q.put('second',block=False)
q.put('third',block=False)
q.put('fourth',block=False)  #报错 queue.Full
 
q.put('first',block=True)
q.put('second',block=True)
q.put('third',block=True)
q.put('fourth',block=True,timeout=3)  #等待3秒后若还进不去报错。注意timeout不能和block=False连用
 
q.get(block=False)
q.get(block=False)
q.get(block=False)
q.get(block=False)           #报错 queue.Empty
 
q.get(block=True)
q.get(block=True)
q.get(block=True)
q.get(block=True,timeout=2)    #等待2秒后还取不出东西则报错。注意timeout不能和block=False连用
 
六、生产者消费者模型
该模型中包含两类重要的角色:
1、生产者:将负责造数据的任务比喻为生产者
2、消费者:接收生产者造出的数据来做进一步的处理,该类人物被比喻成消费者
 
实现生产者消费者模型三要素
1、生产者
2、消费者
3、队列
 
什么时候用该模型:
程序中出现明显的两类任何,一类任务是负责生产,另外一类任务是负责处理生产的数据的
 
该模型的好处:
1、实现了生产者与消费者解耦和
2、平衡了生产者的生产力与消费者处理数据的能力
 
import time
import random
from multiprocessing import Process,Queue
 
def consumer(name,q):
    while True:
        res=q.get()
        time.sleep(random.randint(1,3))
        print('33[46m消费者===》%s 吃了 %s33[0m' %(name,res))
 
def producer(name,q,food):
    for i in range(5):
        time.sleep(random.randint(1,2))
        res='%s%s' %(food,i)
        q.put(res)
        print('33[45m生产者者===》%s 生产了 %s33[0m' %(name,res))
 
if __name__ == '__main__':
    #1、共享的盆
    q=Queue()
 
    #2、生产者们
    p1=Process(target=producer,args=('egon',q,'包子'))
    p2=Process(target=producer,args=('刘清政',q,'泔水'))
    p3=Process(target=producer,args=('杨军',q,'米饭'))
 
    #3、消费者们
    c1=Process(target=consumer,args=('alex',q))
    c2=Process(target=consumer,args=('梁书东',q))
 
    p1.start()
    p2.start()
    p3.start()
    c1.start()
c2.start()
 
生产者消费者模型是解决问题的思路不是技术。可以用进程和队列来实现,也可以用其他的来实现。
原文地址:https://www.cnblogs.com/zhangyaqian/p/py20180425.html