Tornado框架进阶

关于Tornado框架的异步非阻塞功能的个人学习心得:Tornado 和现在的主流 Web 服务器框架(包括大多数 Python 的框架)有着明显的区别:它是非阻塞式服务器,而且速度相当快。得利于其非阻塞的方式和对epoll的运用,在本文通过学习了解Tornado异步非阻塞功能的使用进而一步步分析异步非阻塞web框架的实现原理....

异步非阻塞框架介绍

大部分传统意义上的web框架(如django和flask)都是只能同步处理请求的,就是说一个请求来了只能先处理完这个请求才能接着去处理下一个请求,我们说这样的处理方式性能是比较低的!

而Tornado框架与传统Web框架不同,它最具特色地方就是它可以实现异步非阻塞去处理请求!!!(同样具有异步非阻塞功能的web框架还有node.js)

本文会首先介绍Tornado异步非阻塞功能的使用,然后通过自定制支持异步非阻塞Web框架来加深我们对异步非阻塞Web框架内部实现原理理解,让境界升华一波!

在开始学习之前首先要了解一些知识点:

异步IO模块和异步非阻塞框架:

  • 异步IO:就是遇到IO请求的时候,不会阻塞线程等待请求返回结果。
    请求完成后自动执行回调函数。之前我有写过关于异步IO模块的使用和实现的一篇个人心得,它开发的目的是为了让一个线程可以发送多个请求出去,并且请求返回响应后还是一个线程去处理以此伪造出“并发”发请求的效果(说是伪造是因为如果同时返回多个响应一个线程也只能串行的一个个处理)。
    异步IO模块的使用者是客户端!是客户端“并发”的向服务端发请求的模块!它的底层实现主要是通过select监听socket是否发生变化!

  • 作为服务端的开发者我们当然是希望一个线程能同一时段处理用户的请求越多越好。
    如果一个请求在处理过程中有IO操作(例如连接数据库),那么线程就会阻塞住直到结果返回才能往下处理。这段阻塞的时间什么都不能做其实是白白浪费掉了,如果说能把这段阻塞的空闲时间利用起来,让线程碰到IO操作就不傻等,而是接着去处理别的请求,这种看似可以"并发"处理多个请求的方式不就可以大大的提高服务端的性能了吗?提供这种"并发"处理请求的web框架我们就叫做异步非阻塞Web框架。所以异步非阻塞框架的开发目的就是为了提高处理请求的并发!
    异步非阻塞Web框架使用者是服务端!是服务端“并发”的接收处理请求的功能模块
    PS:异步非阻塞框架只是针对有IO操作(会阻塞进程的操作)的业务才能实现伪造"并发"的效果,如果是计算密集型的业务那它还是只能串行的一个个处理请求,体现不了异步非阻塞功能!
    在实际运用场景中我们的业务中会采用许多别人提供的API来帮助我们实现功能,在调用别人API(一般是通过Http请求调用)的时候就涉及了IO请求操作这个时候异步非阻塞的功能就表现得牛的一批了

Tornado异步非阻塞功能

"""同步形式处理请求的形式:"""
import tornado.ioloop
import tornado.web
   
class MainHandler(tornado.web.RequestHandler):
    def get(self):
        import time
        time.sleep(8) #模拟IO操作
        self.write("Main")
        
        
class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Index")
   
application = tornado.web.Application([
    (r"/main", MainHandler),
    (r"/index", IndexHandler),
])
   
   
if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()
    
"""
浏览器打开两个窗口:
第一个窗口访问127.0.0.1:8888/main
第二个窗口访问127.0.0.1:8888/index
会发现:两个窗口的请求都被Hold住了!这是因为访问index的url只有在main被处理玩后才会被处理!
所以同步处理请求就是前一个没处理完,后一个请求就得等着它处理完才能被处理
"""
"""
异步非阻塞形式处理请求
装饰器 + Future 从而实现Tornado的异步非阻塞
"""
import tornado.ioloop
import tornado.web
from tornado import gen #装饰器
from tornado.concurrent import Future
import time

