Tornado源码分析之事件循环

hello world

#!/usr/bin/env python
import tornado.httpserver
import tornado.ioloop
import tornado.options
import tornado.web
from tornado.options import define, options
define("port", default=8888, help="run on the given port", type=int)
class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("Hello, world")
def main():
    tornado.options.parse_command_line()
    application = tornado.web.Application([
        (r"/", MainHandler),
    ])
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.instance().start()
if __name__ == "__main__":
    main()

tornado提供了高效的异步机制,我们先不管Application实例化过程,以及http_server创建socket、bind、listen的过程,直接调IOLoop.instance().start进行源码分析。

IOLoop.instance()

    @classmethod
    def instance(cls):
        """Returns a global IOLoop instance.
        Most single-threaded applications have a single, global IOLoop.
        Use this method instead of passing around IOLoop instances
        throughout your code.
        A common pattern for classes that depend on IOLoops is to use
        a default argument to enable programs with multiple IOLoops
        but not require the argument for simpler applications:
            class MyClass(object):
                def __init__(self, io_loop=None):
                    self.io_loop = io_loop or IOLoop.instance()
        """
        if not hasattr(cls, "_instance"):
            cls._instance = cls()
        return cls._instance

显然是一个单例模式,注意tornado中的注释,大多数单线程只能有一个ioloop。

start()

这个函数将开始事件循环。

def start():
     """
     Starts the I/O loop.
     The loop will run until one of the I/O handlers calls stop(), which
      will make the loop stop after the current event iteration completes.
      """
        # 判断是否设置了,如果是,将直接退出。
        if self._stopped:
            self._stopped = False
            return
        self._running = True
        while True:
            # Never use an infinite timeout here - it can stall epoll
            # 设置轮询时间
            poll_timeout = 0.2
            # Prevent IO event starvation by delaying new callbacks
            # to the next iteration of the event loop.
            callbacks = self._callbacks
            self._callbacks = []
            # 及时调用回调函数
            for callback in callbacks:
                self._run_callback(callback)
            if self._callbacks:
                poll_timeout = 0.0
            # 如果设置了超时时间
            if self._timeouts:
                # 获取当前时间
                now = time.time()
                while self._timeouts and self._timeouts[0].deadline <= now:
                    timeout = self._timeouts.pop(0)
                    self._run_callback(timeout.callback)
                if self._timeouts:
                    milliseconds = self._timeouts[0].deadline - now
                    poll_timeout = min(milliseconds, poll_timeout)
            # 再一次检查事件循环是否在运行
            if not self._running:
                break
            # 目前不清楚作用
            if self._blocking_signal_threshold is not None:
                # clear alarm so it doesn't fire while poll is waiting for
                # events.
                signal.setitimer(signal.ITIMER_REAL, 0, 0)

            try:
                # 开始等待事件发生
                # _impl初始化和poll源代码见下面
                event_pairs = self._impl.poll(poll_timeout)
            except Exception, e:
                # Depending on python version and IOLoop implementation,
                # different exception types may be thrown and there are
                # two ways EINTR might be signaled:
                # * e.errno == errno.EINTR
                # * e.args is like (errno.EINTR, 'Interrupted system call')
                if (getattr(e, 'errno', None) == errno.EINTR or
                    (isinstance(getattr(e, 'args', None), tuple) and
                     len(e.args) == 2 and e.args[0] == errno.EINTR)):
                    continue
                else:
                    raise
            if self._blocking_signal_threshold is not None:
                signal.setitimer(signal.ITIMER_REAL,
                                 self._blocking_signal_threshold, 0)
            # Pop one fd at a time from the set of pending fds and run
            # its handler. Since that handler may perform actions on
            # other file descriptors, there may be reentrant calls to
            # this IOLoop that update self._events
            self._events.update(event_pairs)
            while self._events:
                fd, events = self._events.popitem()
                try:
                    # 见下面的分析
                    self._handlers[fd](fd, events)
                except (KeyboardInterrupt, SystemExit):
                    raise
                except (OSError, IOError), e:
                    if e.args[0] == errno.EPIPE:
                        # Happens when the client closes the connection
                        pass
                    else:
                        logging.error("Exception in I/O handler for fd %d",
                                      fd, exc_info=True)
                except:
                    logging.error("Exception in I/O handler for fd %d",
                                  fd, exc_info=True)
        # reset the stopped flag so another start/stop pair can be issued
        self._stopped = False
        if self._blocking_signal_threshold is not None:
            signal.setitimer(signal.ITIMER_REAL, 0, 0)

