No.013-Python-学习之路-Day10-事件驱动及异步IO

事件驱动

通常写服务器处理模型的程序时,有以下几种模型:

1.每收到一个请求,创建一个新的进程,如socketserver.ForkingTCPServer->开销大

2.每收到一个请求,创建一个新的线程,如socketserver.ForkingTCPServer->线程同步问题

3.每收到一个请求,放入一个事件列表,让主进程通过非阻塞I/O方式来处理请求->实现复杂,最优

事件驱动模型简介

事件驱动编程是一种常见的网络编程范式<还有单线程及多线程编程2种>,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。

事件驱动模型大体思路如下:

1.首先存在一个事件列表;

2.某事件触发时,向事件列表中添加相应的事件;

3.使用循环不断的从事件列表中取出事件,并根据事件中的存储的处理函数指针调用不同的处理函数。

IO多路复用

同步IO和异步IO,阻塞IO和非阻塞IO分别是什么,到底有什么区别?不同的人在不同的上下文下给出的答案是不同的。所以先限定一下本文的上下文;

本文讨论的背景是Linux环境下的network IO。

需要明确的概念:

1.用户空间和内核空间

现在操作系统都是采用虚拟存储器。那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间

针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间

2.进程切换

为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行,这种行为被称为进程切换。

3.进程的阻塞

正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的

4.文件描述符

文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。

文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符,而进程可将文件描述符

交给OS查询记录表,从而获取相应的文件文件句柄。

在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

5.缓存I/O

缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。

socket的粘包现象就来源于此,目的减少内核态到用户态的转变;

缓存 I/O 的缺点:
数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。

IO模式

刚才说了,对于一次IO访问(以read举例),数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。所以说,当一个read操作发生时,它会经历两个阶段:
1. 等待数据准备 (Waiting for the data to be ready)
2. 将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)

正因为这两个阶段,linux系统产生了下面五种网络模式的方案
- 阻塞 I/O(blocking IO)
- 非阻塞 I/O(nonblocking IO)
- I/O 多路复用( IO multiplexing)
- 信号驱动 I/O( signal driven IO)
- 异步 I/O(asynchronous IO)

注:由于signal driven IO在实际中并不常用,所以我这只提及剩下的四种IO Model。

阻塞 I/O(blocking IO)

blocking IO的特点就是在IO执行的两个阶段都被block了。

非阻塞 I/O(nonblocking IO)

linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:

所以,nonblocking IO的特点是用户进程需要不断的主动询问kernel数据好了没有。

I/O 多路复用( IO multiplexing)

IO multiplexing就是我们说的select,poll,epoll,有些地方也称这种IO方式为event driven IO<这三种模式都是通过事件驱动实现的>。select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select,poll,epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。

所以,I/O 多路复用的特点是通过一种机制一个进程能同时等待多个文件描述符,而这些文件描述符(套接字描述符)其中的任意一个进入读就绪状态,select()函数就可以返回。

在IO multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

异步 I/O(asynchronous IO)-Python中的asyncio

linux下的asynchronous IO其实用得很少。先看一下它的流程:

用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

总结
blocking和non-blocking的区别

调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还准备数据的情况下会立刻返回。

synchronous IO和asynchronous IO的区别

在说明synchronous IO和asynchronous IO的区别之前,需要先给出两者的定义。POSIX的定义是这样子的:
- A synchronous I/O operation causes the requesting process to be blocked until that I/O operation completes;
- An asynchronous I/O operation does not cause the requesting process to be blocked;

两者的区别就在于synchronous IO做”IO operation”的时候会将process阻塞。按照这个定义,之前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO。

各个IO Model的比较如图所示:

I/O 多路复用之select、poll、epoll详解

select,poll,epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。

select-win支持

select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。

Select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点;

缺点:

linux打开文件数量等同于用户打开最多的文件数量,默认1024(可更改)

select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长,扫描开销增大。

poll-<过度版本>

poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。

poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

epoll-<最流行的>-win不支持

直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

1.同时监控多个io操作 2.有活跃的通知程序并返回活跃文件描述符。

2.同时支持水平触发<触发多次,直到被取用>及边缘触发<只触发一次>。

epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。

另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

使用select实现的socketServer:

import select
import socket
import queue

server = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
server.bind(('localhost', 9000))
server.listen(5)

# 不阻塞的模式,才能实行IO多路复用-才能用select
# 没有链接时accept会报BlockingIOError
# So 如果没有链接就不要accept
server.setblocking(False)
#server.accept()


