Python Tornado框架(TCP层)

Tornado在TCP层里的工作机制

上一节是关于应用层的协议 HTTP,它依赖于传输层协议 TCP,例如服务器是如何绑定端口的?HTTP 服务器的 handle_stream 是在什么时候被调用的呢?本节聚焦在 TCP 层次的实现,以便和上节的程序流程衔接起来。

首先是关于 TCP 协议。这是一个面向连接的可靠交付的协议。由于是面向连接,所以在服务器端需要分配内存来记忆客户端连接,同样客户端也需要记录服务器。由于保证可靠交付,所以引入了很多保证可靠性的机制,比如定时重传机制,SYN/ACK 机制等,相当复杂。所以,在每个系统里的 TCP 协议软件都是相当复杂的,本文不打算深入谈这些(我也谈不了多少,呵呵)。但我们还是得对 TCP 有个了解。先上一张图(UNIX 网络编程)-- 状态转换图。

 

除外,来一段TCP服务器端编程经典三段式代码(C实现):

// 创建监听socket
int sfd = socket(AF_INET, SOCK_STREAM, 0);  
// 绑定socket到地址-端口, 并在该socket上开始监听。listen的第二个参数叫backlog,和连接队列有关
bind(sfd,(struct sockaddr *)(&s_addr), sizeof(struct sockaddr)) && listen(sfd, 10); 
while(1) cfd = accept(sfd, (struct sockaddr *)(&cli_addr), &addr_size);

以上,忽略所有错误处理和变量声明,顾名思义吧…… 更多详细,可以搜 Linux TCP 服务器编程。所以,对于 TCP 编程的总结就是:创建一个监听 socket,然后把它绑定到端口和地址上并开始监听,然后不停 accept。这也是 tornado 的 TCPServer 要做的工作。

TCPServer 类的定义在 tcpserver.py。它有两种用法:bind+start 或者 listen。

第一种用法可用于多线程,但在 TCP 方面两者是一样的。就以 listen 为例吧。TCPServer 的__init__没什么注意的,就是记住了 ioloop 这个单例,这个下节再分析(它是tornado异步性能的关键)。listen 方法接收两个参数端口和地址,代码如下

def listen(self, port, address=""):
	"""Starts accepting connections on the given port.

	This method may be called more than once to listen on multiple ports.
	`listen` takes effect immediately; it is not necessary to call
	`TCPServer.start` afterwards.  It is, however, necessary to start
	the `.IOLoop`.
	"""
	sockets = bind_sockets(port, address=address)
	self.add_sockets(sockets)

以上。首先 bind_sockets 方法接收地址和端口创建 sockets 列表并绑定地址端口并监听(完成了TCP三部曲的前两部),add_sockets 在这些 sockets 上注册 read/timeout 事件。有关高性能并发服务器编程可以参照UNIX网络编程里给的几种编程模型,tornado 可以看作是单线程事件驱动模式的服务器,TCP 三部曲中的第三部就被分隔到了事件回调里,因此肯定要在所有的文件 fd(包括sockets)上监听事件。在做完这些事情后就可以安心的调用 ioloop 单例的 start 方法开始循环监听事件了。具体细节可以参照现代高性能 web 服务器(nginx/lightttpd等)的事件模型,后面也会涉及一点。

简言之,基于事件驱动的服务器(tornado)要干的事就是:创建 socket,绑定到端口并 listen,然后注册事件和对应的回调,在回调里accept 新请求。

bind_sockets 方法在 netutil 里被定义,没什么难的,创建监听 socket 后为了异步,设置 socket 为非阻塞(这样由它 accept 派生的socket 也是非阻塞的),然后绑定并监听之。add_sockets 方法接收 socket 列表,对于列表中的 socket,用 fd 作键记录下来,并调用add_accept_handler 方法。它也是在 netutil 里定义的,代码如下:

def add_accept_handler(sock, callback, io_loop=None):
    """Adds an `.IOLoop` event handler to accept new connections on ``sock``.

    When a connection is accepted, ``callback(connection, address)`` will
    be run (``connection`` is a socket object, and ``address`` is the
    address of the other end of the connection).  Note that this signature
    is different from the ``callback(fd, events)`` signature used for
    `.IOLoop` handlers.
    """
    if io_loop is None:
        io_loop = IOLoop.current()

    def accept_handler(fd, events):
        while True:
            try:
                connection, address = sock.accept()
            except socket.error as e:
                if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                    return
                raise
            callback(connection, address)
    io_loop.add_handler(sock.fileno(), accept_handler, IOLoop.READ)

