文件内容:
1949-10-01 14:21:02 34℃ 1949-10-02 14:01:02 36℃ 1950-01-01 14:21:02 32℃ 1950-10-01 11:01:02 37℃ 1951-10-01 14:21:02 23℃ 1950-10-02 17:11:02 41℃ 1950-10-01 18:20:02 27℃ 1951-07-01 14:01:02 45℃ 1951-07-02 13:21:02 46℃
需求:
按照年份升序排序,同时每一年中温度降序排序
代码实现:
一、KeyPair 自定义类
package com.whu.test; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class KeyPair implements WritableComparable<KeyPair> { private int year; private int hot; public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getHot() { return hot; } public void setHot(int hot) { this.hot = hot; } @Override public void readFields(DataInput in) throws IOException { this.year = in.readInt(); this.hot = in.readInt(); } @Override public void write(DataOutput out) throws IOException { out.writeInt(year); out.writeInt(hot); } @Override public int compareTo(KeyPair o) { int y = Integer.compare(year, o.getYear()); if(y == 0){ return Integer.compare(hot, o.getHot()); } return y; } @Override public String toString() { return year+"年"+hot+"℃"; } }
二、FirstPartitioner 自定义分区类
package com.whu.test; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class FirstPartitioner extends Partitioner<KeyPair, Text> { @Override public int getPartition(KeyPair key, Text value, int nums) { return (key.getYear()*127 & Integer.MAX_VALUE) % nums; } }
三、SortKey 自定义排序类
package com.whu.test; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class SortKey extends WritableComparator { public SortKey() { super(KeyPair.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { KeyPair k1 = (KeyPair)a; KeyPair k2 = (KeyPair)b; int pre = Integer.compare(k1.getYear(), k2.getYear()); if(pre != 0){ return pre; } return -Integer.compare(k1.getHot(), k2.getHot()); } }
四、GroupComparator 自定义分组类
package com.whu.test; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class GroupComparator extends WritableComparator { protected GroupComparator() { super(KeyPair.class,true); } @Override public int compare(WritableComparable a, WritableComparable b) { KeyPair k1 = (KeyPair)a; KeyPair k2 = (KeyPair)b; return Integer.compare(k1.getYear(), k2.getYear()); } }
五、MyMapper 自定义Mapper类
package com.whu.test; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class MyMapper extends Mapper<LongWritable, Text, KeyPair, Text> { private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); private KeyPair k = new KeyPair(); @Override protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line = new String(value.getBytes(), 0, value.getLength(), "GBK"); String[] tmp = line.split(" "); System.out.println(tmp[0]+" "+tmp[1]); if(tmp.length>=2){ try { Date date = sdf.parse(tmp[0]); Calendar cal = Calendar.getInstance(); cal.setTime(date); int year = cal.get(1); k.setYear(year); } catch (ParseException e) { // TODO Auto-generated catch block e.printStackTrace(); } int hot = Integer.parseInt(tmp[1].substring(0, tmp[1].indexOf("℃"))); k.setHot(hot); context.write(k, value); } } }
六、MyReducer 自定义Reducer类
package com.whu.test; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class MyReducer extends Reducer<KeyPair, Text, KeyPair,Text> { @Override protected void reduce(KeyPair key, Iterable<Text> value,Context context) throws IOException, InterruptedException { for(Text t : value){ context.write(key, t); } } }
七、YearHot 自定义驱动类
package com.whu.test; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; public class YearHot { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "year hot sort"); job.setJarByClass(YearHot.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setNumReduceTasks(3); job.setPartitionerClass(FirstPartitioner.class); job.setSortComparatorClass(SortKey.class); job.setGroupingComparatorClass(GroupComparator.class); job.setOutputKeyClass(KeyPair.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(GBKOutputFormat.class); FileInputFormat.addInputPath(job, new Path("hdfs://192.168.228.134:/usr/qqx/yhinput/file.txt")); FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.228.134:/usr/qqx/yhoutput")); System.exit(job.waitForCompletion(true)?0:1); } }