并发编程之 multiprocessing 和 concurrent.futures(二)

1. multiprocessing

Python 实现多进程的模块最常用的是multiprocessing,此外还有multiprocess、pathos、concurrent.futures、pp、parallel、pprocess等模块。

1.1 multiprocessing.Process

multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

参数

  • group: 为预留参数
  • target:子进程要执行的目标函数
  • name:线程名称
  • args、kwargs:参数,args 必须是元组
  • deamonbool 值:表示是否为守护进程

实例

# coding=utf-8
import multiprocessing
import time


def run(a):
    time.sleep(5)
    print(a)
    return a * a


if __name__ == '__main__':
    p = multiprocessing.Process(target=run, args=(123456,))
    p.start()	# 运行进程实例
    p.join()    # 阻塞主进程,当子进程结束后,才会继续执行主进程
    print(123)

1.2 multiprocessing.Pool

创建多个子进程最好是采用进程池 multiprocessing.Pool

multiprocessing.Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None)

参数

  • processes:进程数量,如果 processesNone那么使用 os.cpu_count()返回的数量
  • initializer: 如果 initializer不是 None,那么每一个工作进程在开始的时候会调用initializer(*initargs)
  • maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild 默认是None,意味着只要Pool存在工作进程就会一直存活
  • context: 用在制定工作进程启动时的上下文,一般使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context

创建子进程的几种方式

  • apply():同步阻塞执行,上一个子进程结束后才能进行下一个子进程(不推荐)
  • apply_async():异步非阻塞执行,每个子进程都是异步执行的(并行)(推荐)
  • map():同步阻塞
  • map_async():异步非阻塞
  • imap():内存不够用可以采用此种方式,速度慢于 map()
  • imap_unorderedimap() 的无序版本(不会按照调用顺序返回,而是按照结束顺序返回),返回迭代器实例

1.2.1 apply

同步阻塞执行,上一个子进程结束后才能进行下一个子进程

apply(func, args=(), kwds={}, callback=None, error_callback=None) 

1.2.2 apply_async

异步非阻塞执行,每个子进程都是异步执行的(并行),异步执行指的是一批子进程并行执行,且子进程完成一个,就新开始一个,而不必等待同一批其他进程完成

# callback 回调,error_back 错误回调
apply_async(func, args=(), kwds={}, callback=None, error_callback=None)

示例

# coding=utf-8

import multiprocessing


