concurrent.futures

前言

源码:
	Lib/concurrent/futures/thread.py
	Lib/concurrent/futures/process.py
concurrent.futures:
	异步执行可以由 ThreadPoolExecutor 或 ProcessPoolExecutor 来实现
	两者都是实现抽像类 Executor 定义的接口

Future 方法对象说明

对象名 说明
cancel() 尝试去取消调用。如果调用当前正在执行,不能被取消。这个方法将返回False,否则调用将会被取消,方法将返回True
cancelled() 如果调用被成功取消返回True
running() 如果当前正在被执行不能被取消返回True
done() 如果调用被成功取消或者完成running返回True
result(Timeout = None) 拿到调用返回的结果。如果没有执行完毕就会去等待
exception(timeout=None) 捕获程序执行过程中的异常
add_done_callback(fn) 将fn绑定到future对象上。当future对象被取消或完成运行时,fn函数将会被调用

进程池、线程池使用案例

  • 进程池与线程池调用方式、传参差不多,只是调用模块不同......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File    : $file_name.py
# @Author  : BenLam
# @Link    : https://www.cnblogs.com/BenLam/

import os
import time
from concurrent.futures import ProcessPoolExecutor  # 进程池模块
from concurrent.futures import ThreadPoolExecutor  # 线程池模块

#  下面是以进程池为例, 线程池只是模块改一下即可
def task(name):
    print("name: %s  pis%s  run" % (name, os.getpid()))
    time.sleep(1)

if __name__ == "__main__":
    # thread = ThreadPoolExecutor(2)  # 设置进程池大小,默认等于cpu核数
    pool = ProcessPoolExecutor(2)  # 设置进程池大小,默认等于cpu核数
    for i in range(5):
        pool.submit(task, "进程%s" % i)  # 异步提交(只是提交需要运行的线程不等待)

    # 作用1:关闭进程池入口不能再提交了   作用2:相当于jion 等待进程池全部运行完毕
    pool.shutdown(wait=True)  
    print("主进程")

异步调用与同步调用

concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用

同步调用

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File    : $file_name.py
# @Author  : BenLam
# @Link    : https://www.cnblogs.com/BenLam/

import os, time, random
from concurrent.futures import ProcessPoolExecutor  # 进程池模块

# 1、同步调用: 提交完任务后、就原地等待任务执行完毕,拿到结果,再执行下一行代码(导致程序串行执行)
def task(name):
    print("name: %s  pis%s  run" % (name,os.getpid()))
    time.sleep(random.randint(1, 3))

if __name__ == "__main__":
    pool = ProcessPoolExecutor(4)
    for i in range(10):
        pool.submit(task, "进程%s" % i).result() # 同步迪奥用,result(),相当于join 串行

    pool.shutdown(wait=True)
    print("主进程")

异步调用

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File    : $file_name.py
# @Author  : BenLam
# @Link    : https://www.cnblogs.com/BenLam/

from concurrent.futures import ProcessPoolExecutor  # 进程池模块
import os, time, random

def talk(name):
    print("name: %s  pis%s  run" % (name,os.getpid()))
    time.sleep(random.randint(1, 3))

if __name__ == "__main__":
    pool = ProcessPoolExecutor(4)
    for i in range(10):
        pool.submit(talk, "进程%s" % i)  # 异步调用,不需要等待

    pool.shutdown(wait=True)
    print("主进程")

回调机制

可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数

#parse_page 拿到的是一个 future 对象 obj ,需要用 obj.result() 拿到结果
用法:
	p.submit(这里异步调用).add_done_callback(方法)

回调例子

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File    : $file_name.py
# @Author  : BenLam
# @Link    : https://www.cnblogs.com/BenLam/

import os
import time
import requests
from concurrent.futures import ThreadPoolExecutor  # 线程池模块

def get(url):
    print(f"GET {url}")
    response = requests.get(url)  # 下载页面
    time.sleep(3)  # 模拟网络延时
    return {"url": url, "content": response.text}  # 页面地址和页面内容

def parse(res):
    # !取到res结果 【回调函数】带参数需要这样
    res = res.result()
    print(f"{res['url']} res is {len(res['content'])} Kb")

if __name__ == "__main__":
    urls = {
        "http://www.jd.com",
        "http://www.baidu.com",
        "http://www.cnblogs.com"
    }
    thread = ThreadPoolExecutor(2)
    for i in urls:
        # 【回调函数】执行完线程后,跟一个函数
        thread.submit(get, i).add_done_callback(parse)

废弃代码 - 折叠

线程池

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File    : $file_name.py
# @Author  : BenLam
# @Link    : https://www.cnblogs.com/BenLam/

