MapReduce 的类型与格式【编写最简单的mapreduce】(1)

hadoop mapreduce 中的map 和reduce 函数遵循以下的形式
map: (K1, V1) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)

可以从源代码中看出为什么是这样的类型:
map: (K1, V1) → list(K2, V2)

reduce: (K2, list(V2)) → list(K3, V3)

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  public class Context extends MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    // ...
  }
  protected void map(KEYIN key, VALUEIN value, 
      Context context) throws IOException, InterruptedException {
    // ...
  }
}

public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  public class Context extends ReducerContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    // ...
  }
  protected void reduce(KEYIN key, Iterable<VALUEIN> values,
      Context context) throws IOException, InterruptedException {
    // ...
  }
}
context用来接收输出键值对,写出的方法是:
public void write(KEYOUT key, VALUEOUT value)
    throws IOException, InterruptedException

如果有combiner :这里的 combiner就是默认的reducer

map: (K1, V1) → list(K2, V2)
combiner: (K2, list(V2)) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)
如果partitioner被使用:

partition: (K2, V2) → integer(很多时候只取决于key 值被忽略来进行分区)

以及combiner 甚至partitioner让相同的key聚合到一起

public abstract class Partitioner<KEY, VALUE> {
  public abstract int getPartition(KEY key, VALUE value, int numPartitions);
}
一个实现类:

public class HashPartitioner<K, V> extends Partitioner<K, V> {
    public int getPartition(K key, V value, int numReduceTasks) {
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}

输入数据的类型是通过输入格式进行设定的。例如,对于TextlnputFormat ,它的键类型就是LongWritable ,而值类型就是Text 。其他的类型可以通过调用JobConf 中的方法来进行显式地设置。如果没有显式地设置, 中阔的类型将被默认设置为(最终的)输出类型,也就是LongWritable 和Text.综上所述,如果K2 与K3是相同类型,就不需要手工调用setMapOutputKeyClass,因为它将被自动设置每一个步骤的输入和输出类型.一定很奇怪,为什么不能从最初输入的类型推导出每个步骤的输入/输出类型呢?原来Java 的泛型机制具有很多限制,类型擦除导致了运行时类型并不一直可见.所以需要Hadoop 时不时地"提醒"一下。这也导致了可能在某些MapReduce 任务中出现不兼容的输入和输出类型,因为这些配置在编译时无法检查出来。与MapReduce 任务兼容的类型已经在下面列出。所有的类型不兼容将在任务真正执行的时候被发现,所以一个比较聪明的做法是在执行任务前先用少量的数据跑一次测试任务,以发现所有的类型不兼容问题。

Table 8-1. Configuration of MapReduce types in the new API
PropertyJob setter methodInput typesIntermediate typesOutput types
K1V1K2V2K3V3
Properties for configuring types:
mapreduce.job.inputformat.classsetInputFormatClass()




mapreduce.map.output.key.classsetMapOutputKeyClass()





mapreduce.map.output.value.classsetMapOutputValueClass()





mapreduce.job.output.key.classsetOutputKeyClass()





mapreduce.job.output.value.classsetOutputValueClass()





Properties that must be consistent with the types:
mapreduce.job.map.classsetMapperClass()


mapreduce.job.combine.classsetCombinerClass()




mapreduce.job.partitioner.classsetPartitionerClass()




mapreduce.job.output.key.comparator.classsetSortComparatorClass()





mapreduce.job.output.group.comparator.classsetGroupingComparatorClass()





mapreduce.job.reduce.classsetReducerClass()


mapreduce.job.outputformat.classsetOutputFormatClass()




Table 8-2. Configuration of MapReduce types in the old API
PropertyJobConf setter methodInput typesIntermediate typesOutput types
K1V1K2V2K3V3
Properties for configuring types:
mapred.input.format.classsetInputFormat()




mapred.mapoutput.key.classsetMapOutputKeyClass()





mapred.mapoutput.value.classsetMapOutputValueClass()





mapred.output.key.classsetOutputKeyClass()





mapred.output.value.classsetOutputValueClass()





Properties that must be consistent with the types:
mapred.mapper.classsetMapperClass()


mapred.map.runner.classsetMapRunnerClass()


mapred.combiner.classsetCombinerClass()




mapred.partitioner.classsetPartitionerClass()




mapred.output.key.comparator.classsetOutputKeyComparatorClass()





mapred.output.value.groupfn.classsetOutputValueGroupingComparator()





mapred.reducer.classsetReducerClass()


mapred.output.format.classsetOutputFormat()





一个最简单的hadoop mapreduce:

public class MinimalMapReduce extends Configured implements Tool {
  
  @Override
  public int run(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.printf("Usage: %s [generic options] <input> <output>
",
          getClass().getSimpleName());
      ToolRunner.printGenericCommandUsage(System.err);
      return -1;
    }
    
    Job job = new Job(getConf());
    job.setJarByClass(getClass());
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 0 : 1;
  }
  
