Theron, a lightweight C++ concurrency library, 源码分析(二)

Framework class

Framework类在Theron应该可以看作是一个中枢神经系统。他管理着它内部各个actor之间的交互。

我们知道,Windows是基于消息的系统。因此,它定义了消息队列,以及处理消息的基本流程:

   1: while (GetMessage(&msg, NULL, 0, 0))
   2: {
   3:     if (!TranslateAccelerator(msg.hwnd, hAccelTable, &msg))
   4:     {
   5:         TranslateMessage(&msg);
   6:         DispatchMessage(&msg);
   7:     }
   8: }

Actor Model同样以消息为基础,那么也应该存在一个消息队列和消息派发的过程。

这个过程就是在Framework中定义的。我们来看Framework::Initialize()(略去了部分非关键代码):

   1: void Framework::Initialize()
   2: {
   3:     mRunning = true;
   4:     mManagerThread.Start(ManagerThreadEntryPoint, this);
   5:  
   6:     uint32_t backoff(0);
   7:     while (mThreadCount.Load() < mTargetThreadCount.Load())
   8:     {
   9:         Detail::Utils::Backoff(backoff);
  10:     }
  11:  
  12:     mIndex = Detail::StaticDirectory<Framework>::Register(this);
  13:  
  14:     if (mName.IsNull())
  15:     {
  16:         mName = Detail::NameGenerator::Generate(mIndex);
  17:     }
  18: }

有3个亮点:

  1. mRunning = true和ManagerThreadEntryPoint
  2. Utils::Backoff
  3. mIndex和mName

mManagerThread是一个Thread类型,那么我们只有关注ManagerThreadEntryPoint就可以了。其实ManagerThreadEntryPoint很简单,不过它引入了另外一个函数,ManagerThreadProc

Backoff是一个和Sleep(0)类似的东西,不过和Sleep有一点细微的差别。Backoff调用了YieldProcessor。大家可以自己google下YieldProcessor vs Sleep(0)。

mIndex是给Framework分配了一个全局的索引值。

Framework::ManagerThreadProc

这个函数里,我们首先可以看到一个while循环:

while (mRunning) { … }

可以很明确地肯定,在Framework还没有销毁时,它就是个死循环。这基本上就对等了Windows里的那个while。

while里面的循环稍微有点复杂了。我们需要分块一点一点来剥离。

   1: while (mThreadCount.Load() < mTargetThreadCount.Load())
   2: {
   3:     WorkerThreadStore* store = new WorkerThreadStore(…);
   4:     ThreadContext* threadContext = new ThreadContext(store);
   5:  
   6:     ThreadPool::CreateThread(threadContext);
   7:     ThreadPool::StartThread(threadContext, &mWorkQueue, mNodeMask, mProcessorMask);
   8:  
   9:     mThreadContexts.Insert(threadContext);
  10: }

首先,这个函数是在Initialize里被调用的,因此必定要做一些初始化的操作。初始化操作的一个重头戏就是创建出足够多的线程。

在前面已经说了,线程被启动后,就会一直运行。因此初始化操作会创建16个(默认mTargetThreadCount是16)不停运行的线程。这些线程都是从mWorkQueue(SafeThreadQueue<Mailbox>类型)中去取相应的Mailbox,然后把关联的消息地送给actor执行。

所以,消息队列就是mWorkQueue。

当然这个函数虽然是在Initialize里被调用起来的,但是作为消息泵,它还有其他的职责。从函数的实现细节来看,整个过程可以用下面几条来总结:

  • mManagerThread还在运行时:
    • 如果有空闲线程,那么启用空闲线程来执行任务
    • 若可用线程数还没有达到上限,创建出足够多的线程
    • 若线程数超过上限(线程数的上限可以动态调整),停止一些线程
  • mManagerThread停止运行时:
    • 如果mThreadContext非空,则销毁ThreadContext管理的线程对象和掌握的资源

到这里,基本上把整个消息机制讲解完毕了。接下来我们要说一说Framework和Actor之间的关系。

 

Framework and Actor

Theron里的Framework有点像设计模式中的中介者模式。它管理着Actor的相关信息。怎么说呢,事实上和Actor对象关联的Mailbox和Address信息都是从Framework这里生产的。Framework::RegisterActor完成了这个工作。

   1: void Framework::RegisterActor(Actor* actor, const char* name)
   2: {
   3:     const uint32_t mailboxIndex(mMailboxes.Allocate());
   4:     Mailbox& mailbox(mMailboxes.GetEntry(mailboxIndex));
   5:  
   6:     String mailboxName(name);
   7:     mailbox.RegisterActor(actor);
   8:  
   9:     const Index index(mIndex, mailboxIndex);
  10:     const Address mailboxAddress(mailboxName, index);
  11:  
  12:     actor->mAddress = mailboxAddress;
  13: }

所以,Framework像邮局,他知道他管辖的区域中有那些邮箱地址,而且也只有它才知道。不同的邮局管辖不同的区域,因此跨域通信必然是两个Framework之间协同完成的。

