Python协程(gevent+asyncio)模块

前言

如何在使用1个线程的前提下,提网站的并发性,使用协程?

如果要使用协程首先要解决2个问题:

1.如何检测到代码中遇到了IO操作?(XX)

2.如何在线程代码里上下切换?(Greelet模块)

而Gvent模块封装好了以上2种功能,可以让我们在python中优雅的使用协程;

一、Gvent是什么?

:版本1cmdb_rbac_arya>pip show gevent
Name: gevent
Version: 1.4.0
Summary: Coroutine-based network library      #基于协程的网站库
Home-page: http://www.gevent.org/
Author: Denis Bilenko
Author-email: denis.bilenko@gmail.com          #以后使用过程出现任何问题 给 Denis Bilenko发邮箱
License: MIT
Location: d:python3.6.1libsite-packages
Requires: greenlet, cffi                       #需要依赖的库

二、Linux网络IO模型

网络IO模型是指:Linux服务端如何接收、处理客户端请求的形式;(智慧、创新的程序们设计那么多的IO模型,为1个目的 让我们的网页访问更快。。。)

由于同步(synchronous) IO和异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分别是什么,到底有什么区别?

不同的上下文环境里,所表达的意义有所不同,开篇前先框定今天的topic,我们今天主要讲得是Linux网络IO模型;

Linux系统服务器1次网络IO发生时涉及的对象和阶段?

对于一个Linux系统服务器的network IO (这里我们以read举例,也就是python的socket模块执行socket.receive()),它会涉及到2个对象,

对象1.调用这个IO的process (or thread)【我们在服务器端写得web程序】

对象2.系统内核(kernel)

当我们在Linux服务端执行socket.receive()方法接收客户端发送过来的消息时,该操作其实会经历2个过程:

阶段1.等待客户端的数据传输到服务端的内核空间 (Waiting for the data to be ready):等待客户端执行sen(b'hello')把数据传输到服务端的操作系统(内核空间);

阶段2.将数据从内核空间拷贝到用户空间(Copying the data from the kernel to the process): 我们socket程序工作在用户空间,So阶段2是等待数据从内核空间复制到用户空间;

经历了以上2个过程之后才 服务端的socket.receive()才得到了用户send(b'hello')消息;

ps:

严格应该是4个步骤

1.socket.reveive 等待用户分组发完数据

2.数据由用户空间到内核空间

3.等待内核准备数据

4.内核准备玩据,数据由内核空间回到用户空间

因为步骤2和步骤4的耗时很快,是可以忽略不计的。

记住这两点真的真的很重要,IO模型的演变,都是围绕着 尽可能规避这2个过程,为中心来展开的!

 o

阻塞/非阻塞 和 同步/异步 的区别?

阻塞、非阻塞是描述得是 程序在发起系统调用之后等待IO模型返回消息时的状态

比如socket在执行recive()方法的时候,当前线程由活跃---->阻塞状态,直到消息返回(执行以上2个过程之后), 程序由阻塞--->就绪---->活跃状态,如果socket.setblocking(False),socket在执行recive()方法向操作系统发起系统调用之后, 将不会进入阻塞状态,所以非阻塞IO可以大大提升进程对CPU的利用效率 ;

同步、异步描述得是程序从IO模型 获得消息的机制

程序发起系统调用之后,最终消息是通过1种什么样的机制 回调、通知到程序的;

阻塞、非阻塞 和 同步、异步和 的组合?

下面说说的:阻塞(blocking) IO模型,非阻塞(non-blocking )IO模型,IO 多路复用(multiplexing)都属于同步(synchronous) IO这一类;

而 异步(asynchronous) I/O属于异步IO模型。

二、各种IO模型介绍

 1.阻塞IO(blocking IO)模型

 我们写平时写的socket默认都是阻塞IO;

A.客户端连接服务端

B.服务端发起系统调用

C.等待客户端send()消息到操作系统(内核空间)(此时socket进程进入阻塞状态,不能接收新的连接请求,你什么也不可以做!)

D.等待内核把数据从内核空间copy到用户空间 (此时socket进程进入阻塞状态,还是不能接受新的连接请求,你什么也不可以做!)

E.直到数据由内核空间---->socket进程,socket开始响应客户端

