Python学习第57天(异步IO)

  这个内容比较好懂,但是主要是运用,这部分只是完了之后就要暂别Python一段时间,学学web前端的知识了。

  linux下的asynchronous IO其实用得很少。先看一下它的流程:

 

  用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

      到目前为止,已经将四个IO Model都介绍完了。现在回过头来回答最初的那几个问题:blocking和non-blocking的区别在哪,synchronous IO和asynchronous IO的区别在哪。
先回答最简单的这个:blocking vs non-blocking。前面的介绍中其实已经很明确的说明了这两者的区别。调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还准备数据的情况下会立刻返回。

在说明synchronous IO和asynchronous IO的区别之前,需要先给出两者的定义。Stevens给出的定义(其实是POSIX的定义)是这样子的:
      A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;

    有阻塞就是同步
      An asynchronous I/O operation does not cause the requesting process to be blocked;
 

    没有阻塞就是异步

  两者的区别就在于synchronous IO做”IO operation”的时候会将process阻塞。按照这个定义,之前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO。有人可能会说,non-blocking IO并没有被block啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操作,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,如果kernel的数据没有准备好,这时候不会block进程。但是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。而asynchronous IO则不一样,当进程发起IO 操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被block。

       注意:由于咱们接下来要讲的select,poll,epoll都属于IO多路复用,而IO多路复用又属于同步的范畴,故,epoll只是一个伪异步而已。

各个IO Model的比较如图所示:

  

  看个实例吧:

import selectors
import socket

sel = selectors.DefaultSelector()

def accept(sock, mask):
    conn, addr = sock.accept()  # Should be ready
    print('accepted', conn, 'from', addr)
    conn.setblocking(False)
    sel.register(conn, selectors.EVENT_READ, read)

def read(conn, mask):
    data = conn.recv(1000)  # Should be ready
    if data:
        print('echoing', repr(data), 'to', conn)
        conn.send(data)  # Hope it won't block
    else:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()

sock = socket.socket()
sock.bind(('localhost', 1234))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept)

while True:
    events = sel.select()
    for key, mask in events:
        callback = key.data
        callback(key.fileobj, mask)

  其实会发现如果是用selector来写文件的上传下载应该会更简单一些

  近期有空还是实现一下吧

  存一个select的标准解释

# select 模拟一个socket server,注意socket必须在非阻塞情况下才能实现IO多路复用。
# 接下来通过例子了解select 是如何通过单进程实现同时处理多个非阻塞的socket连接的。
#server端


import select
import socket
import queue

server = socket.socket()
server.bind(('localhost',9000))
server.listen(1000)

server.setblocking(False)  # 设置成非阻塞模式,accept和recv都非阻塞
# 这里如果直接 server.accept() ,如果没有连接会报错,所以有数据才调他们
# BlockIOError:[WinError 10035] 无法立即完成一个非阻塞性套接字操作。
msg_dic = {}
inputs = [server,]  # 交给内核、select检测的列表。
# 必须有一个值,让select检测,否则报错提供无效参数。
# 没有其他连接之前,自己就是个socket,自己就是个连接,检测自己。活动了说明有链接
outputs = []  # 你往里面放什么,下一次就出来了

while True:
    readable, writeable, exceptional = select.select(inputs, outputs, inputs)  # 定义检测
    #新来连接                                        检测列表         异常(断开)
    # 异常的也是inputs是: 检测那些连接的存在异常
    print(readable,writeable,exceptional)
    for r in readable:
        if r is server:  # 有数据,代表来了一个新连接
            conn, addr = server.accept()
            print("来了个新连接",addr)
            inputs.append(conn)  # 把连接加到检测列表里,如果这个连接活动了,就说明数据来了
            # inputs = [server.conn] # 【conn】只返回活动的连接,但怎么确定是谁活动了
            # 如果server活动,则来了新连接,conn活动则来数据
            msg_dic[conn] = queue.Queue()  # 初始化一个队列,后面存要返回给这个客户端的数据
        else:
            try :
                data = r.recv(1024)  # 注意这里是r,而不是conn,多个连接的情况
                print("收到数据",data)
                # r.send(data) # 不能直接发,如果客户端不收,数据就没了
                msg_dic[r].put(data)  # 往里面放数据
                outputs.append(r)  # 放入返回的连接队列里
            except ConnectionResetError as e:
                print("客户端断开了",r)
                if r in outputs:
                    outputs.remove(r) #清理已断开的连接
                inputs.remove(r) #清理已断开的连接
                del msg_dic[r] ##清理已断开的连接

    for w in writeable:  # 要返回给客户端的连接列表
        data_to_client = msg_dic[w].get()  # 在字典里取数据
        w.send(data_to_client)  # 返回给客户端
        outputs.remove(w)  # 删除这个数据,确保下次循环的时候不返回这个已经处理完的连接了。

    for e in exceptional:  # 如果连接断开,删除连接相关数据
        if e in outputs:
            outputs.remove(e)
        inputs.remove(e)
        del msg_dic[e]


#*************************client
import socket
client = socket.socket()

client.connect(('localhost', 9000))

while True:
    cmd = input('>>> ').strip()
    if len(cmd) == 0 : continue
    client.send(cmd.encode('utf-8'))
    data = client.recv(1024)
    print(data.decode())

client.close()

  我的正常工作最近可能要面临一个比较大的挑战,所以,近期的文章质量还是有所下降的,继续努力吧。

原文地址:https://www.cnblogs.com/xiaoyaotx/p/12732626.html