day5.进程-锁信号量Event事件进程队列Queue生产者消费者模型JoinableQueue进程间共享数据Manager

一、进程-锁

from multiprocessing import Process,Lock

1、lock的基本用法

"""
上锁和解锁是一对,只上锁不解锁会发生死锁现象(代码阻塞,不往下执行了)
互斥锁 : 互斥锁是进程之间的互相排斥,谁先抢到这个锁资源就先使用,后抢到后使用
"""
# 创建一把锁
lock = Lock()
# 上锁
lock.acquire()
# 连续上锁不解锁是死锁
# lock.acquire() error

print("厕所中...")

# 解锁
lock.release()
print("执行程序 ... ")

2、模拟12306抢票软件

# 防止某一时刻多人同时抢到票,加锁
import json,time

# 读写数据库中的票数
def wr_info(sign,dic=None):
    if sign == "r":
        with open("ticket",mode="r",encoding="utf-8") as fp:
            dic = json.load(fp)
        return dic
        
    elif sign == "w":
        with open("ticket",mode="w",encoding="utf-8") as fp:
            json.dump(dic,fp)
        
# res = wr_info("r")
# print(res)
# dic = {"count":0}
# wr_info("w",dic)

# 抢票方法
def get_ticket(person):
    # 获取数据库中实际的票数
    dic = wr_info("r")
    print(dic)
    
    # 模拟一下网络延迟
    time.sleep(0.5)
    
    # 判断票数
    if dic["count"] > 0 :    
        print("%s抢到票了" % (person))
        dic["count"] -= 1
        wr_info("w",dic)
    else:
        print("%s没有抢到这张票" % (person))

def run(person,lock):
    
    # 查看剩余票数
    dic = wr_info("r")    
    print("%s 查询票数: %s" % (person , dic["count"]) )    
    
    
    # 上锁
    lock.acquire()
    # 开始抢票
    get_ticket(person)
    lock.release()

if __name__ == "__main__":
    lock = Lock()
    lst = ["aaa","bbb","ccc","ddd","eee"]
    for i in lst:
        p = Process(target=run,args=(i,lock))
        p.start()
抢票

二、信号量-Semaphore

# 信号量 Semaphore 本质上就是锁,只不过可以控制上锁的数量

 1、基本操作

from multiprocessing import Semaphore
sem = Semaphore(4)
sem.acquire()
sem.acquire()
sem.acquire()
sem.acquire()
sem.acquire() # 上第五把锁出现死锁状态
print("执行相应的操作")

2、用信号量模拟ktv唱歌

from multiprocessing import Semaphore, Process
import time,random

def ktv(person,sem):
    sem.acquire()
    print("%s进入了ktv,正在唱歌" %(person))
    # 唱了一段时间
    time.sleep(random.uniform(3,7))
    print("%s离开了ktv,唱完了" %(person))
    sem.release()

if __name__ == '__main__':
    sem = Semaphore(4)
    lst = ["aaa","bbb","ccc","ddd","eee","fff"]
    for i in lst:
        p = Process(target=ktv,args=(i,sem))
        p.start()
"""
# 总结
Semaphore 可以设置上锁的数量
同一时间最多允许几个进程上锁
创建进程的时候,是异步并发
执行任务的时候,遇到锁会变成同步程序
"""

 三、Event事件

"""
# 阻塞事件 :
    e = Event()生成事件对象e   
    e.wait()动态给程序加阻塞 , 
程序当中是否加阻塞完全取决于该对象中的is_set() 
    #[默认返回值是False]
    # 如果是True  不加阻塞
    # 如果是False 加阻塞

# 控制这个属性的值
    # set()方法     将这个属性的值改成True
    # clear()方法   将这个属性的值改成False
    # is_set()方法  判断当前的属性是否为True  (默认上来是False)
"""

1、基本语法

1.1、默认Event()为False情况

from multiprocessing import Process,Event
import time,random
e = Event()
print(e.is_set())
e.wait()
print("程序运行中 ... ")

1.2、默认Event()为True情况

e = Event()
e.set() # 将内部成员属性值由False -> True
print(e.is_set())
e.wait()
print("程序运行中 ... ")

