网络编程---异步同步

异步编程

重要概念

同步、异步

阻塞、非阻塞

区别*

 

联系

同步IO、异步IO、IO多路复用

IO两个阶段

 

IO模型

同步IO

阻塞IO

 

非阻塞IO

 

IO多路复用

 

异步IO

Python中的IO多路复用

selectors库

 

#在selects模块源码最下面有如下代码
# Choose the best implementation, roughly:
#    epoll|kqueue|devpoll > poll > select.
# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
if 'KqueueSelector' in globals():
    DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
    DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
    DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
    DefaultSelector = PollSelector
else:
    DefaultSelector = SelectSelector

 1 #使用示例
 2 import selectors
 3 import threading
 4 import socket
 5 import logging
 6 
 7 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"
 8 logging.basicConfig(level=logging.INFO,format=FORMAT)
 9 
10 #回调参数自己定义形参
11 def accept(sock:socket.socket,mask):
12     """mask:事件掩码的或值"""
13     conn,addr = sock.accept()
14     conn.setblocking(False)#不阻塞
15     pass
16 
17 #回调函数
18 def read(conn:socket.socket,mask):
19     pass
20 
21 #构造缺省性能最优selector
22 selector = selectors.DefaultSelector()
23 
24 #创建Tcp Server
25 socket  = socket.socket()
26 socket.bind(('0.0.0.0',9999))
27 socket.listen()
28 logging.info(socket)
29 socket.setblocking(False)#非阻塞
30 
31 #注册文件对象sock关注读事件,返回SelectorKey
32 #将socket、关注事件、data都绑定到key实例属性上
33 key = selector.register(socket,selectors.EVENT_READ,accept)
34 logging.info(key)
35 
36 e = threading.Event()
37 
38 def select(e:threading.Event):
39     while not e.is_set():
40         #开始监视,等到有文件对象监控事件产生,返回(key,mask)元组
41         events = selector.select()#阻塞
42         print('-'*30)
43         for key,mask in events:
44             logging.info(key)
45             logging.info(mask)
46             callback = key.data #回调函数
47             callback(key.fileobj,mask)
48 
49 threading.Thread(target=select,args=(e,),name='select').start()
50 
51 def main():
52     while not e.is_set():
53         cmd = input('>>>')
54         if cmd.strip() == 'quit':
55             e.set()
56             fobjs = []
57             logging.info("{}".format(list(selector.get_map().items())))
58             
59             for fd,key in selector.get_map().items():
60                 print(fd,key)
61                 print(key.fileobj)
62                 fobjs.append(key.fileobj)
63             
64             for fobj in fobjs:
65                 selector.unregister(fobj)
66                 fobj.close()
67             selector.close()
68 
69 if __name__ == '__main__':
70     main()

练习

 

 1 import socket
 2 import threading
 3 import logging
 4 import datetime
 5 import selectors
 6 
 7 FORMAT="%(asctime)s %(threadName)s %(thread)d %(message)s"
 8 logging.basicConfig(level=logging.INFO,format=FORMAT)
 9 