future = None #全局变量,用于手动的给future设置值
class MainHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        """
        当发送GET请求时,由于方法被@gen.coroutine装饰且yield 一个Future对象,那么Tornado会等待,等待我们向future对象中放置数据(通过future.set_result()放置)或者发送信号,如果获取到数据或信号之后,就开始执行done方法。
        注意:在等待我们向future对象中放置数据或信号时,此客户端和服务端连接是不断开的。所以在使用异步非阻塞功能处理完请求后要手动的把连接关掉!self.finish()
        """
        global future
        future = Future()
        #需要手动给future放置数据,才会触发回调函数的done的执行。
        #没有放置数据的这段时间内用户请求会被一直挂起
        future.add_done_callback(self.done)
        
        #特殊的形式模拟IO操作,跟time.sleep的区别是等待时间结束后自动执行某个方法
        #设置5s的等待时间,5s过后future放置数据了,web框架返回来继续处理该请求执行done方法
        #tornado.ioloop.IOLoop.current().add_timeout(time.time() + 5, self.done)
        yield future
        #web框架会监测future对象内的result属性有没有值,如果没有的话说明请求没处理完!
        #如果有值了就返回来继续处理该请求
        
    def done(self,*args,**kwargs):
        self.write('Main')
        self.finish() #关闭连接
        
class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        global future
        future.set_result(None) #手动给future放置数据
        self.write('Index')
        
application = tornado.web.Application([
    (r"/main",MainHandler),
    (r"/index",IndexHandler),
])

if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

Future对象的两个功能:
1.当我们处理请求使用yield future后,会挂起当前请求,线程可以处理其他请求(异步非阻塞的体现)
2.如果有人给future.result设置值(通过future.set_result()设置),当前挂起的请求返回

异步非阻塞体现在当在Tornado等待用户向future对象中放置数据(给result赋值)的这段时间,还可以处理其他请求。

实际上在yield Future对象后,web框架就会挂起当前请求,并去监测future对象中的result属性!直到有人给人给result设置值(可以通过future.set_result()设置)才会返回挂起的请求
在使用异步非阻塞功能的时候,当用户请求的业务中使用tornado给我提供的类库去执行io操作时(如上例的tornado.ioloop.IOLoop.current()),程序线程不会阻塞,而是会挂起当前请求,然后线程先去处理别的请求。待io操作有返回值后io模块内部会自动给future.result设置值。此时web框架就会监测到future.result有值了,就会返回接着去处理该请求!

要使用Tornado的异步非阻塞功能一般要搭配使用它的非阻塞IO操作模块(内部自动会在处理完结果后给future对象发送信号)!而在实际运用中我们的io操作大多是发起http请求去调用某个API,为此Tornado中为我们提供个一个异步IO模块供我们去异步的发送HTTP请求。

Tornado提供了httpclient类库用于发送Http请求(异步IO模块,发IO请求不会阻塞进程),其配合Tornado的异步非阻塞使用。

"""异步发请求+异步非阻塞处理请求"""
import tornado.web
from tornado import gen
from tornado import httpclient
 
# 方式一:
class AsyncHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self, *args, **kwargs):
        print('进入')
        http = httpclient.AsyncHTTPClient()
        #内部创建了future对象,请求返回响应后会改变future.result的值
        data = yield http.fetch("http://www.google.com") 
        print('完事',data)
        self.finish('6666')
 
# 方式二:
# class AsyncHandler(tornado.web.RequestHandler):
#     @gen.coroutine
#     def get(self):
#         print('进入')
#         http = httpclient.AsyncHTTPClient()
#         yield http.fetch("http://www.google.com", self.done)
#
#     def done(self, response):
#         print('完事')
#         self.finish('666')
 
application = tornado.web.Application([
    (r"/async", AsyncHandler),
])
 
if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start() 

自定制web框架

通过对Tornado异步非阻塞功能的学习,让我对web框架的内部实现有了更大的好奇心!web框架内部的异步非阻塞功能是怎么让一个线程去实现把当前请求挂起,去处理别的请求。又是怎么监听被挂起的请求,实现返回?所以接下来是我对web框架的底层实现的一些学习心得,我会通过自定制一个迷你版的web框架去一步步的分析探索web框架的奥秘~

