lock锁,Semaphore信号量,Event事件,进程队列Queue,生产者消费者模型,JoinableQueue---day31

1.lock锁

# ### 锁 lock
from multiprocessing import Process,Lock
import json,time
# (1) lock的基本语法
"""
上锁和解锁是一对,只上锁不解锁会发生死锁现象(程序发生阻塞,下面的代码不执行了)
互斥锁: 互斥锁是进程之间的互相排斥,谁抢到了资源谁就先使用,后抢到资源的后使用.
"""
"""
# 创建一把锁
lock = Lock()
# 上锁
lock.acquire()
# 执行操作
# 解锁
lock.release()
# lock.acquire() 死锁阻塞.
print("执行程序 .... ")
"""

# (2) 模拟12306 抢票软件

# 读取票数,更新票数
def wr_info(sign,dic=None):
    if sign == "r":
        with open("ticket",mode="r",encoding="utf-8") as fp:
            dic = json.load(fp)
        return dic
    elif sign == "w":
        with open("ticket",mode="w",encoding="utf-8") as fp:
            json.dump(dic,fp)
            
            
# 抢票方法
def get_ticket(person):
    # 获取数据库中实际数据
    dic = wr_info("r")

    # 模拟网络延迟
    time.sleep(0.5)    
    
    if dic["count"] > 0:
        print("%s抢到票了" % (person))
        dic["count"] -= 1
        # 更新数据库
        wr_info("w",dic)
    else:
        print("%s没有抢到这张票" % (person))
    

def run(person,lock):
    # 读取数据库中的实际票数
    dic = wr_info("r")
    print("%s 查询票数 : %s" % (person,dic["count"]))    


    # 上锁
    lock.acquire()
    # 抢票
    get_ticket(person)
    # 解锁
    lock.release()
    
if __name__ == "__main__":
    lock = Lock()
    lst =["刘思敏7","云超1","张恒2","尉翼麟3","王振4","黎建忠5","刘鑫炜6","李天兆8","魏小林9","李博10"]
    for i in lst:
        p = Process(target=run,args=(i,lock))
        p.start()


"""
# 总结: 区分同步和异步;
当创建10个进程的时候是异步的.直到查完票数截止;
当执行get_ticket这个方法时候,各个进程之间是同步的;
"""

2.Semaphore信号量

# ### 信号量 Semaphore 本质上就是锁,可以控制上锁的数量
"""
sem = Semaphore(4)
sem.acquire()
# 执行相应的操作
sem.release()
"""
from multiprocessing import Semaphore,Process
import time,random

def ktv(person,sem):
    sem.acquire()
    # 开始唱歌
    print("%s进入ktv,正在唱歌" % (person) )
    time.sleep(random.randrange(3,7)) # 3 4 5 6 
    print("%s离开ktv,唱完了" % (person) )
    sem.release()
    
    


if __name__ == "__main__":
    sem = Semaphore(4)
    for i in range(10):
        p = Process(target=ktv,args=("person%s" % (i)   ,  sem))
        p.start()

"""
# 总结:
Semaphore 可以设置上锁的数量
同一时间最多允许几个进程上锁;
创建进程的时候是异步的
在执行任务时候是同步的;
"""

3.Event事件

# ### 事件 (Event)
"""
# 阻塞事件 :
    e = Event()生成事件对象e   
    e.wait()动态给程序加阻塞 , 
    程序当中是否加阻塞完全取决于该对象中的is_set() [默认返回值是False]
    # 如果是True  不加阻塞
    # 如果是False 加阻塞

# 控制这个属性的值
    # set()方法     将这个属性的值改成True
    # clear()方法   将这个属性的值改成False
    # is_set()方法  判断当前的属性是否为True  (默认上来是False)
    
Lock  Semaphore  Event 进程和进程之间的数据彼此隔离,但是可以通过socket互相发消息;
"""
# (1)  基本语法
from multiprocessing import Process,Event

# 1
# e = Event()
# print(e.is_set())
# e.wait()
# print("程序运行中... ")

