39.IO多路复用

一、IO多路复用

I/O多路复用指:通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。

但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。

Linux中的 select,poll,epoll 都是IO多路复用的机制

1、select

select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。

select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。

2、poll

poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。

  poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

  另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。

3、epoll

直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

  epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。

  epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。

  另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

Python中有一个select模块,其中提供了:select、poll、epoll三个方法,分别调用系统的 select,poll,epoll 从而实现IO多路复用。

Windows Python:
    提供: select
Mac Python:
    提供: select
Linux Python:
    提供: select、poll、epoll

注意:网络操作、文件操作、终端操作等均属于IO操作,对于windows只支持Socket操作,其他系统支持其他IO操作,但是无法检测 普通文件操作 自动上次读取是否已经变化

4、sellect、poll、epoll三者的区别

select

Python的select()方法直接调用操作系统的IO接口,它监控sockets,open files, and pipes(所有带fileno()方法的文件句柄)何时变成readable 和writeable, 或者通信错误,select()使得同时监控多个连接变的简单,并且这比写一个长循环来等待和监控多客户端连接要高效,因为select直接通过操作系统提供的C的网络接口进行操作,而不是通过Python的解释器。

select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述符就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以 通过遍历fdset,来找到就绪的描述符。

poll

int poll (struct pollfd *fds, unsigned int nfds, int timeout);

不同与select使用三个位图来表示三个fdset的方式,poll使用一个 pollfd的指针实现。

struct pollfd {
    int fd; /* file descriptor */
    short events; /* requested events to watch */
    short revents; /* returned events witnessed */
};

pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式。同时,pollfd并没有最大数量限制(但是数量过大后性能也是会下降)。 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。

从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。

epoll

poll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

一 epoll操作过程

epoll操作过程需要三个接口,分别如下:

int epoll_create(int size);//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

1. int epoll_create(int size);


创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议
当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。


2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函数是对指定描述符fd执行op操作。
– epfd:是epoll_create()的返回值。
– op:表示op操作,用三个宏来表示:添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分别添加、删除和修改对fd的监听事件。
– fd:是需要监听的fd(文件描述符)
– epoll_event:是告诉内核需要监听什么事


3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待epfd上的io事件,最多返回maxevents个事件。
参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。

二、select IO多路复用

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select的一 个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样会造成效率的降低

1、select语法:

select(rlist, wlist, xlist, timeout=None)

select()方法接收并监控3个通信列表, 第一个rlist监控所有要进来的输入数据,第二个wlist是监控所有要发出去的输出数据,第三个监控异常错误数据,第四个设置指定等待时间,如果想立即返回,设为null即可,最后需要创建2个列表来包含输入和输出信息来传给select(),让select方法通过内核去监控,然后生成三个实例。

 
#建立两个列表,比如想让内核去检测50个连接,需要传给它一个列表,就是这个inputs(列表中里面存放的是需要被内核监控的链接),然后交给select,就相当于交给内核了
inputs = [server,] #输入列表,监控所有输入数据

outputs = []  #输入列表,监控所有输出数据

#把两个列表传给select方法通过内核去监控,生成三个实例
readable,writeable,exceptional = select.select(inputs,outputs,inputs)  # 这里select方法的第三个参数同样传入input列表是因为,input列表中存放着所有的链接,比如之前放入的50被监控链接中有5个断了,出现了异常,就会输入到exceptional里面,但这5链接本身是放在inputs列表中
 

 

对于select方法:

句柄列表11, 句柄列表22, 句柄列表33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 超时时间)
 
参数: 可接受四个参数(前三个必须)
返回值:三个列表
 