在上面这段代码里有一个mIndex变量,这个是Framework的成员变量,用来在全局表征一个Framework的索引值。这个指会在后面在提到。

到这里,我们基本可以把Framework和Actor之间的关系理个清楚了。接下来要说的就是actor和actor之间是如何发送消息的。

为了能够把问题简单化,我们这里只讨论在同一个Framework中的不同actor之间通信的情况。这个过程要涉及2个要点:

  • Framework::Send函数
  • ProcessorContext结构

 

Framework::Send

Send函数的原型是:Framework::Send(const ValueType &value, const Address &from, const Address &address),是一个模板函数。这个函数的作用就是把消息投递到对应地址的mailbox中。

Send函数串联了不同actor之间的交互。整个消息的发送过程其实是由MessageSender::Send(EndPoint* endPoint, ProcessorContext* processorContext, const uint32_t localFrameworkIndex, IMessage* message, const Address &address)来负责完成的。

这个函数有很多参数,我们已经假定了只考虑同一个Framework中不同actors之间的通信。在这种情况下,我们可以不用考虑endPoint。另外,我们看到只有一个Address类型的参数了,要记得,发送者的address在message里。所以,这里的address是接收消息的actor的address。因此,Framework::Send中的value还不能算消息,只能算消息内容。因为前面说了,消息是要包含发送者的地址的。所以,在Send函数里,我们会看到它调用了MessageCreator::Create来创建一个消息结构,将value和发送消息的actor地址绑定在了一起。

接下来要的是processorContext和localFrameworkIndex两个变量。

ProcessorContext and Index

ProcessorContext事实上就是保存了某个Framework的相关信息。在发送消息的函数里(MessageSender::Send)附带上这个参数,可以明确当前的消息是从哪个Framework中发出来的。

localFrameworkIndex是Index类型的。在MessageSender::Send中就是指接收消息的Framework的Index。

从RegisterActor函数里我们也可以看到,每一个Address对象都会关联一个Index,而这个Index又是和Framework::mIndex有一定的关联关系的。因此在MessageSender::Send中,我们可以通过address和localFrameworkIndex来判断当前的消息是在同一Framework下的actor之间相互沟通还是在不同Framework之间来回。

   1: bool MessageSender::DeliverByIndex(ProcessorContext* processorContext,
   2:                                    const uint32_t localFrameworkIndex,
   3:                                    IMessage* message,
   4:                                    const Address &address)
   5: {    
   6:     const Index index(address.mIndex);
   7:  
   8:     // Which framework is the addressed entity in?
   9:     const uint32_t targetFrameworkIndex(index.mComponents.mFramework);
  10:     if (targetFrameworkIndex == localFrameworkIndex)
  11:     {
  12:         // Is the message addressed to an actor in this framework?
  13:         delivered = DeliverToActorInThisFramework(processorContext,
  14:                                                   message,
  15:                                                   address);
  16:     }
  17:     else
  18:     {
  19:         delivered = DeliverToLocalMailbox(message, address);
  20:     }
  21: }

DeliverToActorInThisFramework就不细说了,可以自己看。大致的逻辑就是从address中找出关联的mailbox对象,然后将message push到该mailbox中。如果当前的mailbox在填充消息前是空的,那么这个mailbox肯定不在该ProcessorContext的workQueue中。因此在放入消息后,要将它加入的workQueue中,让线程池在处理workQueue时,能处理到这条消息。

Actor class

Actor类其实是很简单的。在分析代码前,我们先把之前提到的和actor相关的概念再深化下。

一个actor在响应消息的同时可以:

  • 发送有限量的消息给其他actor
  • 创建有线量的其他actor
  • 指定下一次收到消息时具体响应动作/行为

也就是说在actor的消息处理函数里,actor可以给其他actor发送消息,可以创建新的actor,可以设置当前actor的一些属性。

所以,Actor类必定要有一个发送消息的函数:Actor::Send。在开始分析Send函数之前,我们先看一眼Actor类的成员函数。

  • Address mAddress;
  • Framework* mFramework;
  • HandlerCollection mMessageHandlers;
  • DefaultHandlerCollection mDefaultHandlers;
  • ProcessorContext* mProcessorContext;

Address和Framework暂时没啥好说的。mMessageHandlers就是当前Actor注册了的消息处理函数集合。

那继续我们的成员函数Send。

Actor::Send(const ValueType& value, const Address& address) const

Send函数就是把消息(value)发送到指定的address那里去。和Windows里的PostMessage一样,是异步的。

事实上,这个函数的内部实现也是调用MessageSender::Send来完成的。这种情况下,再回头来看ProcessorContext和localFrameworkIndex你应该会有更多的体会。对这个函数的细致分析,就不多补充了。

基本到这里,该说的都说完了。

原文地址:https://www.cnblogs.com/wpcockroach/p/2800507.html