二次排序解析

1、定义组合key

package com.cr.com.cr.test;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class ComKey implements WritableComparable<ComKey> {

    private int year;
    private int temp;

    public int getYear() {
        return year;
    }

    public void setYear(int year) {
        this.year = year;
    }

    public int getTemp() {
        return temp;
    }

    public void setTemp(int temp) {
        this.temp = temp;
    }

    //对key进行比较实现
    public int compareTo(ComKey o) {
        System.out.println("对key进行比较实现compareTo" );
        int y1 = o.getYear();
        int t1 = o.getTemp();
        System.out.println(y1 + "=" + t1);
        //如果年份相同
        if (year == y1) {
            //气温降序
            int result = -(temp - t1);
            System.out.println("年份相同,比较气温" + result);
            return result;
        } else {
            //年份升序
            int result = year - y1;
            System.out.println("年份升序" + result);
            return result;
        }
    }

    //串行化过程
    public void write(DataOutput out) throws IOException {
        out.writeInt(year);
        out.writeInt(temp);
    }

    //反串行化过程
    public void readFields(DataInput in) throws IOException {
        this.year = in.readInt();
        this.temp = in.readInt();
    }

    @Override
    public String toString() {
        return "year:" + year + ",temp:" + temp;
    }
}

2、mapper

package com.cr.com.cr.test;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * mapper:输出为组合key,输出value为空值
 */
public class MaxTempMapper extends Mapper<LongWritable,Text, ComKey,NullWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        System.out.println("进入mapper");
        String line = value.toString();
        String[] arr = line.split(" ");
        ComKey keyout = new ComKey();
        keyout.setYear(Integer.parseInt(arr[0]));
        keyout.setTemp(Integer.parseInt(arr[1]));
        System.out.println("mapper输出key"+keyout.getYear()+ "==" + keyout.getTemp());
        System.out.println("mapper输出value==nullwritable");
        context.write(keyout,NullWritable.get());
    }
}

3、进行分区

package com.cr.com.cr.test;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class YearPartitioner extends Partitioner<ComKey,NullWritable> {
    @Override
    public int getPartition(ComKey comkey, NullWritable nullWritable, int i) {
        System.out.println("进行分区YearPartitioner" );
        int year = comkey.getYear();
        System.out.println("分区"+year % i);
        return year % i;
    }
}

4、对mapper的输出key进行比较,年份升序排列,如果年份相同,按照气温降序排列

package com.cr.com.cr.test;

