50.IO模型

协程:遇到IO操作就切换,但是什么时候切回去呢?怎么确定IO操作?

       很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。

     这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,如websphere、tomcat和各种数据库等。但是,“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。
对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题

传统的编程是如下线性模式的:

开始--->代码块A--->代码块B--->代码块C--->代码块D--->......--->结束

每一个代码块里是完成各种各样事情的代码,但编程者知道代码块A,B,C,D...的执行顺序,唯一能够改变这个流程的是数据。输入不同的数据,根据条件语句判断,流程或许就改为A--->C--->E...--->结束。每一次程序运行顺序或许都不同,但它的控制流程是由输入数据和你编写的程序决定的。如果你知道这个程序当前的运行状态(包括输入数据和程序本身),那你就知道接下来甚至一直到结束它的运行流程。

 对于事件驱动型程序模型,它的流程大致如下:

开始--->初始化--->等待

 与上面传统编程模式不同,事件驱动程序在启动之后,就在那等待,等待什么呢?等待被事件触发。传统编程下也有“等待”的时候,比如在代码块D中,你定义了一个input(),需要用户输入数据。但这与下面的等待不同,传统编程的“等待”,比如input(),你作为程序编写者是知道或者强制用户输入某个东西的,或许是数字,或许是文件名称,如果用户输入错误,你还需要提醒他,并请他重新输入。事件驱动程序的等待则是完全不知道,也不强制用户输入或者干什么。只要某一事件发生,那程序就会做出相应的“反应”。这些事件包括:输入信息、鼠标、敲击键盘上某个键还有系统内部定时器触发。

所以下面先说一下事件驱动模型

事件驱动模型

一、事件驱动模型介绍

通常,我们写服务器处理模型的程序时,有以下几种模型:

(1)每收到一个请求,创建一个新的进程,来处理该请求; 
(2)每收到一个请求,创建一个新的线程,来处理该请求; 
(3)每收到一个请求,放入一个事件列表,让主进程通过非阻塞I/O方式来处理请求

第三种就是协程、事件驱动的方式,一般普遍认为第(3)种方式是大多数网络服务器采用的方式 

事件驱动之鼠标点击事件注册

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>

</head>
<body>

<p onclick="fun()">点我呀</p>


<script type="text/javascript">
    function fun() {
          alert('约吗?')
    }
</script>
</body>

</html>

在UI编程中,常常要对鼠标点击进行相应,首先如何获得鼠标点击呢? 两种方式:

1创建一个线程循环检测是否有鼠标点击

 那么这个方式有以下几个缺点:

    1.CPU资源浪费,可能鼠标点击的频率非常小,但是扫描线程还是会一直循环检测,这会造成很多的CPU资源浪费;如果扫描鼠标点击的接口是阻塞的呢?
    2.如果是堵塞的,又会出现下面这样的问题,如果我们不但要扫描鼠标点击,还要扫描键盘是否按下,由于扫描鼠标时被堵塞了,那么可能永远不会去扫描键盘;
    3.如果一个循环需要扫描的设备非常多,这又会引来响应时间的问题; 
    所以,该方式是非常不好的。

2 就是事件驱动模型 

目前大部分的UI编程都是事件驱动模型,如很多UI平台都会提供onClick()事件,这个事件就代表鼠标按下事件。事件驱动模型大体思路如下:

1.有一个事件(消息)队列;
2.鼠标按下时,往这个队列中增加一个点击事件(消息);
3.有个循环,不断从队列取出事件,根据不同的事件,调用不同的函数,如onClick()、onKeyDown()等;
4.事件(消息)一般都各自保存各自的处理函数指针,这样,每个消息都有独立的处理函数; 

 如图:

事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。
另外两种常见的编程范式是(单线程)同步以及多线程编程。

模型概念

事件驱动模型一般是由事件收集器、事件发送器和事件处理器三部分组成基本单元组成。

让我们用例子来比较和对比一下单线程、多线程以及事件驱动编程模型。下图展示了随着时间的推移,这三种模式下程序所做的工作。这个程序有3个任务需要完成,每个任务都在等待I/O操作时阻塞自身。阻塞在I/O操作上所花费的时间已经用灰色框标示出来了。

最初的问题:怎么确定IO操作完了切回去呢?通过回调函数

模型总结

1:要理解事件驱动和程序,就需要与非事件驱动的程序进行比较。实际上,现代的程序大多数是事件驱动的,比如多线程的程序,肯定是事件驱动的。早期则存在许多非事件驱动的程序,这样的程序,在需要等待某个条件触发时,会不断的检查这个条件,直到条件满足,这是很浪费CPU时间的,而事件驱动的程序,则有机会释放CPU从而进入睡眠态(注意是有机会,当然程序也可以自行决定不释放CPU),当事件触发时被操作系统唤醒,这样就能更加有效地使用CPU。

2:再说什么是事件驱动的程序,一个典型的事件驱动的程序,就是一个死循环,并以一个线程的形式存在,这个死循环包括两个部分,第一个部分是按照一定的条件接受并选择一个要处理的事件,第二个部分就是事件的处理过程,程序的执行过程就是选择事件和处理事件,而当没有任何事件触发时,程序就会因为查询事件队列失败而进入到睡眠状况,从而释放CPU。

3:事件驱动的程序,必定会直接或者简介拥有一个事件队列,用于存储未能及时处理的事件。

4:事件驱动的程序行为,完全受外部输出的事件控制,所以,事件驱动的系统中,存在大量的这样的程序,并以事件作为主要的通信方式。

5:事件驱动的程序,还有一个最大的好处,就是可以按照一定得顺序处理队列中的事件,而这个顺序则是由事件的触发顺序决定的,这一特性往往被用于保证某些过程的原子化。

6:目前windows,linux,nucleus,vxworks都是事件驱动的,只有一些单片机可能是非事件驱动的。

 7:事件驱动的监听事件是由操作系统调用的cpu来完成的

模型应用

 

一、select库

  

select库是各个版本的linux和windows平台都支持的基本事件驱动模型库,并且在接口的定义上也基本相同,只是部分参数的含义略有差异。

  使用select库的一般步骤:创建所关注事件的描述集合。对于一个描述符,可以关注其上面的读事件、写事件以及异常发生事件,所以要创建三类事件描述符集合,分别用来收集读事件的描述符、写事件的描述符和异常事件的描述符。

  其次,调用底层提供的select()函数,等待事件的发生。select的阻塞与是否设置非阻塞的IO是没有关系的。

  然后,轮询所有事件描述符集合中的每一个事件描述符,检查是否有响应的时间发生,如果有,则进行处理。

nginx服务器在编译过程中如果没有为其指定其他高性能事件驱动模型库,它将自动编译该库。

可以使用--with-select_module和--without-select_module两个参数,强制nginx是否编译该库。

举一个EchoServer的例子,客户端发送任何内容,服务端会原模原样返回。

# !/usr/bin/env python
# coding:utf8

import socket import select from Queue import Queue #AF_INET指定使用IPv4协议,如果要用更先进的IPv6,就指定为AF_INET6。 #SOCK_STREAM指定使用面向流的TCP协议,如果要使用面向数据包的UCP协议,就指定SOCK_DGRAM。 server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(False) #设置监听的ip和port server_address = ('localhost', 1234) server.bind(server_address) #设置backlog为5,client向server发起connect,server accept后建立长连接, #backlog指定排队等待server accept的连接数量,超过这个数量,server将拒绝连接。 server.listen(5) #注册在socket上的读事件 inputs = [server] #注册在socket上的写事件 outputs = [] #注册在socket上的异常事件 exceptions = [] #每个socket有一个发送消息的队列 msg_queues = {} print "server is listening on %s:%s." % server_address while inputs: #第四个参数是timeout,可选,表示n秒内没有任何事件通知,就执行下面代码 readable, writable, exceptional = select.select(inputs, outputs, exceptions) for sock in readable: #client向server发起connect也是读事件,server accept后产生socket加入读队列中 if sock is server: conn, addr = sock.accept() conn.setblocking(False) inputs.append(conn) msg_queues[conn] = Queue() print "server accepts a conn." else: #读取client发过来的数据,最多读取1k byte。 data = sock.recv(1024) #将收到的数据返回给client if data: msg_queues[sock].put(data) if sock not in outputs: #下次select的时候会触发写事件通知,写和读事件不太一样,前者是可写就会触发事件,并不一定要真的去写 outputs.append(sock) else: #client传过来的消息为空,说明已断开连接 print "server closes a conn." if sock in outputs: outputs.remove(sock) inputs.remove(sock) sock.close() del msg_queues[sock] for sock in writable: if not msg_queues[sock].empty(): sock.send(msg_queues[sock].get_nowait()) if msg_queues[sock].empty(): outputs.remove(sock) for sock in exceptional: inputs.remove(sock) if sock in outputs: outputs.remove(sock) sock.close() del msg_queues[sock]

 select有3个缺点:

1. 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大。
2. 每次调用select后,都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大。
这点从python的例子里看不出来,因为python select api更加友好,直接返回就绪的socket列表。事实上linux内核select api返回的是就绪socket数目:
int select (int n, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
3. fd数量有限,默认1024。

二、poll库

  poll库,作为linux平台上的基本事件驱动模型,Windows平台不支持poll库。

  使用poll库的一般过程是:与select的基本工作方式是相同的,都是先创建一个关注事件的描述符集合,再去等待这些事件的发生,然后在轮询描述符集合,检查有没有事件发生,如果有,就进行处理。

  与select的主要区别是select需要为读事件、写事件、异常事件分别创建一个描述符的集合,因此在轮询的时候,需要分别轮询这三个集合。而poll库只需创建一个集合,在每个描述符对应的结构上分别设置读事件,写事件和异常事件,最后轮询的时候可以同时检查这三种事件是否发生。是select库优化的实现。

nginx服务器在编译过程中如果没有为其指定其他高性能事件驱动模型库,它将自动编译该库。

可以使用--with-poll_module和--without-poll_module两个参数,强制nginx是否编译该库。

poll解决了select的第三个缺点,fd数量不受限制,但是失去了select的跨平台特性

# !/usr/bin/env python
# coding:utf8
import select import socket import sys import Queue server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setblocking(False) server_address = ('localhost', 1234) server.bind(server_address) server.listen(5) print 'server is listening on %s port %s' % server_address msg_queues = {} timeout = 1000 * 60 #POLLIN: There is data to read #POLLPRI: There is urgent data to read #POLLOUT: Ready for output #POLLERR: Error condition of some sort #POLLHUP: Hung up #POLLNVAL: Invalid request: descriptor not open READ_ONLY = select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR READ_WRITE = READ_ONLY | select.POLLOUT poller = select.poll() #注册需要监听的事件 poller.register(server, READ_ONLY) #文件描述符和socket映射 fd_to_socket = { server.fileno(): server} while True: events = poller.poll(timeout) for fd, flag in events: sock = fd_to_socket[fd] if flag & (select.POLLIN | select.POLLPRI): if sock is server: conn, client_address = sock.accept() conn.setblocking(False) fd_to_socket[conn.fileno()] = conn poller.register(conn, READ_ONLY) msg_queues[conn] = Queue.Queue() else: data = sock.recv(1024) if data: msg_queues[sock].put(data) poller.modify(sock, READ_WRITE) else: poller.unregister(sock) sock.close() del msg_queues[sock] elif flag & select.POLLHUP: poller.unregister(sock) sock.close() del msg_queues[sock] elif flag & select.POLLOUT: if not msg_queues[sock].empty(): msg = msg_queues[sock].get_nowait() sock.send(msg) else: poller.modify(sock, READ_ONLY) elif flag & select.POLLERR: poller.unregister(sock) sock.close() del msg_queues[sock]

三、epoll库

epoll库是Nginx服务器支持的高性能事件之一,它是公认的非常优秀的时间驱动模型,和poll和select有很大的不同,属于poll库的一个变种,

他们的处理方式都是创建一个待处理事件列表,然后把这个事件列表发送给内核,返回的时候,再去轮询检查这个列表,以判断事件是否发生。

如果这样的描述符在比较多的应用中,效率就显得低下了,epoll是描述符列表的管理交给内核负责,一旦某种事件发生,内核会把发生事件的描述符列表通知给进程,这样就避免了轮询整个描述符列表,epoll库得到事件列表,就开始进行事件处理了。

epoll的用法与poll几乎一样

epoll解决了select的三个缺点,是目前最好的IO多路复用解决方案。

四、其他事件驱动模型

kqueue模型 用于FreeBSD 4.1及以上版本 OpenBSD2.9、NetBSD2.0及Mac os X平台上。都是通过避免轮询操作提供效率。该模型同时支持条件触发(也叫水平触发,只要满足条件就触发一个事件)和边缘触发(每个状态变化时,就触发一个事件)

/dev/poll 主要用在unix衍生平台的高效事件驱动模型,主要在solaris7 11/99及以上版本 HP/UX11.22以上版本等

eventport 模型,用于支持solaris 10及以上版本平台的高效事件驱动模型。

IO多路复用

前面是用协程实现的IO阻塞自动切换,那么协程又是怎么实现的,在原理是是怎么实现的。如何去实现事件驱动的情况下IO的自动阻塞的切换,这个学名叫什么呢? => IO多路复用 
比如socketserver,多个客户端连接,单线程下实现并发效果,就叫多路复用。 
  
同步IO和异步IO,阻塞IO和非阻塞IO分别是什么,到底有什么区别?不同的人在不同的上下文下给出的答案是不同的。所以先限定一下本文的上下文。

 IO模型准备

在进行解释之前,首先要说明几个概念:

    用户空间和内核空间
    进程切换
    进程的阻塞
    文件描述符
    缓存 I/O

用户空间与内核空间

现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。 
操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。 
为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操心系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。 
针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间。 

进程切换

为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换,这种切换是由操作系统来完成的。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。 
从一个进程的运行转到另一个进程上运行,这个过程中经过下面这些变化:

保存处理机上下文,包括程序计数器和其他寄存器。

更新PCB信息。

把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。

选择另一个进程执行,并更新其PCB。

更新内存管理的数据结构。

恢复处理机上下文。 
注:总而言之就是很耗资源的

进程的阻塞

正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的。

文件描述符fd

文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。 
文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

缓存 I/O

缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。用户空间没法直接访问内核空间的,内核态到用户态的数据拷贝 

思考:为什么数据一定要先到内核区,直接到用户内存不是更直接吗?
缓存 I/O 的缺点: 

数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。

 首先,内核不能信任任何用户空间的指针。必须对用户空间的指针指向的数据进行验证。如果只做验证不做拷贝的话,那么在随后的运行中要随时受到其它进/线程可能修改用户空间数据的威胁。所以必须做拷贝。(有人提到在 copy 过程中数据依然可以被修改。是的,但是这种修改不能称为「篡改」。因为这种修改是在「合法性检查」之前发生的,影响的是用户进程的正确性,而不是内核对数据的验证。copy 只保证最后被使用的数据是被验证的数据,至于有没有 race 去破坏被传入的数据本身的正确性不在内核责任之内。要注意,「合法性」不等于「正确性」。)其次,上面说的是 Linux 的具体实现。Linux 中内核空间和用户空间共享线性地址,也就是 cr3 不变。但是从逻辑上来说,内核和用户空间完全可以是不同的两套地址空间。OS X 就是用这种方式让 32-bit kernel 运行 64-bit app 的。参见 地址空间划分(一)。所以从设计角度来说,应该让代码能通用兼顾这种逻辑上的考虑。如果考虑考内核和用户空间不能共享线性地址,那么数据拷贝就是必要的

IO模型介绍

为了更好地了解IO模型,我们需要事先回顾下:同步、异步、阻塞、非阻塞

 同步(synchronous) IO和异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分别是什么,到底有什么区别?这个问题其实不同的人给出的答案都可能不同,比如wiki,就认为asynchronous IO和non-blocking IO是一个东西。这其实是因为不同的人的知识背景不同,并且在讨论这个问题的时候上下文(context)也不相同。所以,为了更好的回答这个问题,我先限定一下本文的上下文。

本文讨论的背景是Linux环境下的network IO。

 本文最重要的参考文献是Richard Stevens的“UNIX® Network Programming Volume 1, Third Edition: The Sockets Networking ”,6.2节“I/O Models ”,Stevens在这节中详细说明了各种IO的特点和区别,如果英文够好的话,推荐直接阅读。Stevens的文风是有名的深入浅出,所以不用担心看不懂。本文中的流程图也是截取自参考文献。

  Stevens在文章中一共比较了五种IO Model:

   * blocking IO           阻塞IO
    * nonblocking IO      非阻塞IO
    * IO multiplexing      IO多路复用
    * signal driven IO     信号驱动IO
    * asynchronous IO    异步IO

由于signal driven IO(信号驱动IO)在实际中并不常用,所以主要介绍其余四种IO Model。     再说一下IO发生时涉及的对象和步骤。对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,该操作会经历两个阶段

#1)等待数据准备 (Waiting for the data to be ready)
#2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)

记住这两点很重要,因为这些IO模型的区别就是在两个阶段上各有不同的情况。

阻塞IO(blocking IO)

在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:

 当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。

    而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。
    所以,blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。

    几乎所有的程序员第一次接触到的网络编程都是从listen()、send()、recv() 等接口开始的,使用这些接口可以很方便的构建服务器/客户机的模型。然而大部分的socket接口都是阻塞型的。如下图

    ps:所谓阻塞型接口是指系统调用(一般是IO接口)不返回调用结果并让当前线程一直阻塞,只有当该系统调用获得结果或者超时出错时才返回。

 实际上,除非特别指定,几乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。这给网络编程带来了一个很大的问题,如在调用recv(1024)的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。

    一个简单的解决方案:

 

#在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。

   该方案的问题是:

#开启多进程或都线程的方式,在遇到要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,
而且线程与进程本身也更容易进入假死状态。

  改进方案: 

#很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。
“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,
如websphere、tomcat和各种数据库等。

  改进后方案其实也存在着问题:

#“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,
“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。

  对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题。

非阻塞IO(non-blocking IO)

Linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子

 如图:

 从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。

从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,

于是用户就可以在本次到下次再发起read询问的时间间隔内做其他事情,或者直接再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的),然后返回。

    也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。

 

 所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。