# 2
"""
e = Event()
# 将阻塞事件中的值改成True
e.set()
print(e.is_set())
e.wait()
print("程序运行中1... ")

# 将阻塞事件中的值改成False
e.clear()
e.wait()
print("程序运行中2... ")
"""
# 3
"""
e = Event()
# 参数: 最多等待时间是5秒,过了5秒之后阻塞放行
e.wait()
print("程序运行中3... ")
"""

# (2) 模拟红绿灯效果
import time,random
def traffic_light(e):
    print("红灯亮")
    while True:
        if e.is_set():
            # 绿灯状态,亮1秒钟
            time.sleep(1)
            print("红灯亮")
            # 把True => False
            e.clear()
        else:
            # 红灯状态,亮1秒钟
            time.sleep(1)
            print("绿灯亮")
            # 把False => True
            e.set()

# e = Event()
# traffic_light(e)

def car(e,i):
 
    if not e.is_set():
        # 走到这个分支里面来,一定是红灯状态,车要停
        print("car%s 在等待" % (i))
        # 加阻塞
        e.wait()
    print("car%s 通行了" % (i))

"""
if __name__ == "__main__":
    e = Event()
    p1 = Process(target=traffic_light,args=(e,))
    p1.start()
    
    # 开始创建小车
    for i in range(1,21):
        time.sleep(random.randrange(0,2)) # 0 1
        p2 = Process(target=car,args=(e,i))
        p2.start()
"""
# (3) 改造红绿灯 (在跑完小车之后,把红绿灯给我炸了)
if __name__ == "__main__":
    lst = []
    e = Event()
    p1 = Process(target=traffic_light,args=(e,))
    # 把红绿灯变成守护进程
    p1.daemon = True
    p1.start()

    # 开始创建小车
    for i in range(1,21):
        time.sleep(random.randrange(0,2)) # 0 1
        p2 = Process(target=car,args=(e,i))
        p2.start()
        lst.append(p2)
        
    # 让所有的小车都通过之后,在终止交通灯;
    for i in lst:
        i.join()

    print("程序结束 ... ")

4.进程队列Queue

# ### 进程队列
from multiprocessing import Process,Queue
"""先进先出,后进后出"""
import queue # 线程队列
# (1) 基本语法
"""
q = Queue()
# 1.put 往队列中存值
q.put(111)
q.put(222)
q.put(333)
# 2.get 从队列中取值
res = q.get()
print(res)
res = q.get()
print(res)
res = q.get()
print(res)

# 3.队列里面没数据了,在调用get会发生阻塞
# res = q.get()
# print(res)
"""
# 4.get_nowait 存在兼容性问题(windows好用  linux不好用 不推荐使用)
"""
res = q.get_nowait()
print(res)

# 队列问题
try:
    res = q.get_nowait()
    print(res)
except queue.Empty:
    pass
"""

# (2) 可以限定Queue队列的长度
"""
q1 = Queue(3)
q1.put(1)
q1.put(2)
q1.put(3)
# 超出了队列的长度,会发生阻塞
# q1.put(4)
# 如果列表满了,还往里面添加数据会直接报错.
q1.put_nowait(4)
"""


# (3)进程之间通过队列交换数据
def func(q2):
    # 2.子进程取数据
    res = q2.get()
    print(res)
    # 3.子进程存数据
    q2.put("刘思敏")
    
    
if __name__ == "__main__":
    q2 = Queue()
    p = Process(target=func,args=(q2,))
    p.start()
    
    # 1.主进程添加数据
    q2.put("王振")
    
    # 为了等待子进程把数据塞到队列中,在获取,要加一个join
    p.join()
    
    # 2.主进程获取数据
    res = q2.get()
    print("主程序执行结束:值为{}".format(res))

5.生产者消费者模型

# ### 生产者和消费者模型
"""
# 爬虫例子:
1号进程负责抓取页面中的内容放到队列里
2号进程负责把内容取出来,配合正则表达式,扣取关键字

1号进程可以理解成生产者
2号进程可以理解成消费者

相对理想的生产者和消费者模型:
    追求彼此的速度相对均匀
    
从程序上来说:
    生产者负责存储数据(put)
    消费者负责获取数据(get)
"""

