ThreadPoolExecutor多线程异步执行

https://www.cnblogs.com/pdev/p/10685093.html

1. 以下为第一种,函数级的异步执行:

import time
from concurrent.futures import ThreadPoolExecutor


def task1_fn(url):
    time.sleep(10)
    return (url + " FINISHED")


def task2_fn(url):
    time.sleep(3)
    return (url + " FINISHED")


def RunPool():
    pool = ThreadPoolExecutor(1)  # 启动一个线程池
    task1 = pool.submit(task1_fn, "CPU1")  # 在另外的线程中运行RunBenchmark() 
    task2 = pool.submit(task2_fn, "CPU2")  # 在另外的线程中运行RunBenchmark() 
    cnt = 0
    while True:
        if cnt == 10:
            task1.cancel()

        print("task1:%s" % task1.done())
        print("task2:%s" % task2.done())
        if task1.done():
            print('task1 is over')

        if task1.done() and task2.done():
            print('all is over.')
            break
        time.sleep(1)
        cnt += 2

    print('task1 result:%s' % task1.result())
    print('task2 result:%s' % task2.result())

    print("bye")


if __name__ == '__main__':
    RunPool()

2. 类级函数的的异步执行,添加了线程强制中断 pool.shutdown

import time
from concurrent.futures import ThreadPoolExecutor

class A():
    def __init__(self):
        pass

    def test_ret(self):
        return 'hello world'

    def task1_fn(self, url):
        time.sleep(10)
        return (url + " FINISHED" + self.test_ret())

    def task2_fn(self, url):
        time.sleep(3)
        return (url + " FINISHED" + self.test_ret())


def RunPool():
    a = A()
    pool = ThreadPoolExecutor(2)    # 启动一个线程池
    task1 = pool.submit(a.task1_fn, "CPU1")  # 在另外的线程中运行RunBenchmark() 
    task2 = pool.submit(a.task2_fn, "CPU2")  # 在另外的线程中运行RunBenchmark() 

    cnt = 0
    while True:
        if cnt == 10:
            task1.cancel()

        print("task1:%s" % task1.done())
        print("task2:%s" % task2.done())
        if task1.done():
            print('task1 is over')
        else:
            task1.cancel()
            pool.shutdown(wait=False)
            print('task1 result AAAAAAAA %s' % task1.result())
            break

        # if task1.done() and task2.done():
        #     print('all is over.')
        #     break
        time.sleep(1)
        cnt += 2

    print('task1 result:%s' % task1.result())
    print('task2 result:%s' % task2.result())

    print("bye")

if __name__ == '__main__':
      RunPool()

3. 第一个任务一旦完成,则强制终止线程

应用场景:如果某个任务一直处于执行中,无法退出,此时就需要强制退出,而强制退出一般需要重新线程run方法,但 模块 from concurrent.futures import ThreadPoolExecutor 重写run方法较麻烦,故采用另一种方式

wait(f_lst, return_when='FIRST_COMPLETED')
f_lst中存放的是提交到线程池中的线程对象列表
import time
import inspect
import ctypes
from concurrent.futures import ThreadPoolExecutor, wait

class A():
    def __init__(self):
        pass

    def test_ret(self):
        return 'hello world'

    def task1_fn(self, url):
        time.sleep(200)
        return (url + " FINISHED " + self.test_ret() + ' ' + str(var))

    def task2_fn(self, url):
        time.sleep(5)
        return (url + " FINISHED " + self.test_ret())


def RunPool():
    a = A()
    pool = ThreadPoolExecutor(2)  # 启动一个线程池
    task1 = pool.submit(a.task1_fn, "CPU1")  # 在另外的线程中运行RunBenchmark() 
    task2 = pool.submit(a.task2_fn, "CPU2")  # 在另外的线程中运行RunBenchmark() 

    f_lst = [task1, task2]
    wait(f_lst, return_when='FIRST_COMPLETED')

    try:
        print('task status, task1: %s, task2: %s' % (task1.done(), task2.done()))
        # print('task1 result:%s' % task1.result())
        print('task2 result:%s' % task2.result())
    except Exception as e:
        print('error: %s' % e)
    print("bye")


if __name__ == '__main__':
    var = 123
    RunPool()

