进程池

进程池:

1.进程池初识,2.效率比较,3.同步和异步,4.进程池的返回值和回调函数,5.socket并发的服务端


为什么要有进程池?进程池的概念。

在程序实际处理问题过程中,忙时会有成千上万的任务需要被执行,闲时可能只有零星任务。那么在成千上万个任务需要被执行的时候,我们就需要去创建成千上万个进程么?首先,创建进程需要消耗时间,销毁进程也需要消耗时间。第二即便开启了成千上万的进程,操作系统也不能让他们同时执行,这样反而会影响程序的效率。因此我们不能无限制的根据任务开启或者结束进程。那么我们要怎么做呢?

在这里,要给大家介绍一个进程池的概念,定义一个池子,在里面放上固定数量的进程,有需求来了,就拿一个池中的进程来处理任务,等到处理完毕,进程并不关闭,而是将进程再放回进程池中继续等待任务。如果有很多任务需要执行,池中的进程数量不够,任务就要等待之前的进程执行任务完毕归来,拿到空闲进程才能继续执行。也就是说,池中进程的数量是固定的,那么同一时间最多有固定数量的进程在运行。这样不会增加操作系统的调度难度,还节省了开闭进程的时间,也一定程度上能够实现并发效果。

multiprocess.Pool模块:

Pool([numprocess  [,initializer [, initargs]]]):创建进程池
1 numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
2 initializer:是每个工作进程启动时要执行的可调用对象,默认为None
3 initargs:是要传给initializer的参数组
参数介绍
p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
'''需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()'''

p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。
'''此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。'''
   
p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成

P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

主要方法
主要方法
方法apply_async()和map_async()的返回值是AsyncResul的实例obj。实例具有以下方法
obj.get():返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
obj.ready():如果调用完成,返回True
obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
obj.wait([timeout]):等待结果变为可用。
obj.terminate():立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
其他方法(了解)

进程池初识:

#进程池
#开辟一个空间,固定放着固定数目的进程,比如5个。现在有50个任务,每次任务一执行,就去池子里找这5个进程,一次执行五个任务,
#执行完的任务,把进程放回进程池,供其他任务继续使用。从而大大减少了进程的开启和销毁的内存占用,提高了效率

#更高级的进程池  #···python里没有
#假设 进程池最少的时候有n个进程,最多是m个
#当我们访问网站,用户量多的时候,进程池的进程,就会根据算法,慢慢增加,增加到m个
#当访问量,或者任务下降,进程池就会 减减减  减到最少 n个


#信号量:是同一段代码,只能同一时间由几个进程执行,但是这些进程数都是要悉数开启和销毁的,如果有200个进程,这些进程都会占用内存
#进程池:进程都是固定的,不用每次都开启和销毁

进程池和普通多进程的效率比较:

from multiprocessing import Pool,Process
import time

def func(n):
    for i in range(10):
        print(n+1)

if __name__ == '__main__':
    pool = Pool(5) #一般进程数是 cpu核数+1
    start = time.time()
    pool.map(func,range(100)) #开启了一百个任务,map是自带join,close的
    #map的功能最多就只能这样了,如果想传更多参数,可以iterable里可以传tuple,list..
    t1 = time.time() - start

    start2 = time.time()
    p_lst = []
    for i in range(100):
        p = Process(target=func,args=(i,))
        p.start()
        p_lst.append(p)
    [p.join() for p in p_lst]
    t2 = time.time() - start2
    print(t1,t2) # 0.19 VS  2.61 进程池完胜
进程池和多进程-效率比较

同步: apply   异步: apply_async(func, args=(args1,args2,...),callback=func2):

import os
import time
from multiprocessing import Pool,Process

def func(n):
    print('%s 开始了~~~'%n,os.getpid())
    time.sleep(1)
    print('%s 结束了'%n,os.getpid())

if __name__ == '__main__':
    pool = Pool(5)
    for i in range(10):
        # pool.apply(func,args=(i,))  #apply 进程池的同步调用(自然不需要close,join),要等待进程中的任务完结,才会执行下一个任务.基本上不会用
        pool.apply_async(func,args=(i,))#apply_async异步,  一但有任务结束了,它所使用的进程(pid)马上会被其他任务使用
                                        #需要close和join,不然会导致主代码的结束,从而结束进程中的任务
    pool.close() #结束进程池接收任务
    pool.join()  #感知所有任务的结束,然后在执行主程序接下来的代码
同步和异步-进程池

#对于Proces开启的进程,是不能接收子进程的返回值的,想获取,只能通过 IPC 进程间通信:队列(子进程put,主进程get),管道
#但是进程池 是可以直接接收子进程的返回值的
#apply的结果就是func的返回值

