Python Tornado框架(ioloop对象分析)

网上都说nginx和lighthttpd是高性能web服务器,而tornado也是著名的高抗负载应用,它们间有什么相似处呢?上节提到的ioloop对象是如何循环的呢?往下看。

首先关于TCP服务器的开发上节已经提过,很明显那个三段式的示例是个效率很低的(因为只有一个连接被端开新连接才能被接受)。要想开发高性能的服务器,就得在这accept上下功夫。

首先,新连接的到来一般是经典的三次握手,只有当服务器收到一个SYN时才说明有一个新连接(还没建立),这时监听fd是可读的可以调用accept,此前服务器可以干点别的,这就是SELECT/POLL/EPOLL的思路。而只有三次握手成功后,accept才会返回,此时监听fd是读完成状态,似乎服务器在此之前可以转身去干别的,等到读完成再调用accept就不会有延迟了,这就是AIO的思路,不过在*nix平台上好像支持不是很广。。。再有,accept得到的新fd,不一定是可读的(客户端请求还没到达),所以可以等新fd可读时在read()(可能会有一点延迟),也可以用AIO等读完后再read就不会延迟了。同样类似,对于write,close也有类似的事件。

总的思路就是,在我们关心的fd上注册关心的多个事件,事件发生了就启动回调,没发生就看点别的。这是单线程的,多线程的复杂一点,但差不多。nginx和lightttpd以及tornado都是类似的方式,只不过是多进程和多线程或单线程的区别而已。为简便,我们只分析tornado单线程的情况。

关于ioloop.py的代码,主要有两个要点。一个是configurable机制,一个就是epoll循环。先看epoll循环吧。IOLoop 类的start是循环所在,但它必须被子类覆盖实现,因此它的start在PollIOLoop里。略过循环外部的多线程上下文环境的保存与恢复,单看循环:

while True:
poll_timeout = 3600.0

# Prevent IO event starvation by delaying new callbacks
# to the next iteration of the event loop.
with self._callback_lock:
	callbacks = self._callbacks
	self._callbacks = []
for callback in callbacks:
	self._run_callback(callback)

if self._timeouts:
	now = self.time()
	while self._timeouts:
		if self._timeouts[0].callback is None:
			# the timeout was cancelled
			heapq.heappop(self._timeouts)
		elif self._timeouts[0].deadline <= now:
			timeout = heapq.heappop(self._timeouts)
			self._run_callback(timeout.callback)
		else:
			seconds = self._timeouts[0].deadline - now
			poll_timeout = min(seconds, poll_timeout)
			break

if self._callbacks:
	# If any callbacks or timeouts called add_callback,
	# we don't want to wait in poll() before we run them.
	poll_timeout = 0.0

if not self._running:
	break

if self._blocking_signal_threshold is not None:
	# clear alarm so it doesn't fire while poll is waiting for
	# events.
	signal.setitimer(signal.ITIMER_REAL, 0, 0)

try:
	event_pairs = self._impl.poll(poll_timeout)
except Exception as e:
	# Depending on python version and IOLoop implementation,
	# different exception types may be thrown and there are
	# two ways EINTR might be signaled:
	# * e.errno == errno.EINTR
	# * e.args is like (errno.EINTR, 'Interrupted system call')
	if (getattr(e, 'errno', None) == errno.EINTR or
		(isinstance(getattr(e, 'args', None), tuple) and
		 len(e.args) == 2 and e.args[0] == errno.EINTR)):
		continue
	else:
		raise

if self._blocking_signal_threshold is not None:
	signal.setitimer(signal.ITIMER_REAL,
					 self._blocking_signal_threshold, 0)

