并发编程笔记(2)——信号量、事件、队列(进程间的通信)

内容目录

  • 信号量
  • 事件
  • 队列

内容详细

信号量(重点)

  • 可以规定有多少进程使用关键代码,其余进程阻塞,直到有子进程释放

  • 示例:模拟KTV使用,同时只有4个人使用

    import random
    import time
    from multiprocessing import Process
    from multiprocessing import Semaphore  #使用信号量模块
    
    def ktv(i,sem):
        sem.acquire()       #获取钥匙,只有4个进程可以执行,后续阻塞
        print('%s走进KTV'%i)
        time.sleep(random.randint(5,10))    #模拟进程使用的时间(随机秒数)
        print('%s走出KTV'%i)
        sem.release()       #进程结束后还钥匙
    
    if __name__ == '__main__':
        sem = Semaphore(4)      #实例化信号量,规定多少进程可以使用
        for i in range(20):
            p = Process(target=ktv,args=(i,sem))
            p.start()
    

事件(重点)

  • 通过一个信号来控制多个进程同时执行或阻塞

  • 一个事件被创建之后,默认是阻塞状态

    from multiprocessing import Event   #使用事件模块
    
    # 一个信号可以使所有的进程都进入阻塞状态
    # 也可以控制所有的进程解除阻塞
    # 一个事件被创建之后,默认是阻塞状态
    
    e = Event()     #创建一个事件
    print(e.is_set())   #查看一个事件的状态,默认被设置成阻塞
    e.set()         #将这个事件的状态改为True
    print(e.is_set())
    e.wait()        #依据e.is_set()的值来决定是否阻塞的
    print(123456)
    
    e.clear()       #将这个事件的状态改为False
    print(e.is_set())
    e.wait()        #此时程序为阻塞状态,等待信号变为True
    print('*'*10)
    
  • set 和 clear

    • 分别用来修改一个事件的状态,True或者False
  • is_set用来查看一个事件的状态

  • wait 是依据事件的状态来决定自己是否阻塞

    • is_set()状态为False是阻塞,True是不阻塞
  • 事件示例:经典的红绿灯事件

    # 绿灯来了车通过,红灯车为阻塞状态
    import time
    import random
    from multiprocessing import Event,Process
    
    def cars(e,i):
        if not e.is_set():
            print('car%i在等待'%i)
            e.wait()            # 阻塞 直到得到一个事件状态变为True的信号
        print('33[0;32;40mcar%s通过33[0m'%i)
    
    def light(e):
        while True:
            if e.is_set():
                e.clear()
                print('33[31m红灯亮了33[0m')
            else:
                e.set()
                print('33[32m绿灯亮了33[0m')
            time.sleep(2)
    
    if __name__ == '__main__':
        e = Event()
        traffic = Process(target=light,args=(e,))
        traffic.start()
        for i in range(20):
            car = Process(target=cars,args=(e,i))
            car.start()
            time.sleep(random.randint(1,5))
    

