Socket网络编程-IO各种概念及多路复用

            Socket网络编程-IO各种概念及多路复用

                                   作者:尹正杰

版权声明:原创作品,谢绝转载!否则将追究法律责任。

一.操作系统相关知识

1>.同步和异步 

  函数或方法被调用的时候,调用者是否得到最终结果的。 

  直接得到最终结果结果的,就是同步调用。

  不直接得到最终结果的,就是异步调用。

2>.阻塞和非阻塞

  函数或方法调用的时候,是否立刻返回。

  立即返回就是非阻塞调用;
  不立即返回就是阻塞调用。

3>.同步,异步,阻塞,非阻塞之间的区别

  同步、异步,与阻塞、非阻塞不相关。 
  同步、异步强调的是,是否得到(最终的)结果;
  阻塞、非阻塞强调是时间,是否等待。
  同步与异步区别在于:调用者是否得到了想要的最终结果。   同步就是一直要执行到返回最终结果;   异步就是直接返回了,但是返回的不是最终结果。调用者不能通过这种调用得到结果,以后可以通过被调用者提供的某种方式(被调用着通知调用者、调用者反复查询、回调),来取回最终结果。
  阻塞与非阻塞的区别在于,调用者是否还能干其他事。   阻塞,调用者就只能干等;   非阻塞,调用者可以先去忙会别的,不用一直等。

4>.同步,异步,阻塞,非阻塞之间的联系

  同步阻塞,我啥事不干,就等你打饭打给我。打到饭是结果,而且我啥事不干一直等,同步加阻塞。

  同步非阻塞,我等着你打饭给我,饭没好,我不等,但是我无事可做,反复看饭好了没有。打饭是结果,但是我不一直等。
  异步阻塞,我要打饭,你说等叫号,并没有返回饭给我,我啥事不干,就干等着饭好了你叫我。例如,取了号什么不干就等叫自己的号。
  异步非阻塞,我要打饭,你给我号,你说等叫号,并没有返回饭给我,我去看电视、玩手机,饭打好了叫我。

5>.x86 CPU的工作级别

在386之前,CPU工作在实模式下,之后,开始支持保护模式,对内存进行了划分。 

我们知道计算机的运行就是运行指定的。指令还分特权指令级别和非特权指令级别。了解过计算机的朋友可能知道X86的CPU架构大概分成了四个层次,由内之外共有四个环,分别为Ring0、Ring1、Ring2、Ring3 
  Ring0级,可以执行特权指令,可以访问所有级别数据,可以访问IO设备等 
  Ring3级,级别最低,只能访问本级别数据 
  内核代码运行在Ring0,用户代码运行在Ring3
  Ring1和Ring2未使用,一般来讲,特权指令级别是指操作硬件,控制总线等等。

6>.用户态和内核态  

  现代操作系统采用虚拟存储器,理论上,对于32位系统来说,进程对虚拟内存地址的内存寻址空间为 4G(232)。64位操作系统理论上最大内存寻址空间(264)。

  操作系统中,内核程序独立且运行在较高的特权级别上,它们驻留在被保护的内存空间上,拥有访问硬 件设备的所有权限,这部分内存称为内核空间(内核态,最高地址1G)。

  普通应用程序运行在用户空间(用户态)。

  应用程序想访问某些硬件资源就需要通过操作系统提供的系统调用,系统调用可以使用特权指令运行在 内核空间,此时进程陷入内核态运行。系统调用完成,进程将回到用户态执行用户空间代码。

  操作系统运行时为了呢能够实现协调多任务,操作系统被分割成了2段,其中接近于硬件一段具有特权权限的叫做内核空间,而进程运行在用户空间当中。所以说,应用程序需要使用特权指令或是要访问硬件资源时需要系统调用。
  
  只要是被开发成应用程序的,不是作为操作系统本身的一部分而存在的,我们称之为用户空间的程序。他们运行状态称之为用户态。
  
  需要在内核(我们可以认为是操作系统)空间运行的程序,我们称之他们运行在内核空间,他们运行的状态为用户态,也叫核心态。注意:内核不负责完成具体工作。在内核空间可用执行任何特权操作。
  
  每一个程序要想真正运行起来,它最终是向内核发起系统调用来完成的,或者有一部分的程序不需要内核的参与,有我们的应用程序就能完成。我们打个比方,你要计算2的32次方的结果,是否需要运行在内核态呢?答案是否定的,我们知道内核是不负责完成具体工作的,我们只是想要计算一个运算结果,也不需要调用任何的特权模式,因此,如果你写了一些关于计算数值的代码,只需要把这个代码交给CPU运行就可以了。
  
  如果一个应用程序需要调用内核的功能而不是用户程序的功能的话,应用程序会发现自己需要做一个特权操作,而应用程序自身没有这个能力,应用程序会向内核发申请,让内核帮忙完成特权操作。内核发现应用程序是有权限使用特权指令的,内核会运行这些特权指令并把执行结果返回给应用程序,然后这个应用程序拿到特权指令的执行结果后,继续后续的代码。这就是模式转换。
  
  因此一个程序员想要让你的程序具有生产力,就应该尽量让你的代码运行在用户空间,如果你的代码大多数都运行在内核空间的话,估计你的应用程序并不会给你打来太大的生产力哟。因为我们知道内核空间不负责产生生产力。

  博主推荐阅读:
    https://www.cnblogs.com/yinzhengjie/p/6957726.html

