并发编程之IO模型

IO模型介绍

同步(synchronous) IO和异步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分别是什么,到底有什么区别?这个问题其实不同的人给出的答案都可能不同,比如wiki,就认为asynchronous IO和non-blocking IO是一个东西。这其实是因为不同的人的知识背景不同,并且在讨论这个问题的时候上下文(context)也不相同。所以,为了更好的回答这个问题,我先限定一下本文的上下文。

    本文讨论的背景是Linux环境下的network IO。本文最重要的参考文献是Richard Stevens的“UNIX® Network Programming Volume 1, Third Edition: The Sockets Networking ”,6.2节“I/O Models ”,Stevens在这节中详细说明了各种IO的特点和区别,如果英文够好的话,推荐直接阅读。Stevens的文风是有名的深入浅出,所以不用担心看不懂。本文中的流程图也是截取自参考文献。

    Stevens在文章中一共比较了五种IO Model:
    * blocking IO
    * nonblocking IO
    * IO multiplexing
    * signal driven IO
    * asynchronous IO
    由signal driven IO(信号驱动IO)在实际中并不常用,所以主要介绍其余四种IO Model。

    再说一下IO发生时涉及的对象和步骤。对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,该操作会经历两个阶段:

1)等待数据准备 (Waiting for the data to be ready)
2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)

记住这两点很重要,因为这些IO模型的区别就是在两个阶段上各有不同的情况。

1、输入操作:read、readv、recv、recvfrom、recvmsg共5个函数,如果会阻塞状态,则会经理wait data和copy data两个阶段,如果设置为非阻塞则在wait 不到data时抛出异常

2、输出操作:write、writev、send、sendto、sendmsg共5个函数,在发送缓冲区满了会阻塞在原地,如果设置为非阻塞,则会抛出异常

3、接收外来链接:accept,与输入操作类似

4、发起外出链接:connect,与输出操作类似

  

阻塞IO(blocking IO)

 在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:

    当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。

    而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。
    所以,blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。

    几乎所有的程序员第一次接触到的网络编程都是从listen()、send()、recv() 等接口开始的,使用这些接口可以很方便的构建服务器/客户机的模型。然而大部分的socket接口都是阻塞型的。如下图

    ps:所谓阻塞型接口是指系统调用(一般是IO接口)不返回调用结果并让当前线程一直阻塞,只有当该系统调用获得结果或者超时出错时才返回。

    实际上,除非特别指定,几乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。这给网络编程带来了一个很大的问题,如在调用recv(1024)的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。

from socket import *
server = socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)
print('start runnig...')
while True:
    conn,addr = server.accept()  #IO操作 在这accept的时候不能干recv的活
    print(addr)
    while True:
        try:
            data = conn.recv(1024)  #IO操作
            conn.send(data.upper())
        except Exception:
            break
    conn.close()
server.close()

# 我们以前写的这个就是阻塞的IO模型:一旦阻塞了就在那卡着
# 直到等到数据已经到了操作系统,操作系统再从内核拷贝给应用程序
# 阻塞IO在那两个阶段全都阻塞住了
服务端
from socket import *
client  = socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
    cmd = input('>>:').strip()
    if not cmd:continue
    client.send(cmd.encode('utf-8'))
    data = client.recv(1024)
    print('接受的是:%s'%data.decode('utf-8'))
client.close()
客户端

    一个简单的解决方案:

在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。

    该方案的问题是:

开启多进程或都线程的方式,在遇到要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态。

    改进方案:    

很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,如websphere、tomcat和各种数据库等。

    改进后方案其实也存在着问题:

“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。

    对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题。

非阻塞IO(non-blocking IO)

 Linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:

    从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是用户就可以在本次到下次再发起read询问的时间间隔内做其他事情,或者直接再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的),然后返回。

也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。

所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。

'''
server.setblocking()   默认值为True
server.setbiocking(False)  False的话就成了非阻塞了,这只是对于socket套接字来说的
所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问内核数据准备好了没有

wait data 等数据的这个阶段是不阻塞的
copy data 这个阶段还是要阻塞的
'''
# 服务端
import socket
import time

server=socket.socket()
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind(('127.0.0.1',8083))
server.listen(5)

server.setblocking(False)
r_list=[]
w_list={}