我们看看输入localhost:8888,event_pairs的值:

可以看出,event_pairs是一个元组列表,其中第一个成员4为accept套接字值,1表示为事件类型。我们看看事件类型为:

 _EPOLLIN = 0x001
 _EPOLLPRI = 0x002
 _EPOLLOUT = 0x004  
 _EPOLLERR = 0x008
 _EPOLLHUP = 0x010
 _EPOLLRDHUP = 0x2000
 _EPOLLONESHOT = (1 << 30)
 _EPOLLET = (1 << 31)
 NONE = 0
 READ = _EPOLLIN
 WRITE = _EPOLLOUT
 ERROR = _EPOLLERR | _EPOLLHUP | _EPOLLRDHUP

可见上述,是文件描述符4,可读事件发生。

self._impl

我们跟踪self._impl初始化过程,可以看到事件循环核心epoll是如何被使用的。在IOLoop实例化开始:

class IOLoop(object):
    def __init__(self, impl = None):
        self._impl = impl or _poll

# Choose a poll implementation. Use epoll if it is available, fall back to
# select() for non-Linux platforms
# hasattr(object, attrname)表示某个对象中是否包含属性
if hasattr(select, "epoll"):
    # Python 2.6+ on Linux
    # 在linux上使用的是select.epoll
    _poll = select.epoll
elif hasattr(select, "kqueue"):
    # Python 2.6+ on BSD or Mac
    _poll = _KQueue
else:
    try:
        # Linux systems with our C module installed
        import epoll
        _poll = _EPoll
    except:
        # All other systems
        import sys
        if "linux" in sys.platform:
            logging.warning("epoll module not found; using select()")
        _poll = _Select

从上面的代码中,可以看到,_poll是对于多个平台下epoll、_kQueue的抽象。看一下select.epoll下的返回结果:其返回对象是一个边沿触发的polling对象,当然也可以用作水平触发。

返回的select.epoll对象的方法:

  • epoll.close() 关闭epoll fd文件描述符
  • epoll.fileno() 返回epoll fd文件描述符只
  • epoll.register(fd, eventmask) 注册fd某个事件
  • epoll.poll([timeout = -1, maxevents = -1]) wait for events. timeout in seconds

self._handlers[fd](fd, events)

显然,self._handlers[fd]是返回一个回调函数,用来处理fd上的事件events,这里测试的fd为4,事件EPOLLIN。我们来跟踪一下self._handlers变化过程。看看在IOLoop初始化的过程。

def __init__(self):
    self._handles = {}
    ....
    if os.name != 'nt':
        r, w = os.pipe()
        self._set_nonblocking(r)
        self._set_nonblocking(w)
        .....
        self._waker_reader = os.fdopen(r, "rb", 0)
        self._waker_writer = os.fdopen(w, "wb", 0)    
    # 显然这是对读管道文件描述符事件处理函数
    self.add_handler(r, self._read_waker, self.READ)

add_handler(self, fd, handler, events)

def add_handler(self, fd, handler, events):
    self._handlers[fd] = stack_context.wrap(handler)
    self._impl.register(fd, events| self.ERROR)

可见add_handler干了两件事情:

  • 回调函数设置,当然不仅仅是简单的将handler赋值,而是使用了stack_context.wrap包裹了该函数,具体实现,见下面。
  • epoll对象添加该事件,就是在代码的第二行。所以self._handlers[fd](fd, args)实际上就是设置的回调函数。那么用stack_context.wrap()来包裹究竟是为了什么了?

update_handler(self, fd, events)

 def update_handler(self, fd, events):
        """Changes the events we listen for fd."""
        self._impl.modify(fd, events | self.ERROR)

该函数用来修改fd感兴趣的事件

原文地址:https://www.cnblogs.com/bofengqiye/p/7353056.html