Tornado 异步浅解

7.1 认识异步

1. 同步

我们用两个函数来模拟两个客户端请求,并依次进行处理:

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#  @Time: 2020/3/9 11:15
#  @Author:zhangmingda
#  @File: asynchronization.py
#  @Software: PyCharm
#  Description:了解异步工作原理
def req_a():
    '''模拟请求A'''
    print('开始处理请求A')
    print('完成处理请求A')

def req_b():
    '''模拟请求A'''
    print('开始处理请求B')
    print('完成处理请求B')
def main():
    """模拟tornado框架,处理两个请求"""
    req_a()
    req_b()

if __name__ == "__main__":
    main()

执行结果:

D:Python3_study ornado1Scriptspython.exe D:/Python3_study/tornado1/asynchronization.py
开始处理请求A
完成处理请求A
开始处理请求B
完成处理请求B

同步是按部就班的依次执行,始终按照同一个步调执行,上一个步骤未执行完不会执行下一步。

想一想,如果在处理请求req_a时需要执行一个耗时的工作(如IO),其执行过程如何?

import time


def req_a():
    '''模拟请求A'''
    print('开始处理请求A')
    long_io()
    print('完成处理请求A')

def req_b():
    '''模拟请求A'''
    print('开始处理请求B')
    print('完成处理请求B')
def main():
    """模拟tornado框架,处理两个请求"""
    req_a()
    req_b()
def long_io():
    '''模拟耗时的IO操作'''
    print('开始IO操作')
    time.sleep(5)
    print("完成IO操作")
    return "IO Complate!!"

if __name__ == "__main__":
    main()

执行过程:

D:Python3_study ornado1Scriptspython.exe D:/Python3_study/tornado1/asynchronization.py
开始处理请求A
开始IO操作
完成IO操作
完成处理请求A
开始处理请求B
完成处理请求B

在上面的测试中,我们看到耗时的操作会将代码执行阻塞住,即req_a未处理完req_b是无法执行的。

我们怎么解决耗时操作阻塞代码执行?

2. 异步

对于耗时的过程,我们将其交给别人(如其另外一个线程)去执行,而我们继续往下处理,当别人执行完耗时操作后再将结果反馈给我们,这就是我们所说的异步。

我们用容易理解的线程机制来实现异步。

2.1 回调写法实现原理

#!/usr/bin/env python3
# -*- coding:utf-8 -*-
#  @Time: 2020/3/15 15:10
#  @Author:zhangmingda
#  @File: asynchronization.py
#  @Software: PyCharm
#  Description:了解异步工作原理
import time
import threading

def long_io():
    '''模拟耗时的IO顺序执行操作'''
    print('开始IO操作')
    time.sleep(5)
    print("完成IO操作")
    return "IO Complate!!"


def long_io_async(callback):
    '''函数方式模拟异步IO:将耗时操作交给另一个线程来处理'''
    def fun(cb):
        '''模拟耗时的IO操作睡眠5秒,'''
        print('开始IO操作')
        time.sleep(5)
        print("完成IO操作")
        cb("IO Complate!!")
    #这里新启动了一个线程
    t1 = threading.Thread(target=fun,args=(callback,))
    t1.start()
class MyLongIOThread(threading.Thread):
    '''
    用类的方式模拟异步IO,启动一个新的线程模拟IO操作
    '''
    def __init__(self,callback):
        super(MyLongIOThread,self).__init__()
        self.callback = callback
    def run(self):
        '''模拟耗时的IO操作睡眠5秒,'''
        print('开始IO操作')
        time.sleep(5)
        print("完成IO操作")
        #开始执行回调函数,传递文本()给回调函数
        self.callback("IO Complate!!")

def on_finish(result):
    '''回调函数'''
    print('开始执行回调函数on_finish')
    print('result: %s' % result)
    print('完成回调函数on_finish')

def req_a():
    '''模拟请求A'''
    print('开始处理请求A')
    # io_resu = long_io() #测试顺序执行的效果
    long_io_async(on_finish)
    logio = MyLongIOThread(on_finish)
    logio.start()
    print('离开处理请求A')

def req_b():
    '''模拟请求A'''
    print('开始处理请求B')
    print('完成处理请求B')
def main():
    """模拟tornado框架,处理两个请求"""
    req_a()
    req_b()

if __name__ == "__main__":
    main()

执行过程:

D:Python3_study ornado1Scriptspython.exe D:/Python3_study/tornado1/asynchronization.py
开始处理请求A
开始IO操作
开始IO操作
离开处理请求A
开始处理请求B
完成处理请求B
完成IO操作
完成IO操作
开始执行回调函数on_finish
result: IO Complate!!
完成回调函数on_finish
开始执行回调函数on_finish
result: IO Complate!!
完成回调函数on_finish


Process finished with exit code 0


异步的特点是程序存在多个步调,即本属于同一个过程的代码可能在不同的步调上同时执行。

2.2 协程写法实现原理

在使用回调函数写异步程序时,需将本属于一个执行逻辑(处理请求a)的代码拆分成两个函数req_a和on_finish,这与同步程序的写法相差很大。而同步程序更便于理解业务逻辑,所以我们能否用同步代码的写法来编写异步程序?

回想yield关键字的作用?

初始版本