web框架分为两类,一个是同步一个是异步。要自定制web框架必须要从socket入手,因为web框架的本质就是socket服务端!
异步功能实现其实都离不开我们的io多路复用的使用,所以异步非阻塞的web框架是基于非阻塞的socket+IO多路复用去实现的。之前有写过一篇异步IO模块的底层实现学习心得,所以在此处就不过多的对io多路复用和非阻塞socket做过多的叙述@

自定制mini版同步的web框架:

"""简单web框架(同步)"""
import socket
import select

class HttpRequest(object):
    """
    用于封装用户请求信息
    提供了请求数据的解析功能(把请求头请求体分割)
    """
    def __init__(self, content):
        #content:用户发送的请求数据:请求头和请求体
        self.content = content

        self.header_bytes = bytes() #请求头
        self.body_bytes = bytes() #请求体
        self.header_dict = {}
 

        self.method = ""
        self.url = ""
        self.protocol = ""

        self.initialize() #解析请求数据(处理请求头和请求体)
        self.initialize_headers() #解析请求头(解析成一个个键值对的格式)

    def initialize(self):
        """解析请求数据:分割请求头和请求体"""
		temp = self.content.split(b'\r\n\r\n',1) #将请求头和请求体分割(以双换行符分割)
        if len(temp) == 1:
            #没有请求体
            self.header_bytes += temp
        else:
            h, b = temp
            self.header_bytes = h #请求头
            self.body_bytes = b  #请求体
            header_flag = True

    @property
    def header_str(self):
        return str(self.header_bytes, encoding='utf-8')

    def initialize_headers(self):
        """
        解析请求头:弄成字典的形式存储,
        并把一些常用的值提取出来放到对象的属性中,如url,method
        """
        headers = self.header_str.split('\r\n')
        first_line = headers[0].split(' ')
        if len(first_line) == 3:
            self.method, self.url, self.protocol = headers[0].split(' ')
            for line in headers:
                kv = line.split(':')
                if len(kv) == 2:
                    k, v = kv
                    self.header_dict[k] = v


def index(request):
    #视图函数
    reuten 'deehuang真帅!'

routers = [
    ('/index',index)
] 	#路由:url与视图函数的对应关系

def run():
    sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    sock.bind(("127.0.0.1",8888,))
    sock.setblocking(False) #设置非阻塞socket
    sock.listen(128)

    #要接收多个请求,所以要使用select去帮助监听服务端sock的变化去跟多个客户端建立连接拿到客户端的socket,然后把客户端的socket再放到select里面去监听它的变化,如果有变化说明客户端发来请求了
    inputs = []
    inputs.append(sock)
    while True:
        rlist,wlist,elist = select.select(inputs,[],[],0.05)
        for r in rlist:
            if r == sock:
                """新请求到来,建立连接并把客户端的sock监听起来"""
                conn,addr = sock.accept()
                conn.setblocking(False) #把客户端的socket设置为非阻塞的
                inputs.append(conn) #把客户端的socket放到监听列表中
            else:
                """客户端发来请求数据"""
                data = b""  #http请求里面有请求头和请求体
                while True:
                    #数据一次可能接收不完所有要循环去收
                    #没有数据接收的时候表示已经接收完了因为设置了非阻塞socket所以会抛出异常
                    #所以要异常处理,且在接收完数据后应该终止循环
                    try:
                    chunk = r.recv(1024)
                    data += chunk  
                    except as Exception:
                        chunk = None
                    if not chunk:
                        break #接收完数据,终止循环

                    #对data进行处理:
                    #data中有请求头和请求体,需要把它们分开。用HttpRequest类帮我们解析请求数据
                    request = HttpRequest(data)
                    #1.请求头中获取URL
                    request_url = request.url
                    #2.去路由系统中匹配(通过正则匹配),获取指定的函数
                    import re
                    flag = False #匹配标志位,匹配成功了就为True
                    func = None
                    for route in routers:
                        if re.match(route[0],request_url): 
                            #匹配url,匹配成功就退出循环
                            flag = True
                            func = route[1] #url对应的视图函数
                            break
                    #3.匹配成功执行函数,获取返回值
                    #之所以是同步处理请求就是在这里执行函数的时候是同步操作!执行完才能继续往下走
                    if flag:
                        result = func(request) #ps:视图函数要传关于请求的所有信息 
                        #4.将返回值r.sendalll('')
                    	r.sendall(result,encoding='utf-8') #返回响应数据
                    else:
                        #没有匹配成功,返回404
                        r.sendall(b"404")
                  
                    
                    #http是短链接,请求响应后就要移除并关闭掉连接
                    inputs.remove(r) #从监听列表中移除连接
                    r.close() #关闭连接
                
  

