进程池,线程池,协程,gevent模块,协程实现单线程服务端与多线程客户端通信,IO模型

一、进程池与线程池知识点:
线程不可能无限制的开下去,总要消耗和占用资源

进程池线程池概念:硬件有极限,为了减轻硬件压力,所以有了池的概念

- concurrent.futures模块导入
- 线程池创建(默认线程数=cpu核数*5左右)
- submit提交任务(提交任务的两种方式)
- 异步提交的submit返回值对象
- shutdown关闭池并等待所有任务运行结束
- 对象获取任务返回值  
- 进程池的使用,验证进程池在创建的时候里面固定有指定的进程数
- 异步提交回调函数的使用  add.done.callback()

  

二、进程池与线程池的使用

特点:
    开线程与开进程都会消耗资源,开线程消耗的资源比开进程要少
开启了这些池:是为了减缓计算机硬件的压力,避免了计算机硬件崩溃
                虽然减轻了计算机硬件的压力,但是在一定程度上降低了程序的效率
开启了这些池,会限制开启的线程数和进程数,从而保证计算机硬件的安全

  

线程池:

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time

pool = ThreadPoolExecutor(5) #创建了一个池子,池子里有20个线程,默认值为cpu核数*5个
def task(n):
    print(n,'----------')
    time.sleep(2)
    return n**2

# 提交任务的方式:
#     同步:提交完任务以后,原地等待任务的返回结果,再继续执行下一步代码
#     异步:提交完任务以后,不等待任务的返回结果,直接执行下一步操作

t_list= []
for i in range(20):
    future = pool.submit(task,i)  #异步提交任务
    # print(future.result())   #此处的future是一个对象, .result()可以拿到返回值  加上这步,变成了同步提交
    t_list.append(future)

for p in t_list:
    print('返回值:',p.result())

# 此处的输出结果逻辑,前面5个线程从task里出来后,将其的返回值打印出来,
# 然后这五个线程从池子里出来,后面的线程在去竞争池子里的5个份额,如此往复。
print('主')


#shutdown的用法

from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time

pool = ThreadPoolExecutor(5)

def task(n):
    print('hello',n)
    time.sleep(2)
    return n**2

t_list = []
for i in range(20):
    future = pool.submit(task,i)
    t_list.append(future)

#加上了shutdown,会导致下面的返回值在终端打印时是最后打印,不会与上述例子一样看起来无序
# pool.shutdown()  #关闭池子的进入通道,并且等待池子里所有的任务运行完毕

for p in t_list:
    print('>>>:',p.result())

print('主')

  

进程池

'''
与线程池存在一个大的区别:线程池里的参数,代表最多可以同时进去这个参数的值,
其它的线程需要在池完等着里面结束:进程池也是类似,但是,进程池里的参数是'事先存在的',
无论有多少的进程被创建,使用的都是这里面这些参数的进程,也就是始终是这些池子里的参数帮忙处理任务。
这样做是为了节省资源。
'''

from concurrent.futures import ProcessPoolExecutor
import time,os
pool = ProcessPoolExecutor(5) #进程池里存在5个进程
def task(n):
    print('%s say hello!'%n,'该进程的pid是%s'%os.getpid())
    time.sleep(2)
    return n**2

#回调函数:异步提交之后一旦任务有返回结果,自动交给另外一个去执行
def call_back(m):     #此处的参数m是函数task的返回值
    print('我拿到了结果%s'%m.result())

if __name__ == '__main__':   #需要加上这一步,线程创建可有可无
    l_list = []
    for i in range(20):
        future = pool.submit(task,i).add_done_callback(call_back)  #异步提交
        #此处task函数的返回结果交给call_back去执行
    print('主')

  

三、协程
- 进程:资源单位
- 线程:执行单位
- 协程:单线程下实现并发(能够在多个任务之间切换和保存状态来节省IO),这里注意区分操作系统的切换+保存状态是针对多个线程而言,而我们现在是想在单个线程下自己手动实现操作系统的切换+保存状态的功能