需要注意的一个参数是 callback,现在指向的是 TCPServer 的 _handle_connection 方法。add_accept_handler 方法的流程:首先是确保ioloop对象。然后调用 add_handler 向 loloop 对象注册在fd上的read事件和回调函数accept_handler。该回调函数是现成定义的,属于IOLoop层次的回调,每当事件发生时就会调用。回调内容也就是accept得到新socket和客户端地址,然后调用callback向上层传递事件。从上面的分析可知,当read事件发生时,accept_handler被调用,进而callback=_handle_connection被调用。

_handle_connection就比较简单了,跳过那些ssl的处理,简化为两句stream = IOStream(connection, io_loop=self.io_loop)和self.handle_stream()。这里IOStream代表了IO层,以后再说,反正读写是不愁了。接着是调用handle_stream。我们可以看到,不论应用层是什么协议(或者自定义协议),当有新连接到来时走的流程是差不多的,都要经历一番上诉的回调,不同之处就在于这个handle_stream方法。这个方法是由子类自定义覆盖的,它的HTTP实现已经在上一节看过了。

到此,和上节的代码流程接上轨了。当事件发生时是如何回调的呢?app.py里的IOLoop.instance().start()又是怎样的流程呢?明天继续,看tornado异步高性能的根本所在

Tornado TCPServer类的设计解读

前文已经说过,HTTPServer是派生自TCPServer,从协议层次上讲,这再自然不过。

从TCPServer的实现上看,它是一个通用的server框架,基本是按照BSD socket的思想设计的。create-bind-listen三段式一个都不少。

从helloworld.py往下追,可以看到:

  1. helloworld.py中的main函数创建了HTTPServer.
  2. HTTPServer继承自TCPServer,在HTTPServer的构造函数中直接调用了TCPServer的构造函数。

接下来我们就去看看TCPServer这个类的实现,它的代码放在tornado/tcpserver.py中。tcpserver.py只有两百多行,不算多。所有代码都是在实现TCPServer这个类。

TCPServer

在TCPServer类的注释中,首先强调了它是一个non-blocking, single-threaded TCP Server。

怎么理解呢? 

non-blocking,就是说,这个服务器没有使用阻塞式API。

什么是阻塞式设计?举个例子,在BSD Socket里,recv函数默认是阻塞式的。使用recv读取客户端数据时,如果对方并未发送数据,则这个API就会一直阻塞那里不返回。这样服务器的设计不得不使用多线程或者多进程方式,避免因为一个API的阻塞导致服务器没法做其它事。阻塞式API是很常见的,我们可以简单认为,阻塞式设计就是“不管有没有数据,服务器都派API去读,读不到,API就不会回来交差”。

非阻塞,对recv来说,区别在于没有数据可读时,它不会在那死等,它直接就返回了。你可能会认为这办法比阻塞式还要矬,因为服务器无法预知有没有数据可读,不得不反复派recv函数去读。这不是浪费大量的CPU资源么?

当然不会这么傻。tornado这里说的非阻塞要高级得多,基本上是另一种思路:服务器并不主动读取数据,它和操作系统合作,实现了一种“监视器”,TCP连接就是它的监视对象。当某个连接上有数据到来时,操作系统会按事先的约定通知服务器:某某号连接上有数据到来,你去处理一下。服务器这时候才派API去取数据。服务器不用创建大量线程来阻塞式的处理每个连接,也不用不停派API去检查连接上有没有数据,它只需要坐那里等操作系统的通知,这保证了recv API出手就不会落空。

tornado另一个被强调的特征是single-threaded,这是因为我们的“监视器”非常高效,可以在一个线程里监视成千上万个连接的状态,基本上不需要再动用线程来分流。实测表明,它比阻塞式多线程或者多进程设计更加高效——当然,这依赖于操作系统的大力配合,现在主流操作系统都提供了非常高端大气上档次的“监视器”机制,比如epoll、kqueue。

作者提到这个类一般不直接被实例化,而是由它派生出子类,再用子类实例化。

