1.自定义key对象,实现WritableComparable接口
1 package com.zhangdan.ykt; 2 3 import java.io.DataInput; 4 import java.io.DataOutput; 5 import java.io.IOException; 6 import org.apache.hadoop.io.WritableComparable; 7 8 /** 9 * 自定义的key类型 10 * 11 * @author vlab 12 * 13 */ 14 public class GdzBeans implements WritableComparable<GdzBeans> { 15 private String key1; 16 private String key2; 17 18 public GdzBeans() {// 自定义数据类型必须有一个无参的构造方法,为了防止反射 19 20 } 21 22 public GdzBeans(String key1, String key2) { 23 this.key1 = key1; 24 this.key2 = key2; 25 } 26 27 public String getKey1() { 28 return key1; 29 } 30 31 public String getKey2() { 32 return key2; 33 } 34 35 public void setKey(String key1, String key2) { 36 this.key1 = key1; 37 this.key2 = key2; 38 } 39 40 public void setKey1(String key1) { 41 this.key1 = key1; 42 } 43 44 public void setKey2(String key2) { 45 this.key2 = key2; 46 } 47 48 @Override 49 public void readFields(DataInput in) throws IOException { 50 this.key1 = in.readUTF(); 51 this.key2 = in.readUTF(); 52 53 } 54 55 @Override 56 public void write(DataOutput out) throws IOException { 57 out.writeUTF(key1); 58 out.writeUTF(key2); 59 } 60 61 @Override 62 public int compareTo(GdzBeans o) { 63 int cmp1 = key1.compareTo(o.key1); 64 if (cmp1 != 0) { 65 return cmp1; 66 } 67 int cmp2 = key2.compareTo(o.key2); 68 return cmp2; 69 } 70 71 @Override 72 public boolean equals(Object obj) { 73 if (obj instanceof GdzBeans) { 74 GdzBeans pw = (GdzBeans) obj; 75 return key1.equals(pw.key1) && key2.equals(pw.key2); 76 } 77 return false; 78 } 79 80 @Override 81 public int hashCode() { 82 return this.key1.hashCode() * 3 + this.key2.hashCode() * 5; 83 } 84 85 @Override 86 public String toString() { 87 return this.key1 + "," + this.key2; 88 } 89 90 }
自定义的key要实现WritableComparable,然后复写对应的几个方法,
Writable---toString()、equals()、hasCode()
Comparable---compareTo(),这个是关键,在这里可以对key排序,之前总不明白这个是什么个逻辑,除了很多错误,比较坑.
2.mapreduce
1 package com.zhangdan.ykt; 2 3 import java.io.IOException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.Path; 7 import org.apache.hadoop.io.IntWritable; 8 import org.apache.hadoop.io.LongWritable; 9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 17 18 public class GetConnectionCount { 19 private static final String Input_Path = "hdfs://192.168.1.104:9000/user/vlab/yktdata/pid07_result04/connection/part-r-*"; 20 private static final String Output_Path = "hdfs://192.168.1.104:9000/user/vlab/yktdata/pid07_result04/connectioncount"; 21 // private static final String Input_Path = 22 // "hdfs://192.168.1.104:9000/user/vlab/yktdata/pid06_result09/connection/part-r-*"; 23 // private static final String Output_Path = 24 // "hdfs://192.168.1.104:9000/user/vlab/yktdata/pid06_result09test/connectioncount"; 25 26 public static class MyMap extends Mapper<LongWritable, Text, GdzBeans, IntWritable> { 27 IntWritable one = new IntWritable(1); 28 29 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 30 String line = value.toString(); 31 // System.out.println(line); 32 String[] tt; 33 tt = line.split(","); 34 context.write(new GdzBeans(tt[0], tt[1]), one); 35 } 36 } 37 38 public static class MyReduce extends Reducer<GdzBeans, IntWritable, GdzBeans, IntWritable> { 39 public void reduce(GdzBeans key, Iterable<IntWritable> values, Context context) 40 throws IOException, InterruptedException { 41 int n = 0; 42 for (IntWritable i : values) { 43 n += i.get(); 44 } 45 if (key.getKey1() != null && key.getKey2() != null && key.getKey1().trim() != "" 46 && key.getKey2().trim() != "") 47 context.write(key, new IntWritable(n)); 48 } 49 } 50 51 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 52 Configuration conf = new Configuration(); 53 conf.set("mapred.textoutputformat.ignoreseparator", "true"); // 去掉key/value对的自动补tab符 54 conf.set("mapred.textoutputformat.separator", ",");// 设置以“,”为key/value的分隔符 55 Job job = new Job(conf); 56 job.setJarByClass(GdzTest.class); 57 job.setJobName("GetConnectionCount"); 58 59 job.setMapperClass(MyMap.class); 60 job.setMapOutputKeyClass(GdzBeans.class); 61 job.setMapOutputValueClass(IntWritable.class); 62 63 job.setReducerClass(MyReduce.class); 64 job.setCombinerClass(MyReduce.class); 65 job.setOutputKeyClass(GdzBeans.class); 66 job.setOutputValueClass(IntWritable.class); 67 68 job.setInputFormatClass(TextInputFormat.class); 69 job.setOutputFormatClass(TextOutputFormat.class); 70 71 FileInputFormat.addInputPath(job, new Path(Input_Path)); 72 FileOutputFormat.setOutputPath(job, new Path(Output_Path)); 73 74 System.exit(job.waitForCompletion(true) ? 0 : 1); 75 76 } 77 }
这个这里有很多改进,很多之前不理解的也慢慢理解了,但是像这个我想实现二次排序还是没有得到比较满意的解决,
看了好多大家写的二次排序的代码,但是总是和我这情况有点不一致,复杂的类我还得慢慢适应。