JobTracker作业调度分析

转自:http://blog.csdn.net/Androidlushangderen/article/details/41408517

JobTracker的作业调度给我感觉就是比较宏观意义上的操作。倘若你只了解了MapReduce的工作原理是远远不够的,这时去学习一下他在宏观层面的原理实现也是对我们非常有帮助的。首先我们又得从上次分析的任务提交之后的操作说起,Job作业通过RPC通信提交到JobTracker端之后,接下来会触发到下面的方法;

  1. /** 
  2.    * 初始化作业操作 
  3.    */  
  4.   public void initJob(JobInProgress job) {  
  5.     if (null == job) {  
  6.       LOG.info("Init on null job is not valid");  
  7.       return;  
  8.     }  
  9.               
  10.     try {  
  11.       JobStatus prevStatus = (JobStatus)job.getStatus().clone();  
  12.       LOG.info("Initializing " + job.getJobID());  
  13.       //初始化Task任务  
  14.       job.initTasks();  
  15.       ......  

接着会执行initTasks的方法,但不是JobTracker,而是JobInProgress类中的方法:

  1. /** 
  2.   * Construct the splits, etc.  This is invoked from an async 
  3.   * thread so that split-computation doesn't block anyone. 
  4.   */  
  5.  public synchronized void initTasks()   
  6.  throws IOException, KillInterruptedException, UnknownHostException {  
  7.    if (tasksInited || isComplete()) {  
  8.      return;  
  9.    }  
  10.    ......  
  11.      
  12.    jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);  
  13.    jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);  
  14.    this.queueMetrics.addWaitingMaps(getJobID(), numMapTasks);  
  15.    this.queueMetrics.addWaitingReduces(getJobID(), numReduceTasks);  
  16.   
  17.    //根据numMapTasks任务数,创建MapTask的总数  
  18.    maps = new TaskInProgress[numMapTasks];  
  19.    for(int i=0; i < numMapTasks; ++i) {  
  20.      inputLength += splits[i].getInputDataLength();  
  21.      maps[i] = new TaskInProgress(jobId, jobFile,   
  22.                                   splits[i],   
  23.                                   jobtracker, conf, this, i, numSlotsPerMap);  
  24.    }  
  25.    ......  
  26.   
  27.    //  
  28.    // Create reduce tasks  
  29.    //根据numReduceTasks,创建Reduce的Task数量  
  30.    this.reduces = new TaskInProgress[numReduceTasks];  
  31.    for (int i = 0; i < numReduceTasks; i++) {  
  32.      reduces[i] = new TaskInProgress(jobId, jobFile,   
  33.                                      numMapTasks, i,   
  34.                                      jobtracker, conf, this, numSlotsPerReduce);  
  35.      nonRunningReduces.add(reduces[i]);  
  36.    }  
  37.   
  38.    ......  
  39.      
  40.    // create cleanup two cleanup tips, one map and one reduce.  
  41.    //创建2个clean up Task任务,1个是Map Clean-Up Task,一个是Reduce Clean-Up Task   
  42.    cleanup = new TaskInProgress[2];  
  43.   
  44.    // cleanup map tip. This map doesn't use any splits. Just assign an empty  
  45.    // split.  
  46.    TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;  
  47.    cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,   
  48.            jobtracker, conf, this, numMapTasks, 1);  
  49.    cleanup[0].setJobCleanupTask();  
  50.   
  51.    // cleanup reduce tip.  
  52.    cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,  
  53.                       numReduceTasks, jobtracker, conf, this, 1);  
  54.    cleanup[1].setJobCleanupTask();  
  55.   
  56.    // create two setup tips, one map and one reduce.  
  57.    //原理同上  
  58.    setup = new TaskInProgress[2];  
  59.   
  60.    // setup map tip. This map doesn't use any split. Just assign an empty  
  61.    // split.  
  62.    setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,   
  63.            jobtracker, conf, this, numMapTasks + 1, 1);  
  64.    setup[0].setJobSetupTask();  
  65.   
  66.    // setup reduce tip.  
  67.    setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,  
  68.                       numReduceTasks + 1, jobtracker, conf, this, 1);  
  69.    setup[1].setJobSetupTask();  
  70.      
  71.    ......  