def callback(result):
    """回调函数"""
    with open("result.txt", "a+", encoding="utf-8") as f:
        f.write(str(result) + "
")


def run(num):
    return num * num


if __name__ == '__main__':
    pool = multiprocessing.Pool(6)
    for i in range(1000):
        pool.apply_async(run, args=(i,), callback=callback)
        
        # # 如有多个参数,可传一个 iterable
        # pool.apply_async(run, args=([i, 123, 456]), callback=callback)

    pool.close()
    pool.join()

1.2.3 map

若子进程有返回值,且需集中处理,建议采用此种方式(但是它是同步阻塞的):

# iterable 可迭代类型,将 iterable 中每个元素作为参数应用到 func 函数中,返回 list
map(func, iterable, chunksize=None)

1.2.4 map_async

map 的异步非阻塞版本,返回 MapResult 实例,使用 get() 方法,获取结果(list 方法):

map_async(func, iterable, chunksize=None, callback=None, error_callback=None)

apply_async 与 map_async 对比

# coding=utf-8

import multiprocessing
import time


def run(a):
    return a * a


data = []


def my_callback(result):
    data.append(result)


if __name__ == '__main__':
    st = time.time()
    pool = multiprocessing.Pool(6)

    # 总耗时:0.4497215747833252
    future = pool.map_async(run, range(20000))
    print(future.get())  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

    # # 总耗时:3.019148111343384
    # for i in range(20000):
    #     pool.apply_async(run, args=(i,), callback=my_callback)
    # 
    # print(data)

    pool.close()
    pool.join()
    print(f"总耗时:{time.time() - st}")

结论

  • map_asyncapply_async 速度快
  • 若想统一处理结果,map_asyncapply_async 更方便

1.2.5 imap 和 imap_unordered

内存不够可以采用 imap 方式,map 的迭代器版本,返回迭代器实例,速度远慢于 map,但是堆内存需求小。

imap_unorderedimap 的无序版本

imap(func, iterable, chunksize=1)
imap_unordered(func, iterable, chunksize=1)

实例:

# coding=utf-8

import multiprocessing
import time


def run(a):
    return a * a


data = []


def my_callback(result):
    data.append(result)


if __name__ == '__main__':
    st = time.time()
    pool = multiprocessing.Pool(6)

    # # 总耗时:0.4497215747833252
    # future = pool.map_async(run, range(20000))
    # print(future.get())  # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

    # # 总耗时:3.019148111343384
    # for i in range(20000):
    #     pool.apply_async(run, args=(i,), callback=my_callback)
    #
    # print(data)

    future = pool.imap(run, range(20000))   # 总耗时:4.171960115432739
    print(future)
    for i in future:
        print(i)

    pool.close()
    pool.join()
    print(f"总耗时:{time.time() - st}")  # 总耗时:0.4497215747833252

1.2.6 starmap 和 starmap_async

starmap 可以使子进程活动接收多个参数,而 map 只能接收一个参数:

# 子进程活动 func允许包含多个参数,也即iterable的每个元素也是iterable(其每个元素作为func的参数),返回结果组成的 list
starmap(func, iterable, chunksize=None)

# 异步并行版本,返回 MapResult 实例,get() 方法可以获取结果组成的 list
starmap_async(func, iterable, chunksize=None, callback=None, error_callback=None)

# 使用方式
pool.starmap_async(f, ((a0, b0), (a1, b1), ...)).get()

1.3 进程间通信(数据共享)

每个进程是相互独立的,内存无法共享,实现进程间数据共享的方式有:

  • multiprocessing.Value(typecode_or_type, *args, lock=True):共享单个数据,共享内存
  • multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True):共享数组,共享内存
  • multiprocessing.Manager() :共享进程,支持多种数据结构的数据共享

Manager 支持的类型有:list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,ValueArray不仅可以在本地进程间共享,甚至可以在多客户端实现网络共享,不过 Manager占用资源较大。

1、共享 dict

# coding=utf-8

# 多个进程将数据添加到字典 dd 中

import multiprocessing


def run(d, k, v):
    d[k] = v


if __name__ == '__main__':
    pool = multiprocessing.Pool(6)
    manager = multiprocessing.Manager()

    dd = manager.dict()

    for i in range(20):
        future = pool.apply_async(run, args=(dd, i, i * i))

    pool.close()
    pool.join()

    print(dict(dd))
    
# 运行结果
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25, 6: 36, 8: 64, 7: 49, 9: 81, 10: 100, 11: 121, 12: 144, 13: 169, 14: 196, 15: 225, 16: 256, 17: 289, 18: 324, 19: 361}

2、管理队列,并让不同的进程可以访问它:

import multiprocessing


def worker(name, que):
    que.put("%d is done" % name)


if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    m = multiprocessing.Manager()
    q = m.Queue()

    for i in range(20):
        pool.apply_async(worker, (i, q))

    pool.close()
    pool.join()
# coding=utf-8

import multiprocessing


def write(name, que):
    que.put("%d is done" % name)
    print(f'{name} write done!')


def read(que):
    while not que.empty():
        val = que.get(True)
        print('read===>: ', val)

        # while True:
        #     if not que.empty():
        #         val = que.get(True)
        #         print('read===>: ', val)


if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    m = multiprocessing.Manager()
    q = m.Queue()

    for i in range(20):
        pool.apply_async(write, (i, q))

    p1 = multiprocessing.Process(target=read, args=(q,))
    p1.start()
    p1.join()

    pool.close()
    pool.join()

注意:在操作共享对象元素时,除了赋值操作,其他的方法都作用在共享对象的拷贝上,并不会对共享对象生效。比如:dic['k'] = []; dic['k'].append(x),将不会修改 dic 的内容

