Yarn源码分析1(Hadoop2.7.2)

在Hadoop中,调度框架YARN(Yet Another Resource Negotiater)是基于事件的,调度的是MapReduce的Application。Application有一系列的状态变化(NEW/NEW_SAVING/SUBMITTED/ACCEPTED/RUNNING/FINISHED/FAILED/KILLED ),即基于状态机的变换(设计模式State Pattern),状态之间的变换通过事件触发。

①对AsyncDispatcher的介绍

核心类AsyncDispatcher:异步事件分发器(Yarn中最底层的总管道)

AsyncDispatcher extends AbstractService implements Dispatcher {

  主要的属性

  (1)事件队列:    BlockingQueue<Event> eventQueue;

  (2)事件分发器:   Map<Class<? extends Enum>, EventHandler> eventDispatchers

  (3)处理事件的线程  Thread eventHandlingThread

  主要的方法

    1、从eventQueue中取出事件以及处理事件

  (1)createThread():返回一个Runnable对象,该线程类对象有一个while循环,不断eventQueue中取出事件(RM启动之后),event = eventQueue.take();然后将事件分发出去dispatch(event)。

  (2)dispatch(event):首先得到事件的类型,然后从eventDispatchers中根据事件类型得到相应的事件处理器EventHandler,然后EventHandler.handle(event)对事件进行处理。

    2、向eventQueue中添加事件

    AsyncDispatcher 的内部类GenericEventHandler implements EventHanler的handle(event)方法向eventQueue中添加事件eventQueue.put(event);

}

②Yarn对事件的二次分发

事件分发,分两次完成。第一次是eventHandlingThread轮询出事件之后,由AsyncDispatcher的dispatch方法进行分发,第二次分发会调用相应的分发器,比如 ApplicationEventDispatcher,ApplicationEventDispatcher自己没有处理这个事件,而是将事件交给了RMApp,RMApp的实现类RMAppImpl.handle(event)最终处理了事件。

RMAppImpl的handler(event)方法,这个方法是不断重复执行的,:

  this.writeLock.lock();

  /* keep the master in sync with the state machine 进行状态机的转换*/

  this.stateMachine.doTransition(event.getType(), event);

  this.writeLock.unlock();

RMAppImpl的handler(event)不断变换状态机的状态,即handler被调用多次,从NEW状态开始不断变换。RM应用的状态如下所示。

public enum RMAppState {

  NEW,

  NEW_SAVING,

  SUBMITTED,

  ACCEPTED,

  RUNNING,

  FINAL_SAVING,

  FINISHING,

  FINISHED,

  FAILED,

  KILLING,

  KILLED

}

③ResourceManager类中的Dispatcher

存在不同的事件,每种事件具有不同的类型,同一类型的事件交给一个XXXEventDispatcher(ResourceManager中定义了许多Dispatcher内部类),XXXEventDispatcher将事件交给真正的事件处理实体进行处理。

原文地址:https://www.cnblogs.com/sodawoods-blogs/p/8717963.html