# Pop one fd at a time from the set of pending fds and run
# its handler. Since that handler may perform actions on
# other file descriptors, there may be reentrant calls to
# this IOLoop that update self._events
self._events.update(event_pairs)
while self._events:
	fd, events = self._events.popitem()
	try:
		self._handlers[fd](fd, events)
	except (OSError, IOError) as e:
		if e.args[0] == errno.EPIPE:
			# Happens when the client closes the connection
			pass
		else:
			app_log.error("Exception in I/O handler for fd %s",
						  fd, exc_info=True)
	except Exception:
		app_log.error("Exception in I/O handler for fd %s",
					  fd, exc_info=True)

首先是设定超时时间。然后在互斥锁下取出上次循环遗留下的回调列表(在add_callback添加对象),把这次列表置空,然后依次执行列表里的回调。这里的_run_callback就没什么好分析的了。紧接着是检查上次循环遗留的超时列表,如果列表里的项目有回调而且过了截止时间,那肯定超时了,就执行对应的超时回调。然后检查是否又有了事件回调(因为很多回调函数里可能会再添加回调),如果是,则不在poll循环里等待,如注释所述。接下来最关键的一句是event_pairs = self._impl.poll(poll_timeout),这句里的_impl是epoll,在platform/epoll.py里定义,总之就是一个等待函数,当有事件(超时也算)发生就返回。然后把事件集保存下来,对于每个事件,self._handlers[fd](fd, events)根据fd找到回调,并把fd和事件做参数回传。如果fd是监听的fd,那么这个回调handler就是accept_handler函数,详见上节代码。如果是新fd可读,一般就是_on_headers 或者 _on_requet_body了,详见前几节。我好像没看到可写时的回调?以上,就是循环的流程了。可能还是看的糊里糊涂的,因为很多对象怎么来的都不清楚,configurable也还没有看。看完下面的分析,应该就可以了。

Configurable类在util.py里被定义。类里有一段注释,已经很明确的说明了它的设计意图和用法。它是可配置接口的父类,可配置接口对外提供一致的接口标识,但它的子类实现可以在运行时进行configure。一般在跨平台时由于子类实现有多种选择,这时候就可以使用可配置接口,例如select和epoll。首先注意 Configurable 的两个函数: configurable_base 和 configurable_default, 两函数都需要被子类(即可配置接口类)覆盖重写。其中,base函数一般返回接口类自身,default返回接口的默认子类实现,除非接口指定了__impl_class。IOLoop及其子类实现都没有初始化函数也没有构造函数,其构造函数继承于Configurable,如下:

def __new__(cls, **kwargs):
	base = cls.configurable_base()
	args = {}
	if cls is base:
		impl = cls.configured_class()
		if base.__impl_kwargs:
			args.update(base.__impl_kwargs)
	else:
		impl = cls
	args.update(kwargs)
	instance = super(Configurable, cls).__new__(impl)
	# initialize vs __init__ chosen for compatiblity with AsyncHTTPClient
	# singleton magic.  If we get rid of that we can switch to __init__
	# here too.
	instance.initialize(**args)
	return instance

当子类对象被构造时,子类__new__被调用,因此参数里的cls指的是Configurabel的子类(可配置接口类,如IOLoop)。先是得到base,查看IOLoop的代码发现它返回的是自身类。由于base和cls是一样的,所以调用configured_class()得到接口的子类实现,其实就是调用base(现在是IOLoop)的configurable_default,总之就是返回了一个子类实现(epoll/kqueue/select之一),顺便把__impl_kwargs合并到args里。接着把kwargs并到args里。然后调用Configurable的父类(Object)的__new__方法,生成了一个impl的对象,紧接着把args当参数调用该对象的initialize(继承自PollIOloop,其initialize下段进行分析),返回该对象。

所以,当构造IOLoop对象时,实际得到的是EPollIOLoop或其它相似子类。另外,Configurable 还提供configure方法来给接口指定实现子类和参数。可以看的出来,Configurable类主要提供构造方法,相当于对象工厂根据配置来生产对象,同时开放configure接口以供配置。而子类按照约定调整配置即可得到不同对象,代码得到了复用。