可以看见,在这里JobInProgress首次被划分为了很多的小的Task任务的形式存在,而这些小的任务是以TaskInProgress的类表示。在这里MapReduce把1个作业做出了如下的分解,numMapTasks个Map Task ,numReduceTasks个Reduce Task,2个CleanUp任务,2个SetUp任务,(Map Reduce,每个各占1个),好,可以大致勾画一下,1个JobInProgress的执行流程了。

     ok,initTask的任务已经完成,也就是说前面初始化的准备工作都已经完成了,后面就等着JobTacker分配作业给TaskTracker了。在这里MapReduce用的是HeartBeat的形式,就是心跳机制,心跳包在这里主要有3个作用:

1.判断TaskTracker是否活着

2.获取各个TaskTracker上的资源使用情况和任务的进度

3.给TaskTracker分配任务

而这里用到的就是第三作用。HeartBeat的调用形式同样是Hadoop自带的RPC实现方式。JobTracker不会直接分配作业给TaskTracker,中间会经过一个叫TaskScheduler掉调度器,这个可以用户自定义实现,满足不同的需求设计,在Hadoop中有默认的实现,所以你会看到大致这样的一个模型流程:

        所以接下来JobTracker首先会收到很多来自TaskTracker的心跳包,判断此TaskTracker是否是无任务状态的,无任务的话,马上让TaskSchedulera分配任务给他:

  1. public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,   
  2.                                                   boolean restarted,  
  3.                                                   boolean initialContact,  
  4.                                                   boolean acceptNewTasks,   
  5.                                                   short responseId)   
  6.     throws IOException {  
  7.    ....  
  8.         
  9.     //通过心跳机制发送命令回应  
  10.     // Initialize the response to be sent for the heartbeat  
  11.     HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);  
  12.     List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();  
  13.     boolean isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());  
  14.     // Check for new tasks to be executed on the tasktracker  
  15.     if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {  
  16.       TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName);  
  17.       if (taskTrackerStatus == null) {  
  18.         LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);  
  19.       } else {  
  20.         List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);  
  21.         //说明此TaskTtracker上无任务了  
  22.         if (tasks == null ) {  
  23.           //为此TaskTracker分配任务  
  24.           tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));  
  25.         }  

