python学习笔记 day37 管道Pipe (IPC机制----进程之间互相通信)

1. 管道支持双端通信,但是数据不安全,没有加锁

from multiprocessing import Pipe
l,r=Pipe()   # 实例化一个管道
l.send("hello,xuanxuan")  # 管道的一端(l)发送数据
print(r.recv())  # 管道的另一端(r)接收数据
r.send("hello,xixi")  # r端发送数据
print(l.recv()) # l端接收数据
# print(l.recv())  # 由于r端只发送了一次数据,l端再次接收,就会发送阻塞

运行结果:

如果一端发送完数据之后直接把管道这端关闭,另一端不断接收数据就会发生EOFError错误(我们可以捕获这种错误,当管道一端关闭时,另一端捕获到该类型的错误,直接也关闭就ok了)

from multiprocessing import Pipe
l,r=Pipe()   # 实例化一个管道
l.send("hello,xuanxuan")  # 管道的一端(l)发送数据
print(r.recv())  # 管道的另一端(r)接收数据
r.send("hello,xixi")  # r端发送数据
print(l.recv()) # l端接收数据
r.close()  # 管道l端关闭
print(l.recv())  # 由于管道r端关闭,所以l端再进行接收数据时就会发生EOFError错误

运行结果:

 2.使用管道实现主进程和子进程之间的通信

思路: 主进程使用管道的一端发送数据,再开子进程使用管道的另一端接收数据,从而完成主进程和子进程之间的通信:

from multiprocessing import Pipe
from multiprocessing import Process
def func(p):  # 消费者(管道的r端,用来接收数据)
    l,r=p  # 代表管道的左右两端,这里其实是子进程来执行的函数,只用到管道的r端,之所以也写l,是想说明在进程中不用的管道那端需要及时关闭
    l.close()  # 该子进程执行 只用到管道r端用来接收数据,所以管道的另一端l需要及时关闭
    while True:
        try:
            print(r.recv()) # 消费者从管道一端r端取数据
        except EOFError:  # 当管道的另一端l发送端 关闭时,管道的r端接收端就不需要再等着接收消息了
            break

if __name__=="__main__":
    l,r=Pipe()  # 管道的两端,l,r 在主进程中执行生产数据(l端发送数据)
    p=Process(target=func,args=((l,r),))  # 把管道的两端都传到func函数中,由子进程来执行func,但子进程中l端是用不到的,所以在子进程中需要及时关闭
    p.start()
    r.close()  # 由于在主进程中没有用到r端,所以需要及时关闭管道的r端
    for i in range(10):
        l.send("hello,xuanxuan-%s"%i)
    l.close()   # 主进程中执行完毕之后需要及时关闭发送端管道的l端

运行结果:

 应该特别注意,管道端点的正确管理问题,如果是生产者或者消费者中都没有使用管道的某个端点,就应该将其关闭,这也说明为何在生产者(主进程中)关闭了管道的输出端(r端)在消费者中(子进程中)关闭了管道的输入端(l端),如果忘记执行这些步骤,程序有可能在消费者(子进程recv)recv()操作上挂起(阻塞),管道是由操作系统进行引用计数的,必须所有进程中关闭管道后,才能生产EOFError异常,所以在生产者(主进程)中关闭管道不会有任何效果,除非在消费者(子进程中)也关闭了相同的管道端点;

 3. 使用管道实现生产者消费者模型(需要加锁)

由于管道默认没加锁,数据不安全,所以在消费者模型中,需要对子进程加锁,同一时间只有一个子进程执行消费者模型,操作里面的数据:

from multiprocessing import Process
from multiprocessing import Pipe
from multiprocessing import Lock

def func1(p,lock): # 消费者模型---消费数据,管道的r端接收数据(类似于从管道中取值),lock是给子进程加锁,防止多个进程同时操作数据,保证数据安全(管道默认没加锁,数据不安全)
    l,r=p  # 管道的两端
    l.close() # 子进程中用不到管道的l端(发送数据---生产者),所以需要及时关闭
    while True:
        try:
            lock.acquire()  # 子进程进来执行func()函数,需要先拿钥匙,占用,其他子进程就没法同时操作func1里面的数据了
            print(r.recv())  # 管道的r端接收数据
            lock.release()    # 执行完recv()后需要释放钥匙,让其他进程进来操作func1
        except EOFError:
            lock.release()  # 如果管道的l端关闭了,不再发送数据,此时try执行到r.recv()就会抛出异常EOFError,后面的lock.release()并没有被执行,所以这里需要对上述异常情况 锁未释放的情况进行补充
            break
def func2(p):  # 生产者模型--管道的l端发送数据(生产数据---生产者)子进程执行func2时只会用到l端,不断地send,所以需要对没有用到的管道一端r在该子进程中及时关闭
    l,r=p   # 管道的两端
    r.close()  # 该子进程中对用不到的管道一端r进行及时关闭
    for i in range(10):
        l.send("hello,xuanxuan--%s"%i)  # 管道l端发送数据
if __name__=="__main__":
    l,r=Pipe()  # 管道的两端
    lock=Lock()  # 用来给子进程(消费者func1)加锁,防止多个子进程同时操作里面的数据
    p1=Process(target=func1,args=((l,r),lock))  # 开子进程执行func1---消费者,管道r端取数据
    p1.start()
    p2=Process(target=func1,args=((l,r),lock))  # 开子进程执行func1---消费者,管道r端取数据
    p2.start()
    # 注意此时不要先关闭l,r管道,是因为后面开子进程需要用到l或者r,得等到不用的时候才能关闭管道
    p3=Process(target=func2,args=((l,r),))  # 开子进程执行func2---生产者--管道l端发送数据
    p3.start()
    p4=Process(target=func2,args=((l,r),))  # 开子进程执行func2---生产者--管道l端发送数据
    p4.start()
    p5=Process(target=func2,args=((l,r),))  # 开子进程执行func2---生产者--管道l端发送数据
    p5.start()
    l.close()   # 现在两个管道都不再使用,需要关闭
    r.close()   # (三个进程执行生产者,两个进程执行消费者)

 运行结果:

总结:

Pipe----管道,数据不安全--没有加锁;可以实现双端通信;

Queue---队列,基于管道实现,加锁,数据安全,其实也可以双向通信;

JoinableQueue----有put端和get的技术机制,每次get()数据发送task_done()  put端计数-1,直到get()端取完了队列的所有数据, put()端的join()就会接受到信号,直到get()端已经接受完数据了

talk is cheap,show me the code
原文地址:https://www.cnblogs.com/xuanxuanlove/p/9782315.html