jobtracker对提交作业的初始化

  1. 通过EagerTaskInitializationListener类中的jobAdded(jobInProgress  job)方法将所提交的作业加入到要初始化的作业队列中,代码如下:
  2. public void jobAdded(JobInProgress job) {
        synchronized (jobInitQueue) {
          jobInitQueue.add(job);
          resortInitQueue();//按照时间大小递增排序
          jobInitQueue.notifyAll();
        }
      }
  3. 在初始化队列jobinitqueue中的是等待初始化的作业。通过线程池中的线程运行的jobinitmanager类代码进行初始化。代码如下:
  4. public void start() throws IOException {//EagerTaskInitializationListener类中的start方法
        this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");
        jobInitManagerThread.setDaemon(true);//设置为守护进程
        this.jobInitManagerThread.start();
      }
  5. jobinitmanager类的具体代码如下:
  6. class JobInitManager implements Runnable {
        public void run() {
          JobInProgress job = null;
          while (true) {
            try {
              synchronized (jobInitQueue) {
                while (jobInitQueue.isEmpty()) {
                  jobInitQueue.wait();
                }
                job = jobInitQueue.remove(0);//每次取出第一个
              }
              threadPool.execute(new InitJob(job));
            } catch (InterruptedException t) {
              LOG.info("JobInitManagerThread interrupted.");
              break;
            }
          }
          LOG.info("Shutting down thread pool");
          threadPool.shutdownNow();
        }
      }
  7. initjob(job)代码如下:
  8. class InitJob implements Runnable {
        private JobInProgress job;
        public InitJob(JobInProgress job) {
          this.job = job;
        }
        public void run() {
          ttm.initJob(job);//tasktrackermanager接口定义的initjob()方法,在jobtracker中有实现
        }
      }
  9. jobtracker中的initjob()方法
  10. public void initJob(JobInProgress job) {
        if (null == job) {
          LOG.info("Init on null job is not valid");
          return;
        }
        try {
          JobStatus prevStatus = (JobStatus)job.getStatus().clone();
          LOG.info("Initializing " + job.getJobID());
          job.initTasks();//调用JobInProgress中的initTasks()方法初始化job中的所有task
          // Inform the listeners if the job state has changed
          // Note : that the job will be in PREP state.
          JobStatus newStatus = (JobStatus)job.getStatus().clone();
          if (prevStatus.getRunState() != newStatus.getRunState()) {
            JobStatusChangeEvent event =
              new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,
                  newStatus);
            synchronized (JobTracker.this) {
              updateJobInProgressListeners(event);
            }
          }
        } catch (KillInterruptedException kie) {
          //   If job was killed during initialization, job state will be KILLED
          LOG.error("Job initialization interrupted: " +
              StringUtils.stringifyException(kie));
          killJob(job);
        } catch (Throwable t) {
          String failureInfo =
            "Job initialization failed: " + StringUtils.stringifyException(t);
          // If the job initialization is failed, job state will be FAILED
          LOG.error(failureInfo);
          job.getStatus().setFailureInfo(failureInfo);
          failJob(job);
        }
         }
  11. JobInProgress中的initTasks()方法中主要代码如下:
  12.  public synchronized void initTasks()
      numMapTasks = splits.length;

        // Sanity check the locations so we don't create/initialize unnecessary tasks
        for (TaskSplitMetaInfo split : splits) {
          NetUtils.verifyHostnames(split.getLocations());
        }
        
        jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
        jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
        this.queueMetrics.addWaitingMaps(getJobID(), numMapTasks);
        this.queueMetrics.addWaitingReduces(getJobID(), numReduceTasks);

        maps = new TaskInProgress[numMapTasks];
        for(int i=0; i < numMapTasks; ++i) {
          inputLength += splits[i].getInputDataLength();
          maps[i] = new TaskInProgress(jobId, jobFile,
                                       splits[i],
                                       jobtracker, conf, this, i, numSlotsPerMap);
        }
        LOG.info("Input size for job " + jobId + " = " + inputLength
            + ". Number of splits = " + splits.length);

        // Set localityWaitFactor before creating cache
        localityWaitFactor =
          conf.getFloat(LOCALITY_WAIT_FACTOR, DEFAULT_LOCALITY_WAIT_FACTOR);
        if (numMapTasks > 0) {
          nonRunningMapCache = createCache(splits, maxLevel);
        }

        // set the launch time
        this.launchTime = jobtracker.getClock().getTime();

        //
        // Create reduce tasks
        //
        this.reduces = new TaskInProgress[numReduceTasks];
        for (int i = 0; i < numReduceTasks; i++) {
          reduces[i] = new TaskInProgress(jobId, jobFile,
                                          numMapTasks, i,
                                          jobtracker, conf, this, numSlotsPerReduce);
          nonRunningReduces.add(reduces[i]);
        }

        // Calculate the minimum number of maps to be complete before
        // we should start scheduling reduces
        completedMapsForReduceSlowstart =
          (int)Math.ceil(
              (conf.getFloat("mapred.reduce.slowstart.completed.maps",
                             DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) *
               numMapTasks));
        
        // ... use the same for estimating the total output of all maps
        resourceEstimator.setThreshhold(completedMapsForReduceSlowstart);
        
        // create cleanup two cleanup tips, one map and one reduce.
        cleanup = new TaskInProgress[2];

        // cleanup map tip. This map doesn't use any splits. Just assign an empty
        // split.
        TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
        cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
                jobtracker, conf, this, numMapTasks, 1);
        cleanup[0].setJobCleanupTask();

        // cleanup reduce tip.
        cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
                           numReduceTasks, jobtracker, conf, this, 1);
        cleanup[1].setJobCleanupTask();

        // create two setup tips, one map and one reduce.
        setup = new TaskInProgress[2];

        // setup map tip. This map doesn't use any split. Just assign an empty
        // split.
        setup[0] = new TaskInProgress(jobId, jobFile, emptySplit,
                jobtracker, conf, this, numMapTasks + 1, 1);
        setup[0].setJobSetupTask();

        // setup reduce tip.
        setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
                           numReduceTasks + 1, jobtracker, conf, this, 1);
        setup[1].setJobSetupTask();
      }
原文地址:https://www.cnblogs.com/cxtblogs/p/5038516.html