while 1:
    try:
        conn,addr=server.accept()
        r_list.append(conn)
    except BlockingIOError:
        # 强调强调强调:!!!非阻塞IO的精髓在于完全没有阻塞!!!
        # time.sleep(0.5) # 打开该行注释纯属为了方便查看效果
        print('在做其他的事情')
        print('rlist: ',len(r_list))
        print('wlist: ',len(w_list))


        # 遍历读列表,依次取出套接字读取内容
        del_rlist=[]
        for conn in r_list:
            try:
                data=conn.recv(1024)
                if not data:
                    conn.close()
                    del_rlist.append(conn)
                    continue
                w_list[conn]=data.upper()
            except BlockingIOError: # 没有收成功,则继续检索下一个套接字的接收
                continue
            except ConnectionResetError: # 当前套接字出异常,则关闭,然后加入删除列表,等待被清除
                conn.close()
                del_rlist.append(conn)


        # 遍历写列表,依次取出套接字发送内容
        del_wlist=[]
        for conn,data in w_list.items():
            try:
                conn.send(data)
                del_wlist.append(conn)
            except BlockingIOError:
                continue


        # 清理无用的套接字,无需再监听它们的IO操作
        for conn in del_rlist:
            r_list.remove(conn)

        for conn in del_wlist:
            w_list.pop(conn)
服务端1
#客户端
import socket
import os

client=socket.socket()
client.connect(('127.0.0.1',8083))

while 1:
    res=('%s hello' %os.getpid()).encode('utf-8')
    client.send(res)
    data=client.recv(1024)

    print(data.decode('utf-8'))
客户端1
from socket import *
import time
server=socket(AF_INET,SOCK_STREAM)
server.bind(('127.0.0.1',8080))
server.listen(5)

server.setblocking(False)  #默认值是True (如果是False,套接字里面的一些阻塞操作都变成非阻塞的)
print('start....')

conn_l=[]    #这种读写没有分开检测,如果服务端操作系统缓存满了,就会报错,还是分成两个列表更加完善
del_l=[]
while True:
    try:
        # print(conn_l)
        conn,addr=server.accept()   #收不到数据的时候才出异常
        print(conn)
        conn_l.append(conn)
    except BlockingIOError:  #把收不到数据的那段时间利用起来(利用他收不到数据的时候,才干下面的for循环)
        for conn in conn_l:
            try:
                data=conn.recv(1024)
                conn.send(data.upper())
            except BlockingIOError:
                pass
            except ConnectionResetError:  #断开连接的错误(如果突然断开连接,会报错,就先添加到列表里面去,完了把连接清除)
                del_l.append(conn)
        for obj in del_l:
            obj.close()
            conn_l.remove(obj)
        del_l.clear()
服务端2
from socket import *
client = socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
    cmd = input('>>:').strip()
    if not cmd:continue
    client.send(cmd.encode('utf-8'))
    data = client.recv(1024)
    print(data.decode('utf-8'))
客户端2
from concurrent.futures import ThreadPoolExecutor
import socket

server = socket.socket()
# 重用端口
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)

server.bind(("192.168.11.210",9999))

server.listen(5)

# 设置是否为阻塞 默认阻塞
server.setblocking(False)

def data_handler(conn):
    print("一个新连接..")
    while True:
        data = conn.recv(1024)
        conn.send(data.upper())
# 已连接的客户端
clients = []
# 需要发送的数据
send_datas = []
# 已经发送完的 需要删除的数据
del_datas = []
# 待关闭的客户端
closed_cs = []
import time
while True:
    try:
        conn,addr = server.accept()
        # 切到处理数据的任务去执行
        # 代码走到这里才算是连接成功
        # 把连接成功的客户端存起来
        clients.append(conn)
    except BlockingIOError:
        # print("没有可以处理的连接 就干别的活儿")
        #要处理的是已经连接成功的客户端
        # 接收数据
        for c in clients:
            try:
                data = c.recv(1024)
                if not data:
                    # 对方关闭了连接
                    c.close()
                    # 从客户端列表中删除它
                    closed_cs.append(c)
                    continue
                print("收到%s" % data.decode("utf-8"))
                # 现在非阻塞 send直接往缓存赛 如果缓存满了 肯定有错误  需要单独处理发送
                # c.send(data.upper())
                send_datas.append((c,data))
            except BlockingIOError:
                pass
            except ConnectionResetError:
                # 对方关闭了连接
                c.close()
                # 从客户端列表中删除它
                closed_cs.append(c)
        # 处理发送数据
        #for data in send_datas[:]:  复制了一份原有列表,但这种方法没有在迭代期间修改被迭代对象,也可以实现删除操作
        for data in send_datas:
            try:
                data[0].send(data[1].upper())
                # 发送成功需要删除 不能直接删除
                # send_datas.remove(data)
                del_datas.append(data)
            except BlockingIOError:
                continue
            except ConnectionResetError:
                # 客户端连接需要删除
                data[0].close()
                closed_cs.append(data[0])
                # 等待发送的数据需要删除
                del_datas.append(data)
        # 删除无用的数据
        for d in del_datas:
            #从待发送的列表中删除
            send_datas.remove(d)
        del_datas.clear()
        for c in closed_cs:
            clients.remove(c)
        closed_cs.clear()
虎老师的服务端,里面有亮点操作

但是非阻塞IO模型绝不被推荐。

我们不能否则其优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在“”同时“”执行)。

但是也难掩其缺点:

1. 循环调用recv()将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况
2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。