import requests
from concurrent.futures import ThreadPoolExecutor

lis = [
    "https://www.baidu.com",
    "https://www.jd.com",
    "https://www.cnblogs.com",
]

def response(url):
    _res = requests.get(url)
    print(_res.url)
    # return _res

print("--- 主线程 ---")
executor = ThreadPoolExecutor(max_workers=3)
for url in lis:
    future = executor.submit(response, url)
    restful = future.running()
    print(restful)
print("--- 主线程 | 结束 ---")

"""
打印:
	--- 主线程 ---
		True
		True
		True
	--- 主线程 | 结束 ---
		https://www.jd.com/
		https://www.cnblogs.com/
		https://www.baidu.com/
	[Finished in 1.9s]
"""

进程池

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File    : $file_name.py
# @Author  : BenLam
# @Link    : https://www.cnblogs.com/BenLam/

import time
from concurrent.futures import ProcessPoolExecutor

Maxlis =[
    30000000,
    31000000,
    29000000
]

def count(Maxnum):
    a = 0
    start = time.time()
    while True:
        a += 1
        if a == Maxnum:
            break
    print(" -> 结果: %s | 耗时 - %.3f" %(a, round(time.time()-start, 3)))

start = time.time()
print("--- 主线程 ---")
executor = ProcessPoolExecutor(max_workers=3)
for num in Maxlis:
    future = executor.submit(count, num)
    restful = future.running()
    print(restful)
print(f"--- 主线程 | - 耗时 {round(time.time() - start, 2)} 结束 ---")

"""
打印:
	--- 主线程 ---
		True
		False
		False
	--- 主线程 | - 耗时 1.92 结束 ---
		 -> 结果: 31000000 | 耗时 - 2.201
		 -> 结果: 30000000 | 耗时 - 2.221
		 -> 结果: 29000000 | 耗时 - 2.012
	[Finished in 5.1s]
"""

submit 实现

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File    : $file_name.py
# @Author  : BenLam
# @Link    : https://www.cnblogs.com/BenLam/

import time
from threading import Lock
from concurrent.futures import ThreadPoolExecutor, as_completed

count = 0
maxsize = 10000000
lock = Lock()

def run():
    global count
    lock.acquire()
    for _ in range(maxsize):
        count += 1
    lock.release()
    print(count)

def result():
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=3) as executor:
        to_do = []
        for site in range(3):
            future = executor.submit(run)
            to_do.append(future)

        for future in as_completed(to_do):
            future.result()
    print(f"总耗时 - {round(time.time() - start_time, 2)} 秒")
result()

"""
打印:
    10000000
    20000000
    30000000
    总耗时 - 2.22 秒
    [Done] exited with code=0 in 2.016 seconds
"""

submit 实现-2

# run() 方法跟上边的一样
# submit 调用方法不一样

def test_submit():
    start_time = time.time()
    test = ThreadPoolExecutor(max_workers=3)
    for _ in range(3):
        future = test.submit(run)
    test.shutdown(wait=True)
    print(f"总耗时 - {round(time.time() - start_time, 2)} 秒")

test_submit()

"""
打印:
    10000000
    20000000
    30000000
    总耗时 - 2.2 秒
    [Done] exited with code=0 in 2.016 seconds
"""

回调函数

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @File    : $file_name.py
# @Author  : BenLam
# @Link    : https://www.cnblogs.com/BenLam/

import os
import time
import requests
import threading
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor

class Demo_1(object):
    """
    $ Time is Money
    $ 模仿批量请求网站,增加回调处理
    $ 为了更好区分 futures, 特意加了 time.sleep 强制睡眠
    $ response() 对象单线程执行就需要 30 秒
    """
    def __init__(self):
        self.lis = [
            "https://www.baidu.com",
            "https://www.jd.com",
            "https://www.cnblogs.com",
        ]
    def response(self, url):
        time.sleep(10)
        _res = requests.get(url)
        return _res

    def _restful(self, future, *args, **kwargs):
        # 回调函数输出打印
        _response = future.result()
        print(f"当前线程名: - {threading.currentThread().name}|状态码: {_response.status_code}")

    def main(self):
        print("--- 主线程 ---")
        start_time = time.time()
        executor = ThreadPoolExecutor(max_workers=3)
        for _ in self.lis:
            # 增加回调函数 call_back(对象)
            executor.submit(self.response, _).add_done_callback(self._restful)
        executor.shutdown(wait=True)
        print(f"--- 结束 | 总耗时: {round(time.time() - start_time, 3)} ---")


