concurrent.futures模块 -----进程池 ---线程池 ---回调

concurrent.futures模块提供了高度封装的异步调用接口,它内部有关的两个池

ThreadPoolExecutor:线程池,提供异步调用,其基础就是老版的Pool

ProcessPoolExecutor: 进程池,提供异步调用

方法

ProcessPoolExecutor(n):n表示池里面存放多少个进程,之后的连接最大就是n的值

submit(fn,*args,**kwargs) 异步提交任务 map(func, *iterables, timeout=None, chunksize=1) 取代for循环submit的操作 shutdown(wait=True) 相当于进程池的pool.close()+pool.join()操作 wait=True,等待池内所有任务执行完毕回收完资源后才继续,--------》默认 wait=False,立即返回,并不会等待池内的任务执行完毕 但不管wait参数为何值,整个程序都会等到所有任务执行完毕 submit和map必须在shutdown之前
result(timeout
=None) #取得结果 add_done_callback(fn) #回调函数
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time,random,os

def task(n):
    print('%s is running'% os.getpid())
    time.sleep(random.randint(1,3))
    return n**2
def handle(res):
    res=res.result()
    print("handle res %s"%res)

if __name__ == '__main__':
    # #同步调用
    # pool=ProcessPoolExecutor(8)
    #
    # for i in range(13):
    #     pool.submit(task, i).result() #变成同步调用,串行了,等待结果
    # # pool.shutdown(wait=True) #关门等待所有进程完成
    # pool.shutdown(wait=False)#默认wait就等于True
    # # pool.submit(task,3333) #shutdown后不能使用submit命令
    #
    # print('主')

    #异步调用
    pool=ProcessPoolExecutor(8)
    for i in range(13):
         obj=pool.submit(task,i)
         obj.add_done_callback(handle) #这里用到了回调函数
    pool.shutdown(wait=True) #关门等待所有进程完成
    print('')
ProcessPoolExecutor
#提交任务的两种方式
#同步调用:提交完任务后,就在原地等待,等待任务结束,拿到任务的返回值,才能继续下一行代码,导致程序串行执行
#异步调用+回调机制:提交完任务后,不在原地等待,任务一旦执行完毕就会触发回调函数的执行,程序是并发执行

#同步有可能是计算任务而在等待
#ProcessPoolExcutor基于pool开发的

#进程的执行状态
#阻塞:遇到i/o进入的一种状态,等待
#非阻塞:
进程其他说明
from concurrent.futures import ThreadPoolExecutor
from urllib import request
from threading import current_thread
import time

def get(url):
    print('%s get %s'%(current_thread().getName(),url))
    response=request.urlopen(url)
    time.sleep(2)
    # print(response.read().decode('utf-8'))
    return{'url':url,'content':response.read().decode('utf-8')}

def parse(res):
    res=res.result()
    print('parse:[%s] res:[%s]'%(res['url'],len(res['content'])))

# get('http://www.baidu.com')
if __name__ == '__main__':
    pool=ThreadPoolExecutor(2)

    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'https://www.openstack.org',
        'https://www.openstack.org',
        'https://www.openstack.org',
        'https://www.openstack.org',
        'https://www.openstack.org',

    ]

    for url in urls:
        pool.submit(get,url).add_done_callback(parse)
ThreadPoolExecutor
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor

import os,time,random
def task(n):
    print('%s is runing' %os.getpid())
    time.sleep(random.randint(1,3))
    return n**2

if __name__ == '__main__':

    executor=ThreadPoolExecutor(max_workers=3)

    # for i in range(11):
    #     future=executor.submit(task,i)

    executor.map(task,range(1,12)) #map取代了for+submit
map用法

回调 略

原文地址:https://www.cnblogs.com/mmyy-blog/p/9453453.html