此外,在这个方案中recv()更多的是起到检测“操作是否完成”的作用,实际操作系统提供了更为高效的检测“操作是否完成“作用的接口,例如select()多路复用模式,可以一次检测多个连接是否活跃。

多路复用IO(IO multiplexing)

Python中有一个select模块,其中提供了:select、poll、epoll三个方法,分别调用系统的 select,poll,epoll从而实现IO多路复用。
    Windows Python:提供: select
    Mac Python:提供: select
    Linux Python:提供: select、poll、epoll

select方法

select 的中文含义是”选择“,select机制也如其名,监听一些 server 关心的套接字、文件等对象,关注他们是否可读、可写、发生异常等事件。一旦出现某个 select 关注的事件,select 会对相应的套接字或文件进行特定的处理,这就是 select 机制最主要的功能。  select 机制可以只使用一个进程/线程来处理多个socket或其他对象,因此又被称为I/O复用。  关于select机制的进程阻塞形式,与普通的套接字略有不同。socket对象可能阻塞在accept(),recvfrom()等方法上,以recvfrom()方法为例,当执行到socket.recvfrom()这一句时,就会调用一个系统调用询问内核:client/server发来的数据包准备好了没?此时从进程空间切換到内核地址空间,内核可能需要等数据包完全到达,然后将数据复制到程序的地址空间后,recvfrom()才会返回,接下来进程继续执行,对读取到的数据进行必要的处理。  而使用select函数编程时,同样针对上面的recvfrom()方法,进程会阻塞在select()调用上,等待出现一个或多个套接字对象满足可读事件,当内核将数据准备好后,select()返回某个套接字对象可读这一条件,随后再调用recvfrom()将数据包从内核复制到进程地址空间。  所以可见,如果仅仅从单个套接字的处理来看,select()反倒性能更低,因为select机制使用两个系统调用。但select机制的优势就在于它可以同时等待多个fd就绪,而当某个fd发生满足我们关心的事件时,就对它执行特定的操作。

select语法

select(rlist, wlist, xlist, timeout=None)

Wait until one or more file descriptors are ready for some kind of I/O.
The first three arguments are sequences of file descriptors to be waited for

rlist -- wait until ready for reading
wlist -- wait until ready for writing
xlist -- wait for an  ''exceptional condition''

If only one kind of condition is required, pass [] for the other lists.
A file descriptor is either a socket or file object, or a small integer
gotten from a fileno() method call on one of those

The return value is a tuple of three lists corresponding to the first threearguments; each contains the subset of the corresponding file descriptorsthat are ready
select官方解释

select方法用来监视文件句柄,如果句柄发生变化,则获取该句柄。
1、当 参数1 序列中的句柄发生可读时(accetp和read),则获取发生变化的句柄并添加到 返回值1 序列中
2、当 参数2 序列中含有句柄时,则将该序列中所有的句柄添加到 返回值2 序列中
3、当 参数3 序列中的句柄发生错误时,则将该发生错误的句柄添加到 返回值3 序列中
4、当 超时时间未设置,则select会一直阻塞,直到监听的句柄发生变化
5、当 超时时间=1时,那么如果监听的句柄均无任何变化,则select会阻塞1秒,之后返回三个空列表,如果监听的句柄有变化,则直接执行。
由于select()接口可以同时对多个句柄进行读状态、写状态和错误状态的探测,所以可以很容易构建为多个客户端提供独立问答服务的服务器系统。这里需要指出的是,客户端的一个connect()操作,将在服务器端激发一个“可读事件”,所以 select() 也能探测来自客户端的connect()行为。
上述模型中,最关键的地方是如何动态维护select()的三个参数。程序员需要检查对应的返回值列表,以确定到底哪些句柄发生了事件。所以如果select()发现某句柄捕捉到了“可读事件”,服务器程序应及时做recv()操作,并根据接收到的数据准备好待发送数据,并将对应的句柄值加入句柄序列1,准备下一次的“可写事件”的select()探测。同样,如果select()发现某句柄捕捉到“可写事件”,则程序应及时做send()操作,并准备好下一次的“可读事件”探测准备。

IO multiplexing这个词可能有点陌生,但是如果我说select/epoll,大概就都能明白了。有些地方也称这种IO方式为事件驱动IO(event driven IO)。我们都知道,select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:

    当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
    这个图和blocking IO的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用(select和recvfrom),而blocking IO只调用了一个系统调用(recvfrom)。但是,用select的优势在于它可以同时处理多个connection。

    强调:

    1. 如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。

    2. 在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

结论: select的优势在于可以处理多个连接,不适用于单个连接  

#服务端
from socket import *
import select
server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8093))
server.listen(5)
server.setblocking(False)
print('starting...')

rlist=[server,]
wlist=[]
wdata={}

