MapReduce job在JobTracker初始化源码级分析

  mapreduce job提交流程源码级分析(三)中已经说明用户最终调用JobTracker.submitJob方法来向JobTracker提交作业。而这个方法的核心提交方法是JobTracker.addJob(JobID jobId, JobInProgress job)方法,这个addJob方法会把Job提交到调度器(默认是JobQueueTaskScheduler)的监听器JobQueueJobInProgressListener和EagerTaskInitializationListener(本文只讨论默认调度器)中,使用方法jobAdded(JobInProgress job),JobQueueJobInProgressListener任务是监控各个JobInProcess生命周期中的变化;EagerTaskInitializationListener是发现有新Job后对其初始化的。

  一、JobQueueJobInProgressListener.jobAdded(JobInProgress job)方法。就一句代码jobQueue.put(new JobSchedulingInfo(job.getStatus()), job),先构建一个JobSchedulingInfo对象,然后和JobInProgress对应起来放入jobQueue中。JobSchedulingInfo类维护这调度这个job必备的一些信息,比如优先级(默认是NORMAL)、JobID以及开始时间startTime。 

  二、EagerTaskInitializationListener.jobAdded(JobInProgress job)方法。  

 1 /**
 2    * We add the JIP to the jobInitQueue, which is processed 
 3    * asynchronously to handle split-computation and build up
 4    * the right TaskTracker/Block mapping.
 5    */
 6   @Override
 7   public void jobAdded(JobInProgress job) {
 8     synchronized (jobInitQueue) {
 9       jobInitQueue.add(job);  //添加进List<JobInProgress> jobInitQueue
10       resortInitQueue();
11       jobInitQueue.notifyAll();  //唤醒阻塞的进程
12     }
13 
14   }

  上面方法中resortInitQueue()方法主要是对jobInitQueue中JobInProcess进行排序,先按照优先级排序,相同的再按开始时间。EagerTaskInitializationListener.start()在调度器初始化时JobQueueTaskScheduler.start()就调用了,所以先于jobAdded方法调用。EagerTaskInitializationListener.start()代码如下:

1 public void start() throws IOException {
2     this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");
3     jobInitManagerThread.setDaemon(true);
4     this.jobInitManagerThread.start();
5   }

  start()方法会启动一个线程:JobInitManager。

 1 /////////////////////////////////////////////////////////////////
 2   //  Used to init new jobs that have just been created
 3   /////////////////////////////////////////////////////////////////
 4   class JobInitManager implements Runnable {
 5    
 6     public void run() {
 7       JobInProgress job = null;
 8       while (true) {
 9         try {
10           synchronized (jobInitQueue) {
11             while (jobInitQueue.isEmpty()) {
12               jobInitQueue.wait();
13             }
14             job = jobInitQueue.remove(0);
15           }
16           threadPool.execute(new InitJob(job));
17         } catch (InterruptedException t) {
18           LOG.info("JobInitManagerThread interrupted.");
19           break;
20         } 
21       }
22       LOG.info("Shutting down thread pool");
23       threadPool.shutdownNow();
24     }
25   }
26   
27   class InitJob implements Runnable {
28   
29     private JobInProgress job;
30     
31     public InitJob(JobInProgress job) {
32       this.job = job;
33     }
34     
35     public void run() {
36       ttm.initJob(job);//对应JobTracker的对应方法
37     }
38   }

  JobInitManager线程的run方法是一个死循环始终监控jobInitQueue是否为空,不为空的话就取出0位置的JobInProgress,在InitJob线程中初始化:TaskTrackerManager.initJob(job)对应JobTracker的initJob方法。这里为什么会另起线程来初始化Job呢?原因很简单,就是可能jobInitQueue中同时会有很多JobInProgress,一个一个的初始化会比较慢,所以采用多线程的方式初始化。来看initJob方法的代码:

 1   public void initJob(JobInProgress job) {
 2     if (null == job) {
 3       LOG.info("Init on null job is not valid");
 4       return;
 5     }
 6             
 7     try {
 8       JobStatus prevStatus = (JobStatus)job.getStatus().clone();
 9       LOG.info("Initializing " + job.getJobID());
10       job.initTasks();    //调用该实例的initTasks方 法,对job进行初始化
11       // Inform the listeners if the job state has changed
12       // Note : that the job will be in PREP state.
13       JobStatus newStatus = (JobStatus)job.getStatus().clone();
14       if (prevStatus.getRunState() != newStatus.getRunState()) {
15         JobStatusChangeEvent event = 
16           new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, 
17               newStatus);
18         synchronized (JobTracker.this) {
19           updateJobInProgressListeners(event);
20         }
21       }
22     } catch (KillInterruptedException kie) {
23       //   If job was killed during initialization, job state will be KILLED
24       LOG.error("Job initialization interrupted:
" +
25           StringUtils.stringifyException(kie));
26       killJob(job);
27     } catch (Throwable t) {
28       String failureInfo = 
29         "Job initialization failed:
" + StringUtils.stringifyException(t);
30       // If the job initialization is failed, job state will be FAILED
31       LOG.error(failureInfo);
32       job.getStatus().setFailureInfo(failureInfo);
33       failJob(job);
34     }
35      }

  首先是获取初始化前的状态prevStatus;然后是job.initTasks()初始化;在获取初始化的后的状态newStatus;

  job.initTasks()方法代码比较多,主要的工作是检查之后获取输入数据的分片信息TaskSplitMetaInfo[] splits = createSplits(jobId)这是去读的上传到HDFS中的文件job.splitmetainfo和job.split,要确保numMapTasks == splits.length,然后构建numMapTasks个TaskInProgress作为MapTask,

原文地址:https://www.cnblogs.com/lxf20061900/p/3737263.html