# (1)基本模型
from multiprocessing import Process,Queue
import random,time

# 消费者模型
def consumer(q,name):
    while True:
        food = q.get()
        time.sleep(random.uniform(0.1,1))
        print("%s 吃了一个%s" % (name,food))
    
    
# 生产者模型
def producer(q,name,food):
    for i in range(5):
        time.sleep(random.uniform(0.1,1))
        print("%s 生产了 %s%s" % (name,food,i))
        q.put(food+str(i))
    
    
if __name__ == "__main__":
    q = Queue()
    # 消费者1
    p1 = Process(target=consumer,args=(q,"张恒"))
    p1.start()

    # 生产者1
    p2 = Process(target=producer,args=(q,"尉翼麟","黄金"))
    p2.start()    

    
# (2)优化模型

# 消费者模型
def consumer(q,name):
    while True:
        food = q.get()
        if food is None:
            break
        time.sleep(random.uniform(0.1,1))
        print("%s 吃了一个%s" % (name,food))
    
    
# 生产者模型
def producer(q,name,food):
    for i in range(5):
        time.sleep(random.uniform(0.1,1))
        print("%s 生产了 %s%s" % (name,food,i))
        q.put(food+str(i))
    
    
if __name__ == "__main__":
    q = Queue()
    # 消费者1
    p1 = Process(target=consumer,args=(q,"张恒"))
    p1.start()
    # 消费者2
    a2 = Process(target=consumer,args=(q,"云超"))
    a2.start()
    
    # 生产者1
    p2 = Process(target=producer,args=(q,"尉翼麟","黄金"))
    p2.start()
    
    # 生产者2
    b2 = Process(target=producer,args=(q,"刘思敏","钻石"))
    b2.start()
    
    # 在生产完所有的数据之后,在队列的末尾塞入一个None
    p2.join()
    b2.join()
    # 消费者模型如果获取的是None,代表停止消费
    q.put(None)
    q.put(None)

6.JoinableQueue

# ### JoinableQueue
"""
put 存储
get 获取
task_done 队列计数减1
join 阻塞

task_done 配合 join 一起使用
[1,2,3,4,5]
队列计数5 
put 一次 每存放一个值,队列计数器加1
get 一次 通过task_done让队列计数器减1
join 函数,会根据队列中的计数器来判定是阻塞还是放行
如果计数器变量是0,意味着放行,其他情况阻塞;
"""


from multiprocessing import Process,JoinableQueue
# (1) 基本使用
"""
jq = JoinableQueue()
# put 会让队列计数器+1
jq.put("a")
print(jq.get())
# 通过task_done,让队列计数器-1
jq.task_done()
# 只有队列计数器是0的时,才会放行
jq.join() # 队列.join
print("finish")
"""


# (2) 改造生产者消费者模型
import random,time

# 消费者模型
def consumer(q,name):
    while True:
        food = q.get()        
        time.sleep(random.uniform(0.1,1))
        print("%s 吃了一个%s" % (name,food))
        q.task_done()
    
    
# 生产者模型
def producer(q,name,food):
    for i in range(5):
        time.sleep(random.uniform(0.1,1))
        print("%s 生产了 %s%s" % (name,food,i))
        q.put(food+str(i))
    
    
if __name__ == "__main__":
    q = JoinableQueue()
    # 消费者1
    p1 = Process(target=consumer,args=(q,"张恒"))
    p1.daemon = True
    p1.start()

    # 生产者1
    p2 = Process(target=producer,args=(q,"尉翼麟","黄金"))
    p2.start()    
    
    # 把生产者所有的数据都装载到队列中
    p2.join()
    
    # 当队列计数器减到0的时候,会立刻放行
    # 必须等待消费者模型中所有的数据都task_done之后,变成0了就代表消费结束.
    q.join()
    
    print("程序结束....")
原文地址:https://www.cnblogs.com/weiweivip666/p/13090730.html