注意:

在网络IO时候,非阻塞IO也会进行recvform系统调用,检查数据是否准备好,与阻塞IO不一样,”非阻塞将大的整片时间的阻塞分成N多的小的阻塞, 所以进程不断地有机会 ‘被’ CPU光顾”。
即每次recvform系统调用之间,cpu的权限还在进程手中,这段时间是可以做其他事情的,也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,
内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,
循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,
进程仍然是属于阻塞的状态。

非阻塞IO实例

#服务端
from socket import *
import time
s=socket(AF_INET,SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.listen(5)
s.setblocking(False) #设置socket的接口为非阻塞
conn_l=[]
del_l=[]
while True:
    try:
        conn,addr=s.accept()
        conn_l.append(conn)
    except BlockingIOError:
        print(conn_l)
        for conn in conn_l:
            try:
                data=conn.recv(1024)
                if not data:
                    del_l.append(conn)
                    continue
                conn.send(data.upper())
            except BlockingIOError:
                pass
            except ConnectionResetError:
                del_l.append(conn)

        for conn in del_l:
            conn_l.remove(conn)
            conn.close()
        del_l=[]

#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8080))

while True:
    msg=input('>>: ')
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))

但是非阻塞IO模型绝不被推荐。

    我们不能否则其优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在“”同时“”执行)。

    但是也难掩其缺点:

#1. 循环调用recv()将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况
#2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。

  此外,在这个方案中recv()更多的是起到检测“操作是否完成”的作用,实际操作系统提供了更为高效的检测“操作是否完成“作用的接口,例如select()多路复用模式,可以一次检测多个连接是否活跃。

多路复用IO(IO multiplexing)

IO multiplexing这个词可能有点陌生,但是如果我说select/epoll,大概就都能明白了。有些地方也称这种IO方式为事件驱动IO(event driven IO)。我们都知道,select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:

当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
    这个图和blocking IO的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用(select和recvfrom),而blocking IO只调用了一个系统调用(recvfrom)。但是,用select的优势在于它可以同时处理多个connection。

    强调:

    1. 如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。

    2. 在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

    结论: select的优势在于可以处理多个连接,不适用于单个连接

select网络IO模型

服务端:

在 python 3中运行:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from socket import *
import select

s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('127.0.0.1',8081))
s.listen(5)
s.setblocking(False) #设置socket的接口为非阻塞
read_l=[s,]
while True:
    r_l,w_l,x_l=select.select(read_l,[],[])
    print(r_l)
    for ready_obj in r_l:
        if ready_obj == s:
            conn,addr=ready_obj.accept() #此时的ready_obj等于s
            read_l.append(conn)
        else:
            try:
                data=ready_obj.recv(1024) #此时的ready_obj等于conn
                if not data:
                    ready_obj.close()
                    read_l.remove(ready_obj)
                    continue
                ready_obj.send(data.upper())
            except ConnectionResetError:
                ready_obj.close()
                read_l.remove(ready_obj)

客户端

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8081))

while True:
    msg=input('>>: ')
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))

 select监听fd变化的过程分析:

#用户进程创建socket对象,拷贝监听的fd到内核空间,每一个fd会对应一张系统文件表,内核空间的fd响应到数据后,就会发送信号给用户进程数据已到;
#用户进程再发送系统调用,比如(accept)将内核空间的数据copy到用户空间,同时作为接受数据端内核空间的数据清除,
这样重新监听时fd再有新的数据又可以响应到了(发送端因为基于TCP协议所以需要收到应答后才会清除)。

该模型的优点:

#相比其他模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。
如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。

  该模型的缺点:

#首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。
#很多操作系统提供了更为高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。
#如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异,
#所以使用类似于epoll的接口实现具有较好跨平台能力的服务器会比较困难。
#其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。

异步IO(Asynchronous I/O)

Linux下的asynchronous IO其实用得不多,从内核2.6版本才开始引入。先看一下它的流程:

 用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

IO模型比较分析

到目前为止,已经将四个IO Model都介绍完了。现在回过头来回答最初的那几个问题:blocking和non-blocking的区别在哪,

synchronous IO和asynchronous IO的区别在哪。


    先回答最简单的这个:blocking vs non-blocking。前面的介绍中其实已经很明确的说明了这两者的区别。调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还准备数据的情况下会立刻返回。

  再说明synchronous IO和asynchronous IO的区别之前,需要先给出两者的定义。Stevens给出的定义(其实是POSIX的定义)是这样子的:

    A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
    An asynchronous I/O operation does not cause the requesting process to be blocked; 


  两者的区别就在于synchronous IO做”IO operation”的时候会将process阻塞。按照这个定义,四个IO模型可以分为两大类,之前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO这一类,而 asynchronous I/O后一类 。

    有人可能会说,non-blocking IO并没有被block啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操作,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,如果kernel的数据没有准备好,这时候不会block进程。但是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。而asynchronous IO则不一样,当进程发起IO 操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被block。

 

各个IO Model的比较如图所示:

 经过上面的介绍,会发现non-blocking IO和asynchronous IO的区别还是很明显的。在non-blocking IO中,虽然进程大部分时间都不会被block,但是它仍然要求进程去主动的check,并且当数据准备完成以后,也需要进程主动的再次调用recvfrom来将数据拷贝到用户内存。而asynchronous IO则完全不同。它就像是用户进程将整个IO操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查IO操作的状态,也不需要主动的去拷贝数据。

五种IO模型比较:

 IO多路复用介绍

IO复用:

为了解释这个名词,首先来理解下复用这个概念,复用也就是共用的意思,这样理解还是有些抽象,为此,咱们来理解下复用在通信领域的使用,
在通信领域中为了充分利用网络连接的物理介质,往往在同一条网络链路上采用时分复用或频分复用的技术使其在同一链路上传输多路信号,
到这里我们就基本上理解了复用的含义,即公用某个“介质”来尽可能多的做同一类(性质)的事,那IO复用的“介质”是什么呢?为此我们首先来看看服务器编程的模型,
客户端发来的请求服务端会产生一个进程来对其进行服务,每当来一个客户请求就产生一个进程来服务,然而进程不可能无限制的产生,因此为了解决大量客户端访问的问题,
引入了IO复用技术,即:一个进程可以同时对多个客户请求进行服务。也就是说IO复用的“介质”是进程(准确的说复用的是select和poll,
因为进程也是靠调用select和poll来实现的),复用一个进程(select和poll)来对多个IO进行服务,虽然客户端发来的IO是并发的但是IO所需的读写数据多数情况下是
没有准备好的,因此就可以利用一个函数(select和poll)来监听IO所需的这些数据的状态,一旦IO有数据可以进行读写了,进程就来对这样的IO进行服务。

 理解完IO复用后,我们在来看下实现IO复用中的三个API(select、poll和epoll)的区别和联系

select,poll,epoll都是IO多路复用的机制,I/O多路复用就是通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),
能够通知应用程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,
也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。三者的原型如下所示:

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

select

select最早于1983年出现在4.2BSD中,它通过一个select()系统调用来监视多个文件描述符的数组,当select()返回后,该数组中就绪的文件描述符便会被内核修改标志位,使得进程可以获得这些文件描述符从而进行后续的读写操作。

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点,事实上从现在看来,这也是它所剩不多的优点之一。

select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,不过可以通过修改宏定义甚至重新编译内核的方式提升这一限制。另外,select()所维护的存储大量文件描述符的数据结构,随着文件描述符数量的增大,其复制的开销也线性增长。同时,由于网络响应时间的延迟使得大量TCP连接处于非活跃状态,但调用select()会对所有socket进行一次线性扫描,所以这也浪费了一定的开销。

select的第一个参数nfds为fdset集合中最大描述符值加1,fdset是一个位数组,其大小限制为__FD_SETSIZE(1024),位数组的每一位代表其对应的描述符是否需要被检查。第二三四参数表示需要关注读、写、错误事件的文件描述符位数组,这些参数既是输入参数也是输出参数,可能会被内核修改用于标示哪些描述符上发生了关注的事件,所以每次调用select前都需要重新初始化fdset。timeout参数为超时时间,该结构会被内核修改,其值为超时剩余的时间。

 select的调用步骤如下:

1)使用copy_from_user从用户空间拷贝fdset到内核空间

(2)注册回调函数__pollwait

(3)遍历所有fd,调用其对应的poll方法(对于socket,这个poll方法是sock_poll,sock_poll根据情况会调用到tcp_poll,udp_poll或者datagram_poll)

(4)以tcp_poll为例,其核心实现就是__pollwait,也就是上面注册的回调函数。

(5)__pollwait的主要工作就是把current(当前进程)挂到设备的等待队列中,不同的设备有不同的等待队列,对于tcp_poll 来说,其等待队列是sk->sk_sleep
  (注意把进程挂到等待队列中并不代表进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数 据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,
   这时current便被唤醒了。 (6)poll方法返回时会返回一个描述读写操作是否就绪的mask掩码,根据这个mask掩码给fd_set赋值。 (7)如果遍历完所有的fd,还没有返回一个可读写的mask掩码,则会调用schedule_timeout是调用select的进程(也就是 current)进入睡眠。
  当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。如果超过一定的超时时间(schedule_timeout 指定),还是没人唤醒,
  则调用select的进程会重新被唤醒获得CPU,进而重新遍历fd,判断有没有就绪的fd。 (8)把fd_set从内核空间拷贝到用户空间。

Python的select()方法直接调用操作系统的IO接口,它监控sockets,open files, and pipes(所有带fileno()方法的文件句柄)何时变成readable 和writeable, 或者通信错误,select()使得同时监控多个连接变的简单,并且这比写一个长循环来等待和监控多客户端连接要高效,因为select直接通过操作系统提供的C的网络接口进行操作,而不是通过Python的解释器。

select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。调用后select函数会阻塞,直到有描述符就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。当select函数返回后,可以 通过遍历fdset,来找到就绪的描述符。



总结下select的几大缺点:

1)每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大

(2)同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大

(3)select支持的文件描述符数量太小了,默认是1024

poll

poll在1986年诞生于System V Release 3,它和select在本质上没有多大差别,但是poll没有最大文件描述符数量的限制。

  poll和select同样存在一个缺点就是,包含大量文件描述符的数组被整体复制于用户态和内核的地址空间之间,而不论这些文件描述符是否就绪,它的开销随着文件描述符数量的增加而线性增大。

  另外,select()和poll()将就绪的文件描述符告诉进程后,如果进程没有对其进行IO操作,那么下次调用select()和poll()的时候将再次报告这些文件描述符,所以它们一般不会丢失就绪的消息,这种方式称为水平触发(Level Triggered)。



poll与select不同,通过一个pollfd数组向内核传递需要关注的事件,故没有描述符个数的限制,pollfd中的events字段和revents分别用于标示关注的事件和发生的事件,故pollfd数组只需要被初始化一次。

 poll的实现机制与select类似,其对应内核中的sys_poll,只不过poll向内核传递pollfd数组,然后对pollfd中的每个描述符进行poll,相比处理fdset来说,poll效率更高。poll返回后,需要对pollfd中的每个元素检查其revents值,来得指事件是否发生。

int poll (struct pollfd *fds, unsigned int nfds, int timeout);

不同与select使用三个位图来表示三个fdset的方式,poll使用一个 pollfd的指针实现。

struct pollfd {
    int fd; /* file descriptor */
    short events; /* requested events to watch */
    short revents; /* returned events witnessed */
};

pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式。同时,pollfd并没有最大数量限制(但是数量过大后性能也是会下降)。 和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符。

从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。
一般也不用它,相当于过渡阶段
 

epoll



直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。windows不支持

epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,

那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。epoll同样只告知那些就绪的文件描述符,

而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,

你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,

这样便彻底省掉了这些文件描述符在系统调用时复制的开销。另一个本质的改进在于epoll采用基于事件的就绪通知方式。

在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,

一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。



 

epoll既然是对select和poll的改进,就应该能避免上述的三个缺点。那epoll都是怎么解决的呢?在此之前,

我们先看一下epoll 和select和poll的调用接口上的不同,select和poll都只提供了一个函数——select或者poll函数。

而epoll提供了三个函 数,epoll_create,epoll_ctl和epoll_wait,epoll_create是创建一个epoll句柄;epoll_ctl是注 册要监听的事件类型;

epoll_wait则是等待事件的产生。

对于第一个缺点,epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时(在epoll_ctl中指定 EPOLL_CTL_ADD),会把所有的fd拷贝进内核,
而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝 一次。 对于第二个缺点,epoll的解决方案不像select或poll一样每次都把current轮流加入fd对应的设备等待队列中,
而只在 epoll_ctl时把current挂一遍(这一遍必不可少)并为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调 函数,
而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd(利用 schedule_timeout()实现睡一会,
判断一会的效果,和select实现中的第7步是类似的)。 对于第三个缺点,epoll没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子, 在1GB内存的机器上大约是10万左右,
具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。

poll是在2.6内核中提出的,是之前的select和poll的增强版本。相对于select和poll来说,epoll更加灵活,没有描述符限制。epoll使用一个文件描述符管理多个描述符,将用户关系的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

一 epoll操作过程

epoll操作过程需要三个接口,分别如下:

int epoll_create(int size);//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

1. int epoll_create(int size);


创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大,这个参数不同于select()中的第一个参数,给出最大监听的fd+1的值,参数size并不是限制了epoll所能监听的描述符最大个数,只是对内核初始分配内部数据结构的一个建议
当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。


2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函数是对指定描述符fd执行op操作。
– epfd:是epoll_create()的返回值。
– op:表示op操作,用三个宏来表示:添加EPOLL_CTL_ADD,删除EPOLL_CTL_DEL,修改EPOLL_CTL_MOD。分别添加、删除和修改对fd的监听事件。
– fd:是需要监听的fd(文件描述符)
– epoll_event:是告诉内核需要监听什么事