队列 --- 进程间的通信

  • 进程间通信-IPC(使用Queue模块)

    import time
    from multiprocessing import Queue   #使用队列模块
    q = Queue(5)        #设置队列中只能有5个进程或数据
    q.put(1)        #往队列中放入1
    q.put(2)
    q.put(3)
    q.put(4)
    q.put(5)        #此时队列已经满了,如果再往里放则为阻塞状态
    print(q.full())     # 查询队列是否满了(True为满了,False为不满)
    
    print(q.get())  #从队列中取出
    print(q.get())
    print(q.get())
    print(q.get())
    print(q.get())  #此时队列已经空了,如果继续取出,则为阻塞状态
    print(q.empty())    # 查询队列是否为空(True为空,False为未空)
    
    q.get_nowait()  #强制取值,此时会报queue.Empty错误,表示队列已经空
    
    # while True:
    #     try:
    #         q.get_nowait()
    #     except:
    #         print('队列已空')
    #         time.sleep(0.5)
    #         while循环1秒会循环上千次,损耗内存,此时加上睡眠时间以避免内存过度损耗
    
  • 注意:

    • q.empty()为检查队列是否为空,有不可靠因素。此方法是实时检测队列是否为空,如果此时生产者有往队列中正在添加的进程时,队列此时为空
  • 简单的多进程队列模型

    from multiprocessing import Queue,Process
    def produce(q):          #生产数据
        q.put('hello')
    
    def consume(q):          #取出数据
        print(q.get())
    
    if __name__ == '__main__':
        q = Queue()     #此队列表示没有限制
        p = Process(target=produce,args=(q,))
        p.start()
        c = Process(target=consume,args=(q,))
        c.start()
    
  • 经典的生产者消费者模型

    • 为解决供需不平衡的问题
    #plan 1:不完整,存在BUG
    import time
    import random
    from multiprocessing import Queue,Process
    
    def consumer(q,name):
        while True:
            food = q.get()
            if food is None:
                #如果获取到None则终止循环,问题bug:此时如果多个消费者取队列中的None,
                #只能是第一个进程能取到,剩余进程为阻塞状态。需要往队列中添加与消费者数量相匹配的None。
                print('%s获取到一个空'%name)
                break
            print( '33[31m%s消费了%s33[0m'%(name,food))
            time.sleep(random.randint(1,3))
    
    def producer(name,food,q):      # 创建生产者
        for i in range(4):
            time.sleep(random.randint(1,3))
            foods = '%s生产了%s个%s'%(name,i+1,food)
            print(foods)
            q.put(foods)
    
    if __name__ == '__main__':
        q = Queue(20)               # 创建队列,所有进程里最大限制20
        p1 = Process(target=producer,args=('alec','包子',q))
        p1.start()
        p2 = Process(target=producer,args=('yazhou','玉米',q))
        p2.start()
        c1 = Process(target=consumer,args=(q,'喳喳辉'))
        c1.start()
        c2 = Process(target=consumer,args=(q,'古天乐'))
        c2.start()
        p1.join()           #让生产者进程回归到主程序进程
        p2.join()
        q.put(None)         #队列中放入两个None,让消费者取到后结束阻塞状态
        q.put(None)
    
  • 使用Joinablequeue模块

#plan 2:
import time
import random
from multiprocessing import Process,JoinableQueue

def consumer(q,name):
    while True:
        food = q.get()
        print( '33[31m%s消费了%s33[0m'%(name,food))
        time.sleep(random.randint(1,3))
        q.task_done()   # 每执行一次该命令都会被记录下来,直到队列中的所有数据都执行完此命令
def producer(name,food,q):   # 创建生产者
    for i in range(4):
        time.sleep(random.randint(1,3))
        foods = '%s生产了%s个%s'%(name,i+1,food)
        print(foods)
        q.put(foods)
    q.join()        # 进程延迟了,进入阻塞状态,直到一个队列中的所有数据全部被处理完毕

if __name__ == '__main__':
    q = JoinableQueue(20)               # 创建队列,所有进程里最大限制20
    p1 = Process(target=producer,args=('alec','包子',q))
    p2 = Process(target=producer,args=('yazhou','玉米',q))
    p1.start()
    p2.start()

    c1 = Process(target=consumer,args=(q,'喳喳辉'))
    c2 = Process(target=consumer,args=(q,'古天乐'))
    c1.daemon = True        #设置为守护进程,主进程中的代码执行完毕之后,子进程自动结束
    c2.daemon = True
    c1.start()
    c2.start()

    p1.join()           #让生产者进程回归到主程序进程
    p2.join()

此模块运行流程:

从消费者端看:

  • 每次获取一个数据
  • 处理一个数据
  • 发送一个记号:标志一个数据被处理成功

从生产者端看:

  • 每次生产一个数据,并放入队列中

  • 对队列中每一个数据刻上记号

  • 当生产者全部生产完毕后,发送join信号,进程为阻塞状态:

    • 此时生产者已经停止生产数据了
    • 等待之前被刻上记号的数据都被消费完
    • 当数据都被处理完时,join阻塞结束
  • 1.consumer中把所有的任务都消耗完

  • 2.producer端的join感知到,停止阻塞

  • 3.所有的producer进程结束# 主进程中的p.join结束

  • 4.主进程中代码结束# 守护进程(消费者的进程)结束

总结:

  • 消费者模型
原文地址:https://www.cnblogs.com/lynlearnde/p/13471789.html