MapReduce执行过程源码分析(一)——Job任务的提交

为了能使源码的执行过程与Hadoop权威指南(2、3版)中章节Shuffle and Sort的分析相对应,Hadoop的版本为0.20.2。

一般情况下我们通过Job(org.apache.hadoop.mapreduce.Job)的方法waitForCompletion来开始一个Job的执行。

    /**
     * Submit the job to the cluster and wait for it to finish.
     * 
     * @param verbose
     *            print the progress to the user
     * @return true if the job succeeded
     * @throws IOException
     *             thrown if the communication with the <code>JobTracker</code>
     *             is lost
     */
    public boolean waitForCompletion(boolean verbose) throws IOException,
            InterruptedException, ClassNotFoundException {
        if (state == JobState.DEFINE) {
            submit();
        }

        if (verbose) {
            jobClient.monitorAndPrintJob(conf, info);
        } else {
            info.waitForCompletion();
        }

        return isSuccessful();
    }

通常设置方法参数verbose为true,这样可以在控制台中看到Job的执行过程信息。

其中Job的具体提交过程是由方法submit完成的,

    /**
     * Submit the job to the cluster and return immediately.
     * 
     * @throws IOException
     */
    public void submit() throws IOException, InterruptedException,
            ClassNotFoundException {
        ensureState(JobState.DEFINE);

        setUseNewAPI();

        info = jobClient.submitJobInternal(conf);

        state = JobState.RUNNING;
    }

而submit方法的执行又依赖于JobClient submitJobInternal来完成,方法submitJobInternal是Job任务提交过程中的重点,在方法中完成的Job任务的初始化准备工作。

    /**
     * Internal method for submitting jobs to the system.
     * 
     * @param job
     *            the configuration to submit
     * @return a proxy object for the running job
     * @throws FileNotFoundException
     * @throws ClassNotFoundException
     * @throws InterruptedException
     * @throws IOException
     */
    public RunningJob submitJobInternal(JobConf job)
            throws FileNotFoundException, ClassNotFoundException,
            InterruptedException, IOException {
        /*
         * configure the command line options correctly on the submitting dfs
         */
        JobID jobId = jobSubmitClient.getNewJobId();

        /*
         * 在submitJobDir目录下有三个文件:job.jar、job.split、job.xml
         * 
         * **********************************************************************
         */
        Path submitJobDir = new Path(getSystemDir(), jobId.toString());
        Path submitJarFile = new Path(submitJobDir, "job.jar");
        Path submitSplitFile = new Path(submitJobDir, "job.split");

        configureCommandLineOptions(job, submitJobDir, submitJarFile);

        Path submitJobFile = new Path(submitJobDir, "job.xml");

        /*
         * 获取reducer的数目
         * 
         * **********************************************************************
         */
        int reduces = job.getNumReduceTasks();

        JobContext context = new JobContext(job, jobId);

        /*
         * Check the output specification
         * 
         * 根据是否使用New API验证OutputFormat
         * 
         * 如输出格式设置(未设置默认为TextOutputFormat)、是否设置输出路径及输出路径是否已经存在
         * 
         * **********************************************************************
         */
        if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
            org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils
                    .newInstance(context.getOutputFormatClass(), job);

            output.checkOutputSpecs(context);
        } else {
            job.getOutputFormat().checkOutputSpecs(fs, job);
        }

        /*
         * Create the splits for the job
         * 
         * *******************************************************************
         */
        LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));

        /*
         * 根据输入切片的数目决定map任务的数目
         * 
         * 一个输入切片对应一个map
         * 
         * *******************************************************************
         */
        int maps;

        if (job.getUseNewMapper()) {
            maps = writeNewSplits(context, submitSplitFile);
        } else {
            maps = writeOldSplits(job, submitSplitFile);
        }

        job.set("mapred.job.split.file", submitSplitFile.toString());

        job.setNumMapTasks(maps);

        /*
         * Write job file to JobTracker's fs
         * 
         * **********************************************************************
         */
        FSDataOutputStream out = FileSystem.create(fs, submitJobFile,
                new FsPermission(JOB_FILE_PERMISSION));

        try {
            job.writeXml(out);
        } finally {
            out.close();
        }

        /*
         * Now, actually submit the job (using the submit name)
         * 
         * **********************************************************************
         */
        JobStatus status = jobSubmitClient.submitJob(jobId);
        
        if (status != null) {
            return new NetworkedJob(status);
        } else {
            throw new IOException("Could not launch job");
        }
    }