3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
等待epfd上的io事件,最多返回maxevents个事件。
参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回需要处理的事件数目,如返回0表示已超时。



总结:

1)select,poll实现需要自己不断轮询所有fd集合,直到设备就绪,期间可能要睡眠和唤醒多次交替。而epoll其实也需要调用 epoll_wait不断轮询就绪链表,
  期间也可能多次睡眠和唤醒交替,但是它是设备就绪时,调用回调函数,把就绪fd放入就绪链表中,并唤醒在 epoll_wait中进入睡眠的进程。
  虽然都要睡眠和交替,但是select和poll在“醒着”的时候要遍历整个fd集合,而epoll在“醒着”的 时候只要判断一下就绪链表是否为空就行了,
  这节省了大量的CPU时间,这就是回调机制带来的性能提升。 (2)select,poll每次调用都要把fd集合从用户态往内核态拷贝一次,并且要把current往设备等待队列中挂一次,而epoll只要 一次拷贝,
  而且把current往等待队列上挂也只挂一次(在epoll_wait的开始,注意这里的等待队列并不是设备等待队列,只是一个epoll内 部定义的等待队列),
  这也能节省不少的开销。

Python中有一个select模块,其中提供了:select、poll、epoll三个方法,分别调用系统的 select,poll,epoll 从而实现IO多路复用。

 
Windows Python:
    提供: select
Mac Python:
    提供: select
Linux Python:
    提供: select、poll、epoll
 

注意:网络操作、文件操作、终端操作等均属于IO操作,对于windows只支持Socket操作,其他系统支持其他IO操作,但是无法检测 普通文件操作 自动上次读取是否已经变化

sellect、poll、epoll三者的区别

select与epoll

首先我们来定义流的概念,一个流可以是文件,socket,pipe等等可以进行I/O操作的内核对象。
 不管是文件,还是套接字,还是管道,我们都可以把他们看作流。


  之后我们来讨论I/O的操作,通过read,我们可以从流中读入数据;通过write,我们可以往流写入数据。现在假定一个情形,我们需要从流中读数据,但是流中还没有数据,(典型的例子为,客户端要从socket读如数据,但是服务器还没有把数据传回来),这时候该怎么办?
         

阻塞。阻塞是个什么概念呢?比如某个时候你在等快递,但是你不知道快递什么时候过来,而且你没有别的事可以干(或者说接下来的事要等快递来了才能做);那么你可以去睡觉了,因为你知道快递把货送来时一定会给你打个电话(假定一定能叫醒你)。

非阻塞忙轮询。接着上面等快递的例子,如果用忙轮询的方法,那么你需要知道快递员的手机号,然后每分钟给他挂个电话:“你到了没?”

很明显一般人不会用第二种做法,不仅显很无脑,浪费话费不说,还占用了快递员大量的时间。
  大部分程序也不会用第二种做法,因为第一种方法经济而简单,经济是指消耗很少的CPU时间,如果线程睡眠了,就掉出了系统的调度队列,暂时不会去瓜分CPU宝贵的时间片了。

  为了了解阻塞是如何进行的,我们来讨论缓冲区,以及内核缓冲区,最终把I/O事件解释清楚。

缓冲区的引入是为了减少频繁I/O操作而引起频繁的系统调用(你知道它很慢的),当你操作一个流时,更多的是以缓冲区为单位进行操作,这是相对于用户空间而言。对于内核来说,也需要缓冲区。
假设有一个管道,进程A为管道的写入方,B为管道的读出方。
       

假设一开始内核缓冲区是空的,B作为读出方,被阻塞着。然后首先A往管道写入,这时候内核缓冲区由空的状态变到非空状态,内核就会产生一个事件告诉B该醒来了,这个事件姑且称之为“缓冲区非空”。但是“缓冲区非空”事件通知B后,B却还没有读出数据;且内核许诺了不能把写入管道中的数据丢掉这个时候,

A写入的数据会滞留在内核缓冲区中,如果内核也缓冲区满了,B仍未开始读数据,最终内核缓冲区会被填满,这个时候会产生一个I/O事件,告诉进程A,你该等等(阻塞)了,我们把这个事件定义为“缓冲区满”。
      

假设后来B终于开始读数据了,于是内核的缓冲区空了出来,这时候内核会告诉A,内核缓冲区有空位了,你可以从长眠中醒来了,继续写数据了,我们把这个事件叫做“缓冲区非满”
     

也许事件Y1已经通知了A,但是A也没有数据写入了,而B继续读出数据,知道内核缓冲区空了。这个时候内核就告诉B,你需要阻塞了!,我们把这个时间定为“缓冲区空”。
      

这四个情形涵盖了四个I/O事件,缓冲区满,缓冲区空,缓冲区非空,缓冲区非满(注都是说的内核缓冲区,且这四个术语都是我生造的,仅为解释其原理而造)。这四个I/O事件是进行阻塞同步的根本。(如果不能理解“同步”是什么概念,请学习操作系统的锁,信号量,条件变量等任务同步方面的相关知识)。

    

然后我们来说说阻塞I/O的缺点。但是阻塞I/O模式下,一个线程只能处理一个流的I/O事件。如果想要同时处理多
个流,要么多进程(fork),要么多线程(pthread_create),很不幸这两种方法效率都不高。
  于是再来考虑非阻塞忙轮询的I/O方式,我们发现我们可以同时处理多个流了(把一个流从阻塞模式切换到非阻塞模式再此不予讨论):

while true {
for i in stream[]; {
if i has data
read until unavailable
}
}

我们只要不停的把所有流从头到尾问一遍,又从头开始。这样就可以处理多个流了,但这样的做法显然不好,因为如果所有的流都没有数据,那么只会白白浪费CPU。这里要补充一点,阻塞模式下,内核对于I/O事件的处理是阻塞或者唤醒,而非阻塞模式下则把I/O事件交给其他对象(后文介绍的select以及epoll)处理甚至直接忽略。

  为了避免CPU空转,可以引进了一个代理(一开始有一位叫做select的代理,后来又有一位叫做poll的代理,不过两者的本质是一样的)。这个代理比较厉害,可以同时观察许多流的I/O事件,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有I/O事件时,就从阻塞态中醒来,于是我们的程序就会轮询一遍所有的流(于是我们可以把“忙”字去掉了)。代码长这样:

while true {
select(streams[])
for i in streams[] {
if i has data
read until unavailable
}
}

     于是,如果没有I/O事件产生,我们的程序就会阻塞在select处。但是依然有个问题,我们从select那里仅仅知道了,有I/O事件发生了,但却并不知道是那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。
     

但是使用select,我们有O(n)的无差别轮询复杂度,同时处理的流越多,每一次无差别轮询时间就越长。再次说了这么多,终于能好好解释epoll了
    

epoll可以理解为event poll,不同于忙轮询和无差别轮询,epoll之会把哪个流发生了怎样的I/O事件通知我们。此时我们对这些流的操作都是有意义的。
    

在讨论epoll的实现细节之前,先把epoll的相关操作列出:
   epoll_create 创建一个epoll对象,一般epollfd = epoll_create()
epoll_ctl (epoll_add/epoll_del的合体),往epoll对象中增加/删除某一个流的某一个事件
比如

epoll_ctl(epollfd, EPOLL_CTL_ADD, socket, EPOLLIN);//有缓冲区内有数据时epoll_wait返回
epoll_ctl(epollfd, EPOLL_CTL_DEL, socket, EPOLLOUT);//缓冲区可写入时epoll_wait返回
epoll_wait(epollfd,...)等待直到注册的事件发生


(注:当对一个非阻塞流的读写发生缓冲区满或缓冲区空,write/read会返回-1,并设置errno=EAGAIN。
而epoll只关心缓冲区非满和缓冲区非空事件)。
一个epoll模式的代码大概的样子是:

while true {
active_stream[] = epoll_wait(epollfd)
for i in active_stream[] {
read or write till unavailable
}
}