通过上面的自定制mini版同步的web框架示例,不难理解同步的原因就在执行视图函数的这步操作!所以对于异步非阻塞框架就是在此这个地方上去做出改变,接下来我们来看一下怎么去模仿tornado使用future对象实现异步非阻塞的模式去自定制一个mini版本的异步非阻塞web框架

自定制mini版支持同步和异步非阻塞的web框架:

"""简单web框架(支持同步和异步非阻塞)"""
import socket
import select
import time

class HttpRequest(object):
    """
    用于封装用户请求信息
    提供了请求数据的解析功能(把请求头请求体分割)
    """
    def __init__(self, content):
        #content:用户发送的请求数据:请求头和请求体
        self.content = content

        self.header_bytes = bytes() #请求头
        self.body_bytes = bytes() #请求体
        self.header_dict = {}
 

        self.method = ""
        self.url = ""
        self.protocol = ""

        self.initialize() #解析请求数据(处理请求头和请求体)
        self.initialize_headers() #解析请求头(解析成一个个键值对的格式)

    def initialize(self):
        """解析请求数据:分割请求头和请求体"""
		temp = self.content.split(b'\r\n\r\n',1) #将请求头和请求体分割(以双换行符分割)
        if len(temp) == 1:
            #没有请求体
            self.header_bytes += temp
        else:
            h, b = temp
            self.header_bytes = h #请求头
            self.body_bytes = b  #请求体
            header_flag = True

    @property
    def header_str(self):
        return str(self.header_bytes, encoding='utf-8')

    def initialize_headers(self):
        """
        解析请求头:弄成字典的形式存储,
        并把一些常用的值提取出来放到对象的属性中,如url,method
        """
        headers = self.header_str.split('\r\n')
        first_line = headers[0].split(' ')
        if len(first_line) == 3:
            self.method, self.url, self.protocol = headers[0].split(' ')
            for line in headers:
                kv = line.split(':')
                if len(kv) == 2:
                    k, v = kv
                    self.header_dict[k] = v

class Future(object):
    def __init__(self,timeout=0):
        self.result = None #结果返回或超时后后给result赋值
        self.timeout = timeout #超时时间,超时后自动给result赋值
        self.start = time.time()
        
f = None                    
def main(request):
    return '我是同步视图函数'
    
def index(request):
    #异步非阻塞视图函数
	global f
    f = Future(5)
    return f #返回future对象则该视图就是异步非阻塞的

def stop(request):
    """模拟给future.result放置数据,让框架监测到返回处理挂起的请求"""
    global f
    f.result = b'xxxxxxx'
    return 'stop'

routers = [
    ('/main',main),
    ('/index',index),
    ('/stop',stop),
] 	#路由:url与视图函数的对应关系