注意协程这个概念完全是程序员自己想出来的东西,它对于操作系统来说根本不存在。操作系统只知道进程和线程。并且需要注意的是并不是单个线程下实现切换+保存状态就能提升效率,因为你可能是没有遇到io也切,那反而会降低效率

再回过头来想之前的socket服务端实现并发的例子,单个线程服务端在建立连接的时候无法去干通信的活,在干通信的时候也无法去干连接的活。这两者肯定都会有IO,如果能够实现通信io了我就去执行建连接任务,建连接io了我就执行通信任务,那其实我们就可以实现单线程下实现并发。

将单个线程的效率提升到最高,多进程下开多线程,多线程下用协程>>> 实现高并发!!!

  

#串行执行

import time
def func1():
    for i in range(10000000):
        i + 1
def func2():
    for i in range(10000000):
        i+1
start = time.time()
func1()
func2()
stop = time.time()
print(stop - start)  #2.845162868499756


#基于yield来实现并发执行

import time
def fun1():
    while True:
        1000000 + 1
        yield

def fun2():
    g = fun1()
    for i in range(1000000):
        i + 1
        next(g)
start = time.time()
fun2()
stop = time.time()
print(stop - start)  #0.33101892471313477

  

gevent模块:
    
#yield却不能自动捕获io行为,可以利用gevent模块来实现
#gevent本身识别不了time,sleep等不属于该模块的io操作
from gevent import monkey;monkey.patch_all() #监测所有的IO行为
from gevent import spawn,joinall  #jional列表里放置多个对象,实现join效果
import time

def play(name):
    print('%s play 1'%name)
    time.sleep(5)
    print('%s play 2'%name)

def eat(name):
    print('%s eat 1'%name)
    time.sleep(3)
    print('%s eat 2'%name)

start = time.time()
'''
g1=gevent.spawn(func,1,,2,3,x=4,y=5)创建一个协程对象g1,spawn括号内第一个参数是函数名,
如eat,后面可以有多个参数,可以是位置实参或关键字实参,都是传给函数eat的
'''

t1 = spawn(play,'michael')
t2 = spawn(eat,'michael')
joinall([t1,t2])
print('主',time.time() - start)

'''
michael play 1
michael eat 1
michael eat 2
michael play 2
主 5.01228666305542
'''

 

四、协程实现服务端与客户端通信(服务端单线程下实现并发)

链接和通信都是io密集型操作,我们只需要在这两者之间来回切换就可以实现并发的效果

服务端监测链接和通信任务,客户端起多线程的同时连接服务端

#服务端:
from gevent import monkey;monkey.patch_all()
from gevent import spawn
import socket

def communicate(conn):
    while True:
        try:
            data = conn.recv(1024)
            if len(data) == 0:break
            print(data.decode('utf-8'))
            conn.send(data.upper())
        except ConnectionResetError:
            break
    conn.close()


def server():
    server = socket.socket()
    server.bind(('127.0.0.1',8030))
    server.listen(5)
    while True:
        conn, addr = server.accept()
        spawn(communicate,conn)

if __name__ == '__main__':
    s1 = spawn(server)
    s1.join()


客户端:

import socket
from threading import Thread,current_thread
def client():
    client = socket.socket()
    client.connect(('127.0.0.1',8030))
    n = 1
    while True:
        data = '%s  %s'%(current_thread().name,   n)
        n += 1
        client.send(data.encode('utf-8'))
        info = client.recv(1024)
        print(info)

if __name__ == '__main__':
    for i in range(500):
        t = Thread(target=client)
        t.start()

效果:原本服务端需要开启500个线程才能跟500个客户端通信,现在只需要一个线程就可以抗住500个客户端
    进程下面开多个线程,线程下面再开多个协程,最大化提升软件的运行效率

  

IO模型:待续

 

原文地址:https://www.cnblogs.com/changwenjun-666/p/10840296.html