while True:
    rl,wl,xl=select.select(rlist,wlist,[],0.5)
    print(wl)
    for sock in rl:
        if sock == server:
            conn,addr=sock.accept()
            rlist.append(conn)
        else:
            try:
                data=sock.recv(1024)
                if not data:
                    sock.close()
                    rlist.remove(sock)
                    continue
                wlist.append(sock)
                wdata[sock]=data.upper()
            except Exception:
                sock.close()
                rlist.remove(sock)

    for sock in wl:
        sock.send(wdata[sock])
        wlist.remove(sock)
        wdata.pop(sock)

#客户端
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8093))


while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    data=client.recv(1024)
    print(data.decode('utf-8'))

client.close()
View Code
from concurrent.futures import ThreadPoolExecutor
import socket
import select
# select 帮你从一堆连接中找出来需要被处理的连接

server = socket.socket()
# 重用端口
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)

server.bind(("192.168.11.210",9999))

server.listen(5)

# 设置是否为阻塞 默认阻塞
server.setblocking(False)

def data_handler(conn):
    print("一个新连接..")
    while True:
        data = conn.recv(1024)
        conn.send(data.upper())

# 需要检测的 是否可读取的列表  (recv就是一个读取操作)
rlist = [server,]
# 需要检测的 是否写入的列表  (send就是写入操作)
wlist = []

# 需要发送的数据 目前是因为 我们要把接收的数据在发回去 所以搞了这个东西 正常没有这种需求
# 目前客户端与服务器端 交互 是必须客户端发送数据 服务器端才能返回数据   正常没有这种需求
dic = {}


while True: # 用于检测需要处理的连接 需要不断检测 所以循环
    # rl目前可读的客户端列表  wl目前可写的客户端列表
    rl,wl,xl = select.select(rlist,wlist,[]) # select默认阻塞 阻塞到任意一个连接可以被处理
    print(len(rl))
    # 处理可读的socket
    for c in rl:
        # 无论是客户端还是服务器只要可读就会执行到这里
        if c == server:
            # 接收客户端的连接请求 (一个读操作)
            conn,addr = c.accept()
            # 将新连接也交给select来检测
            rlist.append(conn)
        else:# 不是服务器 就是客户端 客户端可读 可以执行recv
            try:
                data = c.recv(1024)
                if not data:
                    c.close()
                    rlist.remove(c)
                print("%s 发送 %s" % (c,data.decode("utf-8")))
                # 给客户端发送数据 前要保证目前可以发送 将客户端加入检测列表
                wlist.append(c)  # 正常开发中 不可能必须客户端发送数据过来后 才能 给客户端发送
                                 # 所以这个添加到检测列表的操作 应该建立连接后立即执行
                # 要发送的数据
                dic[c] = data
            except ConnectionResetError:
                # 客户端关闭连接
                c.close()
                rlist.remove(c)
    # 处理可写的socket
    for c in wl:
        print(c)
        try:
            c.send(dic[c].upper())
            # 删除数据
            dic.pop(c)
            # 从检测列表中删除已发送完成的客户端
            wlist.remove(c)
        except ConnectionResetError:
            c.close() # 关闭连接
            dic.pop(c) # 删除要发送的数据
            wlist.remove(c) # 从待检测的列表中删除
        except BlockingIOError:#可能缓存满了 发不了
            pass
View Code
import socket

c = socket.socket()

c.connect(("192.168.11.210",9999))

while True:
    msg = input(">>>:")
    if not msg:continue
    c.send(msg.encode("utf-8"))
    data = c.recv(1024)
    print(data.decode("utf-8"))
View Code

select监听fd变化的过程分析:

用户进程创建socket对象,拷贝监听的fd到内核空间,每一个fd会对应一张系统文件表,内核空间的fd响应到数据后,就会发送信号给用户进程数据已到;
用户进程再发送系统调用,比如(accept)将内核空间的数据copy到用户空间,同时作为接受数据端内核空间的数据清除,这样重新监听时fd再有新的数据又可以响应到了(发送端因为基于TCP协议所以需要收到应答后才会清除)。

该模型的优点:

相比其他模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。

该模型的缺点:

首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。很多操作系统提供了更为高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异,所以使用类似于epoll的接口实现具有较好跨平台能力的服务器会比较困难。
其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。

补充

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import select
import sys

while True:
    readable, writeable, error = select.select([sys.stdin,],[],[],1)
    '''select.select([sys.stdin,],[],[],1)用到I/O多路复用,第一个参数是列表,sys.stdin是系统标准输入的文件描述符, 就是打开标准输入终端返回的文件描述符,一旦终端有输入操作,select就感知sys.stdin描述符的变化,那么会将变化的描述符sys.stdin添加到返回值readable中;如果终端一直没有输入,那么readable他就是一个空列表
    '''
    if sys.stdin in readable:
        print 'select get stdin',sys.stdin.readline()
