[zz]Mesos的分析4 支持Hadoop任务级调度

转载自: http://blog.sina.com.cn/s/blog_4a1f59bf0100qotf.html

Hadoop的调度示意图如下所示:

Mesos的分析4 <wbr>--- <wbr>支持Hadoop任务级调度

在Mesos资源管理平台上,启动Hadoop计算框架的时候,只是启动了它的JobTracker,而并没有启动TaskTracker,这主要考虑到资源伸缩性的管理。下面将介绍,当在这种框架下提交一个Hadoop作业时,调度系统如何工作的。

在Mesos启动的时候,只启动了JobTracker,并没有启动TaskTracker,那么提交的作业如何执行呢?
在Mesos分析3的介绍中,ResourceOffer ---》ReplyToOffer ---》ResourceOffer ---》是一个不断执行的过程。ResourceOffer会给FrameworkScheduler提供SlaveOffer的信息,SlaveOffer消息包含了SlaveId和它能够提供的资源信息(CPU, Mem)。private TaskDescription findTask(String slaveId, String host, int cpus, int mem)方法会获取JobTracker维护的Collection<JobInProgress> jobs的信息,取出目前需要运行的MapTask 和 ReduceTask个数,然后使用一定的判断方法(涉及到DelayScheduling、mapToReduceRatio优先ReduceTask),使用MesosTask对Task进行封装,然后合并资源信息并入TaskDescription对象。由于Mesos-Master一个ResourceOffer会把所有的可用的SlaveOffer都发送过来,因此,要按照上面findTask的方法,在每一个提供资源的Slave上去查找资源和匹配Task。这些匹配好的TaskDescription组装成一个ArrayList作为ReplyToOffer的信息返回给Mesos-Master。
Mesos-Master收到F2M_SLOT_OFFER_REPLY消息之后,会下面的操作,来处理包含Task执行请求的OfferReply,具体的函数如下:

// Process a resource offer reply (for a non-cancelled offer) by launching
// the desired tasks (if the offer contains a valid set of tasks) and
// reporting any unused resources to the allocator
void Master::processOfferReply(SlotOffer *offer,
    const vector<TaskDescription>& tasks, const Params& params)
{
  LOG(INFO) << "Received reply for " << offer;

  Framework *framework = lookupFramework(offer->frameworkId);
  CHECK(framework != NULL);

  // Count resources in the offer
  unordered_map<Slave *, Resources> offerResources;
  foreach (SlaveResources &r, offer->resources) {
    offerResources[r.slave] = r.resources;
  }

  // Count resources in the response, and check that its tasks are valid
  unordered_map<Slave *, Resources> responseResources;
  foreach (const TaskDescription &t, tasks) {
    // Check whether this task size is valid
    Params params(t.params);
    Resources res(params.getInt32("cpus", -1),
                  params.getInt32("mem", -1));
    if (res.cpus < MIN_CPUS || res.mem < MIN_MEM ||
        res.cpus > MAX_CPUS || res.mem > MAX_MEM) {
      terminateFramework(framework, 0,
          "Invalid task size: " + lexical_cast<string>(res));
      return;
    }
    // Check whether the task is on a valid slave
    Slave *slave = lookupSlave(t.slaveId);
    if (!slave || offerResources.find(slave) == offerResources.end()) {
      terminateFramework(framework, 0, "Invalid slave in offer reply");
      return;
    }
    responseResources[slave] += res;
  }

  // Check that the total accepted on each slave isn't more than offered
  foreachpair (Slave *s, Resources& respRes, responseResources) {
    Resources &offRes = offerResources[s];
    if (respRes.cpus > offRes.cpus || respRes.mem > offRes.mem) {
      terminateFramework(framework, 0, "Too many resources accepted");
      return;
    }
  }

  // Check that there are no duplicate task IDs
  unordered_set<TaskID> idsInResponse;
  foreach (const TaskDescription &t, tasks) {
    if (framework->tasks.find(t.taskId) != framework->tasks.end() ||
        idsInResponse.find(t.taskId) != idsInResponse.end()) {
      terminateFramework(framework, 0,
          "Duplicate task ID: " + lexical_cast<string>(t.taskId));
      return;
    }
    idsInResponse.insert(t.taskId);
  }

  // Launch the tasks in the response
  foreach (const TaskDescription &t, tasks) {
    // Record the resources in event_history
    Params params(t.params);
    Resources res(params.getInt32("cpus", -1),
                  params.getInt64("mem", -1));

    Slave *slave = lookupSlave(t.slaveId);
    evLogger->logTaskCreated(t.taskId, framework->id, t.slaveId,
                             slave->webUIUrl, res);

    // Launch the tasks in the response
    launchTask(framework, t);
  }

  // If there are resources left on some slaves, add filters for them
  vector<SlaveResources> resourcesLeft;
  int timeout = params.getInt32("timeout", DEFAULT_REFUSAL_TIMEOUT);
  double expiry = (timeout == -1) ? 0 : elapsed() + timeout;
  foreachpair (Slave *s, Resources offRes, offerResources) {
    Resources respRes = responseResources[s];
    Resources left = offRes - respRes;
    if (left.cpus > 0 || left.mem > 0) {
      resourcesLeft.push_back(SlaveResources(s, left));
    }
    if (timeout != 0 && respRes.cpus == 0 && respRes.mem == 0) {
      LOG(INFO) << "Adding filter on " << s << " to " << framework
                << " for  " << timeout << " seconds";
      framework->slaveFilter[s] = expiry;
    }
  }
 
  // Return the resources left to the allocator
  removeSlotOffer(offer, ORR_FRAMEWORK_REPLIED, resourcesLeft);
}