F.开始接收新的客户端连接请求(循环A-F 的步骤)

解决之道:

每当1个客户端连接到服务端,服务端开启1个线程,来处理客户端请求,制造并发;

利:

大道至简,这样程序处理逻辑最简单;

弊:

即便1个线程维护1个客户端连接,但是每1个线程依然规避不了C、D这2个过程中的程序阻塞;

2.非阻塞IO(non-blocking IO)模型

我们写平时写的socket中调用socket.setblocking(False)之后,此时socket就是非阻塞式的了;

A.客户端连接服务端

B.服务端socket发起系统调用

C.内核说:滚回去,不阻塞你了,你的数据还没来呢!(此时 您的进程可以 print(‘fuck’)、可以接收其它用户的连接.....You can do everything);

D.服务端socket再次发起系统调用

E.内核说:还给我滚回去,不阻塞你了,你的数据还没来呢!(此时 您的进程可以 print(‘fuck’)、可以接收其它用户的连接.....You can do everything);

F.服务端socket不断发起系统调用进行轮询内核,一直到数据从内核空间准备到了用户空间,进程才拿到了数据;

解决之道:

socket.setblocking(False)

利:

规避了进程等待消息从client端------>服务端socket的过程;

进程根本不会再进入阻塞状态,所以拥有了可以处理多个客户端请求的能力;

弊:

不断得发起系统调用轮询内核

虽然进程不需要进入阻塞状态,也规避了等待client端发送消息过来了的过程;

copy data from kenel to user的过程,依然强硬的存在着

代码实现

import socket
sk=socket.socket()
sk.bind(('127.0.0.1',8080))
sk.setblocking(False)#把socket中所有需要阻塞的方法都改变成非阻塞:recv() recvfrom() accept()
sk.listen()
conn_list=[]#用来存储所有来请求server端的conn连接
del_conn_list=[]#用来存储所有已经和server端断开连接的conn连接

while True:  # 在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。
    try:
        conn,ipaddr=sk.accept()#去问问内核有人连接我不? 设置socket不阻塞之后,这里会报错,因为这不会阻塞住等待数据 由内核-->用户空间
        print('建立连接了')
        conn_list.append(conn)
    except BlockingIOError:
        for con in conn_list:
            try:
                msg=con.recv(1024)#去问问内核连接过我的人中,有人给我发消息不?
                if msg == b'':#如果client端和server端断开了连接,server端就会收到b''空消息
                    del_conn_list.append(con)
                    continue
                print(msg)
                con.send(b'Byebye')
            except BlockingIOError: pass
        else:
            for con in del_conn_list:#从正在连接的socket列表删除已经断开连接的conn
                conn_list.remove(con)
                con.close()
            del_conn_list.clear()#把删除列表的scnn清空





#
server.py
import socket
import time
import threading
def func():
    sk=socket.socket()
    sk.connect(('127.0.0.1',8080))
    sk.send(b'hello')
    time.sleep(1)
    print(sk.recv(1024))
    sk.close()

for i in range(20):
    threading.Thread(target=func).start()
client.py

3.IO多路复用/事件驱动IO(event driven IO)模型

 虽然上面的异步IO模型在使用了单线程并切在没有进行协程切换的前提下实现了并发,但是也并不完美,因为while ture会非常耗费内存,不断得向Linux内核

.accept() .recv()轮询,也会造成CPU负载过大;

IO多路复用出现了!

IO多路复用模型工作机制

IO多路复用中的IO依然是阻塞的,但是操作系统给我们提供了1个代理程序(Linux系统是epllo、Windows是select),这个代理程序去循环监听1个socket列表;

该列表中监控2类socket:

conn,ipaddr=sk.accept()是否来了客户端连接?

监听msg=conn.recv(1024)客户端连接是否发来了新数据?

这就意味着 此时我们Python程序的1只手被解放了!!为什么是1只手被解放了?而不是双手?

即便IO多路复用网络模型规避了等待client端发送消息过来的过程,但依旧还是没有规避等待消息从内核空间copy到用户空间的过程;

1.等待client端发送消息过来

2.等待消息从内核空间copy到用户空间

Python程序不用去等待client端发送消息过来,而是由这个操作系统提供代理程序(select/epoll)去等待;