'''
注:
1、[sys.stdin,]以后不管是列表还是元组,在最后一个元素的后面建议增加一个逗号,(1,) | (1) 这两个有区别吗?是不是第二个更像方法的调用或者函数的调用,加个,是不是更容易分清楚。还有就是在以后写django的配置文件的时候,他是必须要加的。
2、select的第一个参数就是要监听的文件句柄,只要监听的文件句柄有变化,那么就会将其加入到返回值readable列表中。
3、select最后一个参数1是超时时间,当执行select时,如果监听的文件句柄没有变化,则会阻塞1秒,然后向下继续执行;默认timeout=None,就是会一直阻塞,直到感知到变化
'''
'''
when runing the program get error :
Traceback (most recent call last):
  File "E:/study/GitHub/homework/tianshuai/share_3_select_socket.py", line 8, in <module>
    readable, writeable, error = select.select([sys.stdin,],[],[],1)
select.error: (10093, 'Either the application has not called WSAStartup, or WSAStartup failed')

when windows only use select socket !!!!!
'''
实例1:利用select监听终端输入
#/usr/bin/env python
#-*- coding:utf-8 -*-

import time
import socket
import select

#生成socket对象
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
#绑定IP和端口
sk.bind(('127.0.0.1',6666))
#监听,并设置最大连接数为5
sk.listen(5)
#设置setblocking为False,即非阻塞模式,accept将不在阻塞,如果没有收到请求就会报错
sk.setblocking(False) 

while True:
    rlist, wlist, elist = select.select([sk,],[],[],2)  #监听第一个列表的文件描述符,如果其中有文件描述符发生改变,则捕获并放到rlist中
    for r in rlist:   #如果rlist非空将执行,否则不执行
        conn,addr = r.accept()   #建立连接,生成客户端的socket对象以及IP地址和端口号
        print addr
实例2:利用select监听浏览器访问
#/usr/bin/env python
#-*- coding:utf-8 -*-

import time
import socket
import select

#生成socket对象
sk1 = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
#绑定IP和端口
sk1.bind(('127.0.0.1',6666))
#监听并设置最大连接数
sk1.listen(5)
sk1.setblocking(False) #设置setblocking为False,accept将不在阻塞,如果没有收到请求就会报错,即非阻塞模式

sk2 = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk2.bind(('127.0.0.1',7777))
sk2.listen(5)
sk2.setblocking(False) #设置非阻塞模式

while True:
    rlist, wlist, elist = select.select([sk1,sk2,],[],[],2)  #监听第一个列表的文件描述符,如果其中有文件描述符发生改变,则捕获并放到rlist中
    for r in readable_list:  
        conn,address = r.accept()
        print address
实例3:利用select监听多端口

执行程序,在浏览器中分别输入地址:127.0.0.1:6666  127.0.0.1:7777

执行结果如下:

('127.0.0.1', 54509)
('127.0.0.1', 53458)
#/usr/bin/env python
#-*- coding:utf-8 -*-

import time
import socket
import select

sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sk.bind(('127.0.0.1',6666))
sk.listen(5)
sk.setblocking(False) #非阻塞
inputs = [sk,] #构造select第一个参数
#原因:看上例conn是客户端对象,客户是一直连接着呢,连接的时候状态变了,连接上之后,还是服务端的socket有关吗?是不是的把他改为动态的?

while True:
    rlist, wlist, elist = select.select(inputs,[],[],1)  #把第一个参数设为列表动态的添加
    time.sleep(2) #测试使用
    print "inputs list :",inputs     #打印inputs列表,查看执行变化
    print "file descriptor :",readable_list #打印rlist ,查看执行变化

    for r in rlist:
        #当客户端第1次连接服务端时
        if r == sk:  
            conn,address = r.accept()
            inputs.append(conn)
            print address
        else:
        # 当客户端连接上服务端之后,再次发送数据时
            client_data = r.recv(1024)
            r.sendall(client_data)
实例4:利用select实现伪同时处理多个Socket客户端请求—服务端
#!/usr/bin/env python
#-*- coding:utf-8 -*-

import socket

client = socket.socket()
client.connect(('127.0.0.1',6666))
client.settimeout(5)

while True:
    client_input = raw_input('please input message:').strip()
    client.sendall(client_input)
    server_data = client.recv(1024)
    print server_data