select方法用来监视文件句柄,如果句柄发生变化,则获取该句柄。
1、当 参数1 序列中的句柄发生可读时(accetp和read),则获取发生变化的句柄并添加到 返回值1 序列中
2、当 参数2 序列中含有句柄时,则将该序列中所有的句柄添加到 返回值2 序列中
3、当 参数3 序列中的句柄发生错误时,则将该发生错误的句柄添加到 返回值3 序列中
4、当 超时时间 未设置,则select会一直阻塞,直到监听的句柄发生变化
   当 超时时间 = 1时,那么如果监听的句柄均无任何变化,则select会阻塞 1 秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。

 

2、select服务端代码实例:

 
 
 
 
import select,socket,queue
server = socket.socket()
server.bind(("localhost",9000))
server.listen(1000)
server.setblocking(False) #设置为非阻塞
msg_dic = dict() #定义一个队列字典
inputs = [server,]  #由于设置成非阻塞模式,accept和recive都不阻塞了,没有值就会报错,因此最开始需要最开始需要监控服务端本身,等待客户端连接
outputs = [] 
while True:
    #exceptional表示如果inputs列表中出现异常,会输出到这个exceptional中
    readable,writeable,exceptional = select.select(inputs,outputs,inputs)#如果没有任何客户端连接,就会阻塞在这里 
    for r in readable:# 没有个r代表一个socket链接
        if r is server:  #如果这个socket是server的话,就说明是是新客户端连接了
            conn,addr = r.accept() #新连接进来了,接受这个连接,生成这个客户端实例
            print("来了一个新连接",addr)
            inputs.append(conn)#为了不阻塞整个程序,我们不会立刻在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新连接
            #就会被交给select去监听
            msg_dic[conn] = queue.Queue() #初始化一个队列,后面存要返回给这个客户端的数据
        else: #如果不是server,就说明是之前建立的客户端来数据了
            data = r.recv(1024)
            print("收到数据:",data)
            msg_dic[r].put(data)#收到的数据先放到queue里,一会返回给客户端
            outputs.append(r)#为了不影响处理与其它客户端的连接 , 这里不立刻返回数据给客户端
            # r.send(data)
            # print("send done....")
    for w in writeable:  #要返回给客户端的链接列表
        data_to_client = msg_dic[w].get()
        w.send(data_to_client)   #返回给客户端的源数据
        outputs.remove(w)  #确保下次循环的时候writeable,不返回这个已经处理完的这个连接了
 
    for e in exceptional:  #处理异常的连接
        if e in outputs:   #因为e不一定在outputs,所以先要判断
            outputs.remove(e)
        inputs.remove(e)   #删除inputs中异常连接
        del msg_dic[e]   #删除此连接对应的队列
 

三、epoll IO多路复用

epoll的方式,这种效率更高,但是这种方式在Windows下不支持,在Linux是支持的,selectors模块就是默认使用就是epoll,但是如果在windows系统上使用selectors模块,就会找不到epoll,从而使用select。

python2没有这个模块

1、selectors语法:

 
#定义一个对象
sel = selectors.DefaultSelector()

#注册一个事件
sel.register(server,selectors.EVENT_READ,accept) #注册事件,只要来一个连接就调accept这个函数,就相当于之前select的用法,sel.register(server,selectors.EVENT_READ,accept) ==  inputs=[server,],readable,writeable,exceptional = select.select(inputs,outputs,inputs)意思是一样的。
 

 2、selectors代码实例:

 
import selectors,socket
 
sel = selectors.DefaultSelector()
 
def accept(sock,mask):
    "接收客户端信息实例"
    conn,addr = sock.accept()
    print("accepted",conn,'from',addr)
    conn.setblocking(False)
    sel.register(conn,selectors.EVENT_READ,read)  #新连接注册read回调函数
 
def read(conn,mask):
    "接收客户端的数据"
    data = conn.recv(1024)
    if data:
        print("echoing",repr(data),'to',conn)
        conn.send(data)
    else:
        print("closing",conn)
        sel.unregister(conn)
        conn.close()
 
server = socket.socket()
server.bind(('localhost',9999))
server.listen(500)
server.setblocking(False)
sel.register(server,selectors.EVENT_READ,accept)  #注册事件,只要来一个连接就调accept这个函数,
#sel.register(server,selectors.EVENT_READ,accept) == inputs=[server,]
 