一旦有新的conn 或者conn有新的消息发送过来,代理程序立即回调、通知Python程序;

此时Python程序执行 socket.accept() conn.receive() ,恰到好处得接收到了新的client请求、conn连接发送过来的数据;

IO多路复用模型流程图

 

解决之道:

找个代理监控socket、conn连接,如果有新的客户端连接、或者新消息,回调Python;

规避了等待client端发送消息过来的过程

利:

规避了等待client端发送消息过来的过程消息回调通知机制,不用向非阻塞IO一样,不断得轮询了;

弊:

虽然IO多路复用模型,有效的规避了等待client客户端发送消息到server端的过程,并解决了非阻塞IO模型不断轮询操作系统的问题;

但copy data from kenel to user的过程,依然强硬的存在着;

建议:

因为在操作系统和程序之间加了中间代理回调,所以网站处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。

应用场景:

select的优势在于可以处理多个连接,不适用于单个连接;

 代码实现

import select
import socket

sk=socket.socket()
sk.bind(('127.0.0.1',8080))
sk.setblocking(False)
sk.listen()

read_list=[sk]

while True:#循环监听 sk 和连接sk的client
    r_list, w_list, x_list = select.select(read_list, [], [])
    for i in r_list:
        if i is sk:
            conn,addr=i.accept()
            read_list.append(conn)#select不仅可以支持对sk对象的监控,还支持对conn对象的监听
        else:
            msg=i.recv(1024)
            if msg == b'':#client端和server端断开连接了
                print('断开')
                read_list.remove(i)
                i.close()
                continue
            print(msg)
            i.send(b'byebye!!')
server.py
import socket
import time
import threading
import time
def func():
    sk=socket.socket()
    sk.connect(('127.0.0.1',8080))
    sk.send(b'hello')
    time.sleep(1)
    print(sk.recv(1024))
    sk.close()

for i in range(10):
    threading.Thread(target=func).start()
client.py

不同操作系统中IO多路复用模型介绍

select机制 :IO多路复用模型得以实现得核心:就是操作系统 监控1个[sk......conn,]列表,不断轮询每1个sk/conn/是否可以accpet/revive,随着监控列表的增加,效率会递减;

支持操作系统:linux/windows

poll机制:和select机制一样,但是支持监控的socket会比select多

支持操作系统:linux

epoll机制:和select机制完全不一样

1.epoll很高级,epoll不会去再通过操作循环检查监控的socket列表中,那些socket出现了读操作,而是给需要监听的socket 1--1绑定1个回调函数;

2.检测的socket中 有1个soket出现了读操作,直接执行调用那个和该sk/con绑定的回调函数执行sk.accpet() 和conn.receve()

epoll的优势:

回调函数取代了循环轮询的反馈方式,所以性能大大提升;

比如epoll监控了1个长度为3000的socket列表,1993_socket和2019_socket同时出现了读操作,那么这2个soket的回调函数同时执行,去响应客户端;

省去了0---1993---->2019循环的过程;

支持操作系统:linux

如果在Linux操作系统我肯定会选择epoll而不是select啊;所以如何让自己的代码兼容不同平台,自动选择最佳的IO多路复用代理?

Python中的selectors模块就是帮我们自动选择最佳IO多路复用代理的;

#服务端
from socket import *
import selectors

sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
    conn,addr=server_fileobj.accept()
    sel.register(conn,selectors.EVENT_READ,read)

def read(conn,mask):
    try:
        data=conn.recv(1024)
        if not data:
            print('closing',conn)
            sel.unregister(conn)
            conn.close()
            return
        conn.send(data.upper()+b'_SB')
    except Exception:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()



server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) #设置socket的接口为非阻塞
sel.register(server_fileobj,selectors.EVENT_READ,accept) #相当于网select的读列表里append了一个文件句柄server_fileobj,并且绑定了一个回调函数accept

while True:
    events=sel.select() #检测所有的fileobj,是否有完成wait data的
    for sel_obj,mask in events:
        callback=sel_obj.data #callback=accpet
        callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)

#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))

while True:
    msg=input('>>: ')
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))

基于selectors模块实现聊天
selectors模块

4.异步IO(Asynchronous I/O)模型

异步框架最完美,程序发起系统调用之后,无需阻塞等待 client端发送数据过来,也不需要等待数据从内核到用户空间;

