进程池、线程池、异步调用(取返回值)

进程池vs线程池

为什么要用“池”:
池子使用来限制并发的任务数目,限制我们的计算机在一个自己可承受的范围内去并发地执行任务

池子内什么时候装进程:并发的任务属于计算密集型
池子内什么时候装线程:并发的任务属于IO密集型
concurrent:并发的,一致的,同时发生的  Executor执行者 

'''
#1、阻塞与非阻塞指的是程序的两种运行状态
阻塞:遇到IO就发生阻塞,程序一旦遇到阻塞操作就会停在原地,并且立刻释放CPU资源
非阻塞(就绪态或运行态):没有遇到IO操作,或者通过某种手段让程序即便是遇到IO操作也不会停在原地,执行其他操作,力求尽可能多的占有CPU


#2、同步与异步指的是提交任务的两种方式:
同步调用:提交完任务后,就在原地等待,直到任务运行完毕后,拿到任务的返回值,才继续执行下一行代码
异步调用:提交完任务后,不在原地等待,直接执行下一行代码,等到任务有返回值后自动触发后调函数

'''
 
#进程池
from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import time,os,random

def task(x):
    print('%s 接客' %os.getpid())
    time.sleep(random.randint(2,5))
    return x**2


if __name__ == '__main__':
    # p=ProcessPoolExecutor(3) # 不指定默认开启的进程数是cpu的核数
    # print(os.cpu_count())查看CPU的核数
    p=ProcessPoolExecutor(max_workers=2) # 默认开启的进程数是cpu的核数,也就是干活的人数

    #简写成for循环模拟6个任务执行
    for i in range(6):
        p.submit(task,i)  #submit只是负责往池子里面丢任务,刚开始2个人同时接客


#1.submit往里面丢任务干活的人就是你指定的ProcessPoolExecutor(2)后面的人数
#2.刚一造个进程池,立马给你预备2个人干活,无论你丢多少任务,干活的永远是那2个人(谁先完了腾出手再接下一个)
#线程池
def task(x):
    print('%s is running'%x)
    time.sleep(random.randint(2, 5))
    # return x**2

if __name__ == '__main__':
    # p=ProcessPoolExecutor(max_workers=2)
    p=ThreadPoolExecutor(20)   #默认不写开启的是CPU核数*5

    for i in range(25):
        p.submit(task,i)

result功能(可以通过每次线程或者进程对象.result(),拿到返回值)

#线程池(result功能)
def task(x):
    print('%s is running'%x)
    time.sleep(random.randint(2, 5))
    return x**2

if __name__ == '__main__':
    # p=ProcessPoolExecutor(max_workers=2)
    p=ThreadPoolExecutor(3)   #默认不写开启的是CPU核数*5

    for i in range(6):
        obj=p.submit(task,i)
        print(obj.result())     #重点:每个线程打印 is running后,obj.result()都回去等着拿return x**2的返回值,所以输出结果会是串行的输出

    print('')

'''结果:因为每次
1 is running
1
2 is running
4
3 is running
9
4 is running
16
5 is running
25
主


'''

 线程池、进程池下异步调用分析:

版本1:

#线程池下分析异步调用(取返回值),分析版本1
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time,os,random
#线程池为例:
def task(x):
    print('%s is running'%x)
    time.sleep(random.randint(2, 5))
    return x**2

if __name__ == '__main__':
    异步调用:submit只负责提交任务,不会等待拿结果不会拿return
    p=ThreadPoolExecutor(4)
    obj_l=[]
    for i in range(10):
        obj=p.submit(task,i)   #每个返回obj对象里面,其实都封装了一个result方法
        # print(obj.result()) #可以通过result拿到每次return的返回值
        obj_l.append(obj)
        # print(obj_l)
    # p.close()
    # p.join() #
    p.shutdown(wait=True)  #关闭进程池入口 等着取一个数目就减1

    #实现异步调用拿结果:如何取结果:
    print(obj_l[0].result())  #第一个任务的结果 即return 0**2
    print(obj_l[1].result())  #第二个:1**2
    print('')

    同步调用:每次都拿到返回结果
    p=ThreadPoolExecutor(4)
    for i in range(10):
        obj=p.submit(task,i)
        print(obj.result())   #每次等待返回值后再执行下一个
    print('')

版本2:

#线程池下分析异步调用(取返回值),分析版本2
def task(x):
    print('%s is running' % x)
    time.sleep(10)
    return x ** 2

def parse(res):
    print('.......')

if __name__ == '__main__':
    # 异步调用:
    p=ThreadPoolExecutor(4)
    obj_l=[]
    for i in range(10):
        obj=p.submit(task,i)
        obj_l.append(obj)

    #实现目的:为了保证进程池里面任务全部运行完,后再统一拿返回值
    p.shutdown(wait=True)  #1.关闭进程池入口(确保统计时进程池不会再有数据进入),等着取一个数目就减1
    # print(obj_l)  #第一个任务的结果 即return 0**2
    for future in obj_l:
        # print(future.result())
        parse(future.result())  #4个任务一次10s + 4个任务(每个1s)=共14s
    print('')

最终版本(引用回调函数)

#线程池、进程池异步调用取返回值最终版(引用回调函数add_done_callback())
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time, os, random
import os

def task(x):
    print('%s is running' %x)
    # time.sleep(1)
    return x ** 2
    # parse(x**2)  #产生结果后立马被处理

def parse(future):
    # time.sleep(1)
    res=future.result()
    print('%s 处理了 %s'%(os.getpid(),res))  #统一是主进程的一个Pid
if __name__ == '__main__':
    poo1=ProcessPoolExecutor(4)  #进程为9s左右
    # poo1=ThreadPoolExecutor(4)   #线程为6s左右,线程效率更高
    start=time.time()
    for i in range(1,5):
        future=poo1.submit(task,i)
        #回调函数就是有结果之后立马执行结果,回调是主进程去处理运行结果,所以parse函数os.getpid()pid都是主进程一个PID
        #提交完之后给对象绑定了回调函数,parse会在future有返回值是立即触发,并且将future当做参数传给parse
        future.add_done_callback(parse) #绑定回调函数
    poo1.shutdown(wait=True) #关闭进程池入口,确保进程池里面不会再有数据进入
    stop=time.time()

    print('',(stop-start))

'''
总结:根据以上执行
    回调函数:
        1.在线程池环境下,处理回调函数是线程池里面的运行的,是线程池里面的线程去执行的
        (还是I/O密集型)
        
        2.在进程池环境下,处理回调函数是主进程去处理的
        
        
'''

线程池与进程池里面没有join()这个方法,

原文地址:https://www.cnblogs.com/yangzhizong/p/9321369.html