MapReduce实现二次排序(温度年份排序)

文件内容:

1949-10-01 14:21:02    341949-10-02 14:01:02    361950-01-01 14:21:02    321950-10-01 11:01:02    371951-10-01 14:21:02    231950-10-02 17:11:02    411950-10-01 18:20:02    271951-07-01 14:01:02    451951-07-02 13:21:02    46℃

需求:

  按照年份升序排序,同时每一年中温度降序排序

代码实现:

一、KeyPair 自定义类

package com.whu.test;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class KeyPair implements WritableComparable<KeyPair> {
    
    private int year;
    private int hot;
    

    public int getYear() {
        return year;
    }

    public void setYear(int year) {
        this.year = year;
    }

    public int getHot() {
        return hot;
    }

    public void setHot(int hot) {
        this.hot = hot;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.year = in.readInt();
        this.hot = in.readInt();
        
    }

    @Override
    public void write(DataOutput out) throws IOException {
    
        out.writeInt(year);
        out.writeInt(hot);
    }

    @Override
    public int compareTo(KeyPair o) {

        int y = Integer.compare(year, o.getYear());
        if(y == 0){
            return Integer.compare(hot, o.getHot());
        }
        return y;
    }

    @Override
    public String toString() {
        return year+"年"+hot+"℃";
    }
}

二、FirstPartitioner 自定义分区类

package com.whu.test;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class FirstPartitioner extends Partitioner<KeyPair, Text> {

    @Override
    public int getPartition(KeyPair key, Text value, int nums) {

        return (key.getYear()*127 & Integer.MAX_VALUE) % nums;
    }

}

三、SortKey 自定义排序类

package com.whu.test;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class SortKey extends WritableComparator {

    public SortKey() {
        super(KeyPair.class,true);
    }
    
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        KeyPair k1 = (KeyPair)a;
        KeyPair k2 = (KeyPair)b;
        int pre = Integer.compare(k1.getYear(), k2.getYear());
        if(pre != 0){
            return pre;
        }
        return -Integer.compare(k1.getHot(), k2.getHot());
    }
}

四、GroupComparator 自定义分组类

package com.whu.test;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;



public class GroupComparator extends WritableComparator  {
    
    protected GroupComparator() {
        
        super(KeyPair.class,true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        KeyPair k1 = (KeyPair)a;
        KeyPair k2 = (KeyPair)b;
        return Integer.compare(k1.getYear(), k2.getYear());
    }

}

五、MyMapper 自定义Mapper类

package com.whu.test;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<LongWritable, Text, KeyPair, Text> {
    
    private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private KeyPair k = new KeyPair();
    
    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        String line = new String(value.getBytes(), 0, value.getLength(), "GBK");
        String[] tmp = line.split("	");
        System.out.println(tmp[0]+"	"+tmp[1]);
        if(tmp.length>=2){
            try {
                Date date = sdf.parse(tmp[0]);
                Calendar cal = Calendar.getInstance();
                cal.setTime(date);
                int year = cal.get(1);
                k.setYear(year);
            } catch (ParseException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            int hot = Integer.parseInt(tmp[1].substring(0, tmp[1].indexOf("℃")));
            k.setHot(hot);
            context.write(k, value);
        }
    }

}

六、MyReducer 自定义Reducer类

package com.whu.test;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer<KeyPair, Text, KeyPair,Text> {

    @Override
    protected void reduce(KeyPair key, Iterable<Text> value,Context context)
            throws IOException, InterruptedException {
        
        for(Text t : value){
            context.write(key, t);
        }
    }
}

七、YearHot 自定义驱动类

package com.whu.test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class YearHot {

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "year hot sort");
        
        job.setJarByClass(YearHot.class);
        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);
        
        job.setNumReduceTasks(3);
        job.setPartitionerClass(FirstPartitioner.class);
        job.setSortComparatorClass(SortKey.class);
        job.setGroupingComparatorClass(GroupComparator.class);
        
        job.setOutputKeyClass(KeyPair.class);
        job.setOutputValueClass(Text.class);
        job.setOutputFormatClass(GBKOutputFormat.class);
        
        FileInputFormat.addInputPath(job, new Path("hdfs://192.168.228.134:/usr/qqx/yhinput/file.txt"));
        FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.228.134:/usr/qqx/yhoutput"));
        System.exit(job.waitForCompletion(true)?0:1);
    }
}
原文地址:https://www.cnblogs.com/qiaoqianxiong/p/4991826.html