WordCount作业提交到FileInputFormat类中split切分算法和host选择算法过程源码分析

参考 FileInputFormat类中split切分算法和host选择算法介绍  以及 Hadoop2.6.0的FileInputFormat的任务切分原理分析(即如何控制FileInputFormat的map任务数量)  以及 Hadoop中FileInputFormat计算InputSplit的getSplits方法的流程  以及 hadoop作业分片处理以及任务本地性分析(源码分析第一篇)    

分析前先介绍一下:

( 这里要注意下, Block 的 hosts 和 Split 的 hosts 不一样, Split 的 hosts 是通过 Split 的 hosts 按一定方法生成的, 如果一个 Block 对应一个 Split (一般情况下是这样的), 这时它们两个 hosts 是一样的. 如果不是一对一( Split > block), 则 Split 需要按一定方法选择 hosts . 

Split 和 MapTask 是一一对应的, 一个 Split 对应一个 MapTask. 所以本地性是跟 Split 的 hosts 相关的.

BlocksMap存储 Block 与 BlockInfo 的映射关系, Block 中主要包含3项: long blockId;  // 数据块的唯一标识,即数据块的ID号. long numBytes;  // 数据块包含的文件数据大小. long generationStamp;  // 数据块的版本号,或数据块的时间戳.   BlockInfo( 在 Hadoop-2.7.3 中是 BlockInfoContiguous) 包含所以副本所在主机名.  )

开始分析: ( 这里是 hadoop-2.7.3-src )

以WordCount开始: org.apache.hadoop.examples.WordCount.main() 内部调用 org.apache.hadoop.mapreduce.Job.waitForCompletion(boolean)

// 该段代码在 org.apache.hadoop.examples.WordCount

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();  //指定作业执行规范 , Configuration:map/reduce的j配置类,向hadoop框架描述map-reduce执行的工作
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "word count");  //指定job名称,及运行对象 
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);  //为job设置Mapper类 
    job.setCombinerClass(IntSumReducer.class);  //为job设置Combiner类 
    job.setReducerClass(IntSumReducer.class);  //为job设置Reducer类
    job.setOutputKeyClass(Text.class);  //为job的输出数据设置Key类
    job.setOutputValueClass(IntWritable.class);  //为job输出设置value类 
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));  //为job设置输入路径, org.apache.hadoop.mapreduce.lib.input.FileInputFormat
    }
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));  //为job设置输出路径 
    System.exit(job.waitForCompletion(true) ? 0 : 1);  //运行job, 调用 Job.waitForCompletion()
  }
WordCount

在 Job.waitForCompletion() 函数内部会调用 Job 本类的方法 submit(), 在 submit() 内部接着调用 org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(Job , Cluster)

// 该段代码在 org.apache.hadoop.mapreduce.Job 中

// ......

/**
   * Submit the job to the cluster and wait for it to finish.
   * @param verbose print the progress to the user
   * @return true if the job succeeded
   * @throws IOException thrown if the communication with the 
   *         <code>JobTracker</code> is lost
   */
  public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();    // 调用本类的 submit()
    }
    if (verbose) {
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }

// ......

/**
   * Submit the job to the cluster and return immediately.
   * @throws IOException
   */
  public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
    connect();
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
        return submitter.submitJobInternal(Job.this, cluster);  // 
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }
Job

 在 JobSubmitter.submitJobInternal(Job , Cluster) 函数内部调用本类的 writeSplits(Job ,Path )  为job创建分片; 接着 writeSplits(Job ,Path ) 方法内部会调用本类的 (1) writeNewSplits(JobContext , Path ) { Hadoop2.0 会调用这个,新版的API}和 (2) writeOldSplits(JobConf , Path ) { 这个是旧版的 API }; 在 JobSubmitter.writeNewSplits(JobContext , Path ) 方法内部会调用抽象类 org.apache.hadoop.mapreduce.InputFormat.getSplites(JobContext ), 计算job的输入文件的逻辑分片集合; 而在 JobSubmitter.writeOldSplits(JobContext , Path ) 方法内部会调用抽象类 org.apache.hadoop.mapred.InputFormat.getSplites(JobContext , int ), 计算job的输入文件的逻辑分片集合.

// 该段代码在 org.apache.hadoop.mapreduce.JobSubmitter 中

// ......