while True:
    events = sel.select()  #这个select,看起来是select,有可能调用的是epoll,看你操作系统是Windows的还是Linux的
                           #默认阻塞,有活动连接就返回活动连接列表
    print("事件:",events)
    for key,mask in events:
        callback = key.data #相当于调accept了
        callback(key.fileobj,mask)  #key.fileobj=文件句柄
 

打印服务端:

 
 [(SelectorKey(fileobj=<socket.socket fd=436, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222)>, fd=436, events=1, data=<function accept at 0x0000022296063E18>), 1)]
accepted <socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)> from ('127.0.0.1', 50281)
事件: [(SelectorKey(fileobj=<socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>, fd=508, events=1, data=<function read at 0x00000222980501E0>), 1)]
echoing b'adas' to <socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>
事件: [(SelectorKey(fileobj=<socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>, fd=508, events=1, data=<function read at 0x00000222980501E0>), 1)]
echoing b'HA' to <socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>
事件: [(SelectorKey(fileobj=<socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>, fd=508, events=1, data=<function read at 0x00000222980501E0>), 1)]
echoing b'asdHA' to <socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>
 

 这样就容易明白:callback = key.data #第一次调用的是accept,第二次调用的是read    callback(key.fileobj,mask)  #key.fileobj=文件句柄

客户端代码:

在Linux端,selectors模块才能是epoll

 
import socket,sys
 
messages = [ b'This is the message. ',
             b'It will be sent ',
             b'in parts.',
             ]
server_address = ('localhost', 9999)
 
# 创建100个 TCP/IP socket实例
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(100)]
 
# 连接服务端
print('connecting to %s port %s' % server_address)
for s in socks:
    s.connect(server_address)
 
for message in messages:
 
    # 发送消息至服务端
    for s in socks:
        print('%s: sending "%s"' % (s.getsockname(), message) )
        s.send(message)
 
    # 从服务端接收消息
    for s in socks:
        data = s.recv(1024)
        print( '%s: received "%s"' % (s.getsockname(), data) )
        if not data:
            print(sys.stderr, 'closing socket', s.getsockname() )
 

利用select监听终端操作实例:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import select
import threading
import sys

while True:
    readable, writeable, error = select.select([sys.stdin,],[],[],1)
    if sys.stdin in readable:
        print 'select get stdin',sys.stdin.readline()

利用select实现伪同时处理多个Socket客户端请求:服务端

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket
import select

sk1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sk1.bind(('127.0.0.1',8002))
sk1.listen(5)
sk1.setblocking(0)

inputs = [sk1,]

while True:
    readable_list, writeable_list, error_list = select.select(inputs, [], inputs, 1)
    for r in readable_list:
        # 当客户端第一次连接服务端时
        if sk1 == r:
            print 'accept'
            request, address = r.accept()
            request.setblocking(0)
            inputs.append(request)
        # 当客户端连接上服务端之后,再次发送数据时
        else:
            received = r.recv(1024)
            # 当正常接收客户端发送的数据时
            if received:
                print 'received data:', received
            # 当客户端关闭程序时
            else:
                inputs.remove(r)

sk1.close()

利用select实现伪同时处理多个Socket客户端请求:客户端

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket

ip_port = ('127.0.0.1',8002)
sk = socket.socket()
sk.connect(ip_port)

while True:
    inp = raw_input('please input:')
    sk.sendall(inp)
sk.close()

此处的Socket服务端相比与原生的Socket,他支持当某一个请求不再发送数据时,服务器端不会等待而是可以去处理其他请求的数据。但是,如果每个请求的耗时比较长时,select版本的服务器端也无法完成同时操作。

基于select实现socket服务端

#!/usr/bin/env python
#coding:utf8

'''
 服务器的实现 采用select的方式
'''

