Apache Mesos 底层网络通信库 libprocess 分析

  1. 背景

LibProcess 是一套基于 Socket 实现的通信协议库,它支持 Protocal Buffer,通过两者结合,可实现一套很高效的基于消息传递的通信协议库,而 Mesos 底层通信协议正是采用了该库。

  1. 简单剖析

有两个服务 Master 和 Slave,Slave 周期性向 Master 汇报自己的进度,而 Master 则不定期地向 Slave 下达任务,采用 LibProcess 实现如下:

(1)Master 类设计

步骤 1 继承 ProtobufProcess 类。让 Master 类继承 LibProcess 中的 ProtobufProcess 类,该类实际上维护了一个 socket server,该 server 可以处理实现注册好的各种 Protocal Buffer 定义的 Message

class Master : public ProtobufProcess<Master> {

//……

}

从该类中可以看出 Socket 的影子,原生的 Socket 的一些基本的通讯函数模型都可以找到。

步骤 2 初始化资源分配模块 allocator->initialize 如下:

allocator->initialize(
flags.allocation_interval,
defer(self(), &Master::offer, lambda::_1, lambda::_2),
defer(self(), &Master::inverseOffer, lambda::_1, lambda::_2),
weights);

步骤 3 注册消息处理器。 在初始化函数 initialize() 中(使用 install 函数)会注册各种类型的 Message 类型(比如:SubmitSchedulerRequest, RegisterFrameworkMessage 等等)消息(需使用 Protocal buffer 定义)的消息处理器, 各种消息处理器对应的响应函数也不一样, 一般一一对应。 这样, Master 内部的 socket server 会监听来自外部的各种消息包, 一旦发现相应 Message 类型的消息包,则会调用相应的函数进行处理。

// Install handler functions for certain messages.
install<SubmitSchedulerRequest>(
    &Master::submitScheduler,
    &SubmitSchedulerRequest::name);

install<RegisterFrameworkMessage>(
    &Master::registerFramework,
    &RegisterFrameworkMessage::framework);

install<ReregisterFrameworkMessage>(
    &Master::reregisterFramework,
    &ReregisterFrameworkMessage::framework,
    &ReregisterFrameworkMessage::failover);

步骤 4 安装所有的 HTTP router,如下例举部分。

route("/create-volumes",
      DEFAULT_HTTP_AUTHENTICATION_REALM,
      Http::CREATE_VOLUMES_HELP(),
      [this](const process::http::Request& request,
             const Option<string>& principal) {
        Http::log(request);
        return http.createVolumes(request, principal);
      });
route("/destroy-volumes",
      DEFAULT_HTTP_AUTHENTICATION_REALM,
      Http::DESTROY_VOLUMES_HELP(),
      [this](const process::http::Request& request,
             const Option<string>& principal) {
        Http::log(request);
        return http.destroyVolumes(request, principal);
      });

步骤 5 编写 main 函数启动 Master。

int main(int argc, char** argv)

{

process::initialize(“master”); // 初始化一个名为 master 的进程

Master* master = new Master();

process::spawn(master); // 启动 master,实际上是一个 socket server

process::wait(master->self());

delete master;

return 0;

}

(2) Slave 类设计

Slave 设计与 Master 类似,具体如下:

class Slave : public ProtobufProcess<Slave>

{

Slave(): ProcessBase(“slave”) {}

void initialize() [

install<LaunchTasksMessage>(

&Master::launchTasks,

&LaunchTasksMessage::id,

&LaunchTasksMessage::tasks);

}

void launchTasks(const int id,

const vector<TaskInfo>& tasks) {

for (int i = 0; i < tasks.size(); i++) {

//launch tasks[i];

}

}

}
  1. LibProcess 事件驱动模型

LibProcess 采用了基于事件驱动的编程模型,每一个服务(进程)内部实际上运行了一个 socket server,而不同服务之间通过消息(事件)进行通信。在一个服务内部,注册了很多消息以及每个消息对应的处理器,一旦它收到某种类型的消息,则会调用相应的处理器进行处理,在处理过程中,可能会产生另外一种消息发送给另一个服务。整个过程如下图所示:

)

  1. Libprocess 用法

(1)Process 类

可通过继承该类,实现一个服务。该类主要包含以下几个方法:

1) Install 函数

void install(

const std::string& name,

const MessageHandler& handler)

注册名为 name 的消息处理器。

2) Send 函数

void send(

const UPID& to,

const std::string& name,

const char* data = NULL,

size_t length = 0)

向参数 to 标识的服务发送名为 name 的消息,该消息中包含数据 data,长度为 length。

(2)使用 ProtobufProcess 类

ProtobufProcess 类实现了 Process 类,与 Protocal buffer 紧密结合,该类主要有以下几个方法:

1) Install 函数

使用 ProtobufProcess 类,可以很容易创建一个处理 Protocal buffer 类型消息的消息处理器,可通过 install 函数注册各种消息处理器,比如如果一个新消息中有 2 个字段(x1,x2),则可这样注册该消息:

install<XXXMessage>(  // 使用 install 函数进行注册

&messageHandler,

&x1,&x2);

}

XXXMessage 的 protocal buffer 定义如下:

message XXXMessage {

required X x1 = 1;

required Y x2 = 2;

}

2) send 函数

定义如下:

void send(const process::UPID& to,

const google::protobuf::Message& message);

它的功能是向参数 to 标识的服务发送 message 消息,其中 to 是 UPID 类型,它包含了服务 ip 和端口号。

3) reply 函数

定义如下:

void reply(const google::protobuf::Message& message);

功能:向最近发送消息的服务返回 message。

(3)全局函数

1) dispatch

存在多种定义方式,一种方式如下:

template <typename R, typename T>

Future<R> dispatch(

const PID<T>& pid,

Future<R> (T::*method)(void))

将函数分配给 pid 进程(服务)执行。注意,函数分配给进程后,不一定会马上执行完后,需要需要等待执行完成,可以结合 wait 函数使用:

Future<bool> added = dispatch(slavesManager, add);

added.await();

if (!added.isReady() || !added.get()) {

…..

}

2) spawn

定义如下:

UPID spawn(ProcessBase* process, bool manage=false);

启动进程(服务)process,参数 manage 表示是否启用垃圾收集机制。

原文地址:https://www.cnblogs.com/qianggezhishen/p/7349323.html