下面对该方法内部的执行流程进行详细分析:

(1)生成Job ID

JobID jobId = jobSubmitClient.getNewJobId();

(2)目录相关及文件

在submitJobDir目录下有三个文件:job.jar、job.split、job.xml,其中

job.jar:Job相关类(资源)的一个Jar包;

job.split:Job的输入文件(可能有多个或可以是其它格式(如HBase HTable))会根据一定的条件进行切片,每一个切片中的“数据”会对应的Job的一个Map任务,即每一个Map仅处理某一个切片中的“数据”;

job.xml:用以保存Job相关的配置数据。

        /*
         * 在submitJobDir目录下有三个文件:job.jar、job.split、job.xml
         * 
         * **********************************************************************
         */
        Path submitJobDir = new Path(getSystemDir(), jobId.toString());
        Path submitJarFile = new Path(submitJobDir, "job.jar");
        Path submitSplitFile = new Path(submitJobDir, "job.split");

        /*
         * 根据命令行参数-libjars, -files, -archives对Job进行相应的配置
         */
        configureCommandLineOptions(job, submitJobDir, submitJarFile);

        Path submitJobFile = new Path(submitJobDir, "job.xml");

其中,configureCommandLineOptions主要是根据用户在命令行环境下提供的参数(-libjars、-files、-archives)进行DistributedCache的设置,并将相应的Jar拷贝至目录submitJobDir中。

注:DistributedCache的相关知识会在后续分析,在此先不进行讨论。

(3)Reducer数目

获取用户所设置的该Job中Reducer的数目。

int reduces = job.getNumReduceTasks();

(4)JobContext

JobContext context = new JobContext(job, jobId);

其实JobContext就是对JobConf与JobID的封装。

(5)Job输出相关校验

        /*
         * Check the output specification
         * 
         * 根据是否使用New API验证OutputFormat
         * 
         * 如输出格式设置(未设置默认为TextOutputFormat)、是否设置输出路径及输出路径是否已经存在
          * 
         * **********************************************************************
         */
        if (reduces == 0 ? job.getUseNewMapper() : job.getUseNewReducer()) {
            org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils
                    .newInstance(context.getOutputFormatClass(), job);

            output.checkOutputSpecs(context);
        } else {
            job.getOutputFormat().checkOutputSpecs(fs, job);
        }

校验时会根据是否使用新版本的API分为两种情况,默认情况下使用的都是新版本的API,所以此处不考虑旧版本API的情况,所以分析的代码变为

 

            org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils
                    .newInstance(context.getOutputFormatClass(), job);

            output.checkOutputSpecs(context);

首先,获取输出的具体格式;

context.getOutputFormatClass()

 

    /**
     * Get the {@link OutputFormat} class for the job.
     * 
     * @return the {@link OutputFormat} class for the job.
     */
    @SuppressWarnings("unchecked")
    public Class<? extends OutputFormat<?, ?>> getOutputFormatClass()
            throws ClassNotFoundException {
        return (Class<? extends OutputFormat<?, ?>>) conf.getClass(
                OUTPUT_FORMAT_CLASS_ATTR, TextOutputFormat.class);
    }

由上述代码可以看出,如果用户并没有明确指定输出格式类型,则默认使用TextOutputFormat。

注:文本是进行数据分析时经常使用的一种格式,因此本文主要使用TextInputFormat、TextOutputFormat进行讲解。

然后,通过反射将输出格式实例化;

org.apache.hadoop.mapreduce.OutputFormat<?, ?> output = ReflectionUtils
                    .newInstance(context.getOutputFormatClass(), job);

最后,通过输出格式的具体类型进行校验,包括两个部分:是否设置输出目录及输出目录是否已经存在。

TextOutputFormat的checkOutputSpecs继承自它的父类FileOutputFormat。

     public void checkOutputSpecs(JobContext job)
            throws FileAlreadyExistsException, IOException {
        // Ensure that the output directory is set and not already there
        Path outDir = getOutputPath(job);

        if (outDir == null) {
            throw new InvalidJobConfException("Output directory not set.");
        }

        if (outDir.getFileSystem(job.getConfiguration()).exists(outDir)) {
            throw new FileAlreadyExistsException("Output directory " + outDir
                    + " already exists");
        }
    }