import select
import socket
import sys
import Queue

#创建套接字并设置该套接字为非阻塞模式

server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking(0)

#绑定套接字
server_address = ('localhost',10000)
print >>sys.stderr,'starting up on %s port %s'% server_address
server.bind(server_address)

#将该socket变成服务模式
#backlog等于5,表示内核已经接到了连接请求,但服务器还没有调用accept进行处理的连接个数最大为5
#这个值不能无限大,因为要在内核中维护连接队列

server.listen(5)

#初始化读取数据的监听列表,最开始时希望从server这个套接字上读取数据
inputs = [server]

#初始化写入数据的监听列表,最开始并没有客户端连接进来,所以列表为空

outputs = []

#要发往客户端的数据
message_queues = {}
while inputs:
    print >>sys.stderr,'waiting for the next event'
    #调用select监听所有监听列表中的套接字,并将准备好的套接字加入到对应的列表中
    readable,writable,exceptional = select.select(inputs,outputs,inputs)#列表中的socket 套接字  如果是文件呢? 
    #监控文件句柄有某一处发生了变化 可写 可读  异常属于Linux中的网络编程 
    #属于同步I/O操作,属于I/O复用模型的一种
    #rlist--等待到准备好读
    #wlist--等待到准备好写
    #xlist--等待到一种异常
    #处理可读取的套接字

    '''
        如果server这个套接字可读,则说明有新链接到来
        此时在server套接字上调用accept,生成一个与客户端通讯的套接字
        并将与客户端通讯的套接字加入inputs列表,下一次可以通过select检查连接是否可读
        然后在发往客户端的缓冲中加入一项,键名为:与客户端通讯的套接字,键值为空队列
        select系统调用是用来让我们的程序监视多个文件句柄(file descrīptor)的状态变化的。程序会停在select这里等待,
        直到被监视的文件句柄有某一个或多个发生了状态改变
        '''

    '''
        若可读的套接字不是server套接字,有两种情况:一种是有数据到来,另一种是链接断开
        如果有数据到来,先接收数据,然后将收到的数据填入往客户端的缓存区中的对应位置,最后
        将于客户端通讯的套接字加入到写数据的监听列表:
        如果套接字可读.但没有接收到数据,则说明客户端已经断开。这时需要关闭与客户端连接的套接字
        进行资源清理
        '''
        
    for s in readable: 
        if s is server:
            connection,client_address = s.accept()
            print >>sys.stderr,'connection from',client_address
            connection.setblocking(0)#设置非阻塞
            inputs.append(connection)
            message_queues[connection] = Queue.Queue()
        else:
            data = s.recv(1024)
            if data:
                print >>sys.stderr,'received "%s" from %s'% 
                (data,s.getpeername())
                message_queues[s].put(data)
                if s not in outputs:
                    outputs.append(s)
            else:
                print >>sys.stderr,'closing',client_address
                if s in outputs:
                    outputs.remove(s)
                inputs.remove(s)
                s.close()
                del message_queues[s]
                    
    #处理可写的套接字
    '''
        在发送缓冲区中取出响应的数据,发往客户端。
        如果没有数据需要写,则将套接字从发送队列中移除,select中不再监视
        '''

    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()

        except Queue.Empty:
            print >>sys.stderr,'  ',s,getpeername(),'queue empty'
            outputs.remove(s)
        else:
            print >>sys.stderr,'sending "%s" to %s'% 
            (next_msg,s.getpeername())
            s.send(next_msg)



    #处理异常情况

    for s in exceptional:
        for s in exceptional:
            print >>sys.stderr,'exception condition on',s.getpeername()
            inputs.remove(s)
            if s in outputs:
                outputs.remove(s)
            s.close()
            del message_queues[s]

参考:https://www.cnblogs.com/Keep-Ambition/p/7598740.html

 https://www.sooele.com/1410.html

原文地址:https://www.cnblogs.com/zhongguiyao/p/11049431.html