inputs = []
outputs = []
# 使用selcet检测,最少要有一个链接,如上列表均是空的,会报错:
# TypeError: select() takes no keyword arguments
# 这里检测server自己,是否有数据可读,或报错
inputs.append(server)
msg_dic = {}
while True:
    # rlist 内放入要检测的100个链接,如果有可读数据,就返回
    # xlist 内放入要检测的100个链接,如果哪个出现报错,就返回
    # 三个返回值,可读的,可写的,报错的
    readable, writeable, exceptions = select.select(inputs, outputs, inputs)
    # 输出如下[<socket.socket fd=496, ...] [] [] fd即文件描述符
    # select原理上说的是只要有活动的,就返回所有的fd,然后让用户程序自己去里面找
    # 但是这里没有,这里返回的只有活动的那个fd,原因是python select内部做了封装的。
    print(readable, writeable, exceptions)
    for r in readable:
        if r is server:
            conn, addr = server.accept()
            print("New connect [%s:%s] connected" % addr)
            inputs.append(conn) # 将新建的IO,加入select的监测列表中
            msg_dic[conn] = queue.Queue() # 为新conn初始化一个对话,存发送给conn的数据
        else:
            data = b""
            try:
                data = r.recv(1024)
            except Exception as e:
                print(repr(e))
            if data:
                # 因为是非阻塞模式,无论对面收不收都会发,如果对端未收,数据会丢弃;
                #r.send(read)
                msg_dic[r].put(data) # 使用队列来保存要数据
                if r not in outputs:
                    outputs.append(r) # select监测是否有writeable,即对端是否收
            else:
                print("客户端断开了", r)
                if r in outputs:
                    outputs.remove(r)
                inputs.remove(r)
                if r in msg_dic:
                    del msg_dic[r]

    for w in writeable: # 如果对端有收的,就取出数据向外发送
        w.send(msg_dic[w].get()) # 从队列中取数据,发送
        outputs.remove(w) # 因为这次数据已经发了,不再监测这个链接

    for e in exceptions:
        print("异常->{}".format(e))
        if e in outputs:
            outputs.remove(e)
        inputs.remove(e)
        if e in msg_dic:
            del msg_dic[e]

Python中对epoll及select的封装模块selectors

selector模块默认会使用epoll方式来实现IO的多路复用,如果设备不支持epoll<比如win下>,则会自动切换到select方式.

模块的整体结构如下:

image

官方举例:

import selectors
import socket

sel = selectors.DefaultSelector()

def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()

sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)

while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)

multiconn-server

import selectors
import socket
import types

sel = selectors.DefaultSelector()
host = "localhost"
port = 9977

lsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
lsock.bind((host, port))
lsock.listen()
print('listening on', (host, port))
lsock.setblocking(False)
sel.register(lsock, selectors.EVENT_READ, data=None)

def accept_wrapper(sock):
    conn, addr = sock.accept()  # Should be ready to read
    print('accepted connection from', addr)
    conn.setblocking(False)
    data = types.SimpleNamespace(addr=addr, inb=b'', outb=b'')
    events = selectors.EVENT_READ | selectors.EVENT_WRITE
    sel.register(conn, events, data=data)

def service_connection(key, mask):
    sock = key.fileobj
    data = key.data
    if mask & selectors.EVENT_READ:
        recv_data = sock.recv(1024)  # Should be ready to read
        if recv_data:
            data.outb += recv_data
        else:
            print('closing connection to', data.addr)
            sel.unregister(sock)
            sock.close()
    if mask & selectors.EVENT_WRITE:
        if data.outb:
            print('echoing', repr(data.outb), 'to', data.addr)
            sent = sock.send(data.outb)  # Should be ready to write
            data.outb = data.outb[sent:]


while True:
    events = sel.select(timeout=None)
    for key, mask in events:
        if key.data is None:
            accept_wrapper(key.fileobj)
        else:
            service_connection(key, mask)

multiconn-client

import selectors
import types
import socket

sel = selectors.DefaultSelector()
messages = [b'Message 1 from client.', b'Message 2 from client.']


def start_connections(host, port, num_conns):
    server_addr = (host, port)
    for i in range(0, num_conns):
        connid = i + 1
        print('starting connection', connid, 'to', server_addr)
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setblocking(False)
        sock.connect_ex(server_addr)
        events = selectors.EVENT_READ | selectors.EVENT_WRITE
        data = types.SimpleNamespace(connid=connid,
                                     msg_total=sum(len(m) for m in messages),
                                     recv_total=0,
                                     messages=list(messages),
                                     outb=b'')
        sel.register(sock, events, data=data)
        
def service_connection(key, mask):
    sock = key.fileobj
    data = key.data
    if mask & selectors.EVENT_READ:
        recv_data = sock.recv(1024)  # Should be ready to read
        if recv_data:
            print('received', repr(recv_data), 'from connection', data.connid)
            data.recv_total += len(recv_data)
        if not recv_data or data.recv_total == data.msg_total:
            print('closing connection', data.connid)
            sel.unregister(sock)
            sock.close()
    if mask & selectors.EVENT_WRITE:
        if not data.outb and data.messages:
            data.outb = data.messages.pop(0)
        if data.outb:
            print('sending', repr(data.outb), 'to connection', data.connid)
            sent = sock.send(data.outb)  # Should be ready to write
            data.outb = data.outb[sent:]
协程与selectors的关系

1.协程Gevent在linux的底层也是使用libevent.so模块实现的;

2.epoll在linux的底层使用libevent.so模块实现的;

3.epoll更多关注的I/O层面,Gevent更多关注于事件之间的切换,Gevent默认就是I/O多路复用。


end

参考:

教程->https://www.cnblogs.com/alex3714/articles/5248247.html

IO多路复用->https://www.cnblogs.com/alex3714/articles/5876749.html

Select poll epoll->https://www.cnblogs.com/alex3714/p/4372426.html

epoll实现socketServer->https://www.cnblogs.com/alex3714/articles/5876749.html

Selectors官方文档->https://docs.python.org/3/library/selectors.html#selectors.DefaultSelector

socket编程->https://segmentfault.com/a/1190000016501735

原文地址:https://www.cnblogs.com/FcBlogPythonLinux/p/12561708.html