为了强化这个设计思想,作者定义了一个未直接实现的接口,叫handle_stream()。

def handle_stream(self, stream, address):
    """Override to handle a new `.IOStream` from an incoming connection."""
    raise NotImplementedError()

这倒是个不错的技巧,强制让子类覆盖本方法,不然就报错给你看!

TCPServer是支持SSL的。由于Python的强大,支持SSL一点都不费事。要启动一个支持SSL的TCPServer,只需要告诉它你的certifile和keyfile就行。

TCPServer(ssl_options={"certfile": os.path.join(data_dir, "mydomain.crt"),
	"keyfile": os.path.join(data_dir, "mydomain.key"),})

关于这两个文件的来龙去脉,可以去Google“数字证书原理”这篇文章。

TCPServer的三种形式

TCPServer的初始化有三种形式。

1. 单进程形式

server = TCPServer()
server.listen(8888)
IOLoop.instance().start()

 我们在helloworld.py中看到的就是这种用法,不再赘述。

2. 多进程形式。

server = TCPServer()
server.bind(8888)
server.start(0)  # Forks multiple sub-processes
IOLoop.instance().start(

区别主要在server.start(0)这里。后面分析listen()与start()两个成员函数时,就会看到它们是怎么跟进程结合的。

注意:这种模式启动时,不能把IOLoop对象传递给TCPServer的构造函数,这样会导致TCPServer直接按单进程启动。

3. 高级多进程形式。

sockets = bind_sockets(8888)
tornado.process.fork_processes(0)
server = TCPServer()
server.add_sockets(sockets)
IOLoop.instance().start()

高级意味着复杂。从上面代码看,虽然只多了一两行,实际里面的流程有比较大的差别。

这种方式的主要优点就是 tornado.process.fork_processes(0)这句,它为进程的创建提供了更多的灵活性。当然现在说了也是糊涂,后面钻进这些代码后,我们再来验证这里的说法。

以上内容都是TCPServer类的doc string中提到的。后面小节开始看code。

从代码分析TCPServer类的机制

TCPServer的__init__函数很简单,仅保存了参数而已。

唯一要注意的是,它可以接受一个io_loop为参数。实际上io_loop对TCPServer来说并不是可有可无,它是必须的。不过TCPServer提供了多种渠道来与一个io_loop绑定,初始化参数只是其中一种绑定方式而已。

listen

接下来我们看一下listen函数,在helloworld.py中,httpserver实例创建之后,它被第一个调用。

TCPServer类的listen函数是开始接受指定端口上的连接。注意,这个listen与BSD Socket中的listen并不等价,它做的事比BSD socket()+bind()+listen()还要多。

注意在函数注释中提到的一句话:你可以在一个server的实例中多次调用listen,以实现一个server侦听多个端口。

怎么理解?在BSD Socket架构里,我们不可能在一个socket上同时侦听多个端口。反推之,不难想到,TCPServer的listen函数内部一定是执行了全套的BSD Socket三段式(create socket->bind->listen),使得每调用一次listen实际上是创建了一个新的socket。

代码很好的符合了我们的猜想:

def listen(self, port, address=""):
	sockets = bind_sockets(port, address=address)
	self.add_sockets(sockets)

两步走,先创建了一个socket,然后把它加到自己的侦听队列里。

bind_socket

bind_socket函数并不是TCPServer的成员,它定义在netutil.py中,原型:

def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=128, flags=None):

它也有大段的注释。

bind_socket完成的工作包括:创建socket,绑定socket到指定的地址和端口,开启侦听。

解释一下参数:

  • port不用说,端口号嘛。
  • address可以是IP地址,如“192.168.1.100”,也可以是hostname,比如“localhost”。如果是hostname,则可以监听该hostname对应的所有IP。如果address是空字符串(“”)或者None,则会监听主机上的所有接口。
  • family是指网络层协议类型。可以选AF_INET和AF_INET6,默认情况下则两者都会被启用。这个参数就是在BSD Socket创建时的那个sockaddr_in.sin_family参数哈。
  • backlog就是指侦听队列的长度,即BSD listen(n)中的那个n。
  • flags参数是一些位标志,它是用来传递给socket.getaddrinfo()函数的。比如socket.AI_PASSIVE等。