1.4 进程间通信(数据传递)

  • 队列
    • multiprocessing.Queue(maxsize=0) :建立共享的队列实例
    • multiprocessing.JoinableQueue(maxsize=0):建立可阻塞的队列实例
  • 管道
    • multiprocessing.Pipe(duplex=True):建立一对管道对象,用于在两个进程之间传递数据

参考文章:python并行计算(上):multiprocessing、multiprocess模块

2. concurrent.futures 模块

concurrent.futures3.2 中引入的新模块,它为异步执行可调用对象提供了高层接口,分为两类:

  • ThreadPoolExecutor:多线程编程
  • ProcessPoolExecutor:多进程编程

两者实现了同样的接口,这些接口由抽象类 Executor 定义;这个模块提供了两大类型:

  • Executor:执行器,用于管理工作池
  • Future:管理工作计算出的结果

2.1 concurrent.futures.Executor 类

提供了一系列方法,可以用于异步执行调用,定义的方法有:

# 调用对象执行,fn(*args, **kwargs),返回 Future 对象,可用 future.result() 获取执行结果
submit(fn, *args, **kwargs)

# 异步执行 func,并支持多次并发调用,返回一个迭代器
# timeout 秒数可以是浮点数或者整数,如果设置为 None 或者不指定,则不限制等待时间
# ProcessPoolExecutor 这个方法将 iterables 划分为多块,作为独立的任务提交到进程池(不是 1)可显著提升性能,ThreadPoolExecutor,chunksize 不起作用
map(func, *iterables, timeout=None, chunksize=1)

# 告诉当执行器 executor 在当前所有等待的 future 对象运行完毕后,应该释放执行器用到的所有资源
# True 会等待所有 future 执行完毕,且 executor 的资源都释放完会才会返回,False 会立即返回,executor 的资源会在 future 执行完后释放
shutdown(wait=True)

2.2 ThreadPoolExecutor

ThreadPoolExecutor + requests 并发执行

# coding=utf-8

import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed


def fetch(req_url):
    r = requests.get(req_url)
    return r.json()['args']['a']


if __name__ == '__main__':
    start = time.time()
    numbers = range(12)
    url = 'http://httpbin.org/get?a={}'

    # submit() 方式
    with ThreadPoolExecutor(max_workers=3) as executor:
        # task_list = [executor.submit(fetch(url.format(n))) for n in range(12)]
        task_list = [executor.submit(fetch, url.format(n)) for n in range(12)]

        for future in as_completed(task_list):
            print(future.result())
            # data = future.result()        # 总耗时:2.903249740600586
            # print(data)

    ## map() 方式
    # with ThreadPoolExecutor(max_workers=3) as executor:
    #     future = executor.map(fetch, (url.format(n) for n in range(12)))
    #
    # for result in future:
    #     print(result)

    print(f'总耗时:{time.time() - start}')  # 总耗时:2.630300760269165

实测 submit 未按顺序返回结果

2.3 ProcessPoolExecutor

ProcessPoolExecutor 使用进程池来异步执行调用,适合计算密集型任务,方法参数:

concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

示例:

# coding=utf-8


from concurrent.futures import ProcessPoolExecutor, as_completed


def fib(n):
    if n <= 2:
        return 1

    return fib(n - 1) + fib(n - 2)


if __name__ == '__main__':
    numbers = range(20)
    with ProcessPoolExecutor(max_workers=3) as executor:
        # # map 方式
        # for num, result in zip(numbers, executor.map(fib, numbers)):
        #     print(f"{num}====>{result}")

        # submit 方式
        work_dict = {executor.submit(fib, i): i for i in numbers}
        for future in as_completed(work_dict):
            num = work_dict[future]
            try:
                data = future.result()
            except Exception as e:
                print(e)
            else:
                print(f"fib({num} = {data})")

2.4 Future 类