/**
   * Internal method for submitting jobs to the system.
   * 
   * <p>The job submission process involves:
   * <ol>
   *   <li>
   *   Checking the input and output specifications of the job.
   *   </li>
   *   <li>
   *   Computing the {@link InputSplit}s for the job.
   *   </li>
   *   <li>
   *   Setup the requisite accounting information for the 
   *   {@link DistributedCache} of the job, if necessary.
   *   </li>
   *   <li>
   *   Copying the job's jar and configuration to the map-reduce system
   *   directory on the distributed file-system. 
   *   </li>
   *   <li>
   *   Submitting the job to the <code>JobTracker</code> and optionally
   *   monitoring it's status.
   *   </li>
   * </ol></p>
   * @param job the configuration to submit
   * @param cluster the handle to the Cluster
   * @throws ClassNotFoundException
   * @throws InterruptedException
   * @throws IOException
   */
  JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    //validate the jobs output specs 
    checkSpecs(job);

    Configuration conf = job.getConfiguration();
    addMRFrameworkToDistributedCache(conf);

    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
    try {
      conf.set(MRJobConfig.USER_NAME,
          UserGroupInformation.getCurrentUser().getShortUserName());
      conf.set("hadoop.http.filter.initializers", 
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
          + " as the submit dir");
      // get delegation token for the dir
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
          new Path[] { submitJobDir }, conf);
      
      populateTokenCache(conf, job.getCredentials());

      // generate a secret to authenticate shuffle transfers
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init(SHUFFLE_KEY_LENGTH);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating shuffle secret key", e);
        }
        SecretKey shuffleKey = keyGen.generateKey();
        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
            job.getCredentials());
      }
      if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
        conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
        LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
                "data spill is enabled");
      }

      copyAndConfigureFiles(job, submitJobDir);

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      int maps = writeSplits(job, submitJobDir);  // 为job创建分片
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);
      AccessControlList acl = submitClient.getQueueAdmins(queue);
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(conf);

      if (conf.getBoolean(
          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
        // Add HDFS tracking ids
        ArrayList<String> trackingIds = new ArrayList<String>();
        for (Token<? extends TokenIdentifier> t :
            job.getCredentials().getAllTokens()) {
          trackingIds.add(t.decodeIdentifier().getTrackingId());
        }
        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
            trackingIds.toArray(new String[trackingIds.size()]));
      }

      // Set reservation info if it exists
      ReservationId reservationId = job.getReservationId();
      if (reservationId != null) {
        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
      }

      // Write job file to submit dir
      writeConf(conf, submitJobFile);
      
      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());  // 提交 job
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          jtFs.delete(submitJobDir, true);

      }
    }
  }

// ......

  private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    if (jConf.getUseNewMapper()) {
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }

// ......

  @SuppressWarnings("unchecked")
  private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

// ......

  //method to write splits for old api mapper.
  private int writeOldSplits(JobConf job, Path jobSubmitDir) 
  throws IOException {
    org.apache.hadoop.mapred.InputSplit[] splits =
    job.getInputFormat().getSplits(job, job.getNumMapTasks());
    // sort the splits into order based on size, so that the biggest
    // go first
    Arrays.sort(splits, new Comparator<org.apache.hadoop.mapred.InputSplit>() {
      public int compare(org.apache.hadoop.mapred.InputSplit a,
                         org.apache.hadoop.mapred.InputSplit b) {
        try {
          long left = a.getLength();
          long right = b.getLength();
          if (left == right) {
            return 0;
          } else if (left < right) {
            return 1;
          } else {
            return -1;
          }
        } catch (IOException ie) {
          throw new RuntimeException("Problem getting input split size", ie);
        }
      }
    });
    JobSplitWriter.createSplitFiles(jobSubmitDir, job, 
        jobSubmitDir.getFileSystem(job), splits);
    return splits.length;
  }
JobSubmitter

 (1) 在抽象类 org.apache.hadoop.mapreduce.InputFormat.getSplites(JobContext ) 方法,这里实际调用的是实现类 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplites(JobContext ), ( 这里 FileInputFormat 有两个相同的, 分别是 org.apache.hadoop.mapreduce.lib.input.FileInputFormat 和 org.apache.hadoop.mapred.FileInputFormat , 我们选择org.apache.hadoop.mapreduce.lib.input.FileInputFormat 有以下几点原因: 首先, org.apache.hadoop.mapred.FileInputFormat 类是抽象类 InputFormat 的实现类; 其次, WordCount中的FileInputFormat 就是 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.)

    我们介绍一个概念, 即新旧 MapReduce API , 从0.20.0版本开始, Hadoop 同时提供了新旧两套 MapReduce API. 新 API 在旧 API 基础上进行了封装,使得其在扩展性和易用性方面更好. 旧版 API 放在 org.apache.hadoop.mapred 包中, 而新版 API 则放在 org.apache.hadoop.mapreduce 包及其子包中.

 // 该段代码是在 org.apache.hadoop.mapreduce.InputFormat 中

  /** 
   * Logically split the set of input files for the job.  
   * 
   * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
   * for processing.</p>
   *
   * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
   * input files are not physically split into chunks. For e.g. a split could
   * be <i>&lt;input-file-path, start, offset&gt;</i> tuple. The InputFormat
   * also creates the {@link RecordReader} to read the {@link InputSplit}.
   * 
   * @param context job configuration.
   * @return an array of {@link InputSplit}s for the job.
   */
  public abstract 
    List<InputSplit> getSplits(JobContext context
                               ) throws IOException, InterruptedException;

在 org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplites(JobContext ) 方法中,

(2) 在抽象类 org.apache.hadoop.mapred.InputFormat.getSplites(JobContext ,int ) 方法,这里实际调用的是实现类 org.apache.hadoop.mapred.FileInputFormat.getSplites(JobContext ,int ) ,  这里  org.apache.hadoop.mapred.FileInputFormat 类是抽象类 InputFormat 的实现类. 具体参考 FileInputFormat类中split切分算法和host选择算法介绍  . 

原文地址:https://www.cnblogs.com/zhangchao0515/p/8288298.html