hadoop之定制自己的sort过程

Key排序

1. 继承WritableComparator

  在hadoop之Shuffle和Sort中,可以看到mapper的输出文件spill文件需要在内存中排序,并且在输入reducer之前,不同的mapper的数据也会排序,排序是根据数据的key进行的.

如果key是用户自定义的类型,并没有默认的比较函数时,就需要自己定义key的比较函数,也就是继承WritableComparator.事例代码如下:

public static class KeyComparator extends WritableComparator {  
  protected KeyComparator() {  
    super(IntPair.class, true);  
  }  
  @Override  
  public int compare(WritableComparable w1, WritableComparable w2) {  
    IntPair ip1 = (IntPair) w1;  
    IntPair ip2 = (IntPair) w2;  
    // 这里要注意的是,一定要在聚合参数相同的情况下,再比较另一个参数  
    // 这里是先比较年份,再比较温度,按温度降序排序  
    int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst());  
    if (cmp != 0) {  
      return cmp;  
    }  
    return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); //reverse  
  }  
}

例子中对IntPair定义了新的compare函数,并在main函数中通过下面的方式实现替换:

job.setSortComparatorClass(KeyComparator.class);

 2.实现 WritableComparable接口

看下面的例子代码:

    static class  NewK2 implements WritableComparable<NewK2>{
        Long first;
        Long second;
        
        public NewK2(){}
        public NewK2(long first, long second){
            this.first = first;
            this.second = second;
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            this.first = in.readLong();
            this.second = in.readLong();
        }
@Override
public void write(DataOutput out) throws IOException { out.writeLong(first); out.writeLong(second); } /** * 当k2进行排序时,会调用该方法. * 当第一列不同时,升序;当第一列相同时,第二列升序 */ @Override public int compareTo(NewK2 o) { final long minus = this.first - o.first; if(minus !=0){ return (int)minus; } return (int)(this.second - o.second); } @Override public int hashCode() { return this.first.hashCode()+this.second.hashCode(); } @Override public boolean equals(Object obj) { if(!(obj instanceof NewK2)){ return false; } NewK2 oK2 = (NewK2)obj; return (this.first==oK2.first)&&(this.second==oK2.second); } }

如果是按照上述的例子实现的,不需要在main函数中设置其他的代码.

Group排序

   一般来说,如果用户自定义了key的排序过程,那么在reducer之前的对数据进行分组的过程就要重新编写,而且一般来说,partitioner也需要重新定义,请参考hadoop之定制自己的Partitioner .

 shuffle阶段,虽然使用的是hash的方法,我们并不能保证映射到同一个reducer的key的hash值都是一样的,对于不同的hash值要进行分群,然后再执行reduce.下面是自定义groupcomparator的例子:

  public static class GroupComparator extends WritableComparator {  
    protected GroupComparator() {  
      super(IntPair.class, true);  
    }  
    @Override  
    public int compare(WritableComparable w1, WritableComparable w2) {  
      IntPair ip1 = (IntPair) w1;  
      IntPair ip2 = (IntPair) w2;  
    // 这里是按key的第一个参数来聚合,就是年份  
      return IntPair.compare(ip1.getFirst(), ip2.getFirst());  
    }  
  } 

例子中实现了对于IntPair类型的分群比较函数的重新定义.在main函数中通过下面的方式进行调用:

job.setGroupingComparatorClass(GroupComparator.class);

二次排序

   下面是对地区温度进行的统计,要求输出各个年份的最大温度,例子中定制了自己的partitioner:FirstPartitioner来对组合后的类型进行分组,实际上还是按照年份进行的分组;定制了自己的keycomparator:KeyComparator,先比较年份,然后再比较温度;定制了自己的分群比较类:GroupComparator,也是按照年份进行分群,然后扔给reducer进行处理.

  值得一提的是,为什么不用传统的mapreduce,按照年份进行进行map,然后在reduce中,遍历每年不同的温度,找到最大值呢?原因之一就是效率的问题,sort操作本身就要在MP框架中执行,而且已经做了很多优化,通过设置比较的不同手段,很容易实现比较,然而在reducer处理中进行遍历,显然比上面的sort过程要慢.下面是例子的完整代码,摘自Hadoop- The Definitive Guide, 4th Edition.

