Python多线程并发时通过线程池限流

Python支持多线程,但是由于GIL的限制并不能无限制的开启子线程。

通过semaphore我们可以控制子线程对于共享资源的访问,即可以阻塞一些子线程直到有空余的semaphore资源,但是并不能实际限制子线程数。

当我们需要开启成千上万个子线程时,很多时候并不希望这些子线程同时执行(可能受限于系统资源or后端数据库),而是更希望一次性执行一批子线程,然后有空余资源时补充一批继续执行。

在Python2中,一种变通的方法是自己设置一个简易的线程池,如下所示:

if __name__ == '__main__':
    max_workers = 50
    all_resouces = get_all_resouces()
    thread_pool = {}
    i = 0
    while i < len(all_resouces):
        if len(thread_pool) < max_workers:
            thd = Thread(target=handle_resource, args=(all_resouces[i],), name=all_resouces[i])
            thd.start()
            thread_pool[i] = thd
            i += 1
        else:
            sleep(3)
            for thd_index,thd in thread_pool.items():
                if not thd.is_alive():
                    thread_pool.pop(thd_index)
    for thd in thread_pool.values():
        thd.join()
# 这里把线程池定义为dict而非list是因为遍历list时对list本身做remove会导致线程池满后清除死线程的效率降低一半。
# 这个BUG是因为for遍历list其实是根据下标索引来遍历的,每当删除一个元素就会导致后边的下标整体-1,这会导致下一次遍历时跳过被删除位置的元素
# 这个BUG有多种处理方式,例如list copy,queue等等,在stackoverflow上也有诸多讨论,这里不过多描述。

上述Demo中,我们将线程池大小限制为50。

当线程池未满,直接创建新的子线程并启动然后加入thread_pool。

当线程池已满,便等待数秒(也可以不等待),之后检查线程池看看能不能空出坑位,进入下一次循环,直到所有子线程创建完毕。

最后join(),等待所有子线程执行完毕后结束主进程。

在Python3中直接执行上述代码会遇到:

RuntimeError: dictionary changed size during iteration

这个错误比较熟悉,但并不会影响程序实际执行。说他熟悉其实是因为这个报错换个单词就可以描述上边代码中为什么使用了dict而不是list:

RuntimeError: list changed size during iteration
问题:是否有现成的标准库或第三方库实现上述功能?
使用python3的标准库concurrent.futures会很好: concurrent.futures — Launching parallel tasks — Python 3.9.6 documentation
其实这个库就是实现了上述限流的目的,其底层依然是Threading和multiprocessing模块,一个future对象其实就是一个子线程,通过其线程池功能,我们可以像使用threading模块那样使用concurrent.futures,只是方法名和使用方式有些许差异。并且threading模块里的Lock和信号量等同步原语也可以直接在concurrent.futures的代码中照常使用,这些同步原语是与资源处理函数绑定的与并发库倒是无关。
另外python2中好像也有threadpool这个第三方库,但是现在已经很少用了,官网已经404了没找到什么有效信息。
那么使用concurrent.futures实现上述代码一样的功能就可以写为:
from concurrent.futures import ThreadPoolExecutor
......
if __name__ == '__main__':
    all_resouces = get_all_resouces()
    with ThreadPoolExecutor(max_workers=50) as pool:
		for r in all_resouces:
            pool.submit(handle_resource, *args)
concurrent.futures通过封装Threading与multiprocessing模块实现了线程池限流的功能,并且写法更加简洁。
此模块中的wait和as_completed两个module function返回的future对象功能很强大,可以通过result存储子线程的执行结果。
相比于Threading模块,concurrent.futures启动的子线程会默认阻塞主进程(直到所有子线程执行完毕),这应该不能算作缺点,总而言之,使用concurrent.futures启动多线程是推荐的做法。

补充:

按上述方式开启大量子线程产生大量futures对象后,即便一些子任务已经完成也不会立即释放future对象(及其占用的内存),因此相比于python2的写法其存在内存耗尽的隐患,而Python3使用dict作为线程池又会报错,所以python3中不适合使用上述python2中的写法做线程池。

那么python3中只能忍受concurrent.futures的这个问题了吗?不,参考上述python2的写法,我们或许可以通过遍历线程池来逐一释放future对象,但这要求我们对任务再做一次拆分,即:假如我们需要开启1万个子线程,那么我们可以将其拆分为10个批次,每批次1000个线程,每个批次submit的同时将其生成的futures对象放入一个sequence中,然后针对这个future的seq遍历其futures.as_completed(seq)结果,由于其返回时总是等待所有线程执行完毕后,因此我们可以在之后执行del seq[index]来逐一删除这些已完成的futures对象。

幸运的是上述这种简易的资源释放思想,concurrent.futures中提供啦。只要使用ThreadPoolExecutor的shutdown(wait=True)方法即可,甚至如果你使用了with语句构造ThreadPoolExecutor,那么with语句会自动帮你调用shutdown(wait=True)。

示例如下:

from concurrent.futures import ThreadPoolExecutor
......
if __name__ == '__main__':
    all_resouces = get_all_resouces()
    i = 0
    while i < len(all_resouces):
        with ThreadPoolExecutor(max_workers=50) as pool:
            for r in all_resouces[i:i+1000]:
                pool.submit(handle_resource, *args)
            i += 1000

本质上还是去寻找一个合适的契机把完成的线程给释放掉(之前一次性开启全部线程时会等待所有线程执行完毕才释放资源),对比上述python2的即时释放策略CPU消耗有明显下降,但并无实质区别。

想建一个数据库技术和编程技术的交流群,用于磨炼提升技术能力,目前主要专注于Golang和Python以及TiDB,MySQL数据库,群号:231338927,建群日期:2019.04.26,截止2021.02.01人数:300人 ... 如发现博客错误,可直接留言指正,感谢。
原文地址:https://www.cnblogs.com/leohahah/p/15015562.html