进程的锁、队列以及生产消费者模型

回顾

多道技术:

空间复用:多个程序公用一个内存条,彼此隔离,物理级别隔离

时间复用:公用一个cpu

切换的情况:

​ io切,占用时间过长也切

串行:一个任务完完整整的运行结束,再运行下一个任务。

并发:看起来是同时执行多个任务 ---> 单核

并行:真正的左到了同时执行多个任务 ---> 多核

一、进程同步(multiprocess.Lock)

为了解决多个进程使用同一份数据资源的时候,引发数据安全或顺序混乱的问题,我们使用锁来维护执行顺序,将并发变成了串行,牺牲了运行效率,但是避免了竞争。

import time
import random
from multiprocessing import Process,Lock

def work(lock, n):
    lock.acquire()
    print(f'进程{n} is running ')
    time.sleep(random.randint(1,3))
    print(f'进程{n} is done ')
    lock.release()
    
if __name__ == '__main__':
    lock = Lock()
    for i in range(3):
        p = Process(target=work, args=(lock, i))
        p.start()

上面这种情况虽然使用加锁的形式实现了顺序的执行,但是程序又重新变成串行了,这样确实会浪费了时间,却保证了数据的安全。

二、进程间通信(multiprocess.Queue)

2.1概念介绍

IPC:inter-Process Communication

概念:创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。Queue(maxsize)创建共享的进程队列。

参数:maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。

底层队列使用管道和锁定实现。

2.2方法介绍

Queue([maxsize]):创建共享的进程队列。maxsize是队列中允许的最大项数。如果省略此参数,则无大小限制。底层队列使用管道和锁定实现。另外,还需要运行支持线程以便队列中的数据传输到底层管道中。
Queue的实例q具有以下方法:

q.get( [ block [ ,timeout ] ] ):返回q中的一个项目。如果q为空,此方法将阻塞,直到队列中有项目可用为止。block用于控制阻塞行为,默认为True. 如果设置为False,将引发Queue.Empty异常(定义在Queue模块中)。timeout是可选超时时间,用在阻塞模式中。如果在制定的时间间隔内没有项目变为可用,将引发Queue.Empty异常。

q.get_nowait() :同q.get(False)方法。

q.put(item [, block [,timeout ] ] ) :将item放入队列。如果队列已满,此方法将阻塞至有空间可用为止。block控制阻塞行为,默认为True。如果设置为False,将引发Queue.Empty异常(定义在Queue库模块中)。timeout指定在阻塞模式中等待可用空间的时间长短。超时后将引发Queue.Full异常。

q.qsize() :返回队列中目前项目的正确数量。此函数的结果并不可靠,因为在返回结果和在稍后程序中使用结果之间,队列中可能添加或删除了项目。在某些系统上,此方法可能引发NotImplementedError异常。

q.empty() :如果调用此方法时 q为空,返回True。如果其他进程或线程正在往队列中添加项目,结果是不可靠的。也就是说,在返回和使用结果之间,队列中可能已经加入新的项目。

q.full() :如果q已满,返回为True. 由于线程的存在,结果也可能是不可靠的(参考q.empty()方法)。

2.3代码实现

'''
multiprocessing模块支持进程间通信的两种主要形式:管道和队列
都是基于消息传递实现的,但是队列接口
'''

from multiprocessing import Queue
q=Queue(3)

#put ,get ,put_nowait,get_nowait,full,empty
q.put(3)
q.put(3)
q.put(3)
# q.put(3)  # 如果队列已经满了,程序就会停在这里,等待数据被别人取走,再将数据放入队列。
            # 如果队列中的数据一直不被取走,程序就会永远停在这里。
try:
    q.put_nowait(3) # 可以使用put_nowait,如果队列满了不会阻塞,但是会因为队列满了而报错。
except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去,但是会丢掉这个消息。
    print('队列已经满了')

# 因此,我们再放入数据之前,可以先看一下队列的状态,如果已经满了,就不继续put了。
print(q.full()) #满了

print(q.get())
print(q.get())
print(q.get())
# print(q.get()) # 同put方法一样,如果队列已经空了,那么继续取就会出现阻塞。
try:
    q.get_nowait(3) # 可以使用get_nowait,如果队列满了不会阻塞,但是会因为没取到值而报错。
except: # 因此我们可以用一个try语句来处理这个错误。这样程序不会一直阻塞下去。
    print('队列已经空了')

print(q.empty()) #空了

三、生产消费者模型

生产者 --> 队列(盆) --> 消费者

生产者可以不停的生产,达到了自己最大的生产效率,消费者可以不停的消费,也达到了自己最大的消费效率。

生产者消费者模型大大提高了生产者生产的效率和消费者消费的效率。

注意:queue不适合传大文件,通常传一些消息。

from multiprocessing import Process, Queue, JoinableQueue
import time, random

def producer(q, name, food):
    """生产者"""
    for i in range(3):
        print(f'{name}生产了{food}{i}')
        time.sleep(random.randint(1, 3))
        res = f'{food}{i}'
        q.put(res)
        
def consumer(q, name):
    """消费者"""
    while True:
        res = q.get()
		if res is None:break
        time.sleep(random.randint(1, 3))
        print(f'{name}吃了{res}')
        q.task_done()
        
if __name__ == '__main__':
    q = JoinableQueue()
    p1 = Process(target=producer, args=(q, 'rocky', '包子'))
    p2 = Process(target=producer, args=(q, 'mac', '韭菜'))
    p3 = Process(target=producer, args=(q, 'nick', '蒜泥'))
    c1 = Process(target=consumer, args=(q, '小红'))
    c2 = Process(target=consumer, args=(q, '小蓝'))
    p1.start()
    p2.start()
    p3.start()
    c1.daemon = True  # 开启c1为主进程的守护进程,主进程结束,c1结束
    c2.daemon = True  # 开启c2为主进程的守护进程,主进程结束,c2结束
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    p3.join()  # 等待生产者生产完毕
    # q.put(None)  # 几个消费者put几次
    # q.put(None)
    q.join()  
    # 生产者生产完毕--这是主进程最后一行代码结束--q.join()消费者已经取干净了,没有存在的意义了.

原文地址:https://www.cnblogs.com/17vv/p/11528168.html