(6)生成输入切片(Split),并设置Map的数目;

        /*
         * Create the splits for the job
         * 
         * *******************************************************************
         */
        LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));

        /*
         * 根据输入切片的数目决定map任务的数目
          * 
         * 一个输入切片对应一个map
         * 
         * *******************************************************************
         */
        int maps;

        if (job.getUseNewMapper()) {
            maps = writeNewSplits(context, submitSplitFile);
        } else {
            maps = writeOldSplits(job, submitSplitFile);
        }

        job.set("mapred.job.split.file", submitSplitFile.toString());

        job.setNumMapTasks(maps);

这里仅分析新版本API下的writeNewSplits,该方法需要两个参数:JobContext及切片文件的Path。

    @SuppressWarnings("unchecked")
    private <T extends org.apache.hadoop.mapreduce.InputSplit> int writeNewSplits(
            JobContext job, Path submitSplitFile) throws IOException,
            InterruptedException, ClassNotFoundException {
        JobConf conf = job.getJobConf();

        /*
         * 创建InputFormat实例
         * 
         * 不同的InputFormat实例获取Split的方式不同
         * 
         * ******************************************************************
         */
        org.apache.hadoop.mapreduce.InputFormat<?, ?> input = ReflectionUtils
                .newInstance(job.getInputFormatClass(), job.getJobConf());

        /*
         * 获取输入文件对应的切片记录
         * 
         * ******************************************************************
         */
        List<org.apache.hadoop.mapreduce.InputSplit> splits = input
                .getSplits(job);

        T[] array = (T[]) splits
                .toArray(new org.apache.hadoop.mapreduce.InputSplit[splits
                        .size()]);

        /*
         * sort the splits into order based on size, so that the biggest go
         * first
         * 
         * ******************************************************************
         */
        Arrays.sort(array, new NewSplitComparator());

        /*
         * 写出SplitFile
         * 
         * ******************************************************************
         */

        // 打开切片文件输出流,并写出头信息(头、版本号、切片数目)
        DataOutputStream out = writeSplitsFileHeader(conf, submitSplitFile,
                array.length);

        try {
            if (array.length != 0) {
                DataOutputBuffer buffer = new DataOutputBuffer();

                RawSplit rawSplit = new RawSplit();

                SerializationFactory factory = new SerializationFactory(conf);

                Serializer<T> serializer = factory
                        .getSerializer((Class<T>) array[0].getClass());

                serializer.open(buffer);

                for (T split : array) {
                    rawSplit.setClassName(split.getClass().getName());

                    buffer.reset();

                    // 序列化文件名、起始位置、切片长度、主机位置(多个)
                    serializer.serialize(split);

                    rawSplit.setDataLength(split.getLength());

                    rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());

                    rawSplit.setLocations(split.getLocations());

                    rawSplit.write(out);
                }

                serializer.close();
            }
        } finally {
            out.close();
        }

        return array.length;
    }

方法思路:根据指定的输入格式类型(InputFormat)对输入文件进行切片,并将切片信息保存至指定的切片文件中。

注:切片并不是对输入文件进行物理上的切割,而只是一种逻辑上的“分割”,即将输入文件某个片段的起始位置保存下来,后期Map任务运行时根据切片文件就可以将该片段作为输入进行处理。

首先,获取输入格式类型,

job.getInputFormatClass()

 

/**
     * Get the {@link InputFormat} class for the job.
     * 
     * @return the {@link InputFormat} class for the job.
     */
    @SuppressWarnings("unchecked")
    public Class<? extends InputFormat<?, ?>> getInputFormatClass()
            throws ClassNotFoundException {
        return (Class<? extends InputFormat<?, ?>>) conf.getClass(
                INPUT_FORMAT_CLASS_ATTR, TextInputFormat.class);
    }

与输出格式类型相同,如果用户没有特殊指定,默认的输入格式类型为TextInputFormat,然后将此输入格式类型实例化。

org.apache.hadoop.mapreduce.InputFormat<?, ?> input = ReflectionUtils
                .newInstance(job.getInputFormatClass(), job.getJobConf());

然后,根据具体的输入格式类型计算切片信息,

        /*
         * 获取输入文件对应的切片记录
          * 
         * ******************************************************************
         */
        List<org.apache.hadoop.mapreduce.InputSplit> splits = input
                .getSplits(job);

