Python异步IO之select

1. select模块的基本使用(以socket为例)

 1 # -*- coding:utf-8 -*-
 2 # Author:Wong Du
 3 
 4 import select
 5 import socket
 6 import queue
 7 
 8 HOST_PORT = ("0.0.0.0", 9000)
 9 print("33[31;1mServer Addr:{0}33[0m".format(HOST_PORT))
10 
11 server = socket.socket()
12 server.bind(HOST_PORT)
13 server.listen(10)
14 
15 inputs = [server, ]     # 监测服务端socket的活跃状态和新socket连接活跃状态列表
16 outputs = []            # 发送数据给客户端的socket连接列表
17 msg_queue = {}          # 发送给客户端的信息数据字典
18 
19 # 断开socket连接函数
20 def disconnect():
21     print("33[41;1mClient [{0}] disconnected...33[0m".format(r.getpeername()))
22     if r in outputs:
23         outputs.remove(r)   
24     inputs.remove(r)  # 移除select监测连接
25     del msg_queue[r]    # 删除消息队列
26 
27 
28 
29 if __name__ == '__main__':
30     while True:
31         print("Waiting for socket to active...")
32 
33         # 如果在select监测的对象里没有活跃状态的socket,则卡在这里;
34         # 当有活跃的socket对象时,把socket对象赋给readable,writeable,excetional;
35         readable, writeable, exceptional = select.select(inputs, outputs, inputs)
36         print("active connect:", readable, writeable, exceptional)
37 
38         for r in readable:  # 循环readable里活跃的socket对象
39             if r is server: # 如果活跃的是server的socket对象,则代表有新客户端socket连接进来了
40                 conn, client_addr = server.accept()     # 接收client对象
41                 print("33[32;1mConnected client addr:{0}33[0m".format(client_addr))   # 打印连接进来的对象身份
42                 inputs.append(conn)                 # 加入到inputs列表让select监测处理
43                 msg_queue[conn] = queue.Queue()     # 接收到的客户端连接,不立即返回,暂存在队列里,以后发送
44             else:   # 如果活跃的不是server的socket对象,则代表客户端socket有数据发过来了
45                 try:
46                     data = r.recv(1024).decode()  # 接收客户端数据
47                     if data:
48                         # r.getpeername()获取客户端连接的身份信息
49                         print("33[33;1mReceived [{0}] from the {1}33[0m".format(data, r.getpeername()))
50                         if r not in outputs:
51                             outputs.append(r)   # 把客户端连接加到outputs列表里
52                         msg_queue[r].put(data.upper().encode()) # 把想要返回给客户端的数据放到相应的客户端连接队列里
53                     else:
54                         disconnect()    # 连接断开
55                 except ConnectionResetError as e:       # 抓取客户端断开错误
56                     disconnect()        # 连接断开
57 
58         for w in writeable:
59             # print("33[34;1mReading to send to client {0}33[0m".format(w.getpeername()))
60             try:
61                 data = msg_queue[w].get_nowait()   # 获取队列中的数据
62                 w.send(data)        # 发送给客户端
63                 print("33[34;1mSend [{0}] to client {1}33[0m".format(data, w.getpeername()))
64             except queue.Empty as e:
65                 print("No data to send to the client [{0}]".format(w.getpeername()))
66 
67             outputs.remove(w)   # 移除列表中需要发送数据的连接
68 
69         for e in exceptional:
70             print("33[41;1mClient [{0}] handles exception33[0m".format(e.getpeername())) #select处理异常连接
71             if e in outputs:
72                 outputs.remove(e)
73 
74             inputs.remove(e)
75             del msg_queue[e]
select_socket_server
 1 # -*- coding:utf-8 -*-
 2 # Author:Wong Du
 3 
 4 import socket
 5 
 6 HOST_PORT = ("localhost", 9000)
 7 client = socket.socket()
 8 client.connect(HOST_PORT)
 9 
10 while True:
11     cmd = input("%s#" % HOST)
12     if not cmd:
13         continue
14     client.send(cmd.encode())
15     data = client.recv(1024)
16     print(data.decode())
select_socket_client
 1 # -*- coding:utf-8 -*-
 2 # Author:Wong Du
 3 
 4 import socket
 5 import time
 6 
 7 server_addr = ("localhost", 9000)
 8 client_socket = [socket.socket() for i in range(100)]
 9 