public class MaxTemperatureUsingSecondarySort extends Configured implements Tool {


  // Map任务
  static class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntPair, NullWritable> {
  private NcdcRecordParser parser = new NcdcRecordParser();
  public void map(LongWritable key, Text value,
      OutputCollector<IntPair, NullWritable> output, Reporter reporter)
      throws IOException {
    parser.parse(value);   // 解析输入的文本
    if (parser.isValidTemperature()) {
    // 这里把年份与温度组合成一个key,value为空
      output.collect(new IntPair(parser.getYearInt(),+ parser.getAirTemperature()), NullWritable.get());
    }
  }
}


// Reduce任务
static class MaxTemperatureReducer extends MapReduceBase
  implements Reducer<IntPair, NullWritable, IntPair, NullWritable> {
  public void reduce(IntPair key, Iterator<NullWritable> values,
      OutputCollector<IntPair, NullWritable> output, Reporter reporter)
      throws IOException {
    // 输出聚合的key值,这里的key是先按年份进行聚合,所我们会看到相同所有年份相同的key会聚合在一起,而这些聚合后的key按温度进行降序按列
    // 所以聚合中第一个key为温度最高的,所以这里输出的key为这一年中温度最高的值
    output.collect(key, NullWritable.get());
  }
}


// 切分器,这里是按年份* 127 % reduceNum来进行切分的
public static class FirstPartitioner
  implements Partitioner<IntPair, NullWritable> {
  @Override
  public void configure(JobConf job) {}
  @Override
  public int getPartition(IntPair key, NullWritable value, int numPartitions) {
    return Math.abs(key.getFirst() * 127) % numPartitions;
  }
}


// 聚合key的一个比较器
public static class KeyComparator extends WritableComparator {
  protected KeyComparator() {
    super(IntPair.class, true);
  }
  @Override
  public int compare(WritableComparable w1, WritableComparable w2) {
    IntPair ip1 = (IntPair) w1;
    IntPair ip2 = (IntPair) w2;
    // 这里要注意的是,一定要在聚合参数相同的情况下,再比较另一个参数
    // 这里是先比较年份,再比较温度,按温度降序排序
    int cmp = IntPair.compare(ip1.getFirst(), ip2.getFirst());
    if (cmp != 0) {
      return cmp;
    }
    return -IntPair.compare(ip1.getSecond(), ip2.getSecond()); //reverse
  }
}
  // 设置聚合比较器
  public static class GroupComparator extends WritableComparator {
    protected GroupComparator() {
      super(IntPair.class, true);
    }
    @Override
    public int compare(WritableComparable w1, WritableComparable w2) {
      IntPair ip1 = (IntPair) w1;
      IntPair ip2 = (IntPair) w2;
    // 这里是按key的第一个参数来聚合,就是年份
      return IntPair.compare(ip1.getFirst(), ip2.getFirst());
    }
  }
  @Override
  public int run(String[] args) throws IOException {
    Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
    if (job == null) {
      return -1;
    }
    job.setMapperClass(MaxTemperatureMapper.class);

    job.setPartitionerClass(FirstPartitioner.class);
    job.setSortComparatorClass(KeyComparator.class);
    job.setGroupingComparatorClass(GroupComparator.class);

    job.setReducerClass(MaxTemperatureReducer.class);
    job.setOutputKeyClass(IntPair.class);    // 设置key的一个组合类型,如里这个类型实现了WritableComparable<T>的话,那就不要设置setOutputKeyComparatorClass了.
    job.setOutputValueClass(NullWritable.class);  // 输出的value为NULL,因为这里的实际value已经组合到了key中
    
return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args); System.exit(exitCode); } }
原文地址:https://www.cnblogs.com/wzyj/p/4693131.html