进程池与线程池

进程池与线程池

属性介绍

concurrent.futures模块提供了高度封装的异步调用接口
ThreadPoolExecutor:线程池,提供异步调用
ProcessPoolExecutor: 进程池,提供异步调用

基本方法

1、submit(fn, *args, **kwargs)
异步提交任务

2、map(func, *iterables, timeout=None, chunksize=1) 
取代for循环submit的操作

3、shutdown(wait=True) 
相当于进程池的pool.close()+pool.join()操作
wait=True,等待池内所有任务执行完毕回收完资源后才继续
wait=False,立即返回,并不会等待池内的任务执行完毕
但不管wait参数为何值,整个程序都会等到所有任务执行完毕
submit和map必须在shutdown之前

4、result(timeout=None)
取得结果

5、add_done_callback(fn)
回调函数

进程池与线程池用法

示例代码:

# 进程池
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import random,os,time

def task(name):
    print('name:%s pid:%s' %(name,os.getpid()))
    time.sleep(random.randint(1,3))


if __name__ == '__main__':
    p = ProcessPoolExecutor(2) # 进程池
    # t = ThreadPoolExecutor(2)# 线程池
    for i in range(10):
        p.submit(task,'egon%s'%i)
        p.shutdown() # 与join相同
    print('主')

map方法

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import random,os,time

def task(name):
    print('name:%s pid:%s' %(name,os.getpid()))
    time.sleep(random.randint(1,3))


if __name__ == '__main__':
    p = ProcessPoolExecutor(2) # 进程池
    # t = ThreadPoolExecutor(2)# 线程池
    # for i in range(10):
    #     p.submit(task,'egon%s'%i)
    p.map(task,range(1,12)) #map取代了for+submit
    p.shutdown()
    print('主')

回调函数

可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数

提交任务方式有两种:同步调用与异步调用

同步调用:提交完任务后,就在原地等待任务执行完毕,拿到结果,再执行下一行代码,导致程序是串行执行

from concurrent.futures import ThreadPoolExecutor
import time
import random

def la(name):
    print('%s is laing' %name)
    time.sleep(random.randint(3,5))
    res=random.randint(7,13)*'#'
    return {'name':name,'res':res}

def weigh(shit):
    name=shit['name']
    size=len(shit['res'])
    print('%s 拉了 《%s》kg' %(name,size))

if __name__ == '__main__':
    pool=ThreadPoolExecutor(13)

    shit1=pool.submit(la,'alex').result()
    weigh(shit1)
   shit2=pool.submit(la,'wupeiqi').result()
    weigh(shit2)
    shit3=pool.submit(la,'yuanhao').result()
    weigh(shit3)

异步调用:提交完任务后,不地等待任务执行完毕

from concurrent.futures import ThreadPoolExecutor
import time
import random

def la(name):
    print('%s is laing' %name)
    time.sleep(random.randint(3,5))
    res=random.randint(7,13)*'#'
    return {'name':name,'res':res}


def weigh(shit):
    shit=shit.result()
    name=shit['name']
    size=len(shit['res'])
    print('%s 拉了 《%s》kg' %(name,size))


if __name__ == '__main__':
    pool=ThreadPoolExecutor(13)

    pool.submit(la,'alex').add_done_callback(weigh)

    pool.submit(la,'wupeiqi').add_done_callback(weigh)

    pool.submit(la,'yuanhao').add_done_callback(weigh)

进程池与线程池练习

from concurrent.futures import ThreadPoolExecutor
import time
import requests

def get(url):
    print('url:%s'%url)
    res = requests.get(url)
    time.sleep(3)
    return {'url':url,'content':res}


def pares(res):
    res = res.result()
    print('%s pares res is %s'%(res['url'],res['content']))

if __name__ == '__main__':
    urls = [
        'http://www.cnblogs.com/linhaifeng',
        'https://www.python.org',
        'https://www.openstack.org',
        'http://www.baidu.com'
    ]
    pool = ThreadPoolExecutor(2)
    for url in urls:
        pool.submit(get,url).add_done_callback(pares)
原文地址:https://www.cnblogs.com/yjiu1990/p/9263267.html