进程池vs线程池
为什么要用“池”:
池子使用来限制并发的任务数目,限制我们的计算机在一个自己可承受的范围内去并发地执行任务
池子内什么时候装进程:并发的任务属于计算密集型
池子内什么时候装线程:并发的任务属于IO密集型
(
concurrent:并发的,一致的,同时发生的 Executor执行者
)
'''
#1、阻塞与非阻塞指的是程序的两种运行状态
阻塞:遇到IO就发生阻塞,程序一旦遇到阻塞操作就会停在原地,并且立刻释放CPU资源
非阻塞(就绪态或运行态):没有遇到IO操作,或者通过某种手段让程序即便是遇到IO操作也不会停在原地,执行其他操作,力求尽可能多的占有CPU
#2、同步与异步指的是提交任务的两种方式:
同步调用:提交完任务后,就在原地等待,直到任务运行完毕后,拿到任务的返回值,才继续执行下一行代码
异步调用:提交完任务后,不在原地等待,直接执行下一行代码,等到任务有返回值后自动触发后调函数
'''
#进程池 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor import time,os,random def task(x): print('%s 接客' %os.getpid()) time.sleep(random.randint(2,5)) return x**2 if __name__ == '__main__': # p=ProcessPoolExecutor(3) # 不指定默认开启的进程数是cpu的核数 # print(os.cpu_count())查看CPU的核数 p=ProcessPoolExecutor(max_workers=2) # 默认开启的进程数是cpu的核数,也就是干活的人数 #简写成for循环模拟6个任务执行 for i in range(6): p.submit(task,i) #submit只是负责往池子里面丢任务,刚开始2个人同时接客 #1.submit往里面丢任务干活的人就是你指定的ProcessPoolExecutor(2)后面的人数 #2.刚一造个进程池,立马给你预备2个人干活,无论你丢多少任务,干活的永远是那2个人(谁先完了腾出手再接下一个)
#线程池 def task(x): print('%s is running'%x) time.sleep(random.randint(2, 5)) # return x**2 if __name__ == '__main__': # p=ProcessPoolExecutor(max_workers=2) p=ThreadPoolExecutor(20) #默认不写开启的是CPU核数*5 for i in range(25): p.submit(task,i)
result功能(可以通过每次线程或者进程对象.result(),拿到返回值)
#线程池(result功能) def task(x): print('%s is running'%x) time.sleep(random.randint(2, 5)) return x**2 if __name__ == '__main__': # p=ProcessPoolExecutor(max_workers=2) p=ThreadPoolExecutor(3) #默认不写开启的是CPU核数*5 for i in range(6): obj=p.submit(task,i) print(obj.result()) #重点:每个线程打印 is running后,obj.result()都回去等着拿return x**2的返回值,所以输出结果会是串行的输出 print('主') '''结果:因为每次 1 is running 1 2 is running 4 3 is running 9 4 is running 16 5 is running 25 主 '''
线程池、进程池下异步调用分析:
版本1:
#线程池下分析异步调用(取返回值),分析版本1 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time,os,random #线程池为例: def task(x): print('%s is running'%x) time.sleep(random.randint(2, 5)) return x**2 if __name__ == '__main__': 异步调用:submit只负责提交任务,不会等待拿结果不会拿return p=ThreadPoolExecutor(4) obj_l=[] for i in range(10): obj=p.submit(task,i) #每个返回obj对象里面,其实都封装了一个result方法 # print(obj.result()) #可以通过result拿到每次return的返回值 obj_l.append(obj) # print(obj_l) # p.close() # p.join() # p.shutdown(wait=True) #关闭进程池入口 等着取一个数目就减1 #实现异步调用拿结果:如何取结果: print(obj_l[0].result()) #第一个任务的结果 即return 0**2 print(obj_l[1].result()) #第二个:1**2 print('主') 同步调用:每次都拿到返回结果 p=ThreadPoolExecutor(4) for i in range(10): obj=p.submit(task,i) print(obj.result()) #每次等待返回值后再执行下一个 print('主')
版本2:
#线程池下分析异步调用(取返回值),分析版本2 def task(x): print('%s is running' % x) time.sleep(10) return x ** 2 def parse(res): print('.......') if __name__ == '__main__': # 异步调用: p=ThreadPoolExecutor(4) obj_l=[] for i in range(10): obj=p.submit(task,i) obj_l.append(obj) #实现目的:为了保证进程池里面任务全部运行完,后再统一拿返回值 p.shutdown(wait=True) #1.关闭进程池入口(确保统计时进程池不会再有数据进入),等着取一个数目就减1 # print(obj_l) #第一个任务的结果 即return 0**2 for future in obj_l: # print(future.result()) parse(future.result()) #4个任务一次10s + 4个任务(每个1s)=共14s print('主')
最终版本(引用回调函数)
#线程池、进程池异步调用取返回值最终版(引用回调函数add_done_callback()) from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor import time, os, random import os def task(x): print('%s is running' %x) # time.sleep(1) return x ** 2 # parse(x**2) #产生结果后立马被处理 def parse(future): # time.sleep(1) res=future.result() print('%s 处理了 %s'%(os.getpid(),res)) #统一是主进程的一个Pid if __name__ == '__main__': poo1=ProcessPoolExecutor(4) #进程为9s左右 # poo1=ThreadPoolExecutor(4) #线程为6s左右,线程效率更高 start=time.time() for i in range(1,5): future=poo1.submit(task,i) #回调函数就是有结果之后立马执行结果,回调是主进程去处理运行结果,所以parse函数os.getpid()pid都是主进程一个PID #提交完之后给对象绑定了回调函数,parse会在future有返回值是立即触发,并且将future当做参数传给parse future.add_done_callback(parse) #绑定回调函数 poo1.shutdown(wait=True) #关闭进程池入口,确保进程池里面不会再有数据进入 stop=time.time() print('主',(stop-start)) ''' 总结:根据以上执行 回调函数: 1.在线程池环境下,处理回调函数是线程池里面的运行的,是线程池里面的线程去执行的 (还是I/O密集型) 2.在进程池环境下,处理回调函数是主进程去处理的 '''
线程池与进程池里面没有join()这个方法,