e.clear() # 将内部成员属性值由True => False 
e.wait()
print("程序运行中2 ... ")

1.3、给wait()设置时间

# def wait(self, timeout=None):
e = Event()
# wait参数 可以写时间 wait(3) 代表最多等待3秒钟
e.wait(3) 
print("程序运行中3 ... ")

2、模拟红绿灯效果

def traffic_light(e):
    print("红灯亮")
    while True:
        if e.is_set():
            # 绿灯状态,亮1秒钟
            time.sleep(1)
            print("红灯亮")
            e.clear()
        else:
            # 红灯状态,亮1秒钟
            time.sleep(1)
            print("绿灯亮")
            e.set()

# e = Event()
# traffic_light(e)


def car(e,i):
    # not False => True => 目前是红灯,小车在等待
    if not e.is_set():
        print("car%s 在等待" % (i))
        # 加阻塞
        e.wait()
    print("car%s 通行了" % (i))

# 不关红绿灯,一直跑
"""
if __name__ == "__main__":
    e = Event()
    # 创建交通灯对象
    p1 = Process(target=traffic_light,args=(e,))
    p1.start()
    
    # 创建车对象
    for i in range(1,21):
        time.sleep(random.randrange(0,2)) # 0 1
        p2 = Process(target=car,args=(e,i))
        p2.start()
"""

# 当所有小车都跑完之后,把红绿灯收拾起来,省电
if __name__ == "__main__":
    lst = []
    e = Event()
    # 创建交通灯对象
    p1 = Process(target=traffic_light,args=(e,))
    
    # 设置红绿灯为守护进程
    p1.daemon = True
    p1.start()
    
    # 创建车对象
    for i in range(1,21):
        time.sleep(random.randrange(0,2)) # 0 1
        p2 = Process(target=car,args=(e,i))
        p2.start()
        lst.append(p2)
        
    # 让所有的小车都通行之后,在结束交通灯
    for i in lst:
        i.join()

    print("程序结束 ... ")
红绿灯

四、进程队列Queue

"""队列特点: 先进先出,后进后出"""

1、进程队列基础

from multiprocessing import Process,Queue
import queue

q = Queue()
# 1.put 往队列中放值
q.put(100)
q.put(101)
q.put(102)

# 2.get 从队列中取值
res = q.get()
print(res)
res = q.get()
print(res)
res = q.get()
print(res)

2、队列中如果已经没有数据了,在调用get会发生阻塞.

res = q.get()
print(res)

3、get_nowait()

# 用法
# 队列里有值就取,没值就抛出异常raise Empty  _queue.Empty
# 存在系统兼容性问题[windows]好用 [linux]不好用 不推荐

4、设置队列的长度 Queue(4)

# 设置队列长度最多存放4个元素
print("<======>")
q2 = Queue(4)
q2.put(200)
q2.put(201)
q2.put(202)
# q2.put(203)
# 如果超过了队列的指定长度,在继续存值会出现阻塞现象
# q2.put(204)
put_nowait() 非阻塞版本的put,超出长度后,直接报错
q2.put_nowait(204)
try:
    q2.put_nowait(205)
except queue.Full:
    pass

五、进程间通信

#主进程和子进程都能获取到队列Queue()中的数据
def func(q3):
    # 2.子进程获取数据
    res = q3.get()
    print(res)
    
    # 3.子进程存数据
    q3.put("马生平")

if __name__ == "__main__":
    q3 = Queue()
    p = Process(target=func,args=(q3,))
    p.start()
    
    # 1.主进程添加数据
    q3.put("王凡")
    
    # 为了等待子进程把数据放到队列中,需要加join
    p.join()
    
    # 4.主进程获取数据
    res = q3.get()
    print(res)
    
    print("主程序结束 ... ")

六、生产者消费者模型

"""
# 爬虫例子:
1号进程负责爬取页面中所有想要的数据
2号进程负责把内容取出来,按照规则进行匹配,抽取关键字

1号进程可以理解成生产者
2号进程可以理解成消费者

理想的生产者和消费者模型中,彼此的速度相对均匀

从程序上来讲:
    生产者负责存储数据 (put)
    消费者负责获取数据 (get)
"""