10 #TCP Server
11 class ChatServer():
12     def __init__(self,ip='0.0.0.0',port=9999):
13         self.addr = (ip,port)
14         self.socket = socket.socket()
15         self.event = threading.Event()
16         self.clients = {}
17         #增加
18         self.selector = selectors.DefaultSelector()#创建selector
19 
20     def start(self):#启动监听
21         self.socket.bind(self.addr)
22         self.socket.listen()
23         # threading.Thread(target=self.accept,name='accept').start()
24         #增加
25         self.socket.setblocking(False)
26         self.selector.register(self.socket,selectors.EVENT_READ,self.accept)#注册
27         threading.Thread(target=self.select,name='select',daemon=True).start()
28 
29     #增加
30     def select(self):#阻塞
31         while not  self.event.is_set():
32             #开始监视,等到某文件对象被监控的事件产生,返回(key,mask)元组
33             events = self.selector.select()
34             print("*"*30)
35             for key,mask in events:
36                 logging.info(key)
37                 logging.info(mask)
38                 callback = key.data#回调函数
39                 callback(key.fileobj)
40 
41     #回调函数
42     def accept(self,sock:socket.socket):#接受客户端连接
43         conn,addr = sock.accept()
44         conn.setblocking(False)
45         #注册,监视每一个与客户端的连接的socket对象
46         self.selector.register(conn,selectors.EVENT_READ,self.recv)
47 
48     #回调函数
49     def recv(self,sock:socket.socket):#接受客户端数据
50         data = sock.recv(1024)
51         if not data or data == b'quit':#客户端主动断开或退出,注销并关闭socket
52             self.selector.unregister(sock)
53             sock.close()
54             return
55         msg = "{:%Y/%m/%d %H%:M:%S} {}:{}
{}
".format(datetime.datetime.now(),
56                                                         *sock.getpeername(),data.encode())
57         #群发
58         for key in self.selector.get_map().values():
59             if key.data == self.recv:#排除self.accept
60                 key.fileobj.send(msg)
61 
62     #停止服务
63     def stop(self):
64         self.event.set()
65         fobjs = []
66         for fd,key in self.selector.get_map().items():#fd:key对象
67             fobjs.append(key.fileobj)
68         for fobj in fobjs:
69             self.selector.unregister(fobj)
70             fobj.close()
71         self.selector.close()
72 
73 def main():
74     cs = ChatServer()
75     cs.start()
76 
77     while True:
78         cmd = input(">>>")
79         if cmd == 'quit':
80             cs.stop()
81             break
82         logging.info(threading.enumerate())
83 
84 if __name__ == '__main__':
85     main()

 

 

 1 #-*- codeing:utf-8 -*-
 2 import socket
 3 import threading
 4 import datetime
 5 import logging
 6 import selectors
 7 from queue import Queue
 8 
 9 FORMAT = "%(asctime)s %(threadName)s %(thread)s %(message)s"
10 logging.basicConfig(level=logging.INFO,format=FORMAT)
11 
12 class ChatServer:
13     def __init__(self,ip='127.0.0.1',port=9999):
14         self.sock = socket.socket()
15         self.addr = (ip,port)
16         self.clients = {}
17         self.event = threading.Event()
18         self.selector = selectors.DefaultSelector()#创建selector
19 
20     def start(self):#启动监听
21         self.sock.bind(self.addr)
22         self.sock.listen()
23         self.sock.setblocking(False)
24         #注册
25         self.selector.register(self.sock,selectors.EVENT_READ,self.accept)
26         threading.Thread(target=self.select,name='selector').start()
27 
28     def select(self):
29         while not self.event.is_set():
30             #开始监视,等到某文件对象被监控的事件发生,返回(key,mask)元组
31             events = self.selector.select()#阻塞,直到events
32             for key,mask in events:
33                 if callable(key.data):
34                     callback = key.data
35                     callback = (key.fileobj,mask)
36                 else:
37                     callback = key.data[0]
38                     callback(key,mask)
39 
40     def accept(self,sock:socket.socket,mask):#接收客户端连接
41         conn,raddr = self.sock.accept()
42         conn.setblocking(False)
43         self.clients[raddr] = (self.handler,Queue())
44         #注册,监视每一个与客户端连接的socket对象
45         self.selector.register(conn,selectors.EVENT_READ | selectors.EVENT_WRITE,self.clients[raddr])
46 
47     def handler(self,key:selectors.SelectorKey,mask):#接收客户端数据
48         if mask & selectors.EVENT_READ:
49             sock = key.fileobj
50             raddr = sock.getpeername()
51             data = sock.recv(1024)
52             if not data or data == b'quit':  # 客户端主动断开或退出,注销并关闭socket
53                 self.selector.unregister(sock)
54                 sock.close()
55                 self.clients.pop(raddr)
56                 return
57             for k in self.selector.get_map().values():
58                 logging.info(k)
59                 if isinstance(k.data,tuple):
60                     k.data[1].put(data)
61         if mask & selectors.EVENT_WRITE:
62             #因为写一直就绪,mask为2,所以一直可以写,从而导致select()不断循环,如同不阻塞一样
63             if not key.data[1].empty():
64                 key.fileobj.send(key.data[1].get())
65 
66         # 停止服务
67 
68     def stop(self):
69         self.event.set()
70         fobjs = []
71         for fd, key in self.selector.get_map().items():
72             fobjs.append(key.fileobj)
73         for fobj in fobjs:
74             self.selector.unregister(fobj)
75             fobj.close()
76         self.selector.close()
77 
78 def main():
79     cs = ChatServer()
80     cs.start()
81 
82     while True:
83         cmd = input(">>>")
84         if cmd == 'quit':
85             cs.stop()
86             break
87         logging.info(threading.enumerate())
88 
89 if __name__ == '__main__':
90     main()

asyncio

 问题的引出