解决了构造,来看看IOLoop的instance方法。先检查类是否有成员_instance,一开始肯定没有,于是就构造了一个IOLoop对象(即EPollIOLoop对象)。以后如果再调用instance,得到的则是已有的对象,这样就确保了ioloop在全局是单例。再看epoll循环时注意到self._impl,Configurable 和 IOLoop 里都没有, 这是在哪儿定义的呢? 为什么IOLoop的start跑到PollIOLoop里,应该是EPollIOLoop才对啊。 对,应该看出来了,EPollIOLoop 就是PollIOLoop的子类,所以方法被继承了是很常见的哈。

从上一段的构造流程里可以看到,EPollIOLoop对象的initialize方法被调用了,看其代码发现它调用了其父类(PollIOLoop)的它方法, 并指定了impl=select.epoll(), 然后在父类的方法里就把它保存了下来,所以self._impl.poll就等效于select.epoll().poll().PollIOLoop里还有一些注册,修改,删除监听事件的方法,其实就是对self._impl的封装调用。就如上节的 add_accept_handler 就是调用ioloop的add_handler方法把监听fd和accept_handler方法进行关联。

IOLoop基本是个事件循环,因此它总是被其它模块所调用。而且为了足够通用,基本上对回调没多大限制,一个可执行对象即可。事件分发就到此结束了,和IO事件密切相关的另一个部分是IOStream,看看它是如何读写的。

IOLoop instance()方法的讲解

Tornado 的源码写得有点难懂,需要你理解好 socket、epoll 这样的东西才能充分理解。需要深入到 Tornado 的源码,ioloop.py 这个文件很关键。

接下来,我们继续读 ioloop.py 这个文件。

IOLoop 是基于 epoll 实现的底层网络I/O的核心调度模块,用于处理 socket 相关的连接、响应、异步读写等网络事件。每个 Tornado 进程都会初始化一个全局唯一的 IOLoop 实例,在 IOLoop 中通过静态方法 instance() 进行封装,获取 IOLoop 实例直接调用此方法即可。

@staticmethod
def instance():
	"""Returns a global `IOLoop` instance.

	Most applications have a single, global `IOLoop` running on the
	main thread.  Use this method to get this instance from
	another thread.  To get the current thread's `IOLoop`, use `current()`.
	"""
	if not hasattr(IOLoop, "_instance"):
		with IOLoop._instance_lock:
			if not hasattr(IOLoop, "_instance"):
				# New instance after double check
				IOLoop._instance = IOLoop()
	return IOLoop._instance

Tornado 服务器启动时会创建监听 socket,并将 socket 的 file descriptor 注册到 IOLoop 实例中,IOLoop 添加对 socket 的IOLoop.READ 事件监听并传入回调处理函数。当某个 socket 通过 accept 接受连接请求后调用注册的回调函数进行读写。接下来主要分析IOLoop 对 epoll 的封装和 I/O 调度具体实现。

epoll是Linux内核中实现的一种可扩展的I/O事件通知机制,是对POISX系统中 select 和 poll 的替代,具有更高的性能和扩展性,FreeBSD中类似的实现是kqueue。Tornado中基于Python C扩展实现的的epoll模块(或kqueue)对epoll(kqueue)的使用进行了封装,使得IOLoop对象可以通过相应的事件处理机制对I/O进行调度。具体可以参考前面小节的 预备知识:我读过的对epoll最好的讲解 。

IOLoop模块对网络事件类型的封装与epoll一致,分为READ / WRITE / ERROR三类,具体在源码里呈现为:

# Our events map exactly to the epoll events
NONE = 0
READ = _EPOLLIN
WRITE = _EPOLLOUT
ERROR = _EPOLLERR | _EPOLLHUP

 回到前面章节的 开始用Tornado:从Hello World开始 里面的示例,