另外要注意,在IPV6和IPV4混用的情况下,这个函数的返回值可以是一个socket列表,因为这时候一个address参数可能对应一个IPv4地址和一个IPv6地址,它们的socket是不通用的,会各自独立创建。

现在来一行一行看下bind_socket的代码

sockets = []
if address == "":
	address = None
if not socket.has_ipv6 and family == socket.AF_UNSPEC:
	# Python can be compiled with --disable-ipv6, which causes
	# operations on AF_INET6 sockets to fail, but does not
	# automatically exclude those results from getaddrinfo
	# results.
	# http://bugs.python.org/issue16208
	family = socket.AF_INET
if flags is None:
	flags = socket.AI_PASSIVE

这一段平淡无奇,基本上都是前面讲到的参数赋值。

接下来就是一个大的循环:

for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM,0, flags)):

闹半天,前面解释的参数全都被socket.getaddrinfo()这个函数吃下去了。

socket.getaddrinfo()是python标准库中的函数,它的作用是将所接收的参数重组为一个结构res,res的类型将可以直接作为socket.socket()的参数。跟BSD Socket中的getaddrinfo差不多嘛。

之所以用了一个循环,正如前面讲到的,因为IPv6和IPv4混用的情况下,getaddrinfo会返回多个地址的信息。参见python文档中的说明和示例:

The function returns a list of 5-tuples with the following structure: (family, type, proto, canonname, sockaddr)

>>> socket.getaddrinfo("www.python.org", 80, proto=socket.SOL_TCP)
[(2, 1, 6, '', ('82.94.164.162', 80)),
 (10, 1, 6, '', ('2001:888:2000:d::a2', 80, 0, 0))]

 接下来的代码在循环体中,是针对单个地址的。循环体内一开始就如我们猜想,直接拿getaddrinfo的返回值来创建socket。

af, socktype, proto, canonname, sockaddr = res
try:
	sock = socket.socket(af, socktype, proto)
except socket.error as e:
	if e.args[0] == errno.EAFNOSUPPORT:
		continue
raise

 先从tuple中拆出5个参数,然后拣需要的来创建socket。

set_close_exec(sock.fileno())

这行是设置进程退出时对sock的操作。lose_on_exec 是一个进程所有文件描述符(文件句柄)的位图标志,每个比特位代表一个打开的文件描述符,用于确定在调用系统调用execve()时需要关闭的文件句柄(参见include/fcntl.h)。当一个程序使用fork()函数创建了一个子进程时,通常会在该子进程中调用execve()函数加载执行另一个新程序。此时子进程将完全被新程序替换掉,并在子进程中开始执行新程序。若一个文件描述符在close_on_exec中的对应比特位被设置,那么在执行execve()时该描述符将被关闭,否则该描述符将始终处于打开状态。

当打开一个文件时,默认情况下文件句柄在子进程中也处于打开状态。因此sys_open()中要复位对应比特位

if os.name != 'nt':
	sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

 对非NT的内核,需要额外设置一个SO_REUSEADDR参数。有些系统的设计里,服务器进程结束后端口也会被内核保持一段时间,若我们迅速的重启服务器,可能会遇到“端口已经被占用”的情况。这个标志就是通知内核不要保持了,进程一关,立马放手,便于后来者重用。

if af == socket.AF_INET6:
	 # On linux, ipv6 sockets accept ipv4 too by default,
	 # but this makes it impossible to bind to both
	 # 0.0.0.0 in ipv4 and :: in ipv6.  On other systems,
	 # separate sockets *must* be used to listen for both ipv4
	 # and ipv6.  For consistency, always disable ipv4 on our
	 # ipv6 sockets and use a separate ipv4 socket when needed.
	 #
	 # Python 2.x on windows doesn't have IPPROTO_IPV6.
	 if hasattr(socket, "IPPROTO_IPV6"):
		 sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)

 这段代码的说明已经很清楚了

sock.setblocking(0)
sock.bind(sockaddr)
sock.listen(backlog)
sockets.append(sock)

前面经常提BSD Socket的这几个家伙,现在它们终于出现了。“非阻塞”性质也是在这里决定的。

每创建一个socket都将它加入到前面定义的列表里,最后函数结束时,将列表返回。其实这个函数蛮简单的。为什么它不是TCPServer的成员函数?

原文地址:https://www.cnblogs.com/jasonwang-2016/p/5950106.html