MapReduce在Shuffle阶段按Mapper输出的Value进行排序

ZKe

-----------------

  在MapReduce框架中,Mapper的输出在Shuffle阶段,根据Key值分组之后,还将会根据Key值进行排序,因此Reducer的输出我们看到的结果是按Key有序的。

  同样我们可以让它按Value有序。通过job.setSortComparatorClass(IntWritableComparator.class);即可(这里的排序规则和类型通过自己定义)

  实体类不仅需要实现Comparable接口,同样还要重写readFiles方法和write方法。然后定义一个该实体的比较器。

  这里定义一个实体类,由String的id和int的count作为属性,我们根据count进行排序。

static class Record implements Comparable<Record>{
        
        private String personalId;
        private int count;
        
        public Record(String id, int count){
            this.personalId = id;
            this.count = count;
        }
        public Record(String line){
            this.personalId = line.split("	")[0];
            this.count = Integer.parseInt(line.split("	")[1]);
        }
        
        /*
         * 反序列化方法
         * @author 180512235 ZhaoKe
         */
        public void readFields(DataInput arg0) throws IOException {
            this.personalId = arg0.readUTF();
            this.count = arg0.readInt();
        }

        // 序列化方法
        public void write(DataOutput arg0) throws IOException {
            arg0.writeUTF(this.personalId);
            arg0.writeInt(this.count);
        }
        
        public int compareTo(Record o) {
            // TODO Auto-generated method stub
            return this.count<o.count?1:-1;
        }
        public String getPersonalId(){
            return this.personalId;
        }
        
        public int getCount(){
            return this.count;
        }
        
    }

它的比较器如下

    static class IntWritableComparator extends WritableComparator {
     
        /*
         * 重写构造方法,定义比较类 IntWritable
         */
        public IntWritableComparator() {
            super(IntWritable.class, true);
        }
        /*
         * 重写compare方法,自定义比较规则
         */
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            //向下转型
            IntWritable ia = (IntWritable) a;
            IntWritable ib = (IntWritable) b;
            return ib.compareTo(ia);
        }
    }
    

Mapper和Reducer如下,没有任何操作,因为Shuffle阶段自己会调用比较器进行排序

    static class SortMapper extends Mapper<LongWritable, Text, IntWritable, Text>{
        private Record r;
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException{
            r = new Record(value.toString());
            context.write(new IntWritable(r.getCount()), new Text(r.getPersonalId()));
        }
    }
    static class SortReducer extends Reducer<IntWritable, Text, Text, IntWritable>{
        
        protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException{

            for(Text value:values){
                context.write(value, key);
            }
        }
    }

主类如下,大家作为模板即可

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // TODO Auto-generated method stub
        String inputFile = "hdfs://master:9000/user/root/finalClassDesign/originData/submitTop10output/";
        
        String outputFile = "hdfs://master:9000/user/root/finalClassDesign/originData/sortedSubmitTop10/";
        BasicConfigurator.configure();
        Configuration conf = new Configuration();
//        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
//        if(otherArgs.length != 2){
//            System.err.println("Usage:wordcount<in><out>");
//            System.exit(2);
//        }
        
        Job job = Job.getInstance(conf, "WordCount");
        
        job.setJarByClass(SortByMapReduce.class);
        
        job.setMapperClass(SortMapper.class);
        job.setReducerClass(SortReducer.class);
        
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        job.setSortComparatorClass(IntWritableComparator.class);  // 此处必须注意设置比较器=======================================
        
//        Path path = new Path(otherArgs[1]);
        Path path = new Path(outputFile);
        FileSystem fileSystem = path.getFileSystem(conf);
        if(fileSystem.exists(path)){
            fileSystem.delete(path, true);
        }
        
//        FileInputFormat.setInputPaths(job, new Path(args[0]));
//        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        FileInputFormat.setInputPaths(job, new Path(inputFile));
        FileOutputFormat.setOutputPath(job, new Path(outputFile));
        
        boolean res = job.waitForCompletion(true);
        if(res)
            System.out.println("===========waitForCompletion:"+res+"==========");
        System.exit(res?0:1);
    }
    
原文地址:https://www.cnblogs.com/zhaoke271828/p/13232856.html