http_server = tornado.httpserver.HTTPServer(application)
http_server.listen(options.port)
tornado.ioloop.IOLoop.instance().start()

前两句是启动服务器,启动服务器之后,还需要启动 IOLoop 的实例,这样可以启动事件循环机制,配合非阻塞的 HTTP Server 工作。更多关于 IOLoop的与Http服务器的细节,在 Tornado对Web请求与响应的处理机制 这里有介绍到。

这就是 IOLoop 的 instance() 方法的一些细节,接下来我们再看看 start() 的细节。

 IOLoop start()里的核心调度

IOLoop的初始化

初始化过程中选择 epoll 的实现方式,Linux 平台为 epoll,BSD 平台为 kqueue,其他平台如果安装有C模块扩展的 epoll 则使用 tornado对 epoll 的封装,否则退化为 select。

def __init__(self, impl=None):
    self._impl = impl or _poll()
    #省略部分代码
    self._waker = Waker()
    self.add_handler(self._waker.fileno(),
                     lambda fd, events: self._waker.consume(),
                     self.READ)

def add_handler(self, fd, handler, events):
    """Registers the given handler to receive the given events for fd."""
    self._handlers[fd] = stack_context.wrap(handler)
    self._impl.register(fd, events | self.ERROR)

在 IOLoop 初始化的过程中创建了一个 Waker 对象,将 Waker 对象 fd 的读端注册到事件循环中并设定相应的回调函数(这样做的好处是当事件循环阻塞而没有响应描述符出现,需要在最大 timeout 时间之前返回,就可以向这个管道发送一个字符)。

Waker 的使用:一种是在其他线程向 IOLoop 添加 callback 时使用,唤醒 IOLoop 同时会将控制权转移给 IOLoop 线程并完成特定请求。唤醒的方法向管道中写入一个字符'x'。另外,在 IOLoop的stop 函数中会调用self._waker.wake(),通过向管道写入'x'停止事件循环。

add_handler 函数使用了stack_context 提供的 wrap 方法。wrap 返回了一个可以直接调用的对象并且保存了传入之前的堆栈信息,在执行时可以恢复,这样就保证了函数的异步调用时具有正确的运行环境。

IOLoop的start方法

IOLoop 的核心调度集中在 start() 方法中,IOLoop 实例对象调用 start 后开始 epoll 事件循环机制,该方法会一直运行直到 IOLoop 对象调用 stop 函数、当前所有事件循环完成。start 方法中主要分三个部分:一个部分是对超时的相关处理;一部分是 epoll 事件通知阻塞、接收;一部分是对 epoll 返回I/O事件的处理。

  • 为防止 IO event starvation,将回调函数延迟到下一轮事件循环中执行。
  • 超时的处理 heapq 维护一个最小堆,记录每个回调函数的超时时间(deadline)。每次取出 deadline 最早的回调函数,如果callback标志位为 True 并且已经超时,通过 _run_callback 调用函数;如果没有超时需要重新设定 poll_timeout 的值。
  • 通过 self._impl.poll(poll_timeout) 进行事件阻塞,当有事件通知或超时时 poll 返回特定的 event_pairs。
  • epoll 返回通知事件后将新事件加入待处理队列,将就绪事件逐个弹出,通过stack_context.wrap(handler)保存的可执行对象调用事件处理。