10 msg_data = "如果我是dj,你会爱我吗?"
11 count = 0
12 
13 
14 for client in client_socket:
15     try:
16         client.connect(server_addr)
17         print("Success to connected...")
18         client.send(msg_data.encode())
19         data = client.recv(1024)
20         print(data.decode())
21         count += 1
22     except ConnectionResetError:
23         print(count)
24 time.sleep(10)
select_socket_client高并发测试

2. selector模块的基本使用(以socket为例)

 1 # -*- coding:utf-8 -*-
 2 # Author:Wong Du
 3 
 4 import selectors
 5 import socket, queue
 6 
 7 sel = selectors.DefaultSelector()   # 生成一个sel对象
 8 send_msg = {}                       # 返回信息列表
 9 
10 # 接收客户端连接函数
11 def accept(sock, mask):
12     conn, client_addr = sock.accept()
13     print("33[33;1mClient {0} is connected...33[0m".format(client_addr))
14     # sock.setbloking(False)
15     # 将客户端socket对象和read方法注册到sel里面
16     sel.register(conn, selectors.EVENT_READ, read)
17     send_msg[conn] = queue.Queue()
18 
19 # 接收客户端发来数据函数
20 def read(r, mask):
21     try:
22         data = r.recv(1024).decode()
23         if data:
24             print("Received [{0}] from the client {1}".format(data, r.getpeername()))
25             # conn.send(data.upper())
26             send_msg[r].put(data.upper().encode())
27             sel.modify(r, selectors.EVENT_WRITE, write)
28         else:
29             disconnect(r)
30     except ConnectionResetError:
31         disconnect(r)
32 
33 # 发送数据给客户端函数
34 def write(w, mask):
35     # print("33[34;1mReading to send to client {0}33[0m".format(w.getpeername()))
36     try:
37         data = send_msg[w].get_nowait()  # 获取队列中的数据
38         w.send(data)  # 发送给客户端
39         print("33[34;1mSend [{0}] to client {1}33[0m".format(data.decode(), w.getpeername()))
40     except queue.Empty as e:
41         print("No data to send to the client [{0}]".format(w.getpeername()))
42 
43     sel.modify(w, selectors.EVENT_READ, read)
44 
45 # 断开socket连接函数
46 def disconnect(conn):
47     print("33[41;1mClient {0} disconnected...33[0m".format(conn.getpeername()))
48     sel.unregister(conn)
49     conn.close()
50 
51 # 初始化服务端连接
52 HOST_PORT = ("localhost", 20000)
53 server = socket.socket()
54 server.bind(HOST_PORT)
55 server.listen(10)
56 print("33[31;1mServer Address: {0}33[0m".format(HOST_PORT))
57 # 将服务端socket对象和accept方法注册到sel里面
58 sel.register(server, selectors.EVENT_READ, accept)
59 
60 while True:
61     # 开启事件监测(阻塞),类似select的select.select(inputs, outputs, inputs),当有活跃连接时,
62     # 返回一个元组,元组内有SelectorKey对象和events事件编号
63     # SelectorKey对象内包含fileobj嵌套字对象,fd柄, events事件编号,data回调函数内存空间指针
64     print("33[32;1mWaiting to connections...33[0m")
65     events = sel.select()
66     print("Active events: %s" % events)
67 
68     for key, mask in events:
69         # 获取socket回调函数内存指针,即accept函数
70         callback = key.data
71         # 调用函数并把socket和events传进去
72         callback(key.fileobj, mask)
selector_socket_server
 1 # -*- coding:utf-8 -*-
 2 # Author:Wong Du
 3 
 4 import socket
 5 HOST = "localhost"
 6 Port = 20000
 7 client = socket.socket()
 8 client.connect((HOST, Port))
 9 
10 while True:
11     cmd = input("%s#" % HOST)
12     if not cmd:
13         continue
14     client.send(cmd.encode())
15     data = client.recv(1024)
16     print(data.decode())
selector_socket_client
原文地址:https://www.cnblogs.com/Caiyundo/p/9548975.html