Future 类封装了可调用对象的异步执行,由 Executor.submit() 产生,有如下方法:

  • cancel() :尝试取消调用,如果该调用正在执行中,无法取消,本方法返回 False,其他情况下调用会被取消,并返回 True;只有当任务提交了还没执行才可以通过这种方式取消
  • cancelled(): 如果调用已经被成功取消,返回 True
  • running() :如果调用正在执行,无法被取消,则返回 True
  • done() :如果调用成功被取消或者已经执行完毕,返回 True
  • result(timeout=None): 返回调用的返回值。如果调用还没有完成,则最多等待 timeout 秒。如果 timeout 秒之后还没有完成,抛出 concurrent.futures.TimeoutError``。timeout 可以为整数或者浮点数。如果不指定或者为 None,则不限制等待时间。如果 future 在完成之前被取消了,会抛出 CancelledError 异常,如果调用抛出异常,这个方法会抛出同样的异常。同时它也会阻塞直到任务完成,获取被取消
  • exception(timeout=None) :返回被调用抛出的异常,如果调用还没有执行完毕,则最多等待 timeout 秒。如果 timeout 秒之后还没有完成,抛出 concurrent.futures.TimeoutErrortimeout 可以为整数或者浮点数。如果不指定或者为 None,则不限制等待时间。
    如果 future 在完成之前被取消了,会抛出 CancelledError 异常,如果调用完成并且没有抛出异常,返回 None
  • add_done_callback(fn):为 future 附加可调用对象 fn,当 future 运行完毕或者被取消时,它会被用作 fn 的唯一参数,并调用 fn。可调用对象按照添加顺序依次调用,并且总是在添加时所处进程的一个线程内调用它。如果该可调用对象抛出了属于 Exception 子类的异常,它会被记录并忽略。如果它抛出了属于 BaseException 子类的异常,该行为未定义。
    如果 future 已经完成或者已经取消,fn 会被立即调用

通过 add_done_callack() 获取返回值和捕获异常

concurrent.futuresthread.ProcessPoolExecutor 线程池中的 worker 引发异常的时候,并不会直接向上抛起异常,而是需要主线程通过调用 concurrent.futures.Future.exception(timeout=None) 方法主动获取 worker 的异常:

# coding=utf-8


from concurrent.futures import ProcessPoolExecutor, as_completed


def fib(n):
    if n <= 2:
        return 1

    return fib(n - 1) + fib(n - 2)


def call_back(future):
    """
    回调(可获取多进程返回值、错误)
    :param future: future 对象
    :return:
    """
    # 获取错误信息
    worker_exception = future.exception()
    if worker_exception:
        print(worker_exception)
    
    # 获取返回值
    print(future.result())


def test(n):
    if n % 2 == 0:
        n / 0	# 发生异常

    return n * 2


if __name__ == '__main__':
    numbers = range(20)
    with ProcessPoolExecutor(max_workers=3) as executor:
        # # map 方式
        # for num, result in zip(numbers, executor.map(fib, numbers)):
        #     print(f"{num}====>{result}")

        # submit 方式
        # work_dict = {executor.submit(fib, i): i for i in numbers}
        # for future in as_completed(work_dict):
        #     num = work_dict[future]
        #     try:
        #         data = future.result()
        #     except Exception as e:
        #         print(e)
        #     else:
        #         print(f"fib({num} = {data})")

        # 其他方法
        for i in numbers:
            executor.submit(test, i).add_done_callback(call_back)

实测 map() 方式提交的会触发异常,submit() 方式需要通过 add_done_callback() 主动捕获异常!

参考文章

  • https://blog.csdn.net/jpch89/article/details/87643972
  • https://blog.csdn.net/makingLJ/article/details/98084973

3. 实例

3.1 多进程(池)向同一文件写入数据

# coding=utf-8

"""
Function:回调函数解决多进程向同一文件写入数据
"""
import multiprocessing


def callback(result):
    """回调函数"""
    with open("result.txt", "a+", encoding="utf-8") as f:
        f.write(str(result) + "
")


def run(num):
    return num * num


if __name__ == '__main__':
    pool = multiprocessing.Pool(6)
    for i in range(1000):
        pool.apply_async(run, args=(i,), callback=callback)

    pool.close()
    pool.join()
原文地址:https://www.cnblogs.com/midworld/p/14614634.html