实例4:利用select实现伪同时处理多个Socket客户端请求—客户端
'''
第1点:
#定义1个包含服务端文件句柄sk的inputs列表,服务端利用select监听到客户端的请求时(目前对于服务端来说,就是accept和recv请求;对于客户端来说就是connect、send、sendall请求),

第2点:
#最开始服务端select会监听inputs=[sk,],当有新客户端connect请求时,select就会监听到服务端文件句柄sk发生了改变,此时select会把sk赋值给第1个返回值rlist,即rlist=[sk,],

#执行for循环,当r==sk时,即表示是服务端文件句柄sk发生了变化,此时服务端就会执行accpet,同时返回客户端文件句柄conn以及其IP地址和端口port,再把conn加入到inputs列表中,
#便于后面服务端对该客户端的其它请求的监听;当r!=sk时,说明是监听到某个客户端文件句柄发生了变化,即是之前监听到的客户端有新的请求(发送了数据),此时服务端就会recv该数据
#如果无任何请求,那么select就不会监听到任何变化,即rlist = []

#程序执行的打印结果:

#1 有1个客户端连接,此后无操作
inputs list : [<socket._socketobject object at 0x0000000002C66798>]      #初始化时inputs包含了sk对象
file descriptor : [<socket._socketobject object at 0x0000000002C66798>]  #当客户端有conncet请求时,sk就会发生变化,此时rlist=[sk,]
('127.0.0.1', 62495)   #客户端的IP地址和端口号
inputs list : [<socket._socketobject object at 0x0000000002C66798>, <socket._socketobject object at 0x0000000002C66800>]  #第2次循环时,inputs = [sk,conn1,]
file descriptor : [] #第2次循环时rlist = [],因为客户端无任何操作,此时select没有监听到inputs中的文件句柄有任何变化,所以rlist为空

#2 有1个新客户端连接,即有客户端向服务端发起了一个connect请求,此时sk发生了变化,那么现在inputs = [sk,conn1,conn2]  rlist = [sk]
#本次循环完成之后再循环的时候inputs = [sk,conn1,conn2,] rlist = [],因为我们没有继续做操作

#第1个连接请求
inputs list : [<socket._socketobject object at 0x0000000002C56798>]  #默认只有sk
file descriptor : []
inputs list : [<socket._socketobject object at 0x0000000002C56798>]  
file descriptor : [<socket._socketobject object at 0x0000000002C56798>] #监听到sk发生了变化,表示有新客户端请求
('127.0.0.1', 62539)
inputs list : [<socket._socketobject object at 0x0000000002C56798>, <socket._socketobject object at 0x0000000002C56800>]  #inputs列表变更为了[sk,conn1]
file descriptor : []  #后续无操作,所以rlist=[]

#第二个链接
inputs list : [<socket._socketobject object at 0x0000000002C56798>, <socket._socketobject object at 0x0000000002C56800>] 
file descriptor : [<socket._socketobject object at 0x0000000002C56798>] #监听到inputs列表中的文件句柄发生了改变
('127.0.0.1', 62548)
#加入到inputs列表中
inputs list : [<socket._socketobject object at 0x0000000002C56798>, <socket._socketobject object at 0x0000000002C56800>, <socket._socketobject object at 0x0000000002C56868>] 
file descriptor : []
inputs list : [<socket._socketobject object at 0x0000000002C56798>, <socket._socketobject object at 0x0000000002C56800>, <socket._socketobject object at 0x0000000002C56868>]
file descriptor : []
inputs list : [<socket._socketobject object at 0x0000000002C56798>, <socket._socketobject object at 0x0000000002C56800>, <socket._socketobject object at 0x0000000002C56868>]
file descriptor : []
'''
实例4执行过程分析
#优化点:当客户端断开连接时,从inputs列表中删除其文件描述符

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket
import select

sk1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sk1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sk1.bind(('127.0.0.1',8002))
sk1.listen(5)
sk1.setblocking(0)

inputs = [sk1,]

while True:
    readable_list, writeable_list, error_list = select.select(inputs, [], inputs, 1)
    for r in readable_list:
        # 当客户端第一次连接服务端时
        if sk1 == r:
            print 'accept'
            request, address = r.accept()
            request.setblocking(0)
            inputs.append(request)
        # 当客户端连接上服务端之后,再次发送数据时
        else:
            received = r.recv(1024)
            # 当正常接收客户端发送的数据时
            if received:
                print 'received data:', received
            # 当客户端关闭程序时
            else:
                inputs.remove(r)

sk1.close()
实例5:实例4服务端的优化

此处的Socket服务端相比与原生的Socket,他支持当某一个请求不再发送数据时,服务器端不会等待而是可以去处理其他请求的数据。但是,如果每个请求的耗时比较长时,select版本的服务器端也无法完成同时操作。

select参数注解

rlist, wlist, elist = select.select(inputs,[],[],1)
1、第一个参数,监听的句柄序列,当有任一句柄变化时,select就能捕获到并返回赋值给rlist;
2、如果第二参数有值,即只要不是空列表,select就能感知,wlist就能获取第二个参数的值;
3、对于第三个参数,在select内部会检测列表中的描述符在底层执行过程中是否发生异常,如果发生异常,则把发生异常的句柄赋值给elist,
   一般第三个参数和第一个参数相同
4、第四个参数是设置阻塞时间,如1秒(这个如果不写,select会阻塞住,直到监听的描述符发生变化才继续往下执行)
View Code

 对于I/O多路复用,咱们上面的例子就可以了,但是为了遵循select规范需要把读和写进行分离:

#rlist -- wait until ready for reading  #等待直到有读的操作
#wlist -- wait until ready for writing  #等待直到有写的操作
#xlist -- wait for an ``exceptional condition'' #等待一个错误的情况
View Code

