进程间通信--队列和管道

1.进程间通信--队列和管道

IPC(Inter-Process Communication)

队列:创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。 

# 队列 先进先出
# IPC
# from multiprocessing import Queue
# q = Queue(5)
# q.put(1)
# q.put(2)
# q.put(3)
# q.put(4)
# q.put(5)
# print(q.full())   # 队列是否满了
# print(q.get())
# print(q.get())
# print(q.get())
# print(q.get())
# print(q.get())
# print(q.empty())
# while True:
#     try:
#         q.get_nowait()
#     except:
#         print('队列已空')
#         time.sleep(0.5)
# for i in range(6):
#     q.put(i)

from multiprocessing import Queue,Process
def produce(q):
    q.put('hello')

def consume(q):
    print(q.get())

if __name__ == '__main__':
    q = Queue()
    p = Process(target=produce,args=(q,))
    p.start()
    c = Process(target=consume, args=(q,))
    c.start()
队列
# 队列
# 生产者消费者模型

# 生产者 进程
# 消费者 进程
import time
import random
from multiprocessing import Process,Queue
def consumer(q,name):
    while True:
        food = q.get()
        if food is None:
            print('%s获取到了一个空'%name)
            break
        print('33[31m%s消费了%s33[0m' % (name,food))
        time.sleep(random.randint(1,3))

def producer(name,food,q):
    for i in range(4):
        time.sleep(random.randint(1,3))
        f = '%s生产了%s%s'%(name,food,i)
        print(f)
        q.put(f)

if __name__  == '__main__':
    q = Queue(20)
    p1 = Process(target=producer,args=('Egon','包子',q))
    p2 = Process(target=producer, args=('wusir','泔水', q))
    c1 = Process(target=consumer, args=(q,'alex'))
    c2 = Process(target=consumer, args=(q,'jinboss'))
    p1.start()
    p2.start()
    c1.start()
    c2.start()
    p1.join()
    p2.join()
    q.put(None)
    q.put(None)
生产者消费者模型
import time
import random
from multiprocessing import Process,JoinableQueue
def consumer(q,name):
    while True:
        food = q.get()
        print('33[31m%s消费了%s33[0m' % (name,food))
        time.sleep(random.randint(1,3))
        q.task_done()     # count - 1

def producer(name,food,q):
    for i in range(4):
        time.sleep(random.randint(1,3))
        f = '%s生产了%s%s'%(name,food,i)
        print(f)
        q.put(f)
    q.join()    # 阻塞  直到一个队列中的所有数据 全部被处理完毕

if __name__  == '__main__':
    q = JoinableQueue(20)
    p1 = Process(target=producer,args=('Egon','包子',q))
    p2 = Process(target=producer, args=('wusir','泔水', q))
    c1 = Process(target=consumer, args=(q,'alex'))
    c2 = Process(target=consumer, args=(q,'jinboss'))
    p1.start()
    p2.start()
    c1.daemon = True   # 设置为守护进程 主进程中的代码执行完毕之后,子进程自动结束
    c2.daemon = True
    c1.start()
    c2.start()
    p1.join()
    p2.join()      # 感知一个进程的结束

#  在消费者这一端:
    # 每次获取一个数据
    # 处理一个数据
    # 发送一个记号 : 标志一个数据被处理成功

# 在生产者这一端:
    # 每一次生产一个数据,
    # 且每一次生产的数据都放在队列中
    # 在队列中刻上一个记号
    # 当生产者全部生产完毕之后,
    # join信号 : 已经停止生产数据了
                # 且要等待之前被刻上的记号都被消费完
                # 当数据都被处理完时,join阻塞结束

# consumer 中把所有的任务消耗完
# producer 端 的 join感知到,停止阻塞
# 所有的producer进程结束
# 主进程中的p.join结束
# 主进程中代码结束
# 守护进程(消费者的进程)结束
joinableQueue解决生产者消费者问题

管道(了解)

#创建管道的类:
Pipe([duplex]):在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道
#参数介绍:
dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。
#主要方法:
    conn1.recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
    conn1.send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
 #其他方法:
conn1.close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
conn1.fileno():返回连接使用的整数文件描述符
conn1.poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
 
conn1.recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
conn.send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收    
 
conn1.recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。
View Code
from multiprocessing import Pipe,Process

def func(conn1,conn2):
    conn2.close()
    while True:
        try :
            msg = conn1.recv()
            print(msg)
        except EOFError:
            conn1.close()
            break

if __name__ == '__main__':
    conn1, conn2 = Pipe()
    Process(target=func,args = (conn1,conn2)).start()
    conn1.close()
    for i in range(20):
        conn2.send('吃了么')
    conn2.close()
View Code
原文地址:https://www.cnblogs.com/shangchunhong/p/9277635.html