import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class ComKeyComparator extends WritableComparator {

    ComKeyComparator() {
        super(ComKey.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        System.out.println("进入组合key比较ComKeyComparator");
        System.out.println(a + "==" + b);
        int result =  a.compareTo(b);
        System.out.println(" a.compareTo(b)比较结果:"+result);
        return result;
    }
}

5、reducer按照key聚合

package com.cr.com.cr.test;

import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 按照年份进行分组对比器实现
 */
public class YearGroupComparator extends WritableComparator {
    YearGroupComparator() {
        super(ComKey.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        System.out.println("年份进行分组对比器YearGroupComparator");
        int y1 = ((ComKey) a).getYear();
        int y2 = ((ComKey) b).getYear();
        int result = y1 - y2;
        return result;
    }
}

6、对年份进行分组

package com.cr.com.cr.test;

import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 按照年份进行分组对比器实现
 */
public class YearGroupComparator extends WritableComparator {
    YearGroupComparator() {
        super(ComKey.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        System.out.println("年份进行分组对比器YearGroupComparator");
        int y1 = ((ComKey) a).getYear();
        int y2 = ((ComKey) b).getYear();
        int result = y1 - y2;
        return result;
    }
}

7、主函数

package com.cr.com.cr.test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MaxTempApp {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //单例作业
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS","file:///");
        Job job = Job.getInstance(conf);
        System.setProperty("hadoop.home.dir","E:\hadoop-2.7.5");

        //设置job的各种属性
        job.setJobName("MaxTempApp");                 //设置job名称
        job.setJarByClass(MaxTempApp.class);              //设置搜索类
        job.setInputFormatClass(TextInputFormat.class);

        //设置输入路径
        FileInputFormat.addInputPath(job,new Path(("D:\data\test1.txt")));
        //设置输出路径
        Path path = new Path("D:\data\out");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(path)) {
            fs.delete(path, true);
        }
        FileOutputFormat.setOutputPath(job,path);

        job.setMapperClass(MaxTempMapper.class);               //设置mapper类
        job.setReducerClass(MaxTempReducer.class);               //设置reduecer类

        job.setMapOutputKeyClass(ComKey.class);            //设置之map输出key
        job.setMapOutputValueClass(NullWritable.class);   //设置map输出value
        job.setOutputKeyClass(IntWritable.class);               //设置mapreduce 输出key
        job.setOutputValueClass(IntWritable.class);      //设置mapreduce输出value
        //设置分区类
        job.setPartitionerClass(YearPartitioner.class);
        //设置分组对比器
        job.setGroupingComparatorClass(YearGroupComparator.class);
//        //设置排序对比器
        job.setSortComparatorClass(ComKeyComparator.class);

        job.setNumReduceTasks(3);
        job.waitForCompletion(true);

    }

}

8、运行过程解析

进入mapper
18/01/14 16:54:06 INFO mapred.MapTask: Map output collector class = org.apache.hadoop.mapred.MapTask$MapOutputBuffer
mapper输出key1995==12
mapper输出value==nullwritable
进行分区YearPartitioner
分区0
进入mapper
mapper输出key1994==13
mapper输出value==nullwritable
进行分区YearPartitioner
分区2
进入mapper
mapper输出key1995==23
mapper输出value==nullwritable
进行分区YearPartitioner
分区0
进入mapper
mapper输出key1998==34
mapper输出value==nullwritable
进行分区YearPartitioner
分区0
进入mapper
mapper输出key1991==23
mapper输出value==nullwritable
进行分区YearPartitioner
分区2
进入mapper
mapper输出key2004==18
mapper输出value==nullwritable
进行分区YearPartitioner
分区0
进入mapper
mapper输出key1995==20
mapper输出value==nullwritable
进行分区YearPartitioner
分区0
进入mapper
mapper输出key2004==11
mapper输出value==nullwritable
进行分区YearPartitioner
分区0
进入组合key比较ComKeyComparator
year:2004,temp:11==year:1995,temp:20
对key进行比较实现compareTo
1995=》20
年份升序排列 year - y1--》9
 a.compareTo(b)比较结果:9
进入组合key比较ComKeyComparator
year:2004,temp:11==year:2004,temp:18
对key进行比较实现compareTo
2004=》18
年份相同,气温降序排列 -(temp - t1)--》7
 a.compareTo(b)比较结果:7
进入组合key比较ComKeyComparator
year:1995,temp:20==year:2004,temp:18
对key进行比较实现compareTo
2004=》18
年份升序排列 year - y1--》-9
 a.compareTo(b)比较结果:-9
进入组合key比较ComKeyComparator
year:2004,temp:11==year:1998,temp:34
对key进行比较实现compareTo
1998=》34
年份升序排列 year - y1--》6
 a.compareTo(b)比较结果:6
进入组合key比较ComKeyComparator
year:2004,temp:18==year:1998,temp:34
对key进行比较实现compareTo
1998=》34
年份升序排列 year - y1--》6
 a.compareTo(b)比较结果:6
进入组合key比较ComKeyComparator
year:1995,temp:20==year:1998,temp:34
对key进行比较实现compareTo
1998=》34
年份升序排列 year - y1--》-3
 a.compareTo(b)比较结果:-3
进入组合key比较ComKeyComparator
year:2004,temp:11==year:1995,temp:23
对key进行比较实现compareTo
1995=》23
年份升序排列 year - y1--》9
 a.compareTo(b)比较结果:9
进入组合key比较ComKeyComparator
year:2004,temp:18==year:1995,temp:23
对key进行比较实现compareTo
1995=》23
年份升序排列 year - y1--》9
 a.compareTo(b)比较结果:9
进入组合key比较ComKeyComparator
year:1998,temp:34==year:1995,temp:23
对key进行比较实现compareTo
1995=》23
年份升序排列 year - y1--》3
 a.compareTo(b)比较结果:3
进入组合key比较ComKeyComparator
year:1995,temp:20==year:1995,temp:23
对key进行比较实现compareTo
1995=》23
年份相同,气温降序排列 -(temp - t1)--》3
 a.compareTo(b)比较结果:3
进入组合key比较ComKeyComparator
year:1991,temp:23==year:1994,temp:13
对key进行比较实现compareTo
1994=》13
年份升序排列 year - y1--》-3
 a.compareTo(b)比较结果:-3
进入组合key比较ComKeyComparator
year:2004,temp:11==year:1995,temp:12
对key进行比较实现compareTo
1995=》12
年份升序排列 year - y1--》9
 a.compareTo(b)比较结果:9
进入组合key比较ComKeyComparator
year:2004,temp:18==year:1995,temp:12
对key进行比较实现compareTo
1995=》12
年份升序排列 year - y1--》9
 a.compareTo(b)比较结果:9
进入组合key比较ComKeyComparator
year:1998,temp:34==year:1995,temp:12
对key进行比较实现compareTo
1995=》12
年份升序排列 year - y1--》3
 a.compareTo(b)比较结果:3
进入组合key比较ComKeyComparator
year:1995,temp:20==year:1995,temp:12
对key进行比较实现compareTo
1995=》12
年份相同,气温降序排列 -(temp - t1)--》-8
 a.compareTo(b)比较结果:-8
年份进行分组对比器YearGroupComparator
进入reducer
reducer输出199523
18/01/14 16:54:06 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 52 bytes
年份进行分组对比器YearGroupComparator
reducer输出199520
18/01/14 16:54:06 INFO reduce.MergeManagerImpl: Merged 1 segments, 62 bytes to disk to satisfy reduce memory limit
年份进行分组对比器YearGroupComparator
reducer输出199512
18/01/14 16:54:06 INFO reduce.MergeManagerImpl: Merging 1 files, 66 bytes from disk
年份进行分组对比器YearGroupComparator
进入reducer
18/01/14 16:54:06 INFO reduce.MergeManagerImpl: Merging 0 segments, 0 bytes from memory into reduce
reducer输出199834
年份进行分组对比器YearGroupComparator
18/01/14 16:54:06 INFO mapred.Merger: Merging 1 sorted segments
进入reducer
reducer输出200418
reducer输出200411
18/01/14 16:54:06 INFO mapred.Task: Task:attempt_local1143039137_0001_r_000000_0 is done. And is in the process of 
年份进行分组对比器YearGroupComparator
18/01/14 16:54:06 INFO reduce.MergeManagerImpl: Merged 1 segments, 22 bytes to disk to satisfy reduce memory limit
进入reducer
reducer输出199123
进入reducer
18/01/14 16:54:06 INFO reduce.MergeManagerImpl: Merging 1 files, 26 bytes from disk
reducer输出199413

欢迎关注我的公众号:小秋的博客 CSDN博客:https://blog.csdn.net/xiaoqiu_cr github:https://github.com/crr121 联系邮箱:rongchen633@gmail.com 有什么问题可以给我留言噢~
原文地址:https://www.cnblogs.com/flyingcr/p/10326950.html