#!/usr/bin/env python2
# -*- coding:utf-8 -*-
#  @Time: 2020/3/15 15:21
#  @Author:zhangmingda
#  @File: yieldsaynchronization.py
#  @Software: PyCharm
#  Description:yield 模仿协程第一个版本

# coding:utf-8

import time
import threading

gen = None # 全局生成器,给MyLongIO使用

class MyLongIO(threading.Thread):
    def __init__(self):
        super(MyLongIO,self).__init__()
    def run(self):
        print('开始执行IO操作,需要5秒')
        global gen
        time.sleep(5)
        try:
            print('完成IO操作,并send结果唤醒挂起程序继续执行')
            gen.send("io result")  # 使用send返回结果并唤醒程序继续执行
        except StopIteration: #捕获生成器完成迭代,防止程序退出
            pass

def req_a():
    print("开始处理请求req_a")
    longio = MyLongIO()
    ret = yield longio.start()
    
    print("ret: %s" % ret)
    print("完成处理请求req_a")

def req_b():
    print("开始处理请求req_b")
    time.sleep(2)
    print("完成处理请求req_b")

def main():
    global gen
    gen = req_a()
    gen.next() # 开启生成器req_a的执行
    req_b()
    while 1:
        pass

if __name__ == '__main__':
    main()

执行过程:

(base) [root@zmdsdkhost ~]# python2 yieldtest.py
开始处理请求req_a
开始执行IO操作,需要5秒
开始处理请求req_b
完成处理请求req_b
完成IO操作,并send结果唤醒挂起程序继续执行
ret: io result
完成处理请求req_a

升级版本

我们在上面编写出的版本虽然req_a的编写方式很类似与同步代码,但是在main中调用req_a的时候却不能将其简单的视为普通函数,而是需要作为生成器对待。

现在,我们试图尝试修改,让req_a与main的编写都类似与同步代码。

#!/usr/bin/env python2
# coding:utf-8

import time
import thread

gen = None # 全局生成器,供long_io使用

def gen_coroutine(f):
    def wrapper(*args, **kwargs):
        global gen
        gen = f()
        gen.next()
    return wrapper

def long_io():
    def fun():
        print "开始执行IO操作"
        global gen
        time.sleep(5)
        try:
            print "完成IO操作,并send结果唤醒挂起程序继续执行"
            gen.send("io result")  # 使用send返回结果并唤醒程序继续执行
        except StopIteration: # 捕获生成器完成迭代,防止程序退出
            pass
    thread.start_new_thread(fun, ())

@gen_coroutine
def req_a():
    print "开始处理请求req_a"
    ret = yield long_io()
    print "ret: %s" % ret
    print "完成处理请求req_a"

def req_b():
    print "开始处理请求req_b"
    time.sleep(2)
    print "完成处理请求req_b"

def main():
    req_a()
    req_b()
    while 1:
        pass

if __name__ == '__main__':
    main()

执行过程:

开始处理请求req_a
开始处理请求req_b
开始执行IO操作
完成处理请求req_b
完成IO操作,并send结果唤醒挂起程序继续执行
ret: io result
完成处理请求req_a

最终版本

刚刚完成的版本依然不理想,因为存在一个全局变量gen来供long_io使用。我们现在再次改写程序,消除全局变量gen。

# coding:utf-8

import time
import thread

def gen_coroutine(f):
    def wrapper(*args, **kwargs):
        gen_f = f()  # gen_f为生成器req_a
        r = gen_f.next()  # r为生成器long_io
        def fun(g):
            ret = g.next() # 执行生成器long_io
            try:
                gen_f.send(ret) # 将结果返回给req_a并使其继续执行
            except StopIteration:
                pass
        thread.start_new_thread(fun, (r,))
    return wrapper

def long_io():
    print "开始执行IO操作"
    time.sleep(5)
    print "完成IO操作,yield回操作结果"
    yield "io result"

@gen_coroutine
def req_a():
    print "开始处理请求req_a"
    ret = yield long_io()
    print "ret: %s" % ret
    print "完成处理请求req_a"

def req_b():
    print "开始处理请求req_b"
    time.sleep(2)
    print "完成处理请求req_b"

def main():
    req_a()
    req_b()
    while 1:
        pass

if __name__ == '__main__':
    main()

执行过程:

开始处理请求req_a
开始处理请求req_b
开始执行IO操作
完成处理请求req_b
完成IO操作,yield回操作结果
ret: io result
完成处理请求req_a

这个最终版本就是理解Tornado异步编程原理的最简易模型,但是,Tornado实现异步的机制不是线程,而是epoll,即将异步过程交给epoll执行并进行监视回调。

需要注意的一点是,我们实现的版本严格意义上来说不能算是协程,因为两个程序的挂起与唤醒是在两个线程上实现的,而Tornado利用epoll来实现异步,程序的挂起与唤醒始终在一个线程上,由Tornado自己来调度,属于真正意义上的协程。虽如此,并不妨碍我们理解Tornado异步编程的原理。

思考

  1. Tornado里的异步就是协程,这句话对吗?
  2. Tornado中出现yield就是异步,这句话对吗?
  3. 怎么理解yield将程序挂起?在Tornado中又如何理解yield挂起程序实现异步?
原文地址:https://www.cnblogs.com/zhangmingda/p/12448109.html