Sample MultipleFileWordcount CombineFileInputFormat

       在旧版本的samples中,使用的是旧的api,mapred下面的MultiFileInputFormat,现在已经过时。

现在推荐使用mapreduce下面的CombineInputFormat来处理。

应用场景:

       如果文件数量大,而且单个文件又比较小,若是使用FileInputFormat进行分片,则会根据一个文件生成一个分片,

每个分片又丢给一个maptask,这样maptask处理的内容太小,很快就完成了,利用率不高,因为maptask本身启动

处理所占的时间和资源消耗就超过了信息处理本身所占的时间。推荐一个maptask至少运行一分钟左右。

解决方案:

      使用combinefileinputformat来重定义了getSplits方法,这样可以根据我们指定的splitsize(一般是给定为blocksize大小,减少数据传输)

,打包多个小文件到一个inputsplit中去。这样减少了框架生成的maptask的数量。

示例:

例如我的englishwords目录下面有四个文件,使用wordcount示例来跑的话,默认生成4个maptask(不考虑失败又生成的maptask)一个reducetask.

使用旧版的api生成了2个maptask,使用新版的multiplefilewordcount示例生成了一个maptask.

CombineFileInputformat 中可以重写的一个重要方法是:

/**
   * Specify the maximum size (in bytes) of each split. Each split is
   * approximately equal to the specified size.
   */
  protected void setMaxSplitSize(long maxSplitSize) {
    this.maxSplitSize = maxSplitSize;
  }

示例中又自己写了一个数据结构wordoffset, 是因为原来的只考虑一个文件(一个分片一个文件)中的信息,所以key是offset,value是当前行的值。

现在一个分片中会有多个文件,所以新的数据结构wordoffset就表示哪个文件的offset,这样更明晰。

有时候我们在项目中就需要自己定义maptask的参数。这个结构是需要实现writable接口的(可以序列化)。

使用CombineFileInputFormat最重要的就是实现 Reader的方法,Reader中最重要的就是next().

基本思路其实和单个文件的是类似的, 只是在这种情况下需要处理多个文件的情况,需要有一个index来标志是正在处理哪个文件。

一般在combineReader里面会有如下的代码:

public static class CombineFileLineRecordReader 
    extends RecordReader<WordOffset, Text> {

    private long startOffset; //offset of the chunk;
    private long end; //end of the chunk;
    private long pos; // current pos 
    private FileSystem fs;
    private Path path;
    private WordOffset key;
    private Text value;
    
    private FSDataInputStream fileIn;
    private LineReader reader;
    
    public CombineFileLineRecordReader(CombineFileSplit split,
        TaskAttemptContext context, Integer index) throws IOException {
      
      this.path = split.getPath(index);
      fs = this.path.getFileSystem(context.getConfiguration());
      this.startOffset = split.getOffset(index);
      this.end = startOffset + split.getLength(index);
      boolean skipFirstLine = false;
      
      //open the file
      fileIn = fs.open(path);
      if (startOffset != 0) {
        skipFirstLine = true;
        --startOffset;
        fileIn.seek(startOffset);
      }
      reader = new LineReader(fileIn);
      if (skipFirstLine) {  // skip first line and re-establish "startOffset".
        startOffset += reader.readLine(new Text(), 0,
                    (int)Math.min((long)Integer.MAX_VALUE, end - startOffset));
      }
      this.pos = startOffset;
    }
…………
Looking for a job working at Home about MSBI
原文地址:https://www.cnblogs.com/huaxiaoyao/p/4297877.html