协程详解(三)

好了,准备知识差不多了,接下来就进入正题了,开始讲解协程方面的知识了
我们先看一段简单的伪代码,这个伪代码已经包含了协程的基本结构和原理
import socket
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()
stopped = False
r0 = request(0, "/1")
r1 = request(1,  "/2")
 
t_list = [r0 ,r1]
 
r0.send(None)
r1.send(None)
 
def request(index, address):
    sock = socket()
    sock.connect(address)
    def on_read():
        print(sock.recv())
        t_list[index].send(None)

    selector.register(sock.fileno(), EVENT_READ, on_read)
    yield  # 遇到io,将运行的权利交出去

    selector.unregister(sock.fileno())
    print('完成了')

while True:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

  

代码分析:
1.程序一开始就注册里两个生成器,r0和r1,并且保存在t_list 这个数组里面
并且进行了第一次执行
2.request:
首先定义了一个socket, 并且连接到传递进来的address
并且注册了一个读的事件的回调,并且yield出来,跳出生成器,等待下一次执行

既然已经yield出来,那么生成器下一次执行怎么办,到底需要怎么执行
所以这里的回调就有些意思了,当数据就绪之后,会执行回调
t_list[index].send(None) ,
重新获取到本身这个生成器的引用,然后重新执行一次
等于是当自己的数据已经就绪好了,就会执行自己的回调,自己的回调就是重新调用一下自己,继续激活自己本身这个生成器

3. events = selector.select()当收到读的事件,然后遍历所有的事件,提取出它的回调,
callback = event_key.data
callback()  并且执行回调


所以协程的基本原理就是:在遇到 io、阻塞的时候,将运行的权利交出去(例如上面的yield),同时注册唤醒自己的回调;
当阻塞事件完成的时候,通过一个回调来唤醒程序继续往下走,并且返回io事件的值。这句话记下来,考试要考!!!!

接下来,我们根据上述的思路更进一步,以一个爬虫的代码来更进一步了解一些协程的设计原理:
class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

    def __iter__(self):
        """
        yield的出现使得__iter__函数变成一个生成器,生成器本身就有next方法,所以不需要额外实现。
        yield from x语句首先调用iter(x)获取一个迭代器(生成器也是迭代器)
        """
        yield self  # 外面使用yield from把f实例本身返回
        return self.result  # 在Task.step中send(result)的时候再次调用这个生成器,但是此时会抛出stopInteration异常,并且把self.result返回

我们先定义一个future类, 这个类是协程的一个基础了,future的一个作用是什么,是用来存储生成器的值(
result这个变量就是用来存返回值,当协程执行完毕,会调用set_result方法设置返回值,并且执行future设置的回调函数)
,并且作为一个协程回调的管理者,
这里的内容非常简单,只有一个set_result方法以及add_done_callback(添加回调)
def connect(sock, address):
    f = Future()
    sock.setblocking(False)
    try:
        sock.connect(address)
    except BlockingIOError:
        pass

    def on_connected():
        f.set_result(None)

    selector.register(sock.fileno(), EVENT_WRITE, on_connected)
    yield from f
    selector.unregister(sock.fileno())
这个是一个连接地址的方法,内容非常简单,我们重点关注一下future的使用
这里面首先定义了一个future,然后注册了socket的写的回调,回调的内容是给future 执行以下 set_result, 这个future没啥内容,所以基本上是不执行什么代码

yield from f 这句代码就执行结果是执行了future 的__iter__函数的yield self 这一句代码,也就是说,代码yield了一个futrue对象
def read(sock):
    f = Future()

    def on_readable():
        f.set_result(sock.recv(4096))

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    """
    此处的chunck接收的是f中return的f.result,同时会跑出一个stopIteration的异常,只不过被yield from处理了。
    这里也可直接写成chunck = yiled f
    """
    chunck = yield from f
    selector.unregister(sock.fileno())
    return chunck

接下来定了一个读取数据的方法,这个方法也比较简单,首先定义了一个future, 并且注册了socket的读方法的回调函数,这个回调函数是的作用是执行futrue的set_result 并且把数据当做参数传递进去,
然后yield from f 同样yield了这个future对象,这里因为传递了参数进去,所以当第二次执行read这个生成器的时候,就会执行future的return self.result 这句代码,
因此就等于chunck就获取到了future的result,也就是说,chunck就获取到了socket读取的数据,然后return chunck 返回给调用方。
  def fetch(url):
        sock = socket.socket()
        yield from connect(sock, ("xkcd.com", 80))
        get = "GET {0} HTTP/1.0
Host:xkcd.com

".format(url)
        sock.send(get.encode('ascii'))
        response = yield from read(sock)
        print(response)

接下来定义一个完整的根据url,获取请求数据的代码,首先yield from connect,然后sock.send(get.encode('ascii')),
当数据写完后,再执行response = yield from read(sock) 获取到response,并且打印出来

