线程池--进程池--回调函数

什么是池?
在程序开始的时候,还没提交任务先创建几个线程或者进程
放在一个池子里,这就是池

为什么要用池?
如果先开好进程或者线程,那么有任务之后就可以直接使用池中的数据

开好的线程或进程会一直存在池中,可以被多个任务反复利用,即谁先执行完谁先被复用
这样极大的减少了开启、关闭、调度的时间开销
池中的线程或进程个数决定了操作系统需要调度的任务个数,控制池中的单位,
有利于提高操作系统的效率,减轻负担

1.线程池
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from threading import current_thread


def func(a, b):
    print(current_thread().ident, 'start', a, b)
    time.sleep(1)

    return a*b


tp = ThreadPoolExecutor(4)
for i in range(20):  # 提交任务时是异步非阻塞的
    ret = tp.submit(func, i, i+1)  # 函数func的返回值形成的Future对象
    # print(ret.result())  # 在这里打印的话会造成堵塞,必须等待func的返回结果

2.进程池

import time, random
import os
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


# def func(a, b):
#     print(os.getpid(), 'start', a, b)
#     time.sleep(random.randint(1, 4))
#     print(os.getpid(), 'end')
#     return a * b
#
#
# if __name__ == '__main__':
#
#     pp = ProcessPoolExecutor(4)
#     future_ls = []
#     for i in range(20):  # 提交任务时是异步非阻塞的
#         ret = pp.submit(func, i, i+1)  # 函数func的返回值形成的Future对象
#         future_ls.append(ret)
#
#     for i in range(len(future_ls)):  # 获取返回值的时候是同步阻塞的
#         print(i, future_ls[i].result())


# map
# def func(a):
#     b = a + 1
#     print(os.getpid(), 'start', a, b)
#     time.sleep(random.randint(1, 4))
#     print(os.getpid(), 'end')
#     return a * b
#
#
# if __name__ == '__main__':
#
#     pp = ProcessPoolExecutor(4)
#     ret = pp.map(func, range(20))  # 可迭代类型参数
#     for k in ret:  # 同步阻塞的
#         print(k)


# 回调函数:效率最高
def func(a):
    b = a + 1
    print(os.getpid(), 'start', a, b)
    time.sleep(random.randint(1, 4))
    print(os.getpid(), 'end')
    return a * b


def print_func(ret):
    print(ret.result())


if __name__ == '__main__':

    pp = ProcessPoolExecutor(4)
    for i in range(20):
        ret = pp.submit(func, i)
        ret.add_done_callback(print_func)
            # 异步阻塞,回调函数,给ret对象绑定一个回调函数,等待ret对应的任务有了返回结果后
            # 立即调用print_func函数并将ret对象作为参数传入,就可以对结果立即进行处理,而不用按照顺序接收结果再处理结果

3.回调函数的例子

 
from concurrent.futures import ThreadPoolExecutor
import requests
import os


def get_page(url):    # 访问网页,获取网页源代码   线程池中的线程来操作
    print('<进程%s> get %s' %(os.getpid(),url))
    response=requests.get(url, )
    if response.status_code == 200:
        return {'url':url,'text':response.text}


def parse_page(res):   # 获取到字典结果之后,计算网页源码的长度,把https://www.baidu.com : 1929749729写到文件里   线程任务执行完毕之后绑定回调函数
    res=res.result()
    print('<进程%s> parse %s' %(os.getpid(),res['url']))
    parse_res='url:<%s> size:[%s]
' %(res['url'],len(res['text']))
    with open('db.txt','a') as f:
        f.write(parse_res)


if __name__ == '__main__':
    urls=[
        'https://www.baidu.com',
        'https://www.python.org',
        'https://www.openstack.org',
        'http://www.sina.com.cn/'
    ]
    tp = ThreadPoolExecutor(3)  # 获取线程池对象
    for url in urls:  # 循环网站列表,开启任务
        ret = tp.submit(get_page, url)
        ret.add_done_callback(parse_page)  # 开启回掉函数,哪个网页先返回结果,就先执行哪个网页的回调函数


"""
进程池:
    一般用于高计算的场景,没有io(无 文件操作/数据库操作/网络操作/input)
    cpu_count * 1 < 一般进程数 < cpu_count * 2

线程池:
    一般根据 io的比例定制
    线程数一般设置为 cpu_count * 5
    

"""



原文地址:https://www.cnblogs.com/GOD-L/p/13781755.html