举个例子:
select:
班里三十个同学在考试,谁先做完想交卷都要通过按钮来活动,他按按钮作为老师的我桌子上的灯就会变红.
一旦灯变红,我(select)我就可以知道有人交卷了,但是我并不知道谁交的,所以,我必须跟个傻子似的轮询
地去问:嘿,是你要交卷吗?然后我就可以以这种效率极低地方式找到要交卷的学生,然后把它的卷子收上来.


epoll:
这次再有人按按钮,我这不光灯会亮,上面还会显示要交卷学生的名字.这样我就可以直接去对应学生那收卷就
好了.当然,同时可以有多人交卷.

IO多路复用的触发方式

 在linux的IO多路复用中有水平触发,边缘触发两种模式,这两种模式的区别如下:

水平触发:如果文件描述符已经就绪可以非阻塞的执行IO操作了,此时会触发通知.允许在任意时刻重复检测IO的状态,
 没有必要每次描述符就绪后尽可能多的执行IO.select,poll就属于水平触发.

 边缘触发:如果文件描述符自上次状态改变后有新的IO活动到来,此时会触发通知.在收到一个IO事件通知后要尽可能
 多的执行IO操作,因为如果在一次通知中没有执行完IO那么就需要等到下一次新的IO活动到来才能获取到就绪的描述
 符.信号驱动式IO就属于边缘触发.

 epoll既可以采用水平触发,也可以采用边缘触发.

大家可能还不能完全了解这两种模式的区别,我们可以举例说明:一个管道收到了1kb的数据,epoll会立即返回,此时
 读了512字节数据,然后再次调用epoll.这时如果是水平触发的,epoll会立即返回,因为有数据准备好了.如果是边
 缘触发的不会立即返回,因为此时虽然有数据可读但是已经触发了一次通知,在这次通知到现在还没有新的数据到来,
 直到有新的数据到来epoll才会返回,此时老的数据和新的数据都可以读取到(当然是需要这次你尽可能的多读取).

 下面我们还从电子的角度来解释一下:

     水平触发:也就是只有高电平(1)或低电平(0)时才触发通知,只要在这两种状态就能得到通知.上面提到的只要
 有数据可读(描述符就绪)那么水平触发的epoll就立即返回.
 
     边缘触发:只有电平发生变化(高电平到低电平,或者低电平到高电平)的时候才触发通知.上面提到即使有数据
 可读,但是没有新的IO活动到来,epoll也不会立即返回.

select IO多路复用

select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点。select的一 个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样会造成效率的降低

1、select语法:

select(rlist, wlist, xlist, timeout=None)

select()方法接收并监控3个通信列表, 第一个rlist监控所有要进来的输入数据,第二个wlist是监控所有要发出去的输出数据,第三个监控异常错误数据,第四个设置指定等待时间,如果想立即返回,设为null即可,最后需要创建2个列表来包含输入和输出信息来传给select(),让select方法通过内核去监控,然后生成三个实例。

 
 
#建立两个列表,比如想让内核去检测50个连接,需要传给它一个列表,就是这个inputs(列表中里面存放的是需要被内核监控的链接),然后交给select,就相当于交给内核了
inputs = [server,] #输入列表,监控所有输入数据

outputs = []  #输入列表,监控所有输出数据

#把两个列表传给select方法通过内核去监控,生成三个实例
readable,writeable,exceptional = select.select(inputs,outputs,inputs)  # 这里select方法的第三个参数同样传入input列表是因为,input列表中存放着所有的链接,比如之前放入的50被监控链接中有5个断了,出现了异常,就会输入到exceptional里面,但这5链接本身是放在inputs列表中
 
 

 

对于select方法:

 
句柄列表11, 句柄列表22, 句柄列表33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 超时时间)
 
参数: 可接受四个参数(前三个必须)
返回值:三个列表
 
select方法用来监视文件句柄,如果句柄发生变化,则获取该句柄。
1、当 参数1 序列中的句柄发生可读时(accetp和read),则获取发生变化的句柄并添加到 返回值1 序列中
2、当 参数2 序列中含有句柄时,则将该序列中所有的句柄添加到 返回值2 序列中
3、当 参数3 序列中的句柄发生错误时,则将该发生错误的句柄添加到 返回值3 序列中
4、当 超时时间 未设置,则select会一直阻塞,直到监听的句柄发生变化
   当 超时时间 = 1时,那么如果监听的句柄均无任何变化,则select会阻塞 1 秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。
 

 

2、select服务端代码实例:

 
 
 
 
 
import select,socket,queue
server = socket.socket()
server.bind(("localhost",9000))
server.listen(1000)
server.setblocking(False) #设置为非阻塞
msg_dic = dict() #定义一个队列字典
inputs = [server,]  #由于设置成非阻塞模式,accept和recive都不阻塞了,没有值就会报错,因此最开始需要最开始需要监控服务端本身,等待客户端连接
outputs = [] 
while True:
    #exceptional表示如果inputs列表中出现异常,会输出到这个exceptional中
    readable,writeable,exceptional = select.select(inputs,outputs,inputs)#如果没有任何客户端连接,就会阻塞在这里 
    for r in readable:# 没有个r代表一个socket链接
        if r is server:  #如果这个socket是server的话,就说明是是新客户端连接了
            conn,addr = r.accept() #新连接进来了,接受这个连接,生成这个客户端实例
            print("来了一个新连接",addr)
            inputs.append(conn)#为了不阻塞整个程序,我们不会立刻在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新连接
            #就会被交给select去监听
            msg_dic[conn] = queue.Queue() #初始化一个队列,后面存要返回给这个客户端的数据
        else: #如果不是server,就说明是之前建立的客户端来数据了
            data = r.recv(1024)
            print("收到数据:",data)
            msg_dic[r].put(data)#收到的数据先放到queue里,一会返回给客户端
            outputs.append(r)#为了不影响处理与其它客户端的连接 , 这里不立刻返回数据给客户端
            # r.send(data)
            # print("send done....")
    for w in writeable:  #要返回给客户端的链接列表
        data_to_client = msg_dic[w].get()
        w.send(data_to_client)   #返回给客户端的源数据
        outputs.remove(w)  #确保下次循环的时候writeable,不返回这个已经处理完的这个连接了
 
    for e in exceptional:  #处理异常的连接
        if e in outputs:   #因为e不一定在outputs,所以先要判断
            outputs.remove(e)
        inputs.remove(e)   #删除inputs中异常连接
        del msg_dic[e]   #删除此连接对应的队列
 
 

epoll IO多路复用

epoll的方式,这种效率更高,但是这种方式在Windows下不支持,在Linux是支持的,selectors模块就是默认使用就是epoll,但是如果在windows系统上使用selectors模块,就会找不到epoll,从而使用select。

python2没有这个模块

1、selectors语法:

 
 
#定义一个对象
sel = selectors.DefaultSelector()

#注册一个事件
sel.register(server,selectors.EVENT_READ,accept) #注册事件,只要来一个连接就调accept这个函数,就相当于之前select的用法,sel.register(server,selectors.EVENT_READ,accept) ==  inputs=[server,],readable,writeable,exceptional = select.select(inputs,outputs,inputs)意思是一样的。
 
 

 2、selectors代码实例:

 
 
import selectors,socket
 
sel = selectors.DefaultSelector()
 
def accept(sock,mask):
    "接收客户端信息实例"
    conn,addr = sock.accept()
    print("accepted",conn,'from',addr)
    conn.setblocking(False)
    sel.register(conn,selectors.EVENT_READ,read)  #新连接注册read回调函数
 
def read(conn,mask):
    "接收客户端的数据"
    data = conn.recv(1024)
    if data:
        print("echoing",repr(data),'to',conn)
        conn.send(data)
    else:
        print("closing",conn)
        sel.unregister(conn)
        conn.close()
 
server = socket.socket()
server.bind(('localhost',9999))
server.listen(500)
server.setblocking(False)
sel.register(server,selectors.EVENT_READ,accept)  #注册事件,只要来一个连接就调accept这个函数,
#sel.register(server,selectors.EVENT_READ,accept) == inputs=[server,]
 
