Day 38 Semaphore ,Event ,队列

什么是信号量(multiprocess.Semaphore)

互斥锁同时只允许一个线程更改数据,而信号量semaphore是同时允许一定数量的线程更改数据.

假设商场里有4个迷你唱吧 ,所以通过同时可以进去4个人,如果来了五个人就要在外面等等,等到有人出来才能再进去玩.

实现:

信号量同步基于内部计数器,每次用一次acquire(),计数器减1,每次调用一次release(),计数器加1 ,当计数器为0 时,acquire()调用被阻塞,,信号量和进程池的概念很像,但是也要区分开,信号量设计到加锁的概念.

一套资源 同一时间 只能被n个人访问

from multiprocessing import Process,Semaphore
import time,random
def ktv(i,sem):
sem.acquire() #获取钥匙
print('%s 走进ktv'%i)
time.sleep(random.randint(1,5))
print('%s 走出ktv'%i)
sem.release() #释放钥匙.
if __name__ == '__main__':
sem = Semaphore(2) #ktv 里只有2个位置,限定进程的访问量。
for i in range (6): #六个人来访问
p =Process(target=ktv, args=(i,sem)) #实例化一个子进程.
p.start()#

结果 :同时允许两个人进入。

0 走进ktv
2 走进ktv
0 走出ktv
1 走进ktv
1 走出ktv
4 走进ktv
4 走出ktv
3 走进ktv
2 走出ktv
5 走进ktv
3 走出ktv
5 走出ktv
sem =Semaphore(4)
sem.acquire()
print("第一把钥匙 ")
sem.acquire()
print("第二把钥匙 ")
sem.acquire()
print("第三把钥匙 ")
sem.acquire()
print("第四把钥匙 ")
sem.acquire()
print("第五把钥匙 ")

结果 (只能获取到四把钥匙.)

第一把钥匙 
第二把钥匙 
第三把钥匙 
第四把钥匙 

事件 --multiprocess.Event

Python 线程 的事件用于主线程控制其他线程的执行,事件的主要提供了三个方法set ,wait ,clear。

事件处理的机制,全局定义了一个 ‘Flag’ ,如果flag的值为false,那么程序执行event.wait方法时就会阻塞,如果 flag 值为True,那么

event.wait 方法时不在阻塞

clear: 将flag 设置为False

set :将 flag 设置为True

通过一个信号 来控制多个进程 同时执行或者阻塞 

from multiprocessing import Event 

一个信号可以使所有的进程接触阻塞

一个世界被创建之后,默认是阻塞状态  

from multiprocessing import Event
e =Event() #创建一个世界
print(e.is_set())#查看一个事件的状态,默认是阻塞状态。 #结果为False
e.set() #将这个事件状态改为True
print(e.is_set())#结果为True
e.wait()#是根据e.is_set()的值来决定是否阻塞的.
print(123435) #打印出来 结果为123435
e.clear() #将这个事件的状态改为False
print(e.is_set())#False
e.wait()#等待事件的信号被变成 True
print('*'*10)

set 和clear 

分别用来修改一个事件的状态True或者False

is_set 是用来查看事件的状态 

wait是依据事件的状态来决定自己是否在wait处阻塞

  #False阻塞  , True 不阻塞.

# 红绿灯事件
import time
import random
from multiprocessing import Event,Process
def cars(e,i):
    if not e.is_set():
        print('car%i在等待'%i)
        e.wait()    # 阻塞 直到得到一个 事件状态变成 True 的信号
    print('33[0;32;40mcar%i通过33[0m' % i)

def light(e):
    while True:
        if e.is_set():
            e.clear()
            print('33[31m红灯亮了33[0m')
        else:
            e.set()
            print('33[32m绿灯亮了33[0m')
        time.sleep(2)

if __name__ == '__main__':
    e = Event()# 创建事件对象
    traffic = Process(target=light,args=(e,))  #创建traffice 子进程 
    traffic.start()#启动子进程
    for i in range(20):
        car = Process(target=cars, args=(e,i))#创建一个car的子进程
        car.start()
        time.sleep(random.random())

队列和管道

import time
import random
from multiprocessing import Process,Queue
def consumer(q,name):
    while True:
        food = q.get()
        if food is None:
            print('%s获取到了一个空'%name)
            break
        print('33[31m%s消费了%s33[0m' % (name,food))
        time.sleep(random.randint(1,3))

def producer(name,food,q):
    for i in range(4):
        time.sleep(random.randint(1,3)) 
        f = '%s生产了%s%s'%(name,food,i)
        print(f)
        q.put(f)

if __name__  == '__main__':
    q = Queue(20)
    p1 = Process(target=producer,args=('Egon','包子',q))
    p2 = Process(target=producer, args=('wusir','泔水', q))
    c1 = Process(target=consumer, args=(q,'alex'))
    c2 = Process(target=consumer, args=(q,'jinboss'))
    p1.start()
    p2.start()
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    q.put(None)
    q.put(None)
import time
import random
from multiprocessing import Process,JoinableQueue
def consumer(q,name):
    while True:
        food = q.get()
        print('33[31m%s消费了%s33[0m' % (name,food))
        time.sleep(random.randint(1,3))
        q.task_done()     # count - 1

def producer(name,food,q):
    for i in range(4):
        time.sleep(random.randint(1,3))
        f = '%s生产了%s%s'%(name,food,i)
        print(f)
        q.put(f)
    q.join()    # 阻塞  直到一个队列中的所有数据 全部被处理完毕

if __name__  == '__main__':
    q = JoinableQueue(20)
    p1 = Process(target=producer,args=('Egon','包子',q))
    p2 = Process(target=producer, args=('wusir','泔水', q))
    c1 = Process(target=consumer, args=(q,'alex'))
    c2 = Process(target=consumer, args=(q,'jinboss'))
    p1.start()
    p2.start()
    c1.daemon = True   # 设置为守护进程 主进程中的代码执行完毕之后,子进程自动结束
    c2.daemon = True
    c1.start()
    c2.start()
    p1.join()
    p2.join()      # 感知一个进程的结束

#  在消费者这一端:
    # 每次获取一个数据
    # 处理一个数据
    # 发送一个记号 : 标志一个数据被处理成功

# 在生产者这一端:
    # 每一次生产一个数据,
    # 且每一次生产的数据都放在队列中
    # 在队列中刻上一个记号
    # 当生产者全部生产完毕之后,
    # join信号 : 已经停止生产数据了
                # 且要等待之前被刻上的记号都被消费完
                # 当数据都被处理完时,join阻塞结束

# consumer 中把所有的任务消耗完
# producer 端 的 join感知到,停止阻塞
# 所有的producer进程结束
# 主进程中的p.join结束
# 主进程中代码结束
# 守护进程(消费者的进程)结束
原文地址:https://www.cnblogs.com/mengbin0546/p/8657779.html