pyton进程池和线程池 --- concurent

concurrent.futures模块

由于频繁的创建线程或者进程是一件较为繁琐的操作,且浪费时间和资源。于是Python提供了线程池和进程池的模块。我们只需要将要执行的任务提交给池对象,池会自动创建线程或是进程对象异步执行这些任务,最后拿到返回结果。这些池中的进程和线程创建后不会立即自动销毁,而是等待下一次任务的提交,方便我们重复使用。在该模块下存在 ThreadPoolExecutor线程池对象和ProcessPoolExecutor进程池对象,python将他们的调用接口做了相同的封装,使用时调用方式基本相同。

ThreadPoolExecutor对象

只需要定义一个池执行器对象,指定该池对象的最大任务数。第一次向池中提交任务时,池会对应创建与任务数对应的线程数,但不超过提供的最大任务数,超过最大任务数的任务会排队等待前一个任务执行完成。

每次提交一个任务可以的到这个任务的future对象,即submit()方法的返回值,该对象主要用于获取线程的执行结果或者报错信息,通过对应的属性获取即可。该对象包括以下的方法

future对象方法 含义
done() 该future对象对应的任务完成返回True,否则返回False
cancelled() 任务是否成功被取消,返回bool,且任务取消会标记对应future对象状态为CANCELED
running() 是否正在运行
cancel() 尝试取消该任务,已经执行无法取消,返回false,否则返回True
result(timeout=None) 取任务执行结果,默认一直等待,或者设置timeout等待时间,超时抛出异常
exception(timeout=None) 取任务执行时的错误信息,timeout等待时间,超时抛出异常
f.add_done_callback(fn) 设置任务结束时的回调函数,该函数使用该future对象作为参数

简单使用线程池

from concurrent import futures
import threading


def func1():
    logging.info("this is func")
    time.sleep(3)
    return 1233

def main():
    executer = futures.ThreadPoolExecutor(3)
    fs = []
for i in range(10):
        f = executer.submit(func1)        # 提交任务,得到futrue对象
        fs.append(f)

while True:
        for f in fs:          # 遍历列表判断是否全部完成
            if not f.done():  # 只要有一个任务没有完成,结束for循环,再一次从头判断
                time.sleep(1)
                break

        else:  # for 正常退出,全部执行完成
            ret = []
            for f in fs:
                ret.append((f.result()))  # 将结果保存
            executer.shutdown()  # 程序结束前清空执行器
            break

    print("计算结果:", ret)     # 打印结果,即任务函数的返回值
if __name__ == "__main__":
    main()

在未提交任务时,线程池中并没有新的线程被创建,每一个线程在有任务执行时才会被创建,线程总数不大于指定的最大值,任务执行完成后,执行器池中线程不会结束,而是等待下一次任务提交。只有调用执行池的shutdown方法才会清空该线程池。

执行器对象支持上下文管理,所以可以使用with语句

with futures.ThreadPoolExecutor(3) as executor:
    fs = []
    for i in range(10):
        f = executer.submit(func1)
        fs.append(f)
    # ....后代码相同,最后不需要执行executor.shutdown(),上下文自动执行

错误处理

执行池在执行提交的任务时候,如果该任务发生错误,将会停止该任务的执行,设置该任务的future对象状态为FINISHED,并将future对象_exception属性设置为True。在我们获取future对象的结果前,需要判断这个任务是否是由于错误而终止。如果发生过异常,那么该异常在future._exception属性上, 但是不宜直接获取,推荐使用future.exception()接口获得这个错误。上面的代码部分修改后

while True:
    for f in fs:          
        if not f.done():
            time.sleep(1)
            break
    else:  
        ret = []
        for f in fs:
            if f._exception:   # 该future对象的发生了异常
                print(f.exception())  # 这里选择打印信息即可,使用excption()方法可以获取这个异常信息
                continue
            ret.append((f.result()))  # 所有正确的结果才能进入该列表
        print("任务执行完成后,shutdown前:", threading.active_count())
        executer.shutdown()  
        break

相关源码如下

# 执行任务函数,出错设置future的异常
try:
    result = self.fn(*self.args, **self.kwargs)    # self.fn是我们提供的任务函数,在此处执行
except BaseException as exc:
    self.future.set_exception(exc)

# 设置异常, 该self 为 future
def set_exception(self, exception):
    with self._condition:
        self._exception = exception  # 异常被赋值给_exception属性
        self._state = FINISHED       # 标记任务完成
        for waiter in self._waiters:
            waiter.add_exception(self)
        self._condition.notify_all()
    self._invoke_callbacks()   # 异常同样会调用回调函数,将自己(future对象)作为参数

设置回调函数

任务执行完成时可以自动进行函数的回调,在提交任务时没有设置提交这个回调函数的接口,只能使用提交任务后获得future对象动态增加。future.add_done_callback(fn),fn函数必须为一参函数且未来的实参为该future对象

def callbackfunc(future):
    global count
    with lock:
        count += 1
    if future._exception:
        print(future.exception())
    else:
        ret.append(future.result())

def funcx(i):
    if i > 9:
        raise IndexError("i > 9 error")
    return 1233


if __name__ == "__main__":
    lock = threading.Lock()
    with futures.ThreadPoolExecutor(3) as executor:
        ret = []
        count = 0
        for i in range(10):
            f = executor.submit(funcx, i+1)
            f.add_done_callback(callbackfunc)  # 添加回调函数
    while count < 10:
        time.sleep(0.2)
    print(ret)

这里尝试使用回调函数将任务计算的结果添加到ret类表中,各个线程分别添加。但事实并非完全如此,在添加回调函数时f.add_done_callback(callbackfunc),如果该任务已经被完成了,这个回调函数将会交被主线程调用,也就是说,如果该任务执行时间比添加任务的时间还短,那么该回调函数实际是被主线程执行,for循环中实际为一种串行执行。所以在提交任务之后,应该在尽量短的时间内给该任务的future对象提交callback。

源码:

def add_done_callback(self, fn):
    with self._condition:
        if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
            self._done_callbacks.append(fn)   # 添加,将来任务执行完成后自动调用
            return
    fn(self)  # 否则,任务已完成,线程自己调用

ProcessPoolExecutor

定义一个进程池去完成任务,使用方式和线程池调用方式基本相同

from concurrent import futures
def func():
    sum = 0
    for i in range(100000):
        sum += 1

# 同样支持上下文管理,结束时自动执行shutdown()
with futures.ProcessPoolExecutor(3) as executor:
    future = exector.submit(func)

    while not future.done():
        pass
    if future._exception:
        print(future.exceptiion())
    else:
        print(future.result())

只提交了一个任务给进程池执行,将只会创建一个进程执行任务。一般为计算密集型的任务才使用进程池执行,IO密集型优先使用多线程。

原文地址:https://www.cnblogs.com/k5210202/p/13070741.html