while True:
    events = sel.select()  #这个select,看起来是select,有可能调用的是epoll,看你操作系统是Windows的还是Linux的
                           #默认阻塞,有活动连接就返回活动连接列表
    print("事件:",events)
    for key,mask in events:
        callback = key.data #相当于调accept了
        callback(key.fileobj,mask)  #key.fileobj=文件句柄
 
 

打印服务端:

 
 
 [(SelectorKey(fileobj=<socket.socket fd=436, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222)>, fd=436, events=1, data=<function accept at 0x0000022296063E18>), 1)]
accepted <socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)> from ('127.0.0.1', 50281)
事件: [(SelectorKey(fileobj=<socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>, fd=508, events=1, data=<function read at 0x00000222980501E0>), 1)]
echoing b'adas' to <socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>
事件: [(SelectorKey(fileobj=<socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>, fd=508, events=1, data=<function read at 0x00000222980501E0>), 1)]
echoing b'HA' to <socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>
事件: [(SelectorKey(fileobj=<socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>, fd=508, events=1, data=<function read at 0x00000222980501E0>), 1)]
echoing b'asdHA' to <socket.socket fd=508, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 2222), raddr=('127.0.0.1', 50281)>
 
 

 这样就容易明白:callback = key.data #第一次调用的是accept,第二次调用的是read    callback(key.fileobj,mask)  #key.fileobj=文件句柄

客户端代码:

在Linux端,selectors模块才能是epoll

 
 
import socket,sys
 
messages = [ b'This is the message. ',
             b'It will be sent ',
             b'in parts.',
             ]
server_address = ('localhost', 9999)
 
# 创建100个 TCP/IP socket实例
socks = [ socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(100)]
 
# 连接服务端
print('connecting to %s port %s' % server_address)
for s in socks:
    s.connect(server_address)
 
for message in messages:
 
    # 发送消息至服务端
    for s in socks:
        print('%s: sending "%s"' % (s.getsockname(), message) )
        s.send(message)
 
    # 从服务端接收消息
    for s in socks:
        data = s.recv(1024)
        print( '%s: received "%s"' % (s.getsockname(), data) )
        if not data:
            print(sys.stderr, 'closing socket', s.getsockname() )
 
 

利用select实现伪同时处理多个Socket客户端请求:服务端

 
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket
import select

sk1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sk1.bind(('127.0.0.1',8002))
sk1.listen(5)
sk1.setblocking(0)

inputs = [sk1,]

while True:
    readable_list, writeable_list, error_list = select.select(inputs, [], inputs, 1)
    for r in readable_list:
        # 当客户端第一次连接服务端时
        if sk1 == r:
            print 'accept'
            request, address = r.accept()
            request.setblocking(0)
            inputs.append(request)
        # 当客户端连接上服务端之后,再次发送数据时
        else:
            received = r.recv(1024)
            # 当正常接收客户端发送的数据时
            if received:
                print 'received data:', received
            # 当客户端关闭程序时
            else:
                inputs.remove(r)

sk1.close()
 

利用select实现伪同时处理多个Socket客户端请求:客户端

 
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket

ip_port = ('127.0.0.1',8002)
sk = socket.socket()
sk.connect(ip_port)

while True:
    inp = raw_input('please input:')
    sk.sendall(inp)
sk.close()
 

此处的Socket服务端相比与原生的Socket,他支持当某一个请求不再发送数据时,服务器端不会等待而是可以去处理其他请求的数据。但是,如果每个请求的耗时比较长时,select版本的服务器端也无法完成同时操作。

简单实例

实例1(non-blocking IO):   

server端:

#!/usr/bin/env python
# coding:utf8


import time
import socket
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.setsockopt
sk.bind(('127.0.0.1',6667))
sk.listen(5)
sk.setblocking(False)
while True:
    try:
        print ('waiting client connection .......')
        connection,address = sk.accept()   # 进程主动轮询
        print("+++",address)
        client_messge = connection.recv(1024)
        print(str(client_messge,'utf8'))
        connection.close()
    except Exception as e:
        print (e)
        time.sleep(4)

client




#!/usr/bin/env python 
# coding:utf8 import time import socket sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM) while True: sk.connect(('127.0.0.1',6667)) print("hello") sk.sendall(bytes("hello","utf8")) time.sleep(2) break

优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在同时执行)。

缺点:任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。

实例2(IO multiplexing):

在非阻塞实例中,轮询的主语是进程,而“后台” 可能有多个任务在同时进行,人们就想到了循环查询多个任务的完成状态,只要有任何一个任务完成,就去处理它。不过,这个监听的重任通过调用select等函数交给了内核去做。IO多路复用有两个特别的系统调用select、poll、epoll函数。select调用是内核级别的,select轮询相对非阻塞的轮询的区别在于—前者可以等待多个socket,能实现同时对多个IO端口进行监听,当其中任何一个socket的数据准好了,就能返回进行可读,然后进程再进行recvfrom系统调用,将数据由内核拷贝到用户进程,当然这个过程是阻塞的。

server端:

#!/usr/bin/env python
# coding:utf8
import socket
import select

sk = socket.socket()
sk.bind(("127.0.0.1", 9904))
sk.listen(5)

while True:
    r, w, e = select.select([sk, ], [], [], 5)
    for i in r:
        # conn,add=i.accept()
        # print(conn)
        print("hello")
    print('>>>>>>')

client端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket

sk = socket.socket()

sk.connect(("127.0.0.1", 9904))

while 1:
    inp = input(">>").strip()
    sk.send(inp.encode("utf8"))
    data = sk.recv(1024)
    print(data.decode("utf8"))

请思考:为什么不调用accept,会反复print?

select属于水平触发

实例3(server端并发聊天):

server端:

#!/usr/bin/env python
# coding:utf8

import socket
import select
sk=socket.socket()
sk.bind(("127.0.0.1",8801))
sk.listen(5)
inputs=[sk,]
while True:
    r,w,e=select.select(inputs,[],[],5)
    print(len(r))

    for obj in r:
        if obj==sk:
            conn,add=obj.accept()
            print(conn)
            inputs.append(conn)
        else:
            data_byte=obj.recv(1024)
            print(str(data_byte,'utf8'))
            inp=input('回答%s号客户>>>'%inputs.index(obj))
            obj.sendall(bytes(inp,'utf8'))

    print('>>',r)

client端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket
sk=socket.socket()
sk.connect(('127.0.0.1',8801))