while True:
    poll_timeout = 3600.0

    with self._callback_lock:
        callbacks = self._callbacks
        self._callbacks = []
    for callback in callbacks:
        self._run_callback(callback)

    # 超时处理
    if self._timeouts:
        now = time.time()
        while self._timeouts:
            if self._timeouts[0].callback is None:
                # the timeout was cancelled
                heapq.heappop(self._timeouts)
            elif self._timeouts[0].deadline <= now:
                timeout = heapq.heappop(self._timeouts)
                self._run_callback(timeout.callback)
            else:
                seconds = self._timeouts[0].deadline - now
                poll_timeout = min(seconds, poll_timeout)
                break

    if self._callbacks:
        # If any callbacks or timeouts called add_callback,
        # we don't want to wait in poll() before we run them.
        poll_timeout = 0.0

    if not self._running:
        break

    if self._blocking_signal_threshold is not None:
        # clear alarm so it doesn't fire while poll is waiting for events.
        signal.setitimer(signal.ITIMER_REAL, 0, 0)

    # epoll阻塞,当有事件通知或超时返回event_pairs
    try:
        event_pairs = self._impl.poll(poll_timeout)
    except Exception, e:
        # 异常处理,省略

    # 对epoll返回event_pairs事件的处理
    self._events.update(event_pairs)
    while self._events:
        fd, events = self._events.popitem()
        try:
            self._handlers[fd](fd, events)
        except Exception e:
            # 异常处理,省略

3.0后的一些改动

Tornado3.0以后 IOLoop 模块的一些改动。

IOLoop 成为 util.Configurable 的子类,IOLoop 中绝大多数成员方法都作为抽象接口,具体实现由派生类 PollIOLoop 完成。IOLoop 实现了 Configurable 中的 configurable_base 和 configurable_default 这两个抽象接口,用于初始化过程中获取类类型和类的实现方法(即 IOLoop 中 poller 的实现方式)。

在 Tornado3.0+ 中针对不同平台,单独出 poller 相应的实现,EPollIOLoop、KQueueIOLoop、SelectIOLoop 均继承于 PollIOLoop。下边的代码是 configurable_default 方法根据平台选择相应的 epoll 实现。初始化 IOLoop 的过程中会自动根据平台选择合适的 poller 的实现方法。

@classmethod
def configurable_default(cls):
	if hasattr(select, "epoll"):
		from tornado.platform.epoll import EPollIOLoop
		return EPollIOLoop
	if hasattr(select, "kqueue"):
		# Python 2.6+ on BSD or Mac
		from tornado.platform.kqueue import KQueueIOLoop
		return KQueueIOLoop
	from tornado.platform.select import SelectIOLoop
	return SelectIOLoop

IOLoop与Configurable类

IOLoop 是 tornado 的核心。程序中主函数通常调用 tornado.ioloop.IOLoop.instance().start() 来启动IOLoop,但是看了一下 IOLoop 的实现,start 方法是这样的:

def start(self):
	"""Starts the I/O loop.

	The loop will run until one of the callbacks calls `stop()`, which
	will make the loop stop after the current event iteration completes.
	"""
	raise NotImplementedError()

也就是说 IOLoop 是个抽象的基类,具体工作是由它的子类负责的。由于是 Linux 平台,所以应该用 Epoll,对应的类是 PollIOLoop。PollIOLoop 的 start 方法开始了事件循环。

问题来了,tornado.ioloop.IOLoop.instance() 是怎么返回 PollIOLoop 实例的呢?刚开始有点想不明白,后来看了一下 IOLoop 的代码就豁然开朗了。

IOLoop 继承自 Configurable,后者位于 tornado/util.py。

A configurable interface is an (abstract) class whose constructor acts as a factory function for one of its implementation subclasses. The implementation subclass as well as optional keyword arguments to its initializer can be set globally at runtime with configure.

Configurable 类实现了一个工厂方法,也就是设计模式中的“工厂模式”,看一下__new__函数的实现:

def __new__(cls, **kwargs):
	base = cls.configurable_base()
	args = {}
	if cls is base:
		impl = cls.configured_class()
		if base.__impl_kwargs:
			args.update(base.__impl_kwargs)
	else:
		impl = cls
	args.update(kwargs)
	instance = super(Configurable, cls).__new__(impl)
	# initialize vs __init__ chosen for compatiblity with AsyncHTTPClient
	# singleton magic.  If we get rid of that we can switch to __init__
	# here too.
	instance.initialize(**args)
	return instance

 当创建一个Configurable类的实例的时候,其实创建的是configurable_class()返回的类的实例。

