异步编程(三)----------线程池、进程池

线程池和进程池在python中代码的编写基本上是一致的,调用 concurrent.futures 模块下的ThreadPoolExecutor,ProcessPoolExecutor。ThreadPoolExecutor是线程模块,ProcessPoolExecutor是进程模块。

下边感受下“速度与激情”:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
def CallPhon(name):
    print("准备给-%s-打电话"%name)
    time.sleep(2)
    print("-%s-通话结束"%name)

if __name__ == '__main__':

    start_time = time.time()
    name_list = ['王栋轩', '黄sir', '军哥', 'teacher婷婷', '刘宇girl']
    pool = ThreadPoolExecutor(max_workers = 3)
    pool.map(CallPhon,name_list)
    pool.shutdown(wait=True)
    end_time = time.time()
    print('time:',end_time-start_time)

大体思路是:通过ThreadPoolExecutor(max_workers = )创建线程池,根据参数max_workers创建几个线程,上边代码中max_workers =3,意味池中有3个线程。pool.map()和python中的map功能基本一致,依次将name_list中的元素作为实参传入CallPhon执行。

输出:

准备给-王栋轩-打电话
准备给-黄sir-打电话
准备给-军哥-打电话
-王栋轩-通话结束
准备给-teacher婷婷-打电话
-军哥-通话结束
准备给-刘宇girl-打电话
-黄sir-通话结束
-teacher婷婷-通话结束
-刘宇girl-通话结束
time: 4.0012288093566895

从输出中看出,开始先执行三个打电话,后边一旦有一个打电话结束,会立即开始给第四个人打电话。打电话结束意味着本次线程工作结束,线程会马上交给另外一个新任务。

最后消耗时间是4.00s。时间上很明显是节省很多了,如果不采用线程池的并发执行,时间需要10s多一点。

再来解析上刚才代码:

1、pool.shutdown(wait=True),功能是等所有线程个工作结束之后,再执行主线程。如果没有这行代码,最后end_time = time.time(),print('time:',end_time-start_time) 两行代码会在线程执行中间执行。相当于在threading模块下的jion()方法。

2、再来说运行时间,用线程池消耗的时间是4.00s,但是如果改为进程池,4.73s,很明显啊,这类计算进程还是略慢点,进程和进程之间切换确实比线程线程之间切换开销大。

3、再仔细看输出,发现输出既有规律也没有规律。没有规律点在于:三个线程同时并发执行,三个线程结束时间或顺序并不固定。从输出来看,“准备给-黄sir-打电话”进程在“准备给-军哥-打电话”前边,但是给军哥电话却比黄sir先结束,也就是说A比B开始的早,A比B结束的晚,这不符合常理啊!!!这就是的异步,谁也不等谁,各自干各自的,哈哈!再来解决这个问题。

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
def CallPhon(name):
    print("准备给-%s-打电话"%name)
    time.sleep(2)
    print("-%s-通话结束"%name)

if __name__ == '__main__':

    start_time = time.time()
    name_list = ['王栋轩', '黄sir', '军哥', 'teacher婷婷', '刘宇girl']
    pool = ThreadPoolExecutor(max_workers = 3)
    for i in name_list:
        pool.submit(CallPhon,i)
    pool.shutdown(wait=True)
    end_time = time.time()
    print('time:',end_time-start_time)

输出:

准备给-王栋轩-打电话
准备给-黄sir-打电话
准备给-军哥-打电话
-王栋轩-通话结束
准备给-teacher婷婷-打电话
-军哥-通话结束
准备给-刘宇girl-打电话
-黄sir-通话结束
-teacher婷婷-通话结束
-刘宇girl-通话结束
time: 4.0012288093566895

通过输出,发现刚才那个问题解决了。A一旦比B开始的任务早,A一定是比B早执行完。

那有同学说了,submint()和map()只有这一点区别吗?并不是。map要求每一个任务的函数是一样的,submit不是。map的返回值是一个list,需要用一个列表去接受,submit需要调用result()方法接收。还有一点不是很常用,submit可以执行程序异常之后的情况,map会直接报错。下边上代码演示一下:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
def CallPhon(name):
    print("准备给-%s-打电话"%name)
    time.sleep(2)
    print("-%s-通话结束"%name)
    return name