#通过get方法,获取 apply_async的返回值

#对于Proces开启的进程,是不能接收子进程的返回值的,想获取,只能通过 IPC  进程间通信:队列(子进程put,主进程get),管道
#但是进程池 是可以直接接收子进程的返回值的
#apply的结果就是func的返回值

from multiprocessing import Pool
import time

def func(n):
    time.sleep(0.2)
    return n*n

if __name__ == '__main__':
    pool = Pool(5)
    ret_lst=[]
    for i in range(10):
        # ret = pool.apply(func, args=(i,))
        ret = pool.apply_async(func,args=(i,))
        #print(ret.get()) #get会阻塞着,直到拿到ret的结果。每个ret马上跟着一个get,所以会按顺序一个个print
                        #一旦异步提交了一个任务,返回一个对象,这个对象获得一个get方法,获取这个任务的返回值,所以get可以替代close和join
        ret_lst.append(ret)
    for ret in ret_lst:print(ret.get())
    # pool.close()
    # pool.join()

    # ret = pool.map(func,range(10))
    # print(ret)  #[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
    #因为map自带close和join,它会把所有的值都计算好了,再打印出来。
    #所以虽然map简单,但是apply_async可以实时提交,还是要用后者
进程池的返回值

回调函数不能传参数,唯一的参数就是注册函数的返回值

回调函数
import os
from multiprocessing import Pool
def func1(n):
    print('in func1',os.getpid())
    return n*n

def func2(nn):
    print('in func2',os.getpid())
    print(nn)

if __name__ == '__main__':
    print('主进程 :',os.getpid())
    p = Pool(5)
    for i in range(10):
        p.apply_async(func1,args=(10,),callback=func2) #回调函数不能传参数,唯一的参数就是注册函数的返回值
    p.close()                                          #回调函数是在主进程里执行的
    p.join()
进程池的回调函数
import requests
from multiprocessing import Pool

def get(url):  #让并发进程去获取url,把网络延时交给并发,把数据处理返回给主进程
    res = requests.get(url)
    if res.status_code == 200:
        return url,res.content.decode('utf8')

def call_back(args):
    url,content = args
    print(url,len(content))

if __name__ == '__main__':
    url_lst = [
        'https://www.cnblogs.com/',
        'http://www.baidu.com',
        'https://www.sogou.com/',
        'http://www.sohu.com/',
    ]
    p = Pool(5)
    for url in url_lst:
        p.apply_async(get,args=(url,),callback=call_back)
    p.close()
    p.join()
小爬虫-回调函数
url = r'https://movie.douban.com/subject/26281899/?from=subject-page'
import requests
res = requests.get(url)
print(res) #返回为一个 <Response [200]>
print(res.status_code) #状态码:200
print(res.content)      #二进制的网页源码 (无格式版的)
print(res.text)         #网页源码

# from urllib.request import urlopen
# ret = urlopen(url)
# print(ret.read().decode('utf8')) #获得一个跟原网页源码一样格式的内容
requests模块
    # apply
        # 同步的:只有当func执行完之后,才会继续向下执行其他代码
        # ret = apply(func,args=())
        # 返回值就是func的return
    # apply_async
        # 异步的:当func被注册进入一个进程之后,程序就继续向下执行
        # apply_async(func,args=())
        # 返回值 : apply_async返回的对象obj
        #          为了用户能从中获取func的返回值obj.get()
        # get会阻塞直到对应的func执行完毕拿到结果
        # 使用apply_async给进程池分配任务,
        # 需要先close后join来保持多进程和主进程代码的同步性
复习

使用进程池来实现socket服务端的并发:

import socket
from multiprocessing import Pool

def server(conn):
    conn.send(b'hi')
    ret = conn.recv(1024).decode('utf8')
    print(ret)
    conn.send(b'hello')
    conn.close()

if __name__ == '__main__':
    pool = Pool(5)
    sk = socket.socket()
    sk.bind(('127.0.0.1',8080))
    sk.listen()
    while True:
        conn,addr = sk.accept()
        pool.apply_async(server,args=(conn,))
    sk.close()
    # pool.close()
    # pool.join()
server端
import socket
sk = socket.socket()
sk.connect(('127.0.0.1',8080))

print(sk.recv(1024))
msg = input('>>>> ')
sk.send(msg.encode('utf8'))
msg = sk.recv(1024).decode('utf8')
print(msg)
# sk.close()
client端
原文地址:https://www.cnblogs.com/gkx0731/p/9744461.html