@classmethod
def configured_class(cls):
	"""Returns the currently configured class."""
	base = cls.configurable_base()
	if cls.__impl_class is None:
		base.__impl_class = cls.configurable_default()
	return base.__impl_class

 最后,就是返回的configurable_default()。此函数在IOLoop中的实现如下:

@classmethod
def configurable_default(cls):
	if hasattr(select, "epoll"):
		from tornado.platform.epoll import EPollIOLoop
		return EPollIOLoop
	if hasattr(select, "kqueue"):
		# Python 2.6+ on BSD or Mac
		from tornado.platform.kqueue import KQueueIOLoop
		return KQueueIOLoop
	from tornado.platform.select import SelectIOLoop
	return SelectIOLoop

 EPollIOLoop 是 PollIOLoop 的子类。至此,这个流程就理清楚了。

对socket封装的IOStream机制概览

IOStream对socket读写进行了封装,分别提供读、写缓冲区实现对socket的异步读写。当socket被accept之后HTTPServer的_handle_connection会被回调并初始化IOStream对象,进一步通过IOStream提供的功能接口完成socket的读写。文章接下来将关注IOStream实现读写的细节。

IOStream的初始化

IOStream初始化过程中主要完成以下操作:

  1. 绑定对应的socket
  2. 绑定ioloop
  3. 创建读缓冲区_read_buffer,一个python deque容器
  4. 创建写缓冲区_write_buffer,同样也是一个python deque容器

IOStream提供的主要功能接口

主要的读写接口包括以下四个:

class IOStream(object):
	def read_until(self, delimiter, callback): 
	def read_bytes(self, num_bytes, callback, streaming_callback=None): 
	def read_until_regex(self, regex, callback): 
	def read_until_close(self, callback, streaming_callback=None): 
	def write(self, data, callback=None):
  • read_until和read_bytes是最常用的读接口,它们工作的过程都是先注册读事件结束时调用的回调函数,然后调用_try_inline_read方法。_try_inline_read首先尝试_read_from_buffer,即从上一次的读缓冲区中取数据,如果有数据直接调用 self._run_callback(callback, self._consume(data_length)) 执行回调函数,_consume消耗掉了_read_buffer中的数据;否则即_read_buffer之前没有未读数据,先通过_read_to_buffer将数据从socket读入_read_buffer,然后再执行_read_from_buffer操作。read_until和read_bytes的区别在于_read_from_buffer过程中截取数据的方法不同,read_until读取到delimiter终止,而read_bytes则读取num_bytes个字节终止。执行过程如下图所示
  • read_until_regex相当于delimiter为某一正则表达式的read_until。
  • read_until_close主要用于IOStream流关闭前后的读取:如果调用read_until_close时stream已经关闭,那么将会_consume掉_read_buffer中的所有数据;否则_read_until_close标志位设为True,注册_streaming_callback回调函数,调用_add_io_state添加io_loop.READ状态。
  • write首先将data按照数据块大小WRITE_BUFFER_CHUNK_SIZE分块写入write_buffer,然后调用handle_write向socket发送数据。

其他内部功能接口

  • def _handle_events(self, fd, events): 通常为IOLoop对象add_handler方法传入的回调函数,由IOLoop的事件机制来进行调度。
  • def _add_io_state(self, state): 为IOLoop对象的handler注册IOLoop.READ或IOLoop.WRITE状态,handler为IOStream对象的_handle_events方法。
  • def _consume(self, loc): 合并读缓冲区loc个字节,从读缓冲区删除并返回这些数据。

Tornado的多进程管理分析

Tornado的多进程管理我们可以参看process.py这个文件。

