Tornado框架简析

Tornado是一款轻量级的Web服务器,同时又是一个开发框架。采用单线程非阻塞I/O模型(epoll),主要是为了应对高并发 访问量而被开发出来,尤其适用于comet应用。

Tornado服务器3大核心模块:

(1) IOLoop

Tornado为了实现高并发和高性能,使用了一个IOLoop来处理socket的读写事件,IOLoop基于epoll,可以高效的响应网络事件。这是Tornado高效的保证。

tornado.ioloop.IOLoop.instance().start()

IOLoop使用了单例模式,处理所有IO事件,

实现为EPollIOLoop->PollIOLoop->IOLoop->Configurable

IOLoop中有四个重要的数据集: _events 和 _handlers 保存I/O事件和对应的处理器, _callbacks 和 _timeouts 保存(超时)回调。

关键函数:

def initialize(self, impl, time_func=None):
    super(PollIOLoop, self).initialize()
    self._impl = impl
    if hasattr(self._impl, 'fileno'):
        set_close_exec(self._impl.fileno())
    self.time_func = time_func or time.time
    #handlers 是一个函数集字典
    self._handlers = {}
    self._events = {}
    #回调函数集合
    self._callbacks = []
    self._callback_lock = threading.Lock()
    self._timeouts = []
    self._cancellations = 0
    self._running = False
    self._stopped = False
    self._closing = False
    self._thread_ident = None
    self._blocking_signal_threshold = None
    self._timeout_counter = itertools.count()

    # Create a pipe that we send bogus data to when we want to wake
    # the I/O loop when it is idle
    self._waker = Waker()
    self.add_handler(self._waker.fileno(),
                     lambda fd, events: self._waker.consume(),
                     self.READ)

其中,waker是一个发伪数据用的类,在需要时,我们可以用它唤醒空闲的I/O Loop。当我们调用add_callback时,为了让回调函数运行,可能会需要使用它发送一个伪数据。

#将文件描述符发生相应的事件时的回调函数对应
def add_handler(self, fd, handler, events):
    """Registers the given handler to receive the given events for fd."""
    self._handlers[fd] = stack_context.wrap(handler)
    #在 epoll 中注册对应事件
    #epoll_ctl
    self._impl.register(fd, events | self.ERROR)

其中stack_context.wrap()对handler进行封装,封装后记录了上下文信息。而_impl是对epoll的封装。

所以,只要把所有事件在IOLoop中进行注册,运行start函数后,就会进入进程的监听循环,循环监听所有的fd,并调用fd对应的handler。循环过程参考start()函数。

def start(self):
    while True:
        with self._callback_lock:
            callbacks = self._callbacks
            self._callbacks = []
        #运行所有callback
        for callback in callbacks:
            self._run_callback(callback)
        #取事件
        event_pairs = self._impl.poll(poll_timeout)
        self._events.update(event_pairs)
        while self._events:
            fd, events = self._events.popitem()
            try:
                #调用事件handler
                fd_obj, handler_func = self._handlers[fd]
                handler_func(fd_obj, events)
            except (OSError, IOError) as e:
                if errno_from_exception(e) == errno.EPIPE:
                    # Happens when the client closes the connection
                    pass
                else:
                    self.handle_callback_exception(self._handlers.get(fd))
            except Exception:
                self.handle_callback_exception(self._handlers.get(fd))

当poll中发现fp有read事件时,会调用对应的callback方法。如果fd是监听的fd,那么这个回调handler就是accept_handler函数(见下面HttpConnection的bind和add_scokets函数)。该方法会Accept连接并且紧跟着创建IOStream对象,read_until方法读完数据后,则调用_run_callback把处理函数(self._header_callback)加到IOLoop中,等到下次轮询时在最前面处理。

(2) IOStream

为了在处理请求的时候,实现对socket的异步读写, Tornado实现了IOStream类,用来处理socket的异步读写,负责异步通讯。

主要包括3个函数,

1.read_bytes(bytes,callback)在有固定的字节的数据到来的时候调用回调函数

2.read_until(delimiter,callback)在读取到固定的字符序列结尾后调用回调函数

3.write(data):异步写

(3) HTTPConnection

这个类用来处理http的请求, 包括读取http请求头, 读取post过来的数据,调用用户自定义的处理方法。以及把响应数据写给客户端socket。

def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=128): 
    sockets = bind_sockets(port, address=address, family=family,backlog=backlog)
        if self._started:
            self.add_sockets(sockets)
        else:
            self._pending_sockets.extend(sockets)
def add_sockets(self, sockets):
    if self.io_loop is None:
        self.io_loop = IOLoop.current()
    for sock in sockets:
        self._sockets[sock.fileno()] = sock
        add_accept_handler(sock, self._handle_connection,io_loop=self.io_loop)

socket启动后,监听各个sockets,事件到来时,调用_handle_connection。

def _handle_connection(self, connection, address):
    if self.ssl_options is not None:
        connection = ssl_wrap_socket(connection,self.ssl_options,
                                     server_side=True,
                                     do_handshake_on_connect=False)
        if self.ssl_options is not None:
            stream = SSLIOStream(connection, io_loop=self.io_loop,
                                 max_buffer_size=self.max_buffer_size,
                                 read_chunk_size=self.read_chunk_size)
        else:
            stream = IOStream(connection, io_loop=self.io_loop,
                              max_buffer_size=self.max_buffer_size,
                              read_chunk_size=self.read_chunk_size)
        self.handle_stream(stream, address)
def handle_stream(self, stream, address):
    context = _HTTPRequestContext(stream, address,
                                  self.protocol)
    conn = HTTP1ServerConnection(
        stream, self.conn_params, context)
    self._connections.add(conn)
    conn.start_serving(self)
def start_serving(self, delegate):
    assert isinstance(delegate, httputil.HTTPServerConnectionDelegate)
    self._serving_future = self._server_request_loop(delegate)
    # Register the future on the IOLoop so its errors get logged.
    self.stream.io_loop.add_future(self._serving_future,
                                   lambda f: f.result())

如前面所述,这里Accept连接并且紧跟着创建IOStream对象(不考虑https),调用handle_stream->start_serving->_server_request_loop处理请求。最后会调用_read_message读取数据,并注册回调函数。

最后抄一张图过来:

参考:

http://www.cnblogs.com/Bozh/archive/2012/07/22/2603976.html

http://kenby.iteye.com/blog/1159621

http://www.nowamagic.net/academy/detail/13321030

http://www.yeolar.com/note/2013/02/09/tornado-async-networking/

源码:

https://github.com/tornadoweb/tornado

原文地址:https://www.cnblogs.com/luosha/p/6185366.html