异步非阻塞

一、必备知识

1.socket客户端:

阻塞式客户端:

import socket
#实例化socket对象
sock = socket.socket()
#向服务器发起连接
sock.connect(("43.226.160.17",80)) #连接会阻塞

# 仿HTTP发送请求
data = b"GET / HTTP/1.0
host: dig.chouti.com

"
sock.sendall(data)
#接收服务器响应信息
response = sock.recv(8096) #等待数据回收时也会阻塞
print(response)
#关闭socket连接
sock.close()

伪非阻塞式:

import socket
#实例化socket对象
sock = socket.socket()

#设置非阻塞,不等待连接成功直接执行后面的代码,由于连接的操作不可能立即完成所以
#会报一个BlockingIOError:无法立即完成一个非阻止性套接字操 的错误,因此需要处理异常
sock.setblocking(False)

#向服务器发起连接
try:
    sock.connect(("43.226.160.17",80)) #连接会阻塞
except BlockingIOError as e:
    print(e)
import time
#虽然会报错,但连接的请求已经发出去,所以连接请求依然生效,
#我们可以使用这段时间进行其它的工作,达到资源的最大利用,这里用sleep代指
time.sleep(5)

# 仿HTTP发送请求
data = b"GET / HTTP/1.0
host: dig.chouti.com

"

#由于没有建立连接,所以这里还发不出消息
try:
    sock.sendall(data)
    #接收服务器响应信息
    response = sock.recv(8096) #等待数据回收时也会阻塞
except Exception as e:
    print(e)

print(response)
#关闭socket连接
sock.close()

  

2.IO多路复用加socket实现非阻塞

伪代码:

import socket
import select
socket_list = []
for url in ["www.baidu.com","www.bing.com"]:
    client = socket.socket()
    client.setblocking(False)
    try:
        client.connect((url,80))
    except BlockingIOError as e:
        print(e)

    socket_list.append(client)

#事件循环
while True:
    r,w,e = select.select(socket_list,socket_list,[],0.05)
    #w代指每一个连接成功的client
    for obj in w:
        obj.send("GET / HTTP/1.0
host: baidu.com

") #这里host写死,下面的代码会解决
    #r代指client有改变,可以收数据了
        for obj in r:
            response = obj.recv(1024)
            print(response)

  

上边代码中,当对多个网站进行连接或爬取时,每一个单独的Url都不会阻塞,再通过select监测数据的变化,可以及时接收数据,又不会挡住后边Url的爬取工作,实现了简单的非阻塞的目标。

但这只是一段伪代码,接下来根据这个思路实现一个真正的异步非阻塞模块。

二、简单的异步非阻塞模块。

import socket
import select

class Request(object):
    """
    封装socket对象,使每次循环时创建的socket对象能对应它的req_info字典,
    方便其利用字典拿到对应的host信息等
    """
    def __init__(self, sock, info):
        self.sock = sock
        self.info = info

    def fileno(self):
        return self.sock.fileno()


class AsyncRequest(object):
    def __init__(self):
        self.sock_list = []
        self.conns = []

    def add_request(self, req_info):
        """
        创建请求
         req_info: {'host': 'www.baidu.com', 'port': 80, 'path': '/'},
        :return:
        """
        sock = socket.socket()
        sock.setblocking(False)
        try:
            sock.connect((req_info['host'], req_info['port']))
        except BlockingIOError as e:
            pass

        obj = Request(sock, req_info)
        self.sock_list.append(obj)
        self.conns.append(obj)

    def run(self):
        """
        开始事件循环,检测:连接成功?数据是否返回?
        :return:
        """
        while True:
            # select.select([socket对象,])
            # 可是任何对象,对象一定要有fileno方法,实际上执行的是对象.fileno()
            # select.select([request对象,])
            r, w, e = select.select(self.sock_list, self.conns, [], 0.05)
            # w,是否连接成功
            for obj in w:
                # 检查obj:request对象
                # socket, {'host': 'www.baidu.com', 'port': 80, 'path': '/'},
                data = "GET %s http/1.1
host:%s

" % (obj.info['path'], obj.info['host'])
                obj.sock.send(data.encode('utf-8'))
                #连接成功后从列表中删除此obj对象,避免重复连接
                self.conns.remove(obj)
            # 数据返回,接收到数据
            for obj in r:
                response = obj.sock.recv(8096)
                #函数名加括号运行对应的回调函数
                obj.info['callback'](response)
                #相应的为避免重复接收移除已经接收成功的对象
                self.sock_list.remove(obj)

            # 所有请求已经返回
            if not self.sock_list:
                break


if __name__ == '__main__':
    #指定回调函数,可以在屏幕输出,也可以写入文件、数据库等
    def callback_fun1(response):
        print(response)

    def callback_fun2(response):
        pass
        # with open ......

    url_list = [
        {'host': 'www.baidu.com', 'port': 80, 'path': '/', 'callback': callback_fun1},
        {'host': 'www.cnblogs.com', 'port': 80, 'path': '/index.html', 'callback': callback_fun2},
        {'host': 'www.bing.com', 'port': 80, 'path': '/', 'callback': callback_fun1},
    ]

    obj = AsyncRequest()
    for item in url_list:
        obj.add_request(item)

    obj.run()

  

原文地址:https://www.cnblogs.com/mitsui/p/7459430.html