if __name__ == '__main__':

    start_time = time.time()
    name_list = ['王栋轩', '黄sir', '军哥', 'teacher婷婷', '刘宇girl']
    pool = ThreadPoolExecutor(max_workers = 3)
    list = pool.map(CallPhon,name_list)
    pool.shutdown(wait=True)
    end_time = time.time()
    print('time:',end_time-start_time)
    for _ in list:
        print(_)

输出:

D:anacondapython.exe C:/Users/W/PycharmProjects/test/class/线程池.py
准备给-王栋轩-打电话
准备给-黄sir-打电话
准备给-军哥-打电话
-黄sir-通话结束
准备给-teacher婷婷-打电话-王栋轩-通话结束
-军哥-通话结束
准备给-刘宇girl-打电话

-刘宇girl-通话结束
-teacher婷婷-通话结束
time: 4.0012288093566895
王栋轩
黄sir
军哥
teacher婷婷
刘宇girl

Process finished with exit code 0

最后map返回的列表和任务顺序是一一对应的,并没有乱。

再来看submit:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
def CallPhon(name):
    print("准备给-%s-打电话"%name)
    time.sleep(2)
    print("-%s-通话结束"%name)
    return name

if __name__ == '__main__':
    list = []
    start_time = time.time()
    name_list = ['王栋轩', '黄sir', '军哥', 'teacher婷婷', '刘宇girl']
    pool = ThreadPoolExecutor(max_workers = 3)
    for i in name_list:
        p = pool.submit(CallPhon,i)
        list.append(p.result())
    pool.shutdown(wait=True)
    end_time = time.time()
    print('time:',end_time-start_time)

输出:

准备给-王栋轩-打电话
-王栋轩-通话结束
准备给-黄sir-打电话
-黄sir-通话结束
准备给-军哥-打电话
-军哥-通话结束
准备给-teacher婷婷-打电话
-teacher婷婷-通话结束
准备给-刘宇girl-打电话
-刘宇girl-通话结束
time: 10.000572204589844

确实可以收到返回值,但是问题又来了,好好的并发,变成了串行,凎!!!好像一夜回到解放前。。。如何操作?来看下一步。

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time
def CallPhon(name):
    print("准备给-%s-打电话"%name)
    time.sleep(2)
    print("-%s-通话结束"%name)
    return name
list = []
def recive(res):
    res = res.result()
    list.append(res)

if __name__ == '__main__':
    start_time = time.time()
    name_list = ['王栋轩', '黄sir', '军哥', 'teacher婷婷', '刘宇girl']
    pool = ThreadPoolExecutor(max_workers = 3)
    for i in name_list:
        pool.submit(CallPhon,i).add_done_callback(recive)
    pool.shutdown(wait=True)
    end_time = time.time()
    print('time:',end_time-start_time)
    print(list)

输出:

准备给-王栋轩-打电话
准备给-黄sir-打电话
准备给-军哥-打电话
-黄sir-通话结束-王栋轩-通话结束
准备给-teacher婷婷-打电话

准备给-刘宇girl-打电话
-军哥-通话结束
-teacher婷婷-通话结束
-刘宇girl-通话结束
time: 4.0012288093566895
['王栋轩', '黄sir', '军哥', 'teacher婷婷', '刘宇girl']

完美解决问题。关键步骤是利用了一个回调机制,add_done_callback。回调机制是啥?就是回调函数。回调函数是啥?我个人感觉就是C语言中的,形参是函数名称。或者说和中断一样,先执行回调函数中形参名称的函数,结束后返回中断点继续执行。在代码中不难看出,形参名是recive,recive定义的是将一个对象的result加入到某列表中。这个类就是pool.submit(CallPhon,i)的返回对象Future,在这里边Future传入的过程是看不到了,记住就好。。。。

还有一个 叫做 multiprocessing 的模块,也是可以创建线程池、进程池,原理上和Thread、concurrent.futures基本一致。multiprocessing 实现的功能,在Thread、concurrent.futures也都能实现。。。烦了,不再写代码演示了,反正都差不多。。。

原文地址:https://www.cnblogs.com/lgwdx/p/14289112.html