进程池
如果有多少个任务,就开启多少个进程,实际上并不划算
由于计算机的cpu个数是非常有限的
因此开启的进程数量完全和cpu个数成比例
# 没有时间延迟时
from multiprocessing import Pool
import os
def func(i):
print(i, os.getpid())
if __name__ == "__main__":
p = Pool(4)
for i in range(10):
p.apply_async(func, args=(i,)) # async异步的提交任务
p.close() # 关闭池子,不是回收池子中的进程,而是阻止继续提交任务
p.join() # 阻塞,直到池子中的任务都执行完毕
# 0 7352
# 1 7352
# 2 7352
# 3 7352
# 4 7352
# 5 7352
# 6 7352
# 7 7352
# 8 7352
# 9 7352
# 如果没有时间延迟,那么结果显示只有一个进程池
# 设置时间延迟时
from multiprocessing import Pool
import os
import time
def func(i):
time.sleep(0.1)
print(i, os.getpid())
if __name__ == "__main__":
p = Pool(4) # 一般设置成cpu的个数+1
for i in range(10):
p.apply_async(func, args=(i,)) # async异步的提交任务
p.close() # 关闭池子,不是回收池子中的进程,而是阻止继续提交任务
p.join() # 阻塞,直到池子中的任务都执行完毕
# 0 9364
# 1 2664
# 2 11456
# 3 7312
# 4 9364
# 5 2664
# 6 11456
# 7 7312
# 8 9364
# 9 2664
# 运行结果发现就4个进程池反复进行
# 起多进程的意义
# 1.为了更好的利用cpu,所以程序中都是网络IO或文件IO就不适合用多进程
# 比如多人聊天,如果1000人同时聊天,服务器不可能开启1000个进程池,不然挂掉
# 2.为了数据的隔离,如果程序中总是要数据共享,那么就不适合使用多进程
# 3.超过了cpu个数的任务数,都应该使用进程池来解决问题,而不能无限开启子进程
# 进程池和多进程的性能测试
import os
import time
from multiprocessing import Process, Pool
def func(i):
print(i, os.getpid())
if __name__ == "__main__":
start = time.time()
p_lst = []
for i in range(5):
p = Process(target=func, args=(i,))
p.start()
p_lst.append(p)
for p in p_lst:
p.join()
end = time.time()
pro_time = end - start
start = time.time()
p = Pool(4)
for i in range(5):
p.apply_async(func, args=(i,)) # async异步的提交任务
p.close() # 关闭池子,不是要回收池子中的进程,而是阻止继续向池子中提交任务
p.join() # 阻塞,直到池子中的任务都执行完毕
end = time.time()
pool_time = end - start
print(pro_time, pool_time)
# 0 7280
# 1 12636
# 2 14180
# 3 8772
# 4 4312
# 0 3120
# 1 3120
# 2 3120
# 3 3120
# 4 3120
# 0.37152719497680664 0.458310604095459
# 进程池的其他机制
import os
import time
from multiprocessing import Pool
def func(i):
time.sleep(0.1)
print(i,os.getpid())
if __name__ == '__main__':
p = Pool(4) # cpu个数 + 1/cpu的个数
p.map(func,range(5))
# 0 3104
# 1 12028
# 2 7620
# 3 692
# 4 3104
# 因此可以把上一个的for循环换成这个map()
# 进程池里面可以使用get()得到func()的返回值
import os
import time
from multiprocessing import Pool
def func(i):
time.sleep(1)
print(i,os.getpid())
return i**i
if __name__ == '__main__':
p = Pool(4) # cpu个数 + 1/cpu的个数
ret_lst = []
for i in range(10):
ret = p.apply_async(func,args=(i,)) # async异步的提交任务
ret_lst.append(ret)
p.close() # 关闭池子,不是要回收池子中的进程,而是阻止继续向池子中提交任务
p.join() # 阻塞,直到池子中的任务都执行完毕
for ret in ret_lst:
print(ret.get())
import os
import time
from multiprocessing import Pool
def func(i):
time.sleep(0.1)
print(i,os.getpid())
return i ** i
if __name__ == '__main__':
p = Pool(4) # cpu个数 + 1/cpu的个数
ret = p.map(func,range(10))
for r in ret:
print(r)
# 进程池的回调函数
import time
import random
from multiprocessing import Process,Pool
def get(i): # 进程池的子进程执行的
time.sleep(random.random())
print('从网页获取一个网页的内容', i)
return i,'网页的内容'*i
def call_back(content): # 主进程执行的
print(content)
if __name__ == '__main__':
# p = Process(target=get)
# p.start()
p = Pool(5)
ret_l = []
# for i in range(10):
# ret = p.apply_async(get,args=(i,))
# ret_l.append(ret)
# for ret in ret_l:
# content = ret.get()
# print(len(content))
for i in range(10):
p.apply_async(get,args=(i,),callback=call_back)
p.close()
p.join()
# 将n个任务交给n个进程去执行
# 每一个进程在执行完毕之后会有一个返回值,这个返回值会直接交给callback参数指定的那个函数去进行处理
# 这样的话 所有的进程 哪一个执行的最快,哪一个就可以先进性统计工作
# 能在最短的时间内得到结果