1、基本模型

"""
# 爬虫例子:
1号进程负责爬取页面中所有想要的数据
2号进程负责把内容取出来,按照规则进行匹配,抽取关键字

1号进程可以理解成生产者
2号进程可以理解成消费者

理想的生产者和消费者模型中,彼此的速度相对均匀

从程序上来讲:
    生产者负责存储数据 (put)
    消费者负责获取数据 (get)
"""
from multiprocessing import Process,Queue
import time,random
# 消费者模型
def consumer(q,name):
    while True:
        food = q.get()
        time.sleep(random.uniform(0.1,1))
        print("%s 吃了一个%s" % (name,food))
        
    
# 生产者模型
def producer(q,name,food):
    for i in range(5):
        time.sleep(random.uniform(0.1,1))
        # 打印生产的数据
        print("%s 生产了 %s%s" % (name,food,i))
        # 存储生产的数据
        q.put(food + str(i))
    

if __name__ == "__main__":
    q = Queue()
    # 消费者
    p1 = Process(target=consumer,args=(q,"宋云杰"))
    # 生产者
    p2 = Process(target=producer,args=(q,"马生平","黄瓜"))

    p1.start()
    p2.start()
# 队列里没有数据了,消费者还在一直get() 所以就一直在阻塞

2、优化版

# 消费者模型
def consumer(q,name):
    while True:
        food = q.get()
        if food is None:
            break
        time.sleep(random.uniform(0.1,1))
        print("%s 吃了一个%s" % (name,food))
        
    
# 生产者模型
def producer(q,name,food):
    for i in range(5):
        time.sleep(random.uniform(0.1,1))
        # 打印生产的数据
        print("%s 生产了 %s%s" % (name,food,i))
        # 存储生产的数据
        q.put(food + str(i))
    

if __name__ == "__main__":
    q = Queue()
    # 消费者
    p1 = Process(target=consumer,args=(q,"宋云杰"))
    # 生产者
    p2 = Process(target=producer,args=(q,"马生平","黄瓜"))
    

    p1.start()
    p2.start()
    
    # 在生产者生产完所有数据之后,在队列的末尾添加一个None
    p2.join()
    # 添加None
    q.put(None)

 7、JoinableQueue

from multiprocessing import Process, JoinableQueue
import time,random
"""
put 存储
get 获取
task_done 
join

task_done 和 join 配合使用的
队列中 1 2 3 4 5
put 一次 内部的队列计数器加1
get 一次 通过task_done让队列计数器减1
join函数,会根据队列计数器来判断是阻塞还是放行
    队列计数器  = 0 , 意味着放行
    队列计数器 != 0 , 意味着阻塞
"""

1、基本语法

"""
jq =JoinableQueue()
jq.put("a")
print(jq.get())
# 通过task_done让队列计数器减1
jq.task_done()
jq.join()
print("finish")
"""

2、改造生产者消费者模型

def consumer(q,name):
    while True:
        food = q.get()
        time.sleep(random.uniform(0.1,1))
        print("%s 吃了一个%s" % (name,food))
        # 当队列计数器减到0的时,意味着进程队列中的数据消费完毕
        q.task_done()
    
# 生产者模型
def producer(q,name,food):
    for i in range(5):
        time.sleep(random.uniform(0.1,1))
        # 打印生产的数据
        print("%s 生产了 %s%s" % (name,food,i))
        # 存储生产的数据
        q.put(food + str(i))
    

if __name__ == "__main__":
    q =JoinableQueue()
    # 消费者
    p1 = Process(target=consumer,args=(q,"宋云杰"))
    # 生产者
    p2 = Process(target=producer,args=(q,"马生平","黄瓜"))
    
    # 设置p1消费者为守护进程
    p1.daemon = True
    p1.start()
    p2.start()
    
    # 把所有生产者生产的数据存放到进程队列中
    p2.join()
    # 为了保证消费者能够消费完所有数据,加上队列.join
    # 当队列计数器减到0的时,放行,不在阻塞,程序彻底结束.
    q.join()
    print("程序结束 ... ")
改造生产者消费者模型
原文地址:https://www.cnblogs.com/kongxiangqun/p/13524171.html