def run():
    sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    sock.bind(("127.0.0.1",8888,))
    sock.setblocking(False) #设置非阻塞socket
    sock.listen(128)

    #要接收多个请求,所以要使用select去帮助监听服务端sock的变化去跟多个客户端建立连接拿到客户端的socket,然后把客户端的socket再放到select里面去监听它的变化,如果有变化说明客户端发来请求了
    inputs = []
    inputs.append(sock)
    
    async_request_dict = {} #存储异步请求和future对象 {用户socket:future}
    
    while True:
        rlist,wlist,elist = select.select(inputs,[],[],0.05)
        for r in rlist:
            if r == sock:
                """新请求到来,建立连接并把客户端的sock监听起来"""
                conn,addr = sock.accept()
                conn.setblocking(False) #把客户端的socket设置为非阻塞的
                inputs.append(conn) #把客户端的socket放到监听列表中
            else:
                """客户端发来请求数据"""
                data = b""  #http请求里面有请求头和请求体
                while True:
                    #数据一次可能接收不完所有要循环去收
                    #没有数据接收的时候表示已经接收完了因为设置了非阻塞socket所以会抛出异常
                    #所以要异常处理,且在接收完数据后应该终止循环
                    try:
                    chunk = r.recv(1024)
                    data += chunk  
                    except as Exception:
                        chunk = None
                    if not chunk:
                        break #接收完数据,终止循环

                    #对data进行处理:
                    #data中有请求头和请求体,需要把它们分开。用HttpRequest类帮我们解析请求数据
                    request = HttpRequest(data)
                    #1.请求头中获取URL
                    request_url = request.url
                    #2.去路由系统中匹配(通过正则匹配),获取指定的函数
                    import re
                    flag = False #匹配标志位,匹配成功了就为True
                    func = None
                    for route in routers:
                        if re.match(route[0],request_url): 
                            #匹配url,匹配成功就退出循环
                            flag = True
                            func = route[1] #url对应的视图函数
                            break
                    #3.匹配成功,执行函数,获取返回值
                    if flag:
                        result = func(request) #ps:视图函数要传关于请求的所有信息 
                        # 判断是异步还是同步处理就是判断函数返回的是字符串还是future对象
                        if isinstance(result,Future):
                            #如果视图返回的是future对象,挂起请求
                            #把用户的socket和future以键值对形式放在字典中存储
                            async_request_dict[r] =result
                        else:
                            #正常的同步视图,处理完后就返回响应,移除监听,断开连接
                            #将返回值r.sendalll('')
                    		r.sendall(result,encoding='utf-8') #返回响应数据
                            #http是短链接,请求响应后就要移除并关闭掉连接
                    		inputs.remove(r) #从监听列表中移除连接
                    		r.close() #关闭连接
                    else:
                        #没有匹配成功,返回404
                        r.sendall(b"404")
                        
        #监测异步请求是否返回数据,通过监测future对象中的result是否放置数据
		for conn in async_request_dict.keys():
       	  #async_request_dict的key是每个异步请求用户的socket连接,value是每个用户的future对象
            #拿到所有的异步请求的future对象
            future = asyc_request_dict.get(conn)
            
            #挂起请求如果设置了超时时间,则到时间后应该自动给result放数据,使挂起的请求继续执行
            start = future.start #请求开始挂起时间
            timeout = future.timeout #超时时间
            ctime = time.time() #当前时间
            if (start + timeout) <= ctime:
                #超时时间到,给future对象放置数据
                future.result = b'timeout' 
                
            if future.result:
                #future.result放置了数据,表示要请求数据准备好了,要返回去处理这个挂起的请求了
                #返回响应给客户端
                conn.sendall(future,result)
                conn.close() #响应完成,关闭连接
                del async_request_dict[conn] #将用户请求从异步列表中移除
                inputs.remove(conn) #将用户请求从监听列表中移除
             else:
                #请求的数据没准备好,什么都不干,继续等
                pass
                        
 
if __name__ == "__main__":
    run()         

总结:

从自定制非阻塞的web框架可以得出画龙点睛的结论:Tornado的非阻塞功能就是通过Future对象实现的!

future对象中的result属性标识IO操作是否已经完成。如果完成,则往result放置数据web框架监测到future对象这个标识就返回挂起的请求继续执行、给客户端返回响应、断开连接等操作;如果没有完成,则挂起请求连接一直不断开!

结语

以上是个人学习之路,如有误,欢迎指正!

参考文献1
参考文献2

原文地址:https://www.cnblogs.com/deehuang/p/14394810.html