进程池、进程池和多进程的性能测试、进程池的其他机制、进程池的回调函数

进程池

如果有多少个任务,就开启多少个进程,实际上并不划算
由于计算机的cpu个数是非常有限的
因此开启的进程数量完全和cpu个数成比例

# 没有时间延迟时

from multiprocessing import Pool
import os

def func(i):
    print(i, os.getpid())

if __name__ == "__main__":
    p = Pool(4)
    for i in range(10):
        p.apply_async(func, args=(i,))  # async异步的提交任务
    p.close()  # 关闭池子,不是回收池子中的进程,而是阻止继续提交任务
    p.join()  # 阻塞,直到池子中的任务都执行完毕

# 0 7352
# 1 7352
# 2 7352
# 3 7352
# 4 7352
# 5 7352
# 6 7352
# 7 7352
# 8 7352
# 9 7352

# 如果没有时间延迟,那么结果显示只有一个进程池


# 设置时间延迟时

from multiprocessing import Pool
import os
import time

def func(i):
    time.sleep(0.1)
    print(i, os.getpid())

if __name__ == "__main__":
    p = Pool(4)  # 一般设置成cpu的个数+1
    for i in range(10):
        p.apply_async(func, args=(i,))  # async异步的提交任务
    p.close()  # 关闭池子,不是回收池子中的进程,而是阻止继续提交任务
    p.join()  # 阻塞,直到池子中的任务都执行完毕

# 0 9364
# 1 2664
# 2 11456
# 3 7312
# 4 9364
# 5 2664
# 6 11456
# 7 7312
# 8 9364
# 9 2664

# 运行结果发现就4个进程池反复进行
# 起多进程的意义
#   1.为了更好的利用cpu,所以程序中都是网络IO或文件IO就不适合用多进程
#       比如多人聊天,如果1000人同时聊天,服务器不可能开启1000个进程池,不然挂掉
#   2.为了数据的隔离,如果程序中总是要数据共享,那么就不适合使用多进程
#   3.超过了cpu个数的任务数,都应该使用进程池来解决问题,而不能无限开启子进程

# 进程池和多进程的性能测试

import os
import time
from multiprocessing import Process, Pool

def func(i):
    print(i, os.getpid())

if __name__ == "__main__":
    start = time.time()
    p_lst = []
    for i in range(5):
        p = Process(target=func, args=(i,))
        p.start()
        p_lst.append(p)
    for p in p_lst:
        p.join()
    end = time.time()
    pro_time = end - start
    start = time.time()
    p = Pool(4)
    for i in range(5):
        p.apply_async(func, args=(i,))  # async异步的提交任务
    p.close()  # 关闭池子,不是要回收池子中的进程,而是阻止继续向池子中提交任务
    p.join()  # 阻塞,直到池子中的任务都执行完毕
    end = time.time()
    pool_time = end - start
    print(pro_time, pool_time)

# 0 7280
# 1 12636
# 2 14180
# 3 8772
# 4 4312
# 0 3120
# 1 3120
# 2 3120
# 3 3120
# 4 3120
# 0.37152719497680664 0.458310604095459
# 进程池的其他机制

import os
import time
from multiprocessing import Pool

def func(i):
    time.sleep(0.1)
    print(i,os.getpid())

if __name__ == '__main__':
    p = Pool(4)   # cpu个数 + 1/cpu的个数
    p.map(func,range(5))

# 0 3104
# 1 12028
# 2 7620
# 3 692
# 4 3104
# 因此可以把上一个的for循环换成这个map()



# 进程池里面可以使用get()得到func()的返回值 import os import time from multiprocessing import Pool def func(i): time.sleep(1) print(i,os.getpid()) return i**i if __name__ == '__main__': p = Pool(4) # cpu个数 + 1/cpu的个数 ret_lst = [] for i in range(10): ret = p.apply_async(func,args=(i,)) # async异步的提交任务 ret_lst.append(ret) p.close() # 关闭池子,不是要回收池子中的进程,而是阻止继续向池子中提交任务 p.join() # 阻塞,直到池子中的任务都执行完毕 for ret in ret_lst: print(ret.get()) import os import time from multiprocessing import Pool def func(i): time.sleep(0.1) print(i,os.getpid()) return i ** i if __name__ == '__main__': p = Pool(4) # cpu个数 + 1/cpu的个数 ret = p.map(func,range(10)) for r in ret: print(r)
# 进程池的回调函数

import time
import random
from multiprocessing import Process,Pool
def get(i):    # 进程池的子进程执行的
    time.sleep(random.random())
    print('从网页获取一个网页的内容', i)
    return i,'网页的内容'*i

def call_back(content):   # 主进程执行的
    print(content)

if __name__ == '__main__':
    # p = Process(target=get)
    # p.start()
    p = Pool(5)
    ret_l = []
    # for i in range(10):
    #     ret = p.apply_async(get,args=(i,))
    #     ret_l.append(ret)
    # for ret in ret_l:
    #     content = ret.get()
    #     print(len(content))
    for i in range(10):
        p.apply_async(get,args=(i,),callback=call_back)
    p.close()
    p.join()


# 将n个任务交给n个进程去执行
# 每一个进程在执行完毕之后会有一个返回值,这个返回值会直接交给callback参数指定的那个函数去进行处理
# 这样的话 所有的进程 哪一个执行的最快,哪一个就可以先进性统计工作
# 能在最短的时间内得到结果
原文地址:https://www.cnblogs.com/shawnhuang/p/10323728.html