一、进程池线程池介绍
1.开线程和开进程都是要消耗资源的,只不过线程消耗的资源较少一点
2.概念:硬件有极限,为了减轻硬件压力,所以有了池的概念
3.什么是池?
在保证计算机硬件安全的情况下最大限度的利用计算机
池其实是降低了程序的运行效率,但是保证了计算机硬件的安全
(硬件的发展跟不上软件的速度)
4.操作步骤
(1)concurrent.futures模块导入
(2)线程池创建(线程数=cpu核数*5左右)
(3)submit提交任务(提交任务的两种方式)
(4)异步提交的submit返回值对象
(5)shutdown关闭池并等待所有任务运行结束
(6)对象获取任务返回值
(7)进程池的使用,验证进程池在创建的时候里面固定有指定的进程数
(8)异步提交回调函数的使用
5.pool = ThreadPoolExecutor(5) 括号内可以传参数指定线程池内的线程个数
也可以不传,默认为当前所在计算机的cpu个数乘5
pool = ProcessPoolExecutor() 默认是当前计算机cpu的个数
add_done_callback() 绑定一个回调函数
6.提交任务的方式:
同步:提交任务之后,原地等待任务的返回结果,期间不做任何事
异步:提交任务之后,不等待任务的返回结果,直接执行下一行代码
7.怎么拿异步的结果?
异步回调机制:当异步提交的任务有返回结果之后,会自动触发回调函数的执行
8.代码操作
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor import time import os # pool = ThreadPoolExecutor(5) # 括号内可以传参数指定线程池内的线程个数 # # 也可以不传 不传默认是当前所在计算机的cpu个数乘5 pool = ProcessPoolExecutor() # 默认是当前计算机cpu的个数 """ 池子中创建的进程/线程创建一次就不会再创建了 至始至终用的都是最初的那几个 这样的话节省了反复开辟进程/线程的资源 """ def task(n): print(n,os.getpid()) # 查看当前进程号 time.sleep(2) return n**2 def call_back(n): print('拿到了异步提交任务的返回结果:',n.result()) """ 提交任务的方式 同步:提交任务之后 原地等待任务的返回结果 期间不做任何事 异步:提交任务之后 不等待任务的返回结果(异步的结果怎么拿???) 直接执行下一行代码 """ # pool.submit(task,1) # 朝线程池中提交任务 异步提交 # print('主') """ 异步回调机制:当异步提交的任务有返回结果之后,会自动触发回调函数的执行 """ if __name__ == '__main__': t_list = [] for i in range(20): res = pool.submit(task,i).add_done_callback(call_back) # 提交任务的时候 绑定一个回调函数 一旦该任务有结果 立刻执行对于的回调函数 # print(res.result()) # 原地等待任务的返回结果 t_list.append(res) # pool.shutdown() # 关闭池子 等待池子中所有的任务执行完毕之后 才会往下运行代码 # for p in t_list: # print('>>>:',p.result())
二、协程
进程:资源单位
线程:执行单位
协程:单线程下实现并发(能够在多个任务之间切换和保存状态来节省IO),是程序员自己想出来的东西,对于操作系统来说根本不存在
并发的条件:
多道技术
空间上的复用
时间上的复用
切换+保存状态
程序员自己通过代码自己检测程序中的IO,一旦遇到IO自己通过代码切换,给操作系统的感觉是你这个线程没有任何的IO
ps:欺骗操作系统,让它误以为你这个程序一直没有IO,从而保证程序在运行态和就绪态来回切换,提升代码的运行效率
切换+保存状态就一定能够提升效率吗?
当你的任务是IO密集型的情况下,提升效率
如果你的任务是计算密集型的,降低效率
将单个线程的效率提升到最高,多进程下开多线程,多线程下用协程>>>实现高并发
三者都是实现并发的手段
yield能够实现保存上次运行状态,但是无法识别遇到IO才切换
串行执行
import time def func1(): for i in range(10000000): i+1 def func2(): for i in range(10000000): i+1 start = time.time() func1() func2() stop = time.time() print(stop - start)
基于yield并发执行
import time def func1(): while True: 10000000+1 yield def func2(): g=func1() for i in range(10000000): # time.sleep(100) # 模拟IO,yield并不会捕捉到并自动切换 i+1 next(g) start=time.time() func2() stop=time.time() print(stop-start)
gevent模块
一个spawn就是一个帮你管理任务的对象
gevent模块不能识别它本身以外的所有的IO行为,但是它内部封装了一个模块,能够帮助我们识别所有的IO行为
from gevent import monkey;monkey.patch_all() # 检测所有的IO行为 from gevent import spawn,joinall # joinall列表里面放多个对象,实现join效果 import time def play(name): print('%s play 1' %name) time.sleep(5) print('%s play 2' %name) def eat(name): print('%s eat 1' %name) time.sleep(3) print('%s eat 2' %name) start=time.time() g1=spawn(play,'刘清正') g2=spawn(eat,'刘清正') # g1.join() # g2.join() joinall([g1,g2]) print('主',time.time()-start) # 单线程下实现并发,提升效率
协程实现服务端客户端通信
服务端
from gevent import monkey;monkey.patch_all() from socket import * from gevent import spawn def communicate(conn): while True: try: data = conn.recv(1024) if len(data) == 0: break conn.send(data.upper()) except ConnectionResetError: break conn.close() def server(ip, port, backlog=5): server = socket(AF_INET, SOCK_STREAM) server.bind((ip, port)) server.listen(backlog) while True: # 链接循环 conn, client_addr = server.accept() print(client_addr) # 通信 spawn(comunicate,conn) if __name__ == '__main__': g1=spawn(server,'127.0.0.1',8080) g1.join()
客户端
from threading import Thread, current_thread from socket import * def client(): client = socket(AF_INET, SOCK_STREAM) client.connect(('127.0.0.1', 8080)) n = 0 while True: msg = '%s say hello %s' % (current_thread().name, n) n += 1 client.send(msg.encode('utf-8')) data = client.recv(1024) print(data.decode('utf-8')) if __name__ == '__main__': for i in range(500): t = Thread(target=client) t.start() # 原本服务端需要开启500个线程才能跟500个客户端通信,现在只需要一个线程就可以扛住500客户端 # 进程下面开多个线程,线程下面再开多个协程,最大化提升软件运行效率
三、IO模型
阻塞IO
非阻塞IO(服务端通信针对accept用s.setblocking(False)加异常捕获,cpu占用率过高)
在只检测一个套接字的情况下,他的效率连阻塞IO都比不上。因为select这个中间人增加了环节。
但是在检测多个套接字的情况下,就能省去wait for data过程
异步IO