TextInputFormat的方法getSplits继承自它的父类FileInputFormat。

    /**
     * Generate the list of files and make them into FileSplits.
     */
    public List<InputSplit> getSplits(JobContext job) throws IOException {
        /*
         * 计算Split的最小值与最大值
         * 
         * ********************************************************************
         */
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        long maxSize = getMaxSplitSize(job);

        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();

        /*
         * 逐个处理InputPaths中的文件
         * 
         * *******************************************************************
         */
        for (FileStatus file : listStatus(job)) {
            Path path = file.getPath();

            FileSystem fs = path.getFileSystem(job.getConfiguration());

            /*
             * 获取特定文件的长度
             * 
             * ******************************************************************
             */
            long length = file.getLen();

            /*
             * 获取特定文件对应的块Block信息
             * 
             * ***************************************************************
             */
            BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
                    length);

            /*
             * 如果文件长度大于0且是可切片的
             * 
             * ***************************************************************
             */
            if ((length != 0) && isSplitable(job, path)) {
                long blockSize = file.getBlockSize();

                /*
                 * 根据blockSize、minSize、maxSize计算切片大小
                 * 
                 * Math.max(minSize, Math.min(maxSize, blockSize)
                 * 
                 * ***********************************************************
                 */
                long splitSize = computeSplitSize(blockSize, minSize, maxSize);

                long bytesRemaining = length;

                while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
                    /*
                     * 返回的Block Index为此切片开始位置所在Block的Index
                     * 
                     * **********************************************************
                     */
                    int blkIndex = getBlockIndex(blkLocations, length
                            - bytesRemaining);

                    /*
                     * 一个Block对应一个FileSplit
                     * 
                     * *******************************************************
                     */
                    splits.add(new FileSplit(path, length - bytesRemaining,
                            splitSize, blkLocations[blkIndex].getHosts()));

                    bytesRemaining -= splitSize;
                }

                if (bytesRemaining != 0) {
                    /*
                     * 剩余的文件数据形成一个切片,hosts为此文件最后一个Block的hosts
                     * 
                     * **********************************************************
                     */
                    splits.add(new FileSplit(path, length - bytesRemaining,
                            bytesRemaining,
                            blkLocations[blkLocations.length - 1].getHosts()));
                }
            } else if (length != 0) {
                /*
                 * 文件长度不为0但不可分割
                 * 
                 * 不能切片的文件,整体形成一个切片,hosts为此文件第一个Block的hosts
                 * 
                 * ***********************************************************
                 */
                splits.add(new FileSplit(path, 0, length, blkLocations[0]
                        .getHosts()));
            } else {
                // Create empty hosts array for zero length files
                splits.add(new FileSplit(path, 0, length, new String[0]));
            }
        }

        LOG.debug("Total # of splits: " + splits.size());

        return splits;
    }

getSplits处理流程如下:

① 根据配置参数计算Split所允许的最小值与最大值,为后期确定Split的长度提供参考;

        /*
         * 计算Split的最小值与最大值
          * 
         * ********************************************************************
         */
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        long maxSize = getMaxSplitSize(job);

② 在内存中创建相应的数据结构,用以保存计算所得的切片信息;

        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();

③ 循环处理InputPaths所添加的文件,对一个文件各自计算其对应的切片信息;

        /*
         * 逐个处理InputPaths中的文件
          * 
         * *******************************************************************
         */
        for (FileStatus file : listStatus(job)) {
            ......
        }

④ 计算某个文件的切片信息:

a. 获取该文件的长度及对应的Block信息;

            Path path = file.getPath();

            FileSystem fs = path.getFileSystem(job.getConfiguration());

            /*
             * 获取特定文件的长度
              * 
             * ******************************************************************
             */
            long length = file.getLen();

            /*
             * 获取特定文件对应的块Block信息
              * 
             * ***************************************************************
             */
            BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
                    length);

b. 根据文件长度以及该文件是否可以切片,分为三种情况处理:

其中,文件是否支持Split,是由该文件类型对应的InputFormat决定的,FileInputFormat中的实现如下:

    /**
     * Is the given filename splitable? Usually, true, but if the file is stream
     * compressed, it will not be.
     * 
     * <code>FileInputFormat</code> implementations can override this and return
     * <code>false</code> to ensure that individual input files are never
     * split-up so that {@link Mapper}s process entire files.
     * 
     * @param context
     *            the job context
     * @param filename
     *            the file name to check
     * @return is this file splitable?
     */
    protected boolean isSplitable(JobContext context, Path filename) {
        return true;
    }

TextInputFormat中重写了该方法:

@Override
    protected boolean isSplitable(JobContext context, Path file) {
        CompressionCodec codec = new CompressionCodecFactory(
                context.getConfiguration()).getCodec(file);
        
        return codec == null;
    }