def a():
    for x in range(3):
        print(x)

def b():
    for x in "abc":
        print(x)

a()
b()

#运行结果一定是
0
1
2
a
b
c

import threading
import time

def a():
    for x in range(3):
        time.sleep(0.001)
        print(x)

def b():
    for x in "abc":
        time.sleep(0.001)
        print(x)

threading.Thread(target=a,name='a').start()
threading.Thread(target=b,name='b').start()
#运行结果
a
0
b
1
c
2
#多进程版本
import multiprocessing
import time
def a():
    for x in range(3):
        time.sleep(0.001)
        print(x)

def b():
    for x in "abc":
        time.sleep(0.001)
        print(x)

if __name__ == '__main__':
    multiprocessing.Process(target=a, name='a').start()
    multiprocessing.Process(target=b, name='b').start()

#运行结果
0
1
a
2
b
c
#生成器版本
def a():
    for x in range(3):
        print(x)
        yield

def b():
    for x in "abc":
        print(x)
        yield

x = a()
y = b()
for i in range(3):
    next(x)
    next(y)

#运行结果
0
a
1
b
2
c

事件循环

 

 协程

协程的使用

import asyncio

@asyncio.coroutine
def sleep(x):#协程函数
    for i in range(3):
        print('sleep {}'.format(i))
        yield from asyncio.sleep(x)

loop = asyncio.get_event_loop()
loop.run_until_complete(sleep(3))
loop.close()

import asyncio
async def sleep(x):
    for i in range(3):
        print('sleep {}'.format(i))
        await asyncio.sleep(x)
loop = asyncio.get_event_loop()
loop.run_until_complete(sleep(3))
loop.close()

import asyncio
import threading

async def sleep(x):
    for i in range(x):
        print('sleep {}'.format(i))
        await asyncio.sleep(x)


async def showthread(x):
    for i in range(x):
        print(threading.enumerate())
        await asyncio.sleep(2)

loop = asyncio.get_event_loop()
tasks = [sleep(3),showthread(3)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
#运算结果
[<_MainThread(MainThread, started 21012)>]
sleep 0
[<_MainThread(MainThread, started 21012)>]
sleep 1
[<_MainThread(MainThread, started 21012)>]
sleep 2
#协程版本
import asyncio
@asyncio.coroutine
def a():
    for x in range(3):
        print('a.x',x)
        yield

@asyncio.coroutine
def b():
    for x in 'abc':
        print('b.x',x)
        yield

print(asyncio.iscoroutinefunction(a))
print(asyncio.iscoroutinefunction(b))

#大循环
loop = asyncio.get_event_loop()
tasks = [a(),b()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

#运行结果
True
True
a.x 0
b.x a
a.x 1
b.x b
a.x 2
b.x c

TCP Echo Server举例

import asyncio
#TCP Echo Server举例
async  def handle(reader,writer):
    while True:
        data = await reader.read(1024)
        print(dir(reader))
        print(dir(writer))
        client = writer.get_extra_info('peername')
        message = "{} Your msg {}".format(client,data.decode()).encode()
        writer.writer(message)
        await writer.drain()

loop = asyncio.get_event_loop()
ip = '127.0.0.1'
port = 9999
crt = asyncio.start_server(handle,ip,port,loop=loop)
server = loop.run_until_complete(crt)
print(server)#server是监听的socket对象
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    server.close()
    loop.close()

aiohttp库

HTTP Server

from aiohttp import web

async def indexhandle(request:web.Request):
    return web.Request(text=request.path,status=201)

async def handle(request:web.Request):
    print(request.match_info)
    print(request.query_string)#http://127.0.0.1:8080/1?name=12301
    return web.Request(text=request.match_info.get('id','0000'),status=200)

app = web.Application()
app.router.add_get("/",indexhandle)#http://127.0.0.1:8080/
app.router.add_get("/{id}",handle)#http://127.0.0.1:8080/12301

web.run_app(app,host='0.0.0.0',port=9977)

HTTP Client

import asyncio
from aiohttp import ClientSession

async def get_html(url:str):
    async with ClientSession() as session:
        async with session.get(url) as res:
            print(res.status)
            print(await res.text())


url = 'http://127.0.0.1/ziroom-web'
loop = asyncio.get_event_loop()
loop.run_until_complete(get_html(url))
loop.close()
做一枚奔跑的老少年!
原文地址:https://www.cnblogs.com/xiaoshayu520ly/p/11007803.html