python自带库--multiprocessing库

multiprocessing模块介绍

multiprocessing 是一个支持使用与 threading 模块类似的 API 来产生进程的包。 multiprocessing 包同时提供了本地和远程并发操作,通过使用子进程而非线程有效地绕过了 全局解释器锁。 因此,multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。 它在 Unix 和 Windows 上均可运行。

multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

一、Process类的介绍

在 multiprocessing 中,通过创建一个 Process 对象然后调用它的 start() 方法来生成进程

简单实例:

from multiprocessing import Process
def f(name):
    print('hello', name)
if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

创建进程的类:

Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,可用来开启一个子进程

强调:

1. 需要使用关键字的方式来指定参数 

2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

参数介绍:

group参数未使用,值始终为None

target表示调用对象,即子进程要执行的任务

args表示调用对象的位置参数元组,args=(1,2,'mike',)

kwargs表示调用对象的字典,kwargs={'name':'mike','age':18}

name为子进程的名称

方法介绍:

p.start() :# 启动进程,并调用该子进程中的p.run() 

p.run() :# 进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法

p.terminate() : # 强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁

p.is_alive() :# 如果p仍然运行,返回True

p.join([timeout]) :# 主进程等待p终止(强调:是主进程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间。

属性介绍

p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置

p.name:进程的名称

p.pid:进程的pid

二、在进程之间交换对象

multiprocessing 支持进程之间的两种通信通道:队列和管道

1、队列(Quene)

class multiprocessing.Queue([maxsize])

返回一个使用一个管道和少量锁和信号量实现的共享队列实例。当一个进程将一个对象放进队列中时,一个写入线程会启动并将对象从缓冲区写入管道中。

一旦超时,将抛出标准库 queue 模块中常见的异常 queue.Empty 和 queue.Full

方法:

qsize()-----返回队列大致长度,返回数字不可靠

empty()-----如果队列是空的,返回 True ,反之返回 False。状态不可靠

full()-----如果队列是满的,返回 True ,反之返回 False 。状态不可靠

put(obj[, block[, timeout]])

将 obj 放入队列。如果可选参数 block 是 True (默认值) 而且 timeout 是 None (默认值), 将会阻塞当前进程,直到有空的缓冲槽。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的缓冲槽时抛出 queue.Full  异常。反之 (block 是 False 时),仅当有可用缓冲槽时才放入对象,否则抛出 queue.Full 异常 (在这种情形下 timeout 参数会被忽略)。

put_nowait(obj)-----相当于 put(obj, False)
get([block[, timeout]])

从队列中取出并返回对象。如果可选参数 block 是 True (默认值) 而且 timeout 是 None (默认值), 将会阻塞当前进程,直到队列中出现可用的对象。如果 timeout 是正数,将会在阻塞了最多 timeout 秒之后还是没有可用的对象时抛出 queue.Empty 异常。反之 (block 是 False 时),仅当有可用对象能够取出时返回,否则抛出 queue.Empty 异常 (在这种情形下 timeout 参数会被忽略)。

在 3.8 版更改: 如果队列已经关闭,会抛出 ValueError 而不是 OSError 。

get_nowait()-----相当于 get(False)
from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

    队列是线程和进程安全的。

2、管道(Pipe)

multiprocessing.Pipe([duplex])

返回一对 Connection 对象 (conn1, conn2) , 分别表示管道的两端。

如果 duplex 被置为 True (默认值),那么该管道是双向的。如果 duplex 被置为 False ,那么该管道是单向的,即 conn1 只能用于接收消息,而 conn2 仅能用于发送消息。

from multiprocessing import Process, Pipe
def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()
if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()
返回的两个连接对象 Pipe() 表示管道的两端。每个连接对象都有 send() 和 recv() 方法(相互之间的)。
请注意,如果两个进程(或线程)同时尝试读取或写入管道
的 同一 端,则管道中的数据可能会损坏。
当然,在不同进程中同时使用管道的不同端的情况下不存在损坏的风险。

三、进程间同步

对于所有在 threading 存在的同步原语,multiprocessing 中都有类似的等价物。例如,可以使用锁来确保一次只有一个进程打印到标准输出:

from multiprocessing import Process, Lock
def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()
if __name__ == '__main__':
    lock = Lock()
    for num in range(10):
        Process(target=f, args=(lock, num)).start()

不使用锁的情况下,来自于多进程的输出很容易产生混淆。

四、服务进程

由 Manager() 返回的管理器对象控制一个服务进程,该进程保存Python对象并允许其他进程使用代理操作它们。