class Demo_2(object):
    """
    $ 时间就是金钱,大家都懂
    $ 多进程减法处理,增加回调处理
    """
    def __init__(self):
        self.lis = [10000, 20000, 30000]

    def desc(self, num):
        time.sleep(10)
        while num > 0:
            num -= 1
        return num

    def _restful(self, future, *args, **kwargs):
        # 回调函数输出打印
        _result = future.result()
        print(f"当前进程: - {os.getpid()}|状态码: {_result}")

    def main(self):
        print("--- 主进程 ---")
        start_time = time.time()
        executor = ProcessPoolExecutor(max_workers=3)
        for _ in self.lis:
            executor.submit(self.desc, _).add_done_callback(self._restful)
        executor.shutdown(wait=True)
        print(f"--- 结束 | 总耗时: {round(time.time() - start_time, 3)} ---")

if __name__ == '__main__':
    Demo_1().main()
    """
    打印:
        --- 主线程 ---
            当前线程名: - ThreadPoolExecutor-0_1|状态码: 200
            当前线程名: - ThreadPoolExecutor-0_2|状态码: 200
            当前线程名: - ThreadPoolExecutor-0_0|状态码: 200
        --- 结束 | 总耗时: 11.962 ---
        [Finished in 12.7s]
    """

    Demo_2().main()
    """
    打印:
        --- 主进程 ---
        当前进程: - 9792|计算结果: 0
        当前进程: - 9792|计算结果: 0
        当前进程: - 9792|计算结果: 0
        --- 结束 | 总耗时: 12.441 ---
        [Finished in 13.2s]
    """
    # [Finished in 24.1s]

Executor 死锁

  • Executor 的子类使用线程池来异步执行调用,
  • 如果使用不正确可能会造成死锁,
  • submit 的 task 尽量不要调用 executor 和 futures, 不然很容易出现死锁

两个方法相互等待结束的死锁

from concurrent.futures import ThreadPoolExecutor
def wait_A():
    time.sleep(8)
    print(b.result())
    return "bbbbbbbbbb"

def wait_B():
    time.sleep(9)
    print(a.result())
    return "aaaaaaaaaa"

# submit 的 task 尽量不要调用 executor 和 futures, 不然很容易出现死锁
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_A)
b = executor.submit(wait_B)

"""
通过debug + 打断电,直观看到两个线程都在互相等待对方执行完毕
debug -
	ThreadPoolExecutir-0_0 正在运行
	ThreadPoolExecutir-0_1 正在运行
"""

调用内置函数的死锁

def wait_future():
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    f = executor.submit(abs, -5)
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_future)
"""
手动强制停止:
    [Done] exited with code=1 in 25.838 seconds
"""

超时直接抛异常

  • call_with_timeout 是定制一个超时方法,在规定时间内未完成会抛出异常
def call_with_timeout(func, *args, timeout=3):
    """
    Executor API无法取消已在执行的调用。
    future.cancel()只能取消尚未开始的通话。
    如果你想要突然中止功能,
    你应该使用除了concurrent.futures.ProcessPoolExecutor以外的东西。

    future.result(timeout=timeout)
    """
    executor = ThreadPoolExecutor(max_workers=1)
    try:
        future = executor.submit(func)
        result = future.result(timeout=timeout)
    finally:
        executor.shutdown(wait=False)

call_with_timeout(wait_A)
"""
编译器打印:
	Traceback (most recent call last):
	  File "concurrent.futures - submit.py", line 91, in <module>
		call_with_timeout(wait_A)
	  File "concurrent.futures - submit.py", line 80, in call_with_timeout
		result = future.result(timeout=timeout)
	  File "C:python36libconcurrentfutures\_base.py", line 434, in result
		raise TimeoutError()
	concurrent.futures._base.TimeoutError

	[Done] exited with code=1 in 8.67 seconds
"""

map 方法

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    executor.map(方法名字, 参数1, 参数2, 参数3)
  • 例子
import os
from concurrent.futures import ThreadPoolExecutor

def test(n):
    print(f"当前运行线程 - {os.getpid()} | - {n} ")

with ThreadPoolExecutor(max_workers=5) as executor:
    executor.map(test, range(10))
    executor.shutdown(wait=True)
"""
打印:
	当前运行线程 - 5848 | - 0 
	当前运行线程 - 5848 | - 1 
	当前运行线程 - 5848 | - 2 
	当前运行线程 - 5848 | - 3 
	当前运行线程 - 5848 | - 4 
	当前运行线程 - 5848 | - 5 
	当前运行线程 - 5848 | - 6 
	当前运行线程 - 5848 | - 7 
	当前运行线程 - 5848 | - 8 
	当前运行线程 - 5848 | - 9 

	[Done] exited with code=0 in 0.661 seconds
"""
原文地址:https://www.cnblogs.com/BenLam/p/12721420.html