接下来就是TaskScheduler的方法了,不过得找出他的实现类,TaskScheduler只是一个基类:

  1. public synchronized List<Task> assignTasks(TaskTracker taskTracker)  
  2.     throws IOException {  
  3.   TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();   
  4.   ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();  
  5.   final int numTaskTrackers = clusterStatus.getTaskTrackers();  
  6.   final int clusterMapCapacity = clusterStatus.getMaxMapTasks();  
  7.   final int clusterReduceCapacity = clusterStatus.getMaxReduceTasks();  
  8.   
  9.   //获取作业队列  
  10.   Collection<JobInProgress> jobQueue =  
  11.     jobQueueJobInProgressListener.getJobQueue();  
  12.    .....  
  13.       for (JobInProgress job : jobQueue) {  
  14.         if (job.getStatus().getRunState() != JobStatus.RUNNING ||  
  15.             job.numReduceTasks == 0) {  
  16.           continue;  
  17.         }  
  18.   
  19.           
  20.         //在这里分配了一个新的Reduce任务  
  21.         Task t =   
  22.           job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers,   
  23.                                   taskTrackerManager.getNumberOfUniqueHosts()  
  24.                                   );  
  25.         .....  

首先获取一个作业列表,在里面挑出一个作业给,在比如从里面挑出1个Reduce的任务区给整个TaskTracker执行,因为我们刚刚已经知道,所有的Task都是以TaskInProgress形式被包含于JobInProgress中的,所以又来到了JobInProgress中了

  1. /** 
  2.    * Return a ReduceTask, if appropriate, to run on the given tasktracker. 
  3.    * We don't have cache-sensitivity for reduce tasks, as they 
  4.    *  work on temporary MapRed files.   
  5.    */  
  6.   public synchronized Task obtainNewReduceTask(TaskTrackerStatus tts,  
  7.                                                int clusterSize,  
  8.                                                int numUniqueHosts  
  9.                                               ) throws IOException {  
  10.     .....  
  11.   
  12.     int  target = findNewReduceTask(tts, clusterSize, numUniqueHosts,   
  13.                                     status.reduceProgress());  
  14.     if (target == -1) {  
  15.       return null;  
  16.     }  
  17.       
  18.     //这里继续调用方法,获取目标任务  
  19.     Task result = reduces[target].getTaskToRun(tts.getTrackerName());  
  20.     if (result != null) {  
  21.       addRunningTaskToTIP(reduces[target], result.getTaskID(), tts, true);  
  22.     }  
  23.   
  24.     return result;  
  25.   }  

此时就执行了一个TIP就是TaskInProgress里面去执行了,此时的转变就是JIP->TIP的转变。继续往里看,这时候来到的是TaskInProgress的类里面了:

  1. public Task getTaskToRun(String taskTracker) throws IOException {  
  2.     if (0 == execStartTime){  
  3.       // assume task starts running now  
  4.       execStartTime = jobtracker.getClock().getTime();  
  5.     }  
  6.   
  7.     // Create the 'taskid'; do not count the 'killed' tasks against the job!  
  8.     TaskAttemptID taskid = null;  
  9.     if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts + numKilledTasks)) {  
  10.       // Make sure that the attempts are unqiue across restarts  
  11.       int attemptId = job.getNumRestarts() * NUM_ATTEMPTS_PER_RESTART + nextTaskId;  
  12.       //启动一次TA尝试  
  13.       taskid = new TaskAttemptID( id, attemptId);  
  14.       ++nextTaskId;  
  15.     } else {  
  16.       LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts) +  
  17.               " (plus " + numKilledTasks + " killed)"  +   
  18.               " attempts for the tip '" + getTIPId() + "'");  
  19.       return null;  
  20.     }  
  21.   
  22.     //加入到相应的数据结构中  
  23.     return addRunningTask(taskid, taskTracker);  
  24.   }  

在这里明显的执行了所谓的TA尝试,就是说这是一次Task的尝试执行,因为不能保证这次任务就一定能执行成功。把这次尝试的任务ID加入系统变量中,就来到了addRunningTask,也就是说来到了方法执行的最末尾:

  1. /** 
  2.    * Adds a previously running task to this tip. This is used in case of  
  3.    * jobtracker restarts. 
  4.    * 添加任务 
  5.    */  
  6.   public Task addRunningTask(TaskAttemptID taskid,   
  7.                              String taskTracker,  
  8.                              boolean taskCleanup) {  
  9.     .....  
  10.     //添加任务和taskTracker的映射关系  
  11.     activeTasks.put(taskid, taskTracker);  
  12.     tasks.add(taskid);  
  13.   
  14.     // Ask JobTracker to note that the task exists  
  15.     //在JobTracker中增加一对任务记录  
  16.     jobtracker.createTaskEntry(taskid, taskTracker, this);  
  17.   
  18.     // check and set the first attempt  
  19.     if (firstTaskId == null) {  
  20.       firstTaskId = taskid;  
  21.     }  
  22.     return t;  
  23.   }  

在这里,就增加了任务和TaskTracker的一些任务运行信息的变量关系。后面就等着TaskTracker自己去把任务挑出来,执行就OK了,上面这个步骤从TIP->TA的转变。我们把这种结构流程叫做“三层多叉树”的方式结构。

整个作业的调度的时序关系图如下:

原文地址:https://www.cnblogs.com/cxzdy/p/5044001.html