注意:只有先完成的任务才可以使用 task.result() 方法,否则会一直卡着,等待 结果返回,而此时线程已被杀死,无法返回结果

4. 关键点说明:

1) task.cancel()当线程未运行之前可以取消,但是线程在线程池中启动后,无法再通过此方式取消
2) pool.shutdown()函数入参 wait=True表示中断线程前,等待直到线程执行完成才会中断, wait=False 表示 线程未执行完成,强制中断

3)传参方式和位置参数一样,不用使用元组

5. 适用业务:

多进程下的多线程,某个任务可能需要等待很长时间,此时就需要中断此任务,或者去执行其他的任务,如果要求线性处理,中断超时任务,就需要用此方法
针对上述情况下的单个进程,可以使用异步进程池 multiprocessing的apply_async方式,主进程和子进程同时执行,互不影响。

https://www.cnblogs.com/pdev/p/10685093.html
**1. 以下为第一种,函数级的异步执行:**`import timefrom concurrent.futures import ThreadPoolExecutor

import timefrom concurrent.futures import ThreadPoolExecutor
def task1_fn(url):
    time.sleep(10)
    return (url + " FINISHED")

def task2_fn(url):
    time.sleep(3)
    return (url + " FINISHED")

def RunPool():
    pool = ThreadPoolExecutor(1)  # 启动一个线程池
    task1 = pool.submit(task1_fn, "CPU1")  # 在另外的线程中运行RunBenchmark() 
    task2 = pool.submit(task2_fn, "CPU2")  # 在另外的线程中运行RunBenchmark() 
    cnt = 0
    while True:
        if cnt == 10:
            task1.cancel()
    print("task1:%s" % task1.done())
    print("task2:%s" % task2.done())
    if task1.done():
        print('task1 is over')
    if task1.done() and task2.done():
        print('all is over.')
        break
        time.sleep(1)
        cnt += 2
    print('task1 result:%s' % task1.result())
    print('task2 result:%s' % task2.result())
    print("bye")

if __name__ == '__main__':      
    RunPool()


**2. 类级函数的的异步执行,添加了线程强制中断 pool.shutdown**

import timefrom concurrent.futures import ThreadPoolExecutor
class A():
    def __init__(self):
        pass
    def test_ret(self):
        return 'hello world'
    def task1_fn(self, url):
        time.sleep(10)
        return (url + " FINISHED" + self.test_ret())
    def task2_fn(self, url):
        time.sleep(3)
        return (url + " FINISHED" + self.test_ret())

def RunPool():
    a = A()
    pool = ThreadPoolExecutor(2)    # 启动一个线程池
    task1 = pool.submit(a.task1_fn, "CPU1")  # 在另外的线程中运行RunBenchmark() 
    task2 = pool.submit(a.task2_fn, "CPU2")  # 在另外的线程中运行RunBenchmark() 
    cnt = 0
    while True:
        if cnt == 10:
            task1.cancel()
    print("task1:%s" % task1.done())
    print("task2:%s" % task2.done())
    if task1.done():
        print('task1 is over')
    else:
        task1.cancel()
        pool.shutdown(wait=False)
        print('task1 result AAAAAAAA %s' % task1.result())
        break
    # if task1.done() and task2.done():
    #     print('all is over.')
    #     break
    time.sleep(1)
    cnt += 2
    print('task1 result:%s' % task1.result())
    print('task2 result:%s' % task2.result())
    print("bye")

if __name__ == '__main__':      
    RunPool()

**> 关键点说明:**1. task.cancel()当线程未运行之前可以取消,但是线程在线程池中启动后,无法再通过此方式取消2. pool.shutdown()函数入参 wait=True表示中断线程前,等待直到线程执行完成才会中断, wait=False 表示 线程未执行完成,强制中断
**> 适用业务:**      多进程下的多线程,某个任务可能需要等待很长时间,此时就需要中断此任务,或者去执行其他的任务,如果要求线性处理,中断超时任务,就需要用此方法      针对上述情况下的单个进程,可以使用异步进程池 multiprocessing的apply_async方式,主进程和子进程同时执行,互不影响。

总结:

#shutdown(wait=True) 
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前
原文地址:https://www.cnblogs.com/zhanghaibin16/p/13322076.html