class Task:
    def __init__(self, coro):
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)  # 激活Task包裹的生成器

    def step(self, future):
        try:
            # next_future = self.coro.send(future.result)
            next_future = self.coro.send(None)  # 驱动future
        except StopIteration:
            return

        next_future.add_done_callback(self.step)

然后定义一个task类,这个task类主要的作用有两个
1. 启动以及唤醒协程
2. 把对协程进行管理,把协程的启动函数send方法,添加到future的回调函数列表里面
那么当socket的回调函数执行set_result,set_result除了会设置result之外,也会执行future的回调函数列表里面的回调函数
当我们把协程的启动函数send放到future的回调的话,当set_result的时候就会重新唤醒协程,继续执行

def loop():
    while :
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()
这是一个事件循环的雏形,跟上一篇文章一样,就是不断地监控select所注册的socket,假如有socket就绪了,就把就绪的socket的回调取出来,然后执行
import socket
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE

selector = DefaultSelector()
stopped = False
urls_todo = {"/", "/1", "/2", "/3"}
if __name__ == "__main__":
    import time
    start = time.time()
    for url in urls_todo:
        Task(fetch(url))
    loop()
    print(time.time() - start)
然后这是入口函数,我们来一句一句的分析一下这个整个代码执行了什么
1. 首先我们初始化了三个生成器fetch(url),并且传递到task,创建了三个task
并且三个task都执行了next_future = self.coro.send(None),开始了生成器的第一次执行
2. 生成器第一次执行的位置是yield from connect(sock, ("xkcd.com", 80)) --> connect函数里面的
    f = Future()
    sock.setblocking(False)
    try:
        sock.connect(address)
    except BlockingIOError:
        pass

    def on_connected():
        f.set_result(None)

    selector.register(sock.fileno(), EVENT_WRITE, on_connected)
    yield from f
进行了socket的连接,以及写事件的回调注册,这里的回调注册仅仅是把future给set_result
然后yield from f会执行future里面的_iter_函数并且yield自身,也就是执行到这里
函数yield出来了一个future对象
3.next_future = self.coro.send(None) 所以这一句获取的值就是一个future对象,
next_future.add_done_callback(self.step) 就给这个future对象分别设置了task.step添加到future的回调函数列表
也就说说 三个生成器分别产生了三个future对象,同时三个future都把对应的生成器的启动函数放到了future的回调函数列表

4. 然后代码执行到loop()这里,loop会进行select操作
当三个socket分别都连接完毕之后,就会返回对应的socket对象,然后分别把socket的回调函数取出来,进行执行
三个回调函数的内容都是给 f.set_result(None), future执行完set_result后,会执行
for fn in self._callbacks:
            fn(self)
回调列表里面的所有的回调函数
三个future的回调函数前面也说了,就是task的self.step 方法,也就说生成器的send方法,也是分别对生成器进行执行

5. 然后生成器进行执行,到下面这里,进行url请求,并且执行read函数,
get = "GET {0} HTTP/1.0
Host:xkcd.com

".format(url)
sock.send(get.encode('ascii'))
response = yield from read(sock)
进而执行到read函数里面,新建了future,并且注册socket的读取回调,回调函数是给future set_result 读取到的数据
f = Future()
def on_readable():
   f.set_result(sock.recv(4096))
selector.register(sock.fileno(), EVENT_READ, on_readable)
chunck = yield from f
跟前面步骤一样,然后yield出来一个future对象

6. 接下来继续执行step下面的代码,继续把step注册到future的回调函数列表里面
next_future.add_done_callback(self.step)
然后三个future都执行了相同的步骤
7. 执行到这里就继续进行loop函数的循环,继续select操作,当socket接收到数据之后,然后取出socket,执行socket的回调函数on_readable 里面的f.set_result(sock.recv(4096))
future保存了读取到的数到result变量里面,然后执行future回调列表里面的函数,回调函数也是继续激活生成器,执行的代码就是future的return self.result
就说read函数里面的
chunck = yield from f  把future结果赋值给了chunck
selector.unregister(sock.fileno())
return chunck   返回了chunck

然后继续返回到了fetch函数的
response = yield from read(sock)
print(response)

把读取出来的结果打印出来了,至此,整个程序也就执行完毕了,三个生成器分别完成了url的连接,数据读取的完整步骤

以上就是协程的基本流程了,python的asyncio的结构也是差不多,执行逻辑也是差不多,只是async在这样的逻辑基础上进行了更加复杂的功能,做了很多优化,上面的各个函数都能够对应得了asyncio的各个class
task对应着 asyncio的 Task 类
loop对应着 asyncio的事件循环
future对应着 asyncio的Future 类
fetch 对应着 asyncio的协程
至于asyncio后续找时间对它的源码分析一下,是完全可以跟上面的例子对应的上的




  



原文地址:https://www.cnblogs.com/wilken/p/14233000.html