twisted internet.reactor部分 源码分析

twisted.internet.reactor 是进行所有twisted事件循环的地方。

reactor在1个python进程中只能有一个。

在windows下用的是select。linux下epool。mac下是pool,这点和socketserver,tornado的都一样哈()。

源码位于twisted.internet.default._getInstallFunction

    try:
        if platform.isLinux():
            try:
                from twisted.internet.epollreactor import install
            except ImportError:
                from twisted.internet.pollreactor import install
        elif platform.getType() == 'posix' and not platform.isMacOSX():
            from twisted.internet.pollreactor import install
        else:
            from twisted.internet.selectreactor import install
    except ImportError:
        from twisted.internet.selectreactor import install
    return install

看SelectReactor

 

try:
from twisted.internet.win32eventreactor import _ThreadedWin32EventsMixin
except ImportError:
_extraBase = object
else:
_extraBase = _ThreadedWin32EventsMixin

  

@implementer(IReactorFDSet)
class SelectReactor(posixbase.PosixReactorBase, _extraBase):
    """
    A select() based reactor - runs on all POSIX platforms and on Win32.

    @ivar _reads: A set containing L{FileDescriptor} instances which will be
        checked for read events.

    @ivar _writes: A set containing L{FileDescriptor} instances which will be
        checked for writability.
    """
    balabala........

主要功能实现在posixbase.PosixReactorBase这个类。然后根据os的不同,混入不同的多线程mixin。

每种Reactor要实现IReactorFDSet的接口。接口定义的是管理 端口状态的方法。比如addReader,addWriter。。还是放代码吧。

class IReactorFDSet(Interface):
    """
    Implement me to be able to use L{IFileDescriptor} type resources.

    This assumes that your main-loop uses UNIX-style numeric file descriptors
    (or at least similarly opaque IDs returned from a .fileno() method)
    """

    def addReader(reader):
        """
        I add reader to the set of file descriptors to get read events for.

        @param reader: An L{IReadDescriptor} provider that will be checked for
                       read events until it is removed from the reactor with
                       L{removeReader}.

        @return: C{None}.
        """

    def addWriter(writer):
        """
        I add writer to the set of file descriptors to get write events for.

        @param writer: An L{IWriteDescriptor} provider that will be checked for
                       write events until it is removed from the reactor with
                       L{removeWriter}.

        @return: C{None}.
        """

    def removeReader(reader):
        """
        Removes an object previously added with L{addReader}.

        @return: C{None}.
        """

    def removeWriter(writer):
        """
        Removes an object previously added with L{addWriter}.

        @return: C{None}.
        """

    def removeAll():
        """
        Remove all readers and writers.

        Should not remove reactor internal reactor connections (like a waker).

        @return: A list of L{IReadDescriptor} and L{IWriteDescriptor} providers
                 which were removed.
        """

    def getReaders():
        """
        Return the list of file descriptors currently monitored for input
        events by the reactor.

        @return: the list of file descriptors monitored for input events.
        @rtype: C{list} of C{IReadDescriptor}
        """

    def getWriters():
        """
        Return the list file descriptors currently monitored for output events
        by the reactor.

        @return: the list of file descriptors monitored for output events.
        @rtype: C{list} of C{IWriteDescriptor}
        """

  没什么奇怪的地方。。

  所以就不看这边了。这地方太熟了。除去了多线程的Mixin和端口reader,writer的接口,剩下的就是reactor主要部分啦

  接下来看下posixbase.PosixReactorBase这个基类。为什么是PosixReactorBase而不是ReactorBase呢,因为后者是他的父类。。

  

@implementer(IReactorTCP, IReactorUDP, IReactorMulticast)
class PosixReactorBase(_SignalReactorMixin, _DisconnectSelectableMixin,
                       ReactorBase):
    """    A basis for reactors that use file descriptors.

    @ivar _childWaker: C{None} or a reference to the L{_SIGCHLDWaker}
        which is used to properly notice child process termination.
    """

    # Callable that creates a waker, overrideable so that subclasses can
    # substitute their own implementation:

 

IReactorTCP, IReactorUDP, IReactorMulticast是定义了PosixReactorBase的套接字方面的一些接口。接口定义的方法不多,源码中实现的还有ListenSSL,listenUNIX,listenUNIXDatagram,甚至adoptStreamPort去兼容IPV6。。he,真不少。

再继续分析PosixReactorBase的混入。
1.  _SignalReactorMixin,前面说的单进程只有一个Reactor就是在这里实现的啦。。

而且reactor也不能在stop后再run起来。Reactor在一个进程中只能run一次。。
2、_DisconnectSelectableMixin。这个MixIn只实现了一个方法。来处理haf_close,原理还是在于TCP的4次挥手再见。根据是否完成挥手操作,区分ConnectionLost还是ConnectionDone.

 然后终于看这个ReactorBase类了,抽丝剥茧,最后看到的就是就是这个东西。。

  

  

@implementer(IReactorCore, IReactorTime, IReactorPluggableResolver)
class ReactorBase(object):
    """
    Default base class for Reactors.

    @type _stopped: C{bool}
    @ivar _stopped: A flag which is true between paired calls to C{reactor.run}
        and C{reactor.stop}.  This should be replaced with an explicit state
        machine.

    @type _justStopped: C{bool}
    @ivar _justStopped: A flag which is true between the time C{reactor.stop}
        is called and the time the shutdown system event is fired.  This is
        used to determine whether that event should be fired after each
        iteration through the mainloop.  This should be replaced with an
        explicit state machine.

    @type _started: C{bool}
    @ivar _started: A flag which is true from the time C{reactor.run} is called
        until the time C{reactor.run} returns.  This is used to prevent calls
        to C{reactor.run} on a running reactor.  This should be replaced with
        an explicit state machine.

    @ivar running: See L{IReactorCore.running}

    @ivar _registerAsIOThread: A flag controlling whether the reactor will
        register the thread it is running in as the I/O thread when it starts.
        If C{True}, registration will be done, otherwise it will not be.
    """

  在这里,实现的是IReactorCore,IReactorTime, IReactorPluggableResolver三个接口。。(核心两个字写在脸上了。“打脸”就朝这里)

  IReactor实现了什么呢。就是Reacor的主要特点。Event操作。Reactor的开关,和 DNS查询。。。BTW,这点比BaseHTTPServer想的好多了。

  后者就直接去查DNS了。而这里放入了事件循环当中,没有去阻塞进程。。

  IReactorTime是有关defer对象处理,seconds和callLater什么的。

  IReactorPluggableResolver是提供了关于查询DNS的接口,方便自己去实现缓存或者预设DNS,本地DNS 什么的。。

  #歇一会儿。。

  








  

原文地址:https://www.cnblogs.com/Yeah-come-on/p/3878074.html