由上述代码可见,如果文本文件经过相应的压缩之后,是不支持进行Split的。

第一种情况:文件长度大于0,且文件支持Split;

首先计算一个切片的具体长度,长度的计算方式为:Math.max(minSize, Math.min(maxSize, blockSize) ;

                long blockSize = file.getBlockSize();

                /*
                 * 根据blockSize、minSize、maxSize计算切片大小
                   * 
                 * Math.max(minSize, Math.min(maxSize, blockSize)
                 * 
                 * ***********************************************************
                 */
                long splitSize = computeSplitSize(blockSize, minSize, maxSize);

然后,根据splitSize进行切片,思路就是从文件开始处,以splitSize为区间,对文件进行逻辑上的切分;

                long bytesRemaining = length;

                while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
                    /*
                     * 返回的Block Index为此切片开始位置所在Block的Index
                     * 
                     * **********************************************************
                     */
                    int blkIndex = getBlockIndex(blkLocations, length
                            - bytesRemaining);

                    /*
                     * 一个Block对应一个FileSplit
                     * 
                     * *******************************************************
                     */
                    splits.add(new FileSplit(path, length - bytesRemaining,
                            splitSize, blkLocations[blkIndex].getHosts()));

                    bytesRemaining -= splitSize;
                }

                if (bytesRemaining != 0) {
                    /*
                     * 剩余的文件数据形成一个切片,hosts为此文件最后一个Block的hosts
                     * 
                     * **********************************************************
                     */
                    splits.add(new FileSplit(path, length - bytesRemaining,
                            bytesRemaining,
                            blkLocations[blkLocations.length - 1].getHosts()));
                }

为了不产生过小的切片,要求尚未进行切片的文件部分长度(bytesRemaining)大于切片长度(splitSize)的SPLIT_SLOP(1.1)倍,然后将文件的剩余部分直接作为一个切片。

在上述代码中的切片信息中,还保存着切片对应的Block信息,注意切片并不一定会与Block完全吻合(即切片在文件中的起止处与该Block在文件中的起止处一致),所谓的对应的Block,是指该切片的起始处正在落在该Block的区间内;之所以要保存切片对应的Block信息,是为后期Map任务的“本地计算”调度运行作准备的。

第二种情况:文件长度大于0,但该文件不支持切片;

                /*
                 * 文件长度不为0但不可分割
                   * 
                 * 不能切片的文件,整体形成一个切片,hosts为此文件第一个Block的hosts
                 * 
                 * ***********************************************************
                 */
                splits.add(new FileSplit(path, 0, length, blkLocations[0]
                        .getHosts()));

因为该文件不支持切片,直接将整个文件作为一个切片就可以了。

第三种情况:文件长度为0;

                // Create empty hosts array for zero length files
                splits.add(new FileSplit(path, 0, length, new String[0]));

此时直接创建一个空的切片即可。

到此,所有输入文件的切片信息就全部产生完毕了。

⑤ 对产生的切片进行排序处理,排序的依据是切片的大小,切片越大,在切片集合中的位置应该更靠前,这样可以使大的切片在调度时,优先得到处理。

        T[] array = (T[]) splits
                .toArray(new org.apache.hadoop.mapreduce.InputSplit[splits
                        .size()]);

        /*
         * sort the splits into order based on size, so that the biggest go
         * first
         * 
         * ******************************************************************
         */
        Arrays.sort(array, new NewSplitComparator());

⑥ 存储切片信息至相应的切片文件中,调度任务时使用切片文件中的信息进行调度,具体的存储过程不影响整个处理流程的理解,在此不对它进行分析。

至此,writeNewSplits方法结果,该方法还回返回切片的总数目,即对应着Job的Map任务数目。

(7)将Job的相关信息写入job.xml;

        /*
         * Write job file to JobTracker's fs
         * 
         * **********************************************************************
         */
        FSDataOutputStream out = FileSystem.create(fs, submitJobFile,
                new FsPermission(JOB_FILE_PERMISSION));

        try {
            job.writeXml(out);
        } finally {
            out.close();
        }

(8)完成Job任务的实际提交;

        /*
         * Now, actually submit the job (using the submit name)
         * 
         * **********************************************************************
         */
        JobStatus status = jobSubmitClient.submitJob(jobId);

        if (status != null) {
            return new NetworkedJob(status);
        } else {
            throw new IOException("Could not launch job");
        }

到此,Job任务的提交过程分析完毕。

原文地址:https://www.cnblogs.com/yurunmiao/p/3557076.html