while True:
    inp=input(">>>>")
    sk.sendall(bytes(inp,"utf8"))
    data=sk.recv(1024)
    print(str(data,'utf8')

文件描述符其实就是咱们平时说的句柄,只不过文件描述符是linux中的概念。注意,我们的accept或recv调用时即向系统发出recvfrom请求

   (1)  如果内核缓冲区没有数据--->等待--->数据到了内核缓冲区,转到用户进程缓冲区;

    (2) 如果先用select监听到某个文件描述符对应的内核缓冲区有了数据,当我们再调用accept或recv时,直接将数据转到用户缓冲区。

 如图:

实例4:

#_*_coding:utf-8_*_
__author__ = 'Alex Li'
 
import select
import socket
import sys
import queue
 
# Create a TCP/IP socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
 
# Bind the socket to the port
server_address = ('localhost', 10000)
print(sys.stderr, 'starting up on %s port %s' % server_address)
server.bind(server_address)
 
# Listen for incoming connections
server.listen(5)
 
# Sockets from which we expect to read
inputs = [ server ]
 
# Sockets to which we expect to write
outputs = [ ]
 
message_queues = {}
while inputs:
 
    # Wait for at least one of the sockets to be ready for processing
    print( '
waiting for the next event')
    readable, writable, exceptional = select.select(inputs, outputs, inputs)
    # Handle inputs
    for s in readable:
 
        if s is server:
            # A "readable" server socket is ready to accept a connection
            connection, client_address = s.accept()
            print('new connection from', client_address)
            connection.setblocking(False)
            inputs.append(connection)
 
            # Give the connection a queue for data we want to send
            message_queues[connection] = queue.Queue()
        else:
            data = s.recv(1024)
            if data:
                # A readable client socket has data
                print(sys.stderr, 'received "%s" from %s' % (data, s.getpeername()) )
                message_queues[s].put(data)
                # Add output channel for response
                if s not in outputs:
                    outputs.append(s)
            else:
                # Interpret empty result as closed connection
                print('closing', client_address, 'after reading no data')
                # Stop listening for input on the connection
                if s in outputs:
                    outputs.remove(s)  #既然客户端都断开了,我就不用再给它返回数据了,所以这时候如果这个客户端的连接对象还在outputs列表中,就把它删掉
                inputs.remove(s)    #inputs中也删除掉
                s.close()           #把这个连接关闭掉
 
                # Remove message queue
                del message_queues[s]
    # Handle outputs
    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except queue.Empty:
            # No messages waiting so stop checking for writability.
            print('output queue for', s.getpeername(), 'is empty')
            outputs.remove(s)
        else:
            print( 'sending "%s" to %s' % (next_msg, s.getpeername()))
            s.send(next_msg)
    # Handle "exceptional conditions"
    for s in exceptional:
        print('handling exceptional condition for', s.getpeername() )
        # Stop listening for input on the connection
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
 
        # Remove message queue
        del message_queues[s]

实例5:

# select 模拟一个socket server,注意socket必须在非阻塞情况下才能实现IO多路复用。
# 接下来通过例子了解select 是如何通过单进程实现同时处理多个非阻塞的socket连接的。
#server端


import select
import socket
import queue

server = socket.socket()
server.bind(('localhost',9000))
server.listen(1000)

server.setblocking(False)  # 设置成非阻塞模式,accept和recv都非阻塞
# 这里如果直接 server.accept() ,如果没有连接会报错,所以有数据才调他们
# BlockIOError:[WinError 10035] 无法立即完成一个非阻塞性套接字操作。
msg_dic = {}
inputs = [server,]  # 交给内核、select检测的列表。
# 必须有一个值,让select检测,否则报错提供无效参数。
# 没有其他连接之前,自己就是个socket,自己就是个连接,检测自己。活动了说明有链接
outputs = []  # 你往里面放什么,下一次就出来了

while True:
    readable, writeable, exceptional = select.select(inputs, outputs, inputs)  # 定义检测
    #新来连接                                        检测列表         异常(断开)
    # 异常的也是inputs是: 检测那些连接的存在异常
    print(readable,writeable,exceptional)
    for r in readable:
        if r is server:  # 有数据,代表来了一个新连接
            conn, addr = server.accept()
            print("来了个新连接",addr)
            inputs.append(conn)  # 把连接加到检测列表里,如果这个连接活动了,就说明数据来了
            # inputs = [server.conn] # 【conn】只返回活动的连接,但怎么确定是谁活动了
            # 如果server活动,则来了新连接,conn活动则来数据
            msg_dic[conn] = queue.Queue()  # 初始化一个队列,后面存要返回给这个客户端的数据
        else:
            try :
                data = r.recv(1024)  # 注意这里是r,而不是conn,多个连接的情况
                print("收到数据",data)
                # r.send(data) # 不能直接发,如果客户端不收,数据就没了
                msg_dic[r].put(data)  # 往里面放数据
                outputs.append(r)  # 放入返回的连接队列里
            except ConnectionResetError as e:
                print("客户端断开了",r)
                if r in outputs:
                    outputs.remove(r) #清理已断开的连接
                inputs.remove(r) #清理已断开的连接
                del msg_dic[r] ##清理已断开的连接

    for w in writeable:  # 要返回给客户端的连接列表
        data_to_client = msg_dic[w].get()  # 在字典里取数据
        w.send(data_to_client)  # 返回给客户端
        outputs.remove(w)  # 删除这个数据,确保下次循环的时候不返回这个已经处理完的连接了。

    for e in exceptional:  # 如果连接断开,删除连接相关数据
        if e in outputs:
            outputs.remove(e)
        inputs.remove(e)
        del msg_dic[e]


#*************************client
import socket
client = socket.socket()

client.connect(('localhost', 9000))

while True:
    cmd = input('>>> ').strip()
    if len(cmd) == 0 : continue
    client.send(cmd.encode('utf-8'))
    data = client.recv(1024)
    print(data.decode())

client.close()

实例6:

import selectors
import socket

sel = selectors.DefaultSelector()

def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()

sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)

while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)

selectors模块

它的功能与linux的epoll,还是select模块,poll等类似;实现高效的I/O multiplexing,  常用于非阻塞的socket的编程中; 简单介绍一下这个模块,

更多内容查看 python文档:https://docs.python.org/3/library/selectors.html

1. 模块定义了一个 BaseSelector的抽象基类, 以及它的子类,包括:SelectSelector, PollSelector, EpollSelector, DevpollSelector, KqueueSelector.    

另外还有一个DefaultSelector类,它其实是以上其中一个子类的别名而已,它自动选择为当前环境中最有效的Selector,所以平时用 DefaultSelector类就可以了,其它用不着。

 2. 模块定义了两个常量,用于描述 event Mask

    EVENT_READ :      表示可读的; 它的值其实是1;

    EVENT_WRITE:      表示可写的; 它的值其实是2;

3. 模块定义了一个 SelectorKey类, 一般用这个类的实例 来描述一个已经注册的文件对象的状态, 这个类的几个属性常用到:

    fileobj:   表示已经注册的文件对象;

    fd:          表示文件对象的描述符,是一个整数,它是文件对象的 fileno()方法的返回值;

    events:    表示注册一个文件对象时,我们等待的events, 即上面的event Mask, 是可读呢还是可写呢!!

    data:       表示注册一个文件对象是邦定的data;

4. 最后说说抽象基类中的方法;

register(fileobj, events, data=None) 作用:注册一个文件对象。

 参数: 
  fileobj:即可以是fd 也可以是一个拥有fileno()方法的对象;   events:上面的event Mask 常量; data 返回值: 一个SelectorKey类的实例;

unregister(fileobj) 作用: 注销一个已经注册过的文件对象;

 返回值:一个SelectorKey类的实例;

modify(fileobj, events, data=None) 作用:用于修改一个注册过的文件对象,比如从监听可读变为监听可写;它其实就是register() 后再跟unregister(), 但是使用modify( ) 更高效;

 返回值:一个SelectorKey类的实例;

select(timeout=None) 作用: 用于选择满足我们监听的event的文件对象;

 返回值: 是一个(key, events)的元组, 其中key是一个SelectorKey类的实例, 而events 就是 event Mask(EVENT_READ或EVENT_WRITE,或者二者的组合)

close()

作用:关闭 selector。 最后一定要记得调用它, 要确保所有的资源被释放

get_key(fileobj)

作用: 返回注册文件对象的 key;


 返回值 :一个SelectorKey类的实例;

基于selectors模块实现聊天

#服务端

#!/usr/bin/env python
# coding:utf8

from socket import *
import selectors

sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
    conn,addr=server_fileobj.accept()
    sel.register(conn,selectors.EVENT_READ,read)

def read(conn,mask):
    try:
        data=conn.recv(1024)
        if not data:
            print('closing',conn)
            sel.unregister(conn)
            conn.close()
            return
        conn.send(data.upper()+b'_SB')
    except Exception:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()



server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) #设置socket的接口为非阻塞
sel.register(server_fileobj,selectors.EVENT_READ,accept) #相当于网select的读列表里append了一个文件句柄server_fileobj,并且绑定了一个回调函数accept

while True:
    events=sel.select() #检测所有的fileobj,是否有完成wait data的
    for sel_obj,mask in events:
        callback=sel_obj.data #callback=accpet
        callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)

#客户端

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))

while True:
    msg=input('>>: ')
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))

参考:

https://www.cnblogs.com/Eva-J/articles/8324837.html

https://www.cnblogs.com/yuanchenqi/articles/5722574.html

https://pymotw.com/2/select/#module-select

https://www.cnblogs.com/yinheyi/p/8127871.html

原文地址:https://www.cnblogs.com/zhongguiyao/p/11049665.html