  public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new MinimalMapReduce(), args);
    System.exit(exitCode);
  }
}
执行方法:

 hadoop MinimalMapReduce "input/ncdc/all/190{1,2}.gz" output
输出结果:
0→0029029070999991901010106004+64333+023450FM-12+000599999V0202701N01591...
0→0035029070999991902010106004+64333+023450FM-12+000599999V0201401N01181...
135→0029029070999991901010113004+64333+023450FM-12+000599999V0202901N00821...
141→0035029070999991902010113004+64333+023450FM-12+000599999V0201401N01181...
270→0029029070999991901010120004+64333+023450FM-12+000599999V0209991C00001...
282→0035029070999991902010120004+64333+023450FM-12+000599999V0201401N01391...

改默认最简mapreduce等同于一下的程序:
public class MinimalMapReduceWithDefaults extends Configured implements Tool {
  
  @Override
  public int run(String[] args) throws Exception {
    Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
    if (job == null) {
      return -1;
    }
    
    job.setInputFormatClass(TextInputFormat.class);
    
    job.setMapperClass(Mapper.class);
    
    job.setMapOutputKeyClass(LongWritable.class);
    job.setMapOutputValueClass(Text.class);
    
    job.setPartitionerClass(HashPartitioner.class);
    
    job.setNumReduceTasks(1);
    job.setReducerClass(Reducer.class);

    job.setOutputKeyClass(LongWritable.class);
    job.setOutputValueClass(Text.class);

    job.setOutputFormatClass(TextOutputFormat.class);
    
    return job.waitForCompletion(true) ? 0 : 1;
  }
  
  public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new MinimalMapReduceWithDefaults(), args);
    System.exit(exitCode);
  }
}

那么,默认使用的mapreduce是:
Mapper.class 
HashPartitioner.class
Reducer.class
默认map代码,就是读取key value输出
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

  protected void map(KEYIN key, VALUEIN value, 
      Context context) throws IOException, InterruptedException {
    context.write((KEYOUT) key, (VALUEOUT) value);
  }
}
默认Partitioner:hash分割,默认只有一个reducer因此我们这里只有一个分区
 class HashPartitioner<K, V> extends Partitioner<K, V> {

  public int getPartition(K key, V value,
      int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}
默认Reduce 输出传进来的数据:
public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {

  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
      Context context) throws IOException, InterruptedException {
    for (VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
  }
}
因为什么都没做,只是在map中读取了偏移量和value,分区使用的hash,一个reduce输出的便是我们上面看到的样子。

相对于java api,hadoop流也有最简的mapreduce:
% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar 
  -input input/ncdc/sample.txt 
  -output output 
  -mapper /bin/cat
等于下面的命令:
% hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar 
  -input input/ncdc/sample.txt 
  -output output 
  -inputformat org.apache.hadoop.mapred.TextInputFormat 
  -mapper /bin/cat 
  -partitioner org.apache.hadoop.mapred.lib.HashPartitioner 
  -numReduceTasks 1 
  -reducer org.apache.hadoop.mapred.lib.IdentityReducer 
  -outputformat org.apache.hadoop.mapred.TextOutputFormat
  -io text
流操作的键与值
一个文本文件流怎么知道哪里是一个记录的结束呢?
一个流操作的程序可以修改输入的分隔符(用于将键与值从输入文件中分开并且传入mapper) 。默认情况下是Tab ,但是如果输入的键或值中本身有Tab 分隔符的话,最好将分隔符修改成其他符号。类似地,当map 和reduc e 将结果输出的时候, 也需要一个可以配置的分隔符选项。更进一步, 键可以不仅仅是每一条记录的第1 个字段,它可以是一条记录的前n 个字段(可以在stream.num.map.output.key.fields和stream.num.reduce.output.key.fields 中进行设置) ,而剩下的字段就是值。比如有一条记录是a ,b , C , 且用逗号分隔,如果n 设为2 ,那么键就是a 、b ,而值就是c 。
流分隔符:
Table 8-3. Streaming separator properties
Property nameTypeDefault valueDescription
stream.map.input.field.separatorString The separator to use when passing the input key and value strings to the stream map process as a stream of bytes
stream.map.output.field.separatorString The separator to use when splitting the output from the stream map process into key and value strings for the map output
stream.num.map.output.key.fieldsint1The number of fields separated bystream.map.output.field.separator to treat as the map output key
stream.reduce.input.field.separatorString The separator to use when passing the input key and value strings to the stream reduce process as a stream of bytes
stream.reduce.output.field.separatorString The separator to use when splitting the output from the stream reduce process into key and value strings for the final reduce output
stream.num.reduce.output.key.fieldsint1The number of fields separated bystream.reduce.output.field.separatorto treat as the reduce output key

mapreduce中分隔符使用的地方,在标准输入输出和map-reducer之间。

原文地址:https://www.cnblogs.com/mrcharles/p/11879852.html