多进程Queue

进程间通讯

不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

Queues

使用方法跟threading里的queue差不多

from multiprocessing import Process, Queue
#将子进程数据传给主进程
def f(yy):#子进程
    yy.put([42, None, 'hello'])

if __name__ == '__main__':#主进程
    q = Queue()#生成进程Queue,线程Queue不行
    p = Process(target=f, args=(q,))#将Queue传给子进程
    p.start()
    print(q.get())  # prints "[42, None, 'hello']"
    p.join()
#结果:[42, None, 'hello']

  

另一种方法:

Pipes

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

from multiprocessing import Process, Pipe


def f(conn):
    conn.send([42, None, 'hello'])
    conn.send(['I am your song'])
    print(conn.recv(),'收到了!')
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()#生成管道实例:parent_conn, child_conn分别为管道的两头
    p = Process(target=f, args=(child_conn,))#管道一头传数据
    p.start()
    print(parent_conn.recv())  #管道另一头收数据 prints "[42, None, 'hello']"
    print(parent_conn.recv())    #['I am your song']
    # print(parent_conn.recv()) #如果一头只发了两次,另一头接受之后,还想接受一次,会卡主。比如发几次,收几次。
    parent_conn.send('我也给你发条信息,你能收到吗?')
    p.join()

  结果:

[42, None, 'hello']
['I am your song']
我也给你发条信息,你能收到吗? 收到了!
View Code

以上只实现了数据的传递,要实现数据的共享,比如两个进程同时修改一份数据,该怎么办呢?

Managers

A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.

A manager returned by Manager() will support types listdictNamespace(变量)LockRLockSemaphoreBoundedSemaphoreConditionEventBarrierQueueValue and Array(manager支持很多很多格式的共享)

For example:

from multiprocessing import Process, Manager
import os
#多进程实现数据的共享:不需要加锁,
#因为manager已经自动加锁了,不允许多个进程同时修改一份数据,进程数据是独立的,
#比如共享字典数据,实际已经把字典copy了10(进程数量)份了,最终通过序列化和反序列化拟合在一起,所以数据不会乱。
def f(d, l):
    d[1] = '1'#10个进程以同样的方式修改字典,结果里还是一样的值
    d['2'] = 2
    d[0.25] = None
    l.append(os.getpid())#放10个进程里每个进程号
    print('每个进程操作:',l)


if __name__ == '__main__':
    with Manager() as manager:#也可以写成 manager = Manager(),赋一个变量
        d = manager.dict()#生成一个可在多个进程之间传递和共享的字典

        l = manager.list(range(5))#生成一个可在多个进程之间传递和共享的列表
        p_list = []#准备以后存多个进程
        for i in range(10):#生成10个进程
            p = Process(target=f, args=(d, l))#将值传给d,l
            p.start()
            p_list.append(p)
        for res in p_list:#等待结果
            res.join()
        print('最终结果:')
        print(d)
        print(l)

  结果:

各个进程操作: [0, 1, 2, 3, 4, 2108]
各个进程操作: [0, 1, 2, 3, 4, 2108, 3048]
各个进程操作: [0, 1, 2, 3, 4, 2108, 3048, 4840]
各个进程操作: [0, 1, 2, 3, 4, 2108, 3048, 4840, 1600]
各个进程操作: [0, 1, 2, 3, 4, 2108, 3048, 4840, 1600, 2156]
各个进程操作: [0, 1, 2, 3, 4, 2108, 3048, 4840, 1600, 2156, 2512]
各个进程操作: [0, 1, 2, 3, 4, 2108, 3048, 4840, 1600, 2156, 2512, 4740]
各个进程操作: [0, 1, 2, 3, 4, 2108, 3048, 4840, 1600, 2156, 2512, 4740, 3476]
各个进程操作: [0, 1, 2, 3, 4, 2108, 3048, 4840, 1600, 2156, 2512, 4740, 3476, 1368]
各个进程操作: [0, 1, 2, 3, 4, 2108, 3048, 4840, 1600, 2156, 2512, 4740, 3476, 1368, 4360]
最终结果:
{1: '1', '2': 2, 0.25: None}
[0, 1, 2, 3, 4, 2108, 3048, 4840, 1600, 2156, 2512, 4740, 3476, 1368, 4360]
View Code
原文地址:https://www.cnblogs.com/tianqizhi/p/9438056.html