二.IO模型

1>.IO两个阶段 

IO过程分两阶段: 
  1、数据准备阶段。从设备读取数据到内核空间的缓冲区(淘米,把米放饭锅里煮饭)
  2、内核空间复制回用户空间进程缓冲区阶段(盛饭,从内核这个饭锅里面把饭装到碗里来)
系统调用——read函数、recv函数等

2>.同步IO

  同步IO模型包括 阻塞IO、非阻塞IO、IO多路复用。
  
  阻塞IO如下图所示,进程等待(阻塞),直到读写完成。(全程等待)

  非阻塞IO如下图所示。

  进程调用recvfrom操作,如果IO设备没有准备好,立即返回ERROR,进程不阻塞。用户可以再次发起 系统调用(可以轮询),如果内核已经准备好,就阻塞,然后复制数据到用户空间。

  第一阶段数据没有准备好,可以先忙别的,等会再来看看。检查数据是否准备好了的过程是非阻塞的。

  第二阶段是阻塞的,即内核空间和用户空间之间复制数据是阻塞的。

  淘米、蒸饭我不阻塞等,反复来询问,一直没有拿到饭。盛饭过程我等着你装好饭,但是要等到盛好饭 才算完事,这是同步的,结果就是盛好饭。

  IO多路复用也称Event-driven IO,工作原理如下图所示。

  所谓IO多路复用,就是同时监控多个IO,有一个准备好了,就不需要等了开始处理,提高了同时处理IO的能力。
  select几乎所有操作系统平台都支持,poll是对的select的升级。

  epoll,Linux系统内核2.
5+开始支持,对select和poll的增强,在监视的基础上,增加回调机制。BSD、 Mac平台有kqueue,Windows有iocp。   以select为例,将关注的IO操作告诉select函数并调用,进程阻塞,内核“监视”select关注的文件描述符 fd,被关注的任何一个fd对应的IO准备好了数据,select返回。再使用read将数据复制到用户进程。
  select举例:
    食堂供应很多菜(众多的IO),你需要吃某三菜一汤,大师傅(操作系统)说要现做,需要等,你只好 等待大师傅叫。其中一样菜好了,大师傅叫你,说你点的菜有好的了,你得自己遍历找找看哪一样才好了,请服务员把做好的菜打给你。

  epoll是有菜准备好了,大师傅喊你去几号窗口直接打菜,不用自己找菜了。
  一般情况下,select最多能监听1024个fd(可以修改,但不建议改),但是由于select采用轮询的方 式,当管理的IO多了,每次都要遍历全部fd,效率低下。

  epoll没有管理的fd的上限,且是回调机制,不需遍历,效率很高。

3>.信号驱动IO

  进程在IO访问时,先通过sigaction系统调用,提交一个信号处理函数,立即返回。进程不阻塞。

  当内核准备好数据后,产生一个SIGIO信号并投递给信号处理函数。可以在此函数中调用recvfrom函数 操作数据从内核空间复制到用户空间,这段过程进程阻塞。

  工作原理如下图所示。
 

4>.异步IO

  同步IO,因为核心操作recv函数调用时,进程阻塞直到拿到最终结果为止。 而异步IO进程全程不阻塞。 

  进程发起异步IO请求,立即返回。内核完成IO的两个阶段,内核给进程发一个信号。 

  举例1:
    来打饭,跟大师傅说饭好了叫你,饭菜准备好了,窗口服务员把饭盛好了打电话叫你。两阶段都 是异步的。在整个过程中,进程都可以忙别的,等好了才过来。 

  举例2:
    今天不想出去到饭店吃饭了,点外卖,饭菜在饭店做好了(第一阶段),快递员从饭店送到你家 门口(第二阶段)。

  Linux的aio的系统调用,内核从版本2.6开始支持

  工作原理如下图所示。

三.Python中IO多路复用

1>.IO多路复用方案

  大多数操作系统都支持select和poll 

  Linux 2.5+ 支持epoll

  BSD、Mac支持kqueue

  Solaris实现了/dev/poll

  Windows的IOCP

2>.开发中的选择

开发中的选择
  1、完全跨平台,使用select、poll。但是性能较差 
  2、针对不同操作系统自行选择支持的技术,这样做会提高IO处理的性能 Python的select库实现了select、poll系统调用,这个基本上操作系统都支持。对Linux内核2.5+支持了epoll。 select维护一个文件描述符数据结构,单个进程使用有上限,通常是1024,线性扫描这个数据结构。效率低。
pool和select的区别是内部数据结构使用链表,没有这个最大限制,但是依然是线性遍历才知道哪个设备就绪了。
epoll使用事件通知机制,使用回调机制提高效率。 select
/poll还要从内核空间复制消息到用户空间,而epoll通过内核空间和用户空间共享一块内存来减少复制。

3>.selectors库 

  3.4版本提供selectors库,高级IO复用库。
    类层次结构
    BaseSelector
      +-- SelectSelector      实现select 
      +-- PollSelector      实现poll 
      +-- EpollSelector      实现epoll 
      +-- DevpollSelector    实现devpoll
      +-- KqueueSelector     实现kquue

  selectors.DefaultSelector返回当前平台最有效、性能最高的实现。 但是,由于没有实现Windows下的IOCP,所以,Windows下只能退化为select。

  在selects模块源码最下面有如下代码
    # Choose the best implementation, roughly:
    # epoll|kqueue|devpoll > poll > select.
    # select() also can't accept a FD > FD_SETSIZE (usually around 1024) 
    if 'KqueueSelector' in globals():
      DefaultSelector = KqueueSelector     elif 'EpollSelector' in globals():       DefaultSelector = EpollSelector     elif 'DevpollSelector' in globals():       DefaultSelector = DevpollSelector     elif 'PollSelector' in globals():       DefaultSelector = PollSelector     else:      DefaultSelector = SelectSelector   事件注册     class SelectSelector(_BaseSelectorImpl):        """Select-based selector."""        def register(fileobj, events, data=None) -> SelectorKey: pass   为selector注册一个文件对象,监视它的IO事件。返回SelectKey对象。   fileobj 被监视文件对象,例如socket对象   events 事件,该文件对象必须等待的事件   data 可选的与此文件对象相关联的不透明数据,例如,关联用来存储每个客户端的会话ID,关联 方法。通过这个参数在关注的事件产生后让selector干什么事。   EVENT_READ     可读 0b01,内核已经准备好输入设备,可以开始读了   EVENT_WRITE     可写 0b10,内核准备好了,可以往里写了       selectors.SelectorKey 有4个属性:     1. fileobj 注册的文件对象     2. fd 文件描述符     3. events 等待上面的文件描述符的文件对象的事件
    4. data 注册时关联的数据 

4>.IO多路复用TCP Server

 1 #!/usr/bin/env python
 2 #_*_conding:utf-8_*_
 3 #@author :yinzhengjie
 4 #blog:http://www.cnblogs.com/yinzhengjie
 5 
 6 import selectors
 7 import threading
 8 import socket
 9 import logging
10 import time
11 
12 FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
13 logging.basicConfig(format=FORMAT, level=logging.INFO)
14 
15 # 构建本系统最优Selector
16 selector = selectors.DefaultSelector()
17 
18 sock = socket.socket() # TCP Server
19 sock.bind(('127.0.0.1', 9999))
20 sock.listen()
21 logging.info(sock)
22 
23 sock.setblocking(False) # 非阻塞
24 
25 # 回调函数,sock的读事件
26 # 形参自定义
27 def accept(sock:socket.socket, mask):
28     """mask:事件的掩码"""
29     conn, raddr = sock.accept()
30     conn.setblocking(False) # 非阻塞
31     logging.info('new client socket {} in accept.'.format(conn))
32     key = selector.register(conn, selectors.EVENT_READ, read)
33     logging.info(key)
34 
35 # 回调函数
36 def read(conn:socket.socket, mask):
37     data = conn.recv(1024)
38     msg = "Your msg = {} ~~~~".format(data.decode())
39     logging.info(msg)
40     conn.send(msg.encode())
41 
42 # 注册sock的被关注事件,返回SelectorKey对象
43 # key记录了fileobj, fileobj的fd, events, data
44 key = selector.register(sock, selectors.EVENT_READ, accept)
45 logging.info(key)
46 
47 
48 # 开始循环
49 while True:
50     # 监听注册的对象的事件,发生被关注事件则返回events
51     events = selector.select()
52     print(events) # [(key, mask)]
53     # 表示那个关注的对象的某事件发生了
54     for key, mask in events:
55         # key.data => accept; key.fileobj => sock
56         callback = key.data
57         callback(key.fileobj, mask)

5>.IO多路复用群聊软件

 1 #!/usr/bin/env python
 2 #_*_conding:utf-8_*_
 3 #@author :yinzhengjie
 4 #blog:http://www.cnblogs.com/yinzhengjie
 5 
 6 import selectors
 7 import threading
 8 import socket
 9 import logging
10 import time
11 
12 FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
13 logging.basicConfig(format=FORMAT, level=logging.INFO)
14 
15 class ChatServer:
16     def __init__(self, ip='127.0.0.1', port=9999):
17         self.sock = socket.socket()
18         self.addr = ip, port
19         self.event = threading.Event()
20 
21         # 构建本系统最优Selector
22         self.selector = selectors.DefaultSelector()
23 
24     def start(self):
25         self.sock.bind(self.addr)
26         self.sock.listen()
27         self.sock.setblocking(False)
28         # 注册sock的被关注事件,返回SelectorKey对象
29         # key记录了fileobj, fileobj的fd, events, data
30         self.selector.register(self.sock, selectors.EVENT_READ, self.accept)
31 
32         # 事件监听循环
33         threading.Thread(target=self.select, name='selelct', daemon=True).start()
34 
35     def select(self):
36         # 开始循环
37         while not self.event.is_set():
38             # 监听注册的对象的事件,发生被关注事件则返回events
39             events = self.selector.select()
40             print(events)  # [(key, mask)]
41             # 表示那个关注的对象的某事件发生了
42             for key, mask in events:
43                 # key.data => accept; key.fileobj => sock
44                 callback = key.data
45                 callback(key.fileobj,mask)
46 
47     # 回调函数,sock的读事件
48     # 形参自定义
49     def accept(self, sock: socket.socket, mask):
50         """mask:事件的掩码"""
51         conn, raddr = sock.accept()
52         conn.setblocking(False)  # 非阻塞
53         logging.info('new client socket {} in accept.'.format(conn))
54         key = self.selector.register(conn, selectors.EVENT_READ, self.recv)
55         logging.info(key)
56 
57     # 回调函数
58     def recv(self, conn: socket.socket, mask):
59         data = conn.recv(1024)
60         data = data.strip()
61         if data == b'quit' or data == b'':
62             self.selector.unregister(conn)
63             conn.close()
64             return
65         msg = "Your msg = {} ~~~~".format(data.decode()).encode()
66         logging.info(msg)
67 
68         for key in self.selector.get_map().values():
69             print(self.recv)  # 当前绑定的
70             print(key.data)  # 注册时注入的绑定的对象
71             print(self.recv is key.data)  # 是否一致!!!
72             print(self.recv == key.data)  # 是否一致?
73             if key.data == self.recv:
74                 key.fileobj.send(msg)
75 
76     def stop(self):  # 关闭关注的文件对象,关闭selector
77         self.event.set()
78         fobjs = []
79         for fd, key in self.selector.get_map().items():
80             fobjs.append(key.fileobj)
81 
82         for fobj in fobjs:
83              self.selector.unregister(fobj)
84              fobj.close()
85 
86         self.selector.close()
87 
88 if __name__ == '__main__':
89     cs = ChatServer()
90     cs.start()
91     while True:
92         cmd = input('>>')
93         if cmd.strip() == 'quit':
94             logging.info('quit')
95             cs.stop()
96             break
97         print(threading.enumerate())

四.总结

使用IO多路复用 +(select、epoll) 并不一定比多线程 + 同步阻塞IO性能好,其最大优势减少了大量线程,可以处理更多的连接。

多线程
+ 同步阻塞IO模式   开辟太多线程,线程开辟、销毁开销还是较大,倒是可以使用线程池;线程多,线程自己使用的内存也很可观;多线程切换时要保护现场和恢复现场,线程过多,切换会占用大量的时间。
连接较少,多线程
+ 同步阻塞IO模式比较适合,效率也不低。
如果连接非常多,对服务端程序来说,IO并发还是比较高的,这时候,开辟太多线程其实也不是很划算,这时候IO多路复用或许是更好的选择。
原文地址:https://www.cnblogs.com/yinzhengjie/p/11980602.html