python并发编程之IO模型

1.IO模型介绍

2.阻塞IO(blockingIO)

3.非阻塞IO(non-blocking IO)

4.多路复用IO(IOmultiplexing)

1.IO模型介绍

 Stevens在文章中一共比较了五种IO Model:
    * blocking IO
    * nonblocking IO
    * IO multiplexing
    * signal driven IO
    * asynchronous IO
    由signal driven IO(信号驱动IO)在实际中并不常用,所以主要介绍其余四种IO Model。

  再说一下IO发生时涉及的对象和步骤。对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,一个是调用这个IO的process (or thread),另一个就是系统内核(kernel)。当一个read操作发生时,该操作会经历两个阶段:

#1)等待数据准备 (Waiting for the data to be ready)
#2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)

    记住这两点很重要,因为这些IO模型的区别就是在两个阶段上各有不同的情况。

2.阻塞IO(blockingIO)

实际上,除非特别指定,几乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。这给网络编程带来了一个很大的问题,如在调用recv(1024)的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。

#服务端
# from socket import *
# s=socket(AF_INET,SOCK_STREAM)
# s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
# s.bind(('127.0.0.1',8080))
# s.listen(5)
# while True:#链接循环
#     conn,addr=s.accept()#sever阻塞
#     print(addr)
#     while True:
#         try:
#             data=conn.recv(1024)#recv阻塞
#             if not data:break
#             conn.send(data.upper())
#         except Exception:
#           break
#客户端
# from socket import *
# c=socket(AF_INET,SOCK_STREAM)
# c.connect(('127.0.0.1',8080))
# while True:
#     msg=input('>>>').strip()#阻塞
#     if  not msg:continue
#     c.send(msg.encode('utf-8'))
#     data=c.recv(1024)#阻塞
#     print(data.decode('utf-8'))
# c.close()

 简单解决方案:

#在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。

这种方案的不足:

#开启多进程或都线程的方式,在遇到要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态。

   改进方案:   

#很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,如websphere、tomcat和各种数据库等。

改进后方案其实也存在着问题:

#“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。

     对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题。

3.非阻塞IO(non-blocking IO)

Linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:

从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是用户就可以在本次到下次再发起read询问的时间间隔内做其他事情,或者直接再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的),然后返回。

    也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。

    所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。

代码:

#服务端
from socket import *
s=socket(AF_INET,SOCK_STREAM)
s.bind(('127.0.0.1',8080))
s.setblocking(False)#设置为False是说为非阻塞
s.listen(5)
clint_l=[]#链接 循环的conn都放在这个列表里
del_l=[]
while True:
    try:
        print(clint_l)
        conn,addr=s.accept()
        print(addr)
        clint_l.append(conn)
    except  BlockingIOError:
        for conn in clint_l:
            try:
               data= conn.recv(1024)
               conn.send(data.upper())
            except BlockingIOError:
                pass
            except ConnectionResetError:
                del_l.append(conn)#把关闭链接的conn放在单独的一个列表里
        for conn in del_l:
            conn.close()#先关闭
            clint_l.remove(conn)#在清除
        del_l=[]#设置为空
#客户端
from socket import *
s=socket(AF_INET,SOCK_STREAM)
s.connect(('127.0.0.1',8080))
while True:
    msg=input('>>>').strip()
    if not msg:continue
    s.send(msg.encode('utf-8'))
    data=s.recv(1024)
    print(data.decode('utf-8'))
s.close()

   系统会一直在循环中,占用大量的cpu资源, 但是非阻塞IO模型绝不被推荐。

 我们不能否定其优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在“”同时“”执行)。

    但是也难掩其缺点:

#1. 循环调用recv()将大幅度推高CPU占用率;在低配主机下极容易出现卡机情况
#2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。
 此外,在这个方案中recv()更多的是起到检测“操作是否完成”的作用,实际操作系统提供了更为高效的检测“操作是否完成“作用的接口,例如select()多路复用模式,可以一次检测多个连接是否活跃。
4.多路复用IO(IOmultiplexing)
通过select检测哪个链接处于活跃(也就是说处于copydata阶段)
通信服务端
#服务端
from socket import *
import select
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('127.0.0.1',8080))
s.setblocking(False)#设置为False是说为非阻塞
s.listen(5)
sever_l=[s,]#检测表格,首先放一个s在里面
while True:
   res_l,_,_=select.select(sever_l,[],[])#检测是否活跃
   print(res_l)
   for obj  in res_l:
       if obj==s:
           conn,addr=obj.accept()
           sever_l.append(conn)
       else:
               data=obj.recv(1024)
               obj.send(data.upper())
#客户端
from socket import *
s=socket(AF_INET,SOCK_STREAM)
s.connect(('127.0.0.1',8080))
while True:
    msg=input('>>>').strip()
    if not msg:continue
    s.send(msg.encode('utf-8'))
    data=s.recv(1024)
    print(data.decode('utf-8'))
s.close()

强调:

    1. 如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。

    2. 在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。

    结论: select的优势在于可以处理多个连接,不适用于单个连接 

优点

#相比其他模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。

缺点

#首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。很多操作系统提供了更为高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异,所以使用类似于epoll的接口实现具有较好跨平台能力的服务器会比较困难。
#其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。
原文地址:https://www.cnblogs.com/1a2a/p/7463960.html