Manager() 返回的管理器支持类型: list 、 dict 、 Namespace 、 Lock 、 RLock 、 Semaphore 、 BoundedSemaphore 、 Condition 、 Event 、 Barrier 、 Queue 、 Value 和 Array 。

from multiprocessing import Process, Manager
def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()
if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))
        p = Process(target=f, args=(d, l))
        p.start()
        p.join()
        print(d)
        print(l)

将打印

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

使用服务进程的管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以通过网络由不同计算机上的进程共享。但是,它们比使用共享内存慢

五、使用工作进程

Pool 类表示一个工作进程池。它具有允许以几种不同方式将任务分配到工作进程的方法。

from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
    return x*x
if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:
        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))
        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)
        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"
        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 secs
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

请注意,进程池的方法只能由创建它的进程使用。

六、连接(Connection)对象

class multiprocessing.connection.Connection

send(obj)-----将一个对象发送到连接的另一端,可以用 recv() 读取。
recv()------返回一个由另一端使用 send() 发送的对象。
fileno()-----返回由连接对象使用的描述符或者句柄。
close()-----关闭连接对象。
poll([timeout])-----返回连接对象中是否有可以读取的数据。
send_bytes(buffer[, offset[, size]])-----从一个 bytes-like object  (字节类对象)对象中取出字节数组并作为一条完整消息发送。
recv_bytes([maxlength])-----以字符串形式返回一条从连接对象另一端发送过来的字节数据。此方法在接收到数据前将一直阻塞。
ecv_bytes_into(buffer[, offset])-----将一条完整的字节数据消息读入 buffer 中并返回消息的字节数。 此方法在接收到数据前将一直阻塞。
七、class multiprocessing.Lock一旦一个进程或者线程拿到了锁,后续的任何其他进程或线程的其他请求都会被阻塞直到锁被释放。
方法:acquire(block=Truetimeout=None)-----获得锁,阻塞或非阻塞的。

如果 block 参数被设为 True ( 默认值 ) , 对该方法的调用在锁处于释放状态之前都会阻塞,然后将锁设置为锁住状态并返回 True 。

如果 block 参数被设置成 False ,方法的调用将不会阻塞。 如果锁当前处于锁住状态,将返回 False ; 否则将锁设置成锁住状态,并返回 True 。

方法:release()---释放锁,可以在任何进程、线程使用,并不限于锁的拥有者。

八、管理器--管理器提供了一种创建共享数据的方法,从而可以在不同进程中共享,甚至可以通过网络跨机器共享数据。管理器维护一个用于管理 共享对象 的服务。其他进程可以通过代理访问这些共享对象。

1、multiprocessing.Manager()---返回一个已启动的 SyncManager 管理器对象,这个对象可以用于在不同进程中共享数据。返回的管理器对象对应了一个已经启动的子进程,并且拥有一系列方法可以用于创建共享对象、返回对应的代理。

2、class multiprocessing.managers.BaseManager([address[, authkey]])---创建一个 BaseManager 对象。

一旦创建,应该及时调用 start() 或者 get_server().serve_forever() 以确保管理器对象对应的管理进程已经启动。

address 是管理器服务进程监听的地址。如果 address 是 None ,则允许和任意主机的请求建立连接。

authkey 是认证标识,用于检查连接服务进程的请求合法性。如果 authkey 是 None, 则会使用 current_process().authkey , 否则,就使用 authkey , 需要保证它必须是 byte 类型的字符串。

方法:tart([initializer[, initargs]])-----为管理器开启一个子进程,如果 initializer 不是 None , 子进程在启动时将会调用 initializer(*initargs)

方法:get_server()----返回一个 Server  对象,它是管理器在后台控制的真实的服务。 Server  对象拥有 serve_forever() 方法。

方法:connect()----将本地管理器对象连接到一个远程管理器进程:

方法:shutdown()-----停止管理器的进程。这个方法只能用于已经使用 start() 启动的服务进程。它可以被多次调用。

方法:register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])-----一个 classmethod,可以将一个类型或者可调用对象注册到管理器类。

属性:address----管理器所用的地址。

九、进程池------可以创建一个进程池,它将使用 Pool 类执行提交给它的任务。

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

一个进程池对象,它控制可以提交作业的工作进程池。它支持带有超时和回调的异步结果,以及一个并行的 map 实现。

map(funciterable[, chunksize])-----内置 map() 函数的并行版本 (但它只支持一个 iterable 参数,对于多个可迭代对象请参阅 starmap())。 它会保持阻塞直到获得结果。

参考文档:https://docs.python.org/zh-cn/3/library/multiprocessing.html#multiprocessing.managers.BaseManager.connect

原文地址:https://www.cnblogs.com/eihouwang/p/14197640.html