在编写多进程的时候我们一般都用python自带的multiprocessing,使用方法和threading基本一致,只需要继承里面的Process类以后就可以编写多进程程序了,这次我们看看tornado是如何实现他的multiprocessing,可以说实现的功能不多,但是更加简单高效。

我们只看fork_process里面的代码:

global _task_id
    assert _task_id is None
    if num_processes is None or num_processes <= 0:
        num_processes = cpu_count()
    if ioloop.IOLoop.initialized():
        raise RuntimeError("Cannot run in multiple processes: IOLoop instance "
                           "has already been initialized. You cannot call "
                           "IOLoop.instance() before calling start_processes()")
    logging.info("Starting %d processes", num_processes)
    children = {}

 这一段很简单,就是在没有传入进程数的时候使用默认的cpu个数作为将要生成的进程个数。

def start_child(i):
	pid = os.fork()
	if pid == 0:
		# child process
		_reseed_random()
		global _task_id
		_task_id = i
		return i
	else:
		children[pid] = i
		return None

这是一个内函数,作用就是生成子进程。fork是个很有意思的方法,他会同时返回两种状态,为什么呢?其实fork相当于在原有的一条路(父进程)旁边又修了一条路(子进程)。如果这条路修成功了,那么在原有的路上(父进程)你就看到旁边来了另外一条路(子进程),所以也就是返回新生成的那条路的名字(子进程的pid),但是在另外一条路上(子进程),你看到的是自己本身修建成功了,也就返回自己的状态码(返回结果是0)。

所以if pid==0表示这时候cpu已经切换到子进程了,相当于我们在新生成的这条路上面做事(返回任务id);else表示又跑到原来的路上做事了,在这里我们记录下新生成的子进程,这时候children[pid]=i里面的pid就是新生成的子进程的pid,而 i 就是刚才在子进程里面我们返回的任务id(其实就是用来代码子进程的id号)。

for i in range(num_processes):
	id = start_child(i)
	if id is not None:
		return id

 if id is not None表示如果我们在刚刚生成的那个子进程的上下文里面,那么就什么都不干,直接返回子进程的任务id就好了,啥都别想了,也别再折腾。如果还在父进程的上下文的话那么就继续生成子进程。

num_restarts = 0
    while children:
        try:
            pid, status = os.wait()
        except OSError, e:
            if e.errno == errno.EINTR:
                continue
            raise
        if pid not in children:
            continue
        id = children.pop(pid)
        if os.WIFSIGNALED(status):
            logging.warning("child %d (pid %d) killed by signal %d, restarting",
                            id, pid, os.WTERMSIG(status))
        elif os.WEXITSTATUS(status) != 0:
            logging.warning("child %d (pid %d) exited with status %d, restarting",
                            id, pid, os.WEXITSTATUS(status))
        else:
            logging.info("child %d (pid %d) exited normally", id, pid)
            continue
        num_restarts += 1
        if num_restarts > max_restarts:
            raise RuntimeError("Too many child restarts, giving up")
        new_id = start_child(id)
        if new_id is not None:
            return new_id

剩下的这段代码都是在父进程里面做的事情(因为之前在子进程的上下文的时候已经返回了,当然子进程并没有结束)。

pid, status = os.wait()的意思是等待任意子进程退出或者结束,这时候我们就把它从我们的children表里面去除掉,然后通过status判断子进程退出的原因。

如果子进程是因为接收到kill信号或者抛出exception了,那么我们就重新启动一个子进程,用的当然还是刚刚退出的那个子进程的任务号。如果子进程是自己把事情做完了才退出的,那么就算了,等待别的子进程退出吧。

我们看到在重新启动子进程的时候又使用了

if new_id is not None:
    return new_id

 主要就是退出子进程的空间,只在父进程上面做剩下的事情,不然刚才父进程的那些代码在子进程里面也会同样的运行,就会形成无限循环了,我没试过,不如你试试?

原文地址:https://www.cnblogs.com/jasonwang-2016/p/5950548.html