在对提交的Tasks进行了安全检查之后,向framework提交任务。


void Master::launchTask(Framework *framework, const TaskDescription& t)
{
  Params params(t.params);
  Resources res(params.getInt32("cpus", -1),
                params.getInt32("mem", -1));

  // The invariant right now is that launchTask is called only for
  // TaskDescriptions where the slave is still valid (see the code
  // above in processOfferReply).
  Slave *slave = lookupSlave(t.slaveId);
  CHECK(slave != NULL);

  Task *task = new Task(t.taskId, framework->id, res, TASK_STARTING,
                        t.name, "", slave->id);

  framework->addTask(task);
  slave->addTask(task);

  allocator->taskAdded(task);

  LOG(INFO) << "Launching " << task << " on " << slave;
  send(slave->pid, pack<M2S_RUN_TASK>(
        framework->id, t.taskId, framework->name, framework->user,
        framework->executorInfo, t.name, t.arg, t.params, framework->pid));//讲作业提交给Slave去执行
}

Slave接到M2S_RUN_TASK消息之后,
如果这是第一次收到该消息的话,SlaveProcess会检查当前节点是否启动了Executor,如果没有启动则会执行下图1所示的流程:
Mesos的分析4 <wbr>--- <wbr>支持Hadoop任务级调度

图1 该Slave上注册Framework和TaskTracker的启动过程

如果Slave上已经注册了Framework,则相当于ExecutorProcess在MesosExecutorDriver的干预下已经生成。可以用来与SlaveProcess进行通信。那么由ReplyToOffer传递给MasterProcess的Task就要被Launch到对应框架上。注意这里的Launch只是象征意义的,只是在FrameworkScheduler上记录选中Task的信息,选择的过程是使用findTask的方法找到可以提供SlaveOffer的Slave节点的Task。这个过程和JobTracker的assignTask的过程互斥,这里对于JobTracker实例对象进行了synchronized的说明,保证了Hadoop框架内调用assignTask在findTask之后进行,并且分配的task是之前对应Slave节点选中的task。Task的Launch过程如图2所示。
Mesos的分析4 <wbr>--- <wbr>支持Hadoop任务级调度
图2 launch task的过程

在launch task之后,然后就要使得Mesos框架内task状态的更新与Hadoop task的更新的步调一致。在TaskTracker增加了MesosTaskTrackerInstrumentation来添加task状态改变所进行的处理。当状态改变之后,MesosTaskTrackerInstrumentation就会将task状态的改变通过FrameworkExecutor作为与Mesos框架沟通的桥梁,提交给Mesos框架。具体流程请看图3 task status update
Mesos的分析4 <wbr>--- <wbr>支持Hadoop任务级调度
图3 task status update


经过上面流程的分析,Hadoop计算平台已经完全和Mesos兼容在一起。这种做法的好处有以下几点:
1)Hadoop集群的资源变得有弹性,可以根据作业的需求动态的启动和关闭TaskTracker,这可以大大提高集群的使用效率。
2)资源管理与调度策略的分离。Mesos可以同时支持不同的计算框架,通过Mesos框架的MesosSchedulerDriver和MesosExecutorDriver将框架内的自身的调度优势和执行方式与Mesos自身资源的管理隔离。这样就只需管理资源,而作业的执行的管理则交给计算框架去管理。
3)对于Hadoop源码没有伤害。Mesos在支持Hadoop的过程中,支持提供了一个Hadoop-contrib-mesos来进行,没有修改任何关于hadoop MapReduce、Common、HDFS三个基本组件的代码,这样的策略更容易让Hadoop使用者接受。
4)Hadoop提交作业方式没有变化。原有的MapReduce程序仍然可以执行。

Mesos的不足之处,
1)框架过于复杂,要想支持其它计算框架,不仅需要对于Mesos源码十分熟悉,而且还要对另外的计算框架非常精通。这明显增加了很大的人力成本。
2)Mesos对于资源的管理还不够成熟,Slave节点资源信息只有Mem大小和CPU的个数,管理粒度过于粗糙。
3)Mesos缺少资源隔离技术。
 

请网友珍惜Klose的工作成果,如果需要转载请注明出处。谢谢
Klose
原文地址:https://www.cnblogs.com/zhangzhang/p/2850601.html