而是我发起系统调用之后,没有任何的等待,有消息过来,我马上得知;

可以从上面看出来:

异步IO框架完全没有了wait_data和copy_data的过程,所以它可以响应更多的请求;

虽然异步框架代码实现起来比较困难,但是我们可以直接使用别人写好的框架例如:Tornado/Twisted....需要说明的是Django是阻塞型IO Web框架


IO模型总结:

只有对各种IO模型有1定了解之后,你才能知道,在什么样的场景中需要使用什么样的框架,使用什么手段提升你网站的并发; 

Asyncio模块

gevent模块虽然可以实现协程但是需要在使用前给所有io操作相关的模块加上 monkey path会影响模块那些模块原本的功能,在一次面试中有人建议我使用 asyncio模块,于是浅尝一下。

import asyncio

#执行1个协程
async def func(name):#定义1个协程
    print(name,'start')
    #await 关键字出现在可能发生IO阻塞的前面
    #await 必须卸载asyncio的函数里
    await asyncio.sleep(1)
    print('end')

loop=asyncio.get_event_loop()#创建1个事件循环
loop.run_until_complete(func('张根'))


#执行多个协程
async def func1(name):#async语法用于定义1个协程函数
    print(name,'start')
    #await 关键字出现在可能发生IO阻塞的前面
    #await 必须卸载asyncio的函数里
    await asyncio.sleep(1)
    print('end')

loop=asyncio.get_event_loop()#创建1个事件循环
loop.run_until_complete(asyncio.wait([func('张根'),func1('小米')]))
asyncio模块简单使用

 Asyncio模块实现的原理

如果想要实现协程就必须首先做到 可以在1个线程里 切换执行代码,Python里面什么能做到 代码切换执行呢?

没错   yield, yield的切换 就是实现 async模块最最基本的前提。

yield切换功能

如下面的代码,func函数里面有yield关键字就成了生成器函数,调用生成器函数得了1个生成器对象。既然是生成器对象那我们就可以1点1点得执行生成器里的代码了。

在函数外面next(g)开始调用这个生成器

第1次next(g)

print('2.我切到函数外面了')

print('1.我在函数里面') 代码执行

第二次next(g)

print('3.我切回函数里面来了') 代码执行

print('4.我又切到函数外面了')

这就是代码切换。

def func():
    print('1.我在函数里面')
    yield
    print('3.我切回函数里面来了')

g=func()
next(g)
print('2.我切到函数外面了')
try:
    next(g)
except StopIteration:pass  #捕捉生成器 最后的StopIteration异常
print('4.我又切到函数外面了')

#代码执行的流程
# 1.我在函数里面
# 2.我切到函数外面了
# 3.我切回函数里面来了
# 4.我又切到函数外面了

yield from是什么?

 顾英文名思义就是可以在另1个函数把其他生成器 yiled 出来,把当前函数做成了1个汇聚型的生成器;

def coroutine1(n):
    print('我这coroutine1里面')
    yield 'coroutine1' 


def coroutine2(n):
    print('我这coroutine2里面')
    yield 'coroutine2'


def func(n):
    print('我在func里面')
    yield from coroutine1(n) #yield from 相当于1个中间件,可以在1个函数里面 直接 把其他生成器 yield出来
    yield from coroutine2(n)

g=func(2)
ret1=next(g)
ret2=next(g)

print(ret1,ret2)

 使用 yield from 实现1个简单的async模块

import time
def coroutine(n):
    print('start sleep')
    yield time.time()+n #返回需要切换回此处的时间
    print('end sleep')



def func(n):
    print('1.我在函数里面')
    yield from coroutine(n)

g1=func(8)
g2=func(8)

ret1=next(g1)
ret2=next(g2)

#
time_dict={ret1:g1,ret2:g2}
while time_dict:
    min_time=min(time_dict)         #获取最近的时间
    time.sleep(min_time-time.time()) #执行IO耗时操作
    try:
        next(time_dict[min_time])#切回到 coroutine 继续执行
    except StopIteration:pass
    del time_dict[min_time]     #删除

asnycio模块

阻塞/非阻塞和同步/异步的区别

IO模型

协程yeild原理

 

原文地址:https://www.cnblogs.com/sss4/p/11124917.html