MapReduce中的OutputFormat

OutputFormat在hadoop源码中是一个抽象类 public abstract class OutputFormat<K, V>,其定义了reduce任务的输出格式

https://github.com/apache/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputFormat.java

可以参考文章

MapReduce快速入门系列(12) | MapReduce之OutputFormat

常用的OutputFormat可以查看源码

https://github.com/apache/hadoop/tree/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output

1.文本输出TextOutputFormat,是hadoop的默认实现输出功能的类

https://github.com/apache/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/TextOutputFormat.java

TextOutputFormat实现了FileOutputFormatFileOutputFormat也一个抽象类,是OutputFormat的子类,源码在

https://github.com/apache/hadoop/blob/release-2.6.0/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java

<i>其中一个比较重要的是 RecordWriter接口,其中有两个方法,一个是write方法,另一个是close方法

https://github.com/apache/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/RecordWriter.java

源码

public interface RecordWriter<K, V> {
  /** 
   * Writes a key/value pair.
   *
   * @param key the key to write.
   * @param value the value to write.
   * @throws IOException
   */      
  void write(K key, V value) throws IOException;

  /** 
   * Close this <code>RecordWriter</code> to future operations.
   * 
   * @param reporter facility to report progress.
   * @throws IOException
   */ 
  void close(Reporter reporter) throws IOException;
}

在 TextOutputFormat 实现类中如下实现了 LineRecordWriter<K, V>源码

protected static class LineRecordWriter<K, V>
    extends RecordWriter<K, V> {
    private static final String utf8 = "UTF-8";
    private static final byte[] newline;
    static {
      try {
        newline = "
".getBytes(utf8);
      } catch (UnsupportedEncodingException uee) {
        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
      }
    }

    protected DataOutputStream out;
    private final byte[] keyValueSeparator;

    public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
      this.out = out;
      try {
        this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
      } catch (UnsupportedEncodingException uee) {
        throw new IllegalArgumentException("can't find " + utf8 + " encoding");
      }
    }

    public LineRecordWriter(DataOutputStream out) {
      this(out, "	");
    }

    /**
     * Write the object to the byte stream, handling Text as a special
     * case.
     * @param o the object to print
     * @throws IOException if the write throws, we pass it on
     */
    private void writeObject(Object o) throws IOException {
      if (o instanceof Text) {
        Text to = (Text) o;
        out.write(to.getBytes(), 0, to.getLength());
      } else {
        out.write(o.toString().getBytes(utf8));
      }
    }

    public synchronized void write(K key, V value)
      throws IOException {

      boolean nullKey = key == null || key instanceof NullWritable;
      boolean nullValue = value == null || value instanceof NullWritable;
      if (nullKey && nullValue) {
        return;
      }
      if (!nullKey) {
        writeObject(key);
      }
      if (!(nullKey || nullValue)) {
        out.write(keyValueSeparator);
      }
      if (!nullValue) {
        writeObject(value);
      }
      out.write(newline);
    }

    public synchronized 
    void close(TaskAttemptContext context) throws IOException {
      out.close();
    }
  }

<ii> 另外一个比较重要是 getRecordWriter 抽象方法,当实现 FileOutputFormat抽象类 的时候需要实现这个方法,从job当中获取 RecordWriter<K, V>

public abstract RecordWriter<K, V> 
     getRecordWriter(TaskAttemptContext job
                     ) throws IOException, InterruptedException;

在 TextOutputFormat 实现类中如下实现了 getRecordWriter

其中使用了LineRecordWriter,源码

public RecordWriter<K, V> 
         getRecordWriter(TaskAttemptContext job
                         ) throws IOException, InterruptedException {
    Configuration conf = job.getConfiguration();
    boolean isCompressed = getCompressOutput(job);
    String keyValueSeparator= conf.get(SEPERATOR, "	");
    CompressionCodec codec = null;
    String extension = "";
    if (isCompressed) {
      Class<? extends CompressionCodec> codecClass = 
        getOutputCompressorClass(job, GzipCodec.class);
      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
      extension = codec.getDefaultExtension();
    }
    Path file = getDefaultWorkFile(job, extension);
    FileSystem fs = file.getFileSystem(conf);
    if (!isCompressed) {
      FSDataOutputStream fileOut = fs.create(file, false);
      return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
    } else {
      FSDataOutputStream fileOut = fs.create(file, false);
      return new LineRecordWriter<K, V>(new DataOutputStream
                                        (codec.createOutputStream(fileOut)),
                                        keyValueSeparator);
    }
  }

2.二进制输出SequenceFileOutputFormat

https://github.com/apache/hadoop/blob/master/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/SequenceFileOutputFormat.java

SequenceFileOutputFormatTextOutputFormat一样,同样实现了FileOutputFormat

<i>其中一个比较重要的是 getSequenceWriter方法,返回二进制文件的Writer

https://github.com/apache/hadoop/blob/master/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java

源码

  protected SequenceFile.Writer getSequenceWriter(TaskAttemptContext context,
      Class<?> keyClass, Class<?> valueClass) 
      throws IOException {
    Configuration conf = context.getConfiguration();
	    
    CompressionCodec codec = null;
    CompressionType compressionType = CompressionType.NONE;
    if (getCompressOutput(context)) {
      // find the kind of compression to do
      compressionType = getOutputCompressionType(context);
      // find the right codec
      Class<?> codecClass = getOutputCompressorClass(context, 
                                                     DefaultCodec.class);
      codec = (CompressionCodec) 
        ReflectionUtils.newInstance(codecClass, conf);
    }
    // get the path of the temporary output file 
    Path file = getDefaultWorkFile(context, "");
    FileSystem fs = file.getFileSystem(conf);
    return SequenceFile.createWriter(fs, conf, file,
             keyClass,
             valueClass,
             compressionType,
             codec,
             context);
  }

<ii> 另外一个比较重要是 getRecordWriter 抽象方法,源码

  public RecordWriter<K, V> 
         getRecordWriter(TaskAttemptContext context
                         ) throws IOException, InterruptedException {
    final SequenceFile.Writer out = getSequenceWriter(context,
      context.getOutputKeyClass(), context.getOutputValueClass());

    return new RecordWriter<K, V>() {

        public void write(K key, V value)
          throws IOException {

          out.append(key, value);
        }

        public void close(TaskAttemptContext context) throws IOException { 
          out.close();
        }
      };
  }

  

3.用于输出thrift对象的Parquet文件ParquetThriftOutputFormat,参考项目:/spark-parquet-thrift-example

https://github.com/adobe-research/spark-parquet-thrift-example/blob/master/src/main/scala/SparkParquetThriftApp.scala

代码

    ParquetThriftOutputFormat.setThriftClass(job, classOf[SampleThriftObject])
    ParquetOutputFormat.setWriteSupportClass(job, classOf[SampleThriftObject])
    sc.parallelize(sampleData)
      .map(obj => (null, obj))
      .saveAsNewAPIHadoopFile(
        parquetStore,
        classOf[Void],
        classOf[SampleThriftObject],
        classOf[ParquetThriftOutputFormat[SampleThriftObject]],
        job.getConfiguration
      )

4.用于文本格式hive表的HiveIgnoreKeyTextOutputFormat

https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java

其实现了 HiveOutputFormat

https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java

HiveIgnoreKeyTextOutputFormat的key为null,源码

    @Override
    public synchronized void write(K key, V value) throws IOException {
      this.mWriter.write(null, value);
    }

  

  

原文地址:https://www.cnblogs.com/tonglin0325/p/13970805.html