chapter11、4concurrent.future模块

concurrent 包  future 模块

异步并行任务编程模块,提供了一个高级的异步可执行的便利接口。

提供了两个池执行器
 

ThreadPoolExectutor

ThreadPoolExectutor(max_works=1)        池中至多创建max_workers个线程来同时异步执行,返回Executor实例。

submit(fn,*args,*kwargs)        提交执行函数级器参数,返回Future类的实例

shutdown(wait=Ture)       清理池

方法:

done()        如果调用被成功的取消或者执行完成,返回True

cancelled()        如果调用被成功的取消,返回True

running()        如果正在运行,且不能被取消,返回True

cancel()        尝试取消调用,如果已经执行且不能被取消,返回False,否则返回True

result(timeout=None)        取返回的结果,timeout为None,一直等待返回;timeout设置到期,抛出concurrent.futures.TimeoutError 异常

exception(timeout=None)        取返回的异常,timeout为None,一直等待返回;timeout设置到期,抛出concurrent.futures.TimeoutError 异常

 
import threading
from concurrent import futures
import logging
import time

FORMAT = '%(asctime)-15s	 [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

def worker(n):
    logging.info('begin to work{}'.format(n))
    time.sleep(5)
    logging.info('finished{}'.format(n))

executor = futures.ThreadPoolExecutor(max_workers=3)
fs = []
for i in range(3):
    future = executor.submit(worker, i)
    fs.append(future)

while True:
    time.sleep(2)
    logging.info(threading.enumerate())

    flag = True
    for f in fs:
        logging.info(f.down())
        flag = flag and f.done()
    print("__" * 30)
    if flag:
        executor.shutdown()
        logging.info(threading.enumerate())
        break

import threading
from concurrent import futures
import logging
import time

FORMAT = '%(asctime)-15s	 [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)


def worker(n):
    logging.info('begin to work{}'.format(n))
    time.sleep(5)
    logging.info('finished{}'.format(n))

executor = futures.ThreadPoolExecutor(max_workers=3)
fs = []
for i in range(3):
    future = executor.submit(worker, i)

while True:
    time.sleep(2)
    logging.info(threading.enumerate())

    flag = True
    for f in fs:
        logging.info(f.down())
        flag = flag and f.done()
    print("__" * 30)
    if flag:
        executor.shutdown()
        logging.info(threading.enumerate())
        break
import threading
from concurrent import futures
import logging
import time

FORMAT = '%(asctime)-15s	 [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)


def worker(n):
    logging.info('begin to work{}'.format(n))
    time.sleep(5)
    logging.info('finished{}'.format(n))

executor = futures.ThreadPoolExecutor(max_workers=3)
fs = []
for i in range(3):
    future = executor.submit(worker, i)

while True:
    time.sleep(2)
    logging.info(threading.enumerate())

    flag = True
    for f in fs:
        logging.info(f.done())
        flag = flag and f.done()
    print("__" * 30)
    if flag:
        executor.shutdown()
        logging.info(threading.enumerate())
        break

线程一旦创建就不要轻易删除

ProcessPoolExecutor对象

方法和多线程一样

 
import threading
from concurrent import futures
import logging
import time
#输出格式定义
FORMAT = '%(asctime)-15s	 [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

def worker(n):
    logging.info('begin to work{}'.format(n))
    time.sleep(5)
    logging.info('finished{}'.format(n))

if __name__ == '__main__':
    #进程池,容量3
    executor = futures.ProcessPoolExecutor(max_workers=3)
    fs = []
    for i in range(3):
        future = executor.submit(worker, i)
        fs.append(future)

    while True:
        time.sleep(2)
        logging.info(threading.enumerate())

        flag = True
        for f in fs:
            logging.info(f.done)
            flag = flag and f.done()
        print("__" * 30)
        if flag:
            executor.shutdown()
            logging.info(threading.enumerate())
            break

支持上下文管理

concurrent.futures.ProcessPoolExecutor继承自concurrent.futures._base.Executor,而父类有__enter__ 、__exit__方法,支持上下文管理。可以使用with语句。

__exit__方法本质还是调用的shutdown(wait=True),就是一直阻塞到所有运行的任务完成

 
import threading
from concurrent import futures
import logging
import time
#输出格式定义
FORMAT = '%(asctime)-15s	 [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

def worker(n):
    logging.info('begin to work{}'.format(n))
    time.sleep(5)
    logging.info('finished{}'.format(n))
    return n + 100

if __name__ == '__main__':
    #进程池,容量3
    executor = futures.ProcessPoolExecutor(max_workers=3)
    with executor:
        fs = []
        for i in range(3):
            future = executor.submit(worker, i)
            fs.append(future)

        while True:
            time.sleep(2)
            logging.info(threading.enumerate())

            flag = True
            for f in fs:#判断是否还有未完成任务
                logging.info(f.done)
                flag = flag and f.done()
                if f.done():#如果做完,查看结果
                    logging.info("result = {}".format(f.result()))
            print("__" * 30)
            if flag:
               
                break
#executor.shutdown() # 上下文清理资源,不需要再关闭
logging.info(threading.enumerate())

是Python的简单思想的哲学的体现

池约束使用量,复用,不会无限制扩张

concurrent就是为了解决高并发,虽然无法设置线程的名称,但影响不大。

原文地址:https://www.cnblogs.com/rprp789/p/9900657.html