为了实现读写分离,需要构造一个字典,字典里为每一客户端维护一个队列。收到的信息放到队列里,然后写的时候直接从队列取数据

异步IO(Asynchronous I/O)

 asyncio模块

async内部使用协程,只有协程能实现

Linux下的asynchronous IO其实用得不多,从内核2.6版本才开始引入。先看一下它的流程:

用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

IO模型比较分析

到目前为止,已经将四个IO Model都介绍完了。现在回过头来回答最初的那几个问题:blocking和non-blocking的区别在哪,synchronous IO和asynchronous IO的区别在哪。
    先回答最简单的这个:blocking vs non-blocking。前面的介绍中其实已经很明确的说明了这两者的区别。调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还准备数据的情况下会立刻返回。

    再说明synchronous IO和asynchronous IO的区别之前,需要先给出两者的定义。Stevens给出的定义(其实是POSIX的定义)是这样子的:
    A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
    An asynchronous I/O operation does not cause the requesting process to be blocked; 
    两者的区别就在于synchronous IO做”IO operation”的时候会将process阻塞。按照这个定义,四个IO模型可以分为两大类,之前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO这一类,而 asynchronous I/O后一类 。

    有人可能会说,non-blocking IO并没有被block啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操作,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,如果kernel的数据没有准备好,这时候不会block进程。但是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。而asynchronous IO则不一样,当进程发起IO 操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被block。

    各个IO Model的比较如图所示:

    经过上面的介绍,会发现non-blocking IO和asynchronous IO的区别还是很明显的。在non-blocking IO中,虽然进程大部分时间都不会被block,但是它仍然要求进程去主动的check,并且当数据准备完成以后,也需要进程主动的再次调用recvfrom来将数据拷贝到用户内存。而asynchronous IO则完全不同。它就像是用户进程将整个IO操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查IO操作的状态,也不需要主动的去拷贝数据。

selectors模块

IO复用:为了解释这个名词,首先来理解下复用这个概念,复用也就是共用的意思,这样理解还是有些抽象,为此,咱们来理解下复用在通信领域的使用,在通信领域中为了充分利用网络连接的物理介质,往往在同一条网络链路上采用时分复用或频分复用的技术使其在同一链路上传输多路信号,到这里我们就基本上理解了复用的含义,即公用某个“介质”来尽可能多的做同一类(性质)的事,那IO复用的“介质”是什么呢?为此我们首先来看看服务器编程的模型,客户端发来的请求服务端会产生一个进程来对其进行服务,每当来一个客户请求就产生一个进程来服务,然而进程不可能无限制的产生,因此为了解决大量客户端访问的问题,引入了IO复用技术,即:一个进程可以同时对多个客户请求进行服务。也就是说IO复用的“介质”是进程(准确的说复用的是select和poll,因为进程也是靠调用select和poll来实现的),复用一个进程(select和poll)来对多个IO进行服务,虽然客户端发来的IO是并发的但是IO所需的读写数据多数情况下是没有准备好的,因此就可以利用一个函数(select和poll)来监听IO所需的这些数据的状态,一旦IO有数据可以进行读写了,进程就来对这样的IO进行服务。

  

理解完IO复用后,我们在来看下实现IO复用中的三个API(select、poll和epoll)的区别和联系

select,poll,epoll都是IO多路复用的机制,I/O多路复用就是通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知应用程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。三者的原型如下所示:

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);



 1.select的第一个参数nfds为fdset集合中最大描述符值加1,fdset是一个位数组,其大小限制为__FD_SETSIZE(1024),位数组的每一位代表其对应的描述符是否需要被检查。第二三四参数表示需要关注读、写、错误事件的文件描述符位数组,这些参数既是输入参数也是输出参数,可能会被内核修改用于标示哪些描述符上发生了关注的事件,所以每次调用select前都需要重新初始化fdset。timeout参数为超时时间,该结构会被内核修改,其值为超时剩余的时间。

 select的调用步骤如下:

(1)使用copy_from_user从用户空间拷贝fdset到内核空间

(2)注册回调函数__pollwait

(3)遍历所有fd,调用其对应的poll方法(对于socket,这个poll方法是sock_poll,sock_poll根据情况会调用到tcp_poll,udp_poll或者datagram_poll)

(4)以tcp_poll为例,其核心实现就是__pollwait,也就是上面注册的回调函数。

(5)__pollwait的主要工作就是把current(当前进程)挂到设备的等待队列中,不同的设备有不同的等待队列,对于tcp_poll 来说,其等待队列是sk->sk_sleep(注意把进程挂到等待队列中并不代表进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数 据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时current便被唤醒了。

(6)poll方法返回时会返回一个描述读写操作是否就绪的mask掩码,根据这个mask掩码给fd_set赋值。

(7)如果遍历完所有的fd,还没有返回一个可读写的mask掩码,则会调用schedule_timeout是调用select的进程(也就是 current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。如果超过一定的超时时间(schedule_timeout 指定),还是没人唤醒,则调用select的进程会重新被唤醒获得CPU,进而重新遍历fd,判断有没有就绪的fd。

(8)把fd_set从内核空间拷贝到用户空间。

总结下select的几大缺点:

(1)每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大

(2)同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大

(3)select支持的文件描述符数量太小了,默认是1024

 

2.  poll与select不同,通过一个pollfd数组向内核传递需要关注的事件,故没有描述符个数的限制,pollfd中的events字段和revents分别用于标示关注的事件和发生的事件,故pollfd数组只需要被初始化一次。

 poll的实现机制与select类似,其对应内核中的sys_poll,只不过poll向内核传递pollfd数组,然后对pollfd中的每个描述符进行poll,相比处理fdset来说,poll效率更高。poll返回后,需要对pollfd中的每个元素检查其revents值,来得指事件是否发生。

 

3.直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

 

epoll既然是对select和poll的改进,就应该能避免上述的三个缺点。那epoll都是怎么解决的呢?在此之前,我们先看一下epoll 和select和poll的调用接口上的不同,select和poll都只提供了一个函数——select或者poll函数。而epoll提供了三个函 数,epoll_create,epoll_ctl和epoll_wait,epoll_create是创建一个epoll句柄;epoll_ctl是注 册要监听的事件类型;epoll_wait则是等待事件的产生。

  对于第一个缺点,epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时(在epoll_ctl中指定 EPOLL_CTL_ADD),会把所有的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝 一次。

  对于第二个缺点,epoll的解决方案不像select或poll一样每次都把current轮流加入fd对应的设备等待队列中,而只在 epoll_ctl时把current挂一遍(这一遍必不可少)并为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调 函数,而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd(利用 schedule_timeout()实现睡一会,判断一会的效果,和select实现中的第7步是类似的)。

  对于第三个缺点,epoll没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子, 在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。

总结:

(1)select,poll实现需要自己不断轮询所有fd集合,直到设备就绪,期间可能要睡眠和唤醒多次交替。而epoll其实也需要调用 epoll_wait不断轮询就绪链表,期间也可能多次睡眠和唤醒交替,但是它是设备就绪时,调用回调函数,把就绪fd放入就绪链表中,并唤醒在 epoll_wait中进入睡眠的进程。虽然都要睡眠和交替,但是select和poll在“醒着”的时候要遍历整个fd集合,而epoll在“醒着”的 时候只要判断一下就绪链表是否为空就行了,这节省了大量的CPU时间,这就是回调机制带来的性能提升。

(2)select,poll每次调用都要把fd集合从用户态往内核态拷贝一次,并且要把current往设备等待队列中挂一次,而epoll只要 一次拷贝,而且把current往等待队列上挂也只挂一次(在epoll_wait的开始,注意这里的等待队列并不是设备等待队列,只是一个epoll内 部定义的等待队列),这也能节省不少的开销。
View Code

这三种IO多路复用模型在不同的平台有着不同的支持,而epoll在windows下就不支持,好在我们有selectors模块,帮我们默认选择当前平台下最合适的

#服务端
from socket import *
import selectors

sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
    conn,addr=server_fileobj.accept()
    sel.register(conn,selectors.EVENT_READ,read)

def read(conn,mask):
    try:
        data=conn.recv(1024)
        if not data:
            print('closing',conn)
            sel.unregister(conn)
            conn.close()
            return
        conn.send(data.upper()+b'_SB')
    except Exception:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()



server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) #设置socket的接口为非阻塞
sel.register(server_fileobj,selectors.EVENT_READ,accept) #相当于网select的读列表里append了一个文件句柄server_fileobj,并且绑定了一个回调函数accept

while True:
    events=sel.select() #检测所有的fileobj,是否有完成wait data的
    for sel_obj,mask in events:
        callback=sel_obj.data #callback=accpet
        callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)

#客户端
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))

while True:
    msg=input('>>: ')
    if not msg:continue
    c.send(msg.encode('utf-8'))
    data=c.recv(1024)
    print(data.decode('utf-8'))
View Code

总结

IO多路复用(select)
select检测的是哪个套接字准备好了(检测的时候等待了,变成阻塞了)
select之所以比阻塞IO好,就是因为select可以检测多个套接字
多个链接下select才能发挥它的优势
但是你的套接字特别多,你怎么知道哪个好了呢,那么就得用循环去遍历一下
那么如果特别多的时候,效率也就不咋高了
eppol:只支持linux系统(就是为了解决select效率低的问题)
eppol比pool,select效率高
selectors 更好用,解决了上面select,eppol,pool的问题
socketserver用这个模块IO问题也解决了,实现并发也解决了

原文地址:https://www.cnblogs.com/596014054-yangdongsheng/p/9955960.html