hadoop-ykt(自定义key)

1.自定义key对象,实现WritableComparable接口

 1 package com.zhangdan.ykt;
 2 
 3 import java.io.DataInput;
 4 import java.io.DataOutput;
 5 import java.io.IOException;
 6 import org.apache.hadoop.io.WritableComparable;
 7 
 8 /**
 9  * 自定义的key类型
10  * 
11  * @author vlab
12  *
13  */
14 public class GdzBeans implements WritableComparable<GdzBeans> {
15     private String key1;
16     private String key2;
17 
18     public GdzBeans() {// 自定义数据类型必须有一个无参的构造方法,为了防止反射
19 
20     }
21 
22     public GdzBeans(String key1, String key2) {
23         this.key1 = key1;
24         this.key2 = key2;
25     }
26 
27     public String getKey1() {
28         return key1;
29     }
30 
31     public String getKey2() {
32         return key2;
33     }
34 
35     public void setKey(String key1, String key2) {
36         this.key1 = key1;
37         this.key2 = key2;
38     }
39 
40     public void setKey1(String key1) {
41         this.key1 = key1;
42     }
43 
44     public void setKey2(String key2) {
45         this.key2 = key2;
46     }
47 
48     @Override
49     public void readFields(DataInput in) throws IOException {
50         this.key1 = in.readUTF();
51         this.key2 = in.readUTF();
52 
53     }
54 
55     @Override
56     public void write(DataOutput out) throws IOException {
57         out.writeUTF(key1);
58         out.writeUTF(key2);
59     }
60 
61     @Override
62     public int compareTo(GdzBeans o) {
63         int cmp1 = key1.compareTo(o.key1);
64         if (cmp1 != 0) {
65             return cmp1;
66         }
67         int cmp2 = key2.compareTo(o.key2);
68         return cmp2;
69     }
70 
71     @Override
72     public boolean equals(Object obj) {
73         if (obj instanceof GdzBeans) {
74             GdzBeans pw = (GdzBeans) obj;
75             return key1.equals(pw.key1) && key2.equals(pw.key2);
76         }
77         return false;
78     }
79 
80     @Override
81     public int hashCode() {
82         return this.key1.hashCode() * 3 + this.key2.hashCode() * 5;
83     }
84 
85     @Override
86     public String toString() {
87         return this.key1 + "," + this.key2;
88     }
89 
90 }
 自定义的key要实现WritableComparable,然后复写对应的几个方法,
Writable---toString()、equals()、hasCode()
Comparable---compareTo(),这个是关键,在这里可以对key排序,之前总不明白这个是什么个逻辑,除了很多错误,比较坑.

2.mapreduce

 1 package com.zhangdan.ykt;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.Path;
 7 import org.apache.hadoop.io.IntWritable;
 8 import org.apache.hadoop.io.LongWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.Mapper;
12 import org.apache.hadoop.mapreduce.Reducer;
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
14 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
17 
18 public class GetConnectionCount {
19     private static final String Input_Path = "hdfs://192.168.1.104:9000/user/vlab/yktdata/pid07_result04/connection/part-r-*";
20     private static final String Output_Path = "hdfs://192.168.1.104:9000/user/vlab/yktdata/pid07_result04/connectioncount";
21     // private static final String Input_Path =
22     // "hdfs://192.168.1.104:9000/user/vlab/yktdata/pid06_result09/connection/part-r-*";
23     // private static final String Output_Path =
24     // "hdfs://192.168.1.104:9000/user/vlab/yktdata/pid06_result09test/connectioncount";
25 
26     public static class MyMap extends Mapper<LongWritable, Text, GdzBeans, IntWritable> {
27         IntWritable one = new IntWritable(1);
28 
29         public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
30             String line = value.toString();
31             // System.out.println(line);
32             String[] tt;
33             tt = line.split(",");
34             context.write(new GdzBeans(tt[0], tt[1]), one);
35         }
36     }
37 
38     public static class MyReduce extends Reducer<GdzBeans, IntWritable, GdzBeans, IntWritable> {
39         public void reduce(GdzBeans key, Iterable<IntWritable> values, Context context)
40                 throws IOException, InterruptedException {
41             int n = 0;
42             for (IntWritable i : values) {
43                 n += i.get();
44             }
45             if (key.getKey1() != null && key.getKey2() != null && key.getKey1().trim() != ""
46                     && key.getKey2().trim() != "")
47                 context.write(key, new IntWritable(n));
48         }
49     }
50 
51     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
52         Configuration conf = new Configuration();
53         conf.set("mapred.textoutputformat.ignoreseparator", "true"); // 去掉key/value对的自动补tab符
54         conf.set("mapred.textoutputformat.separator", ",");// 设置以“,”为key/value的分隔符
55         Job job = new Job(conf);
56         job.setJarByClass(GdzTest.class);
57         job.setJobName("GetConnectionCount");
58 
59         job.setMapperClass(MyMap.class);
60         job.setMapOutputKeyClass(GdzBeans.class);
61         job.setMapOutputValueClass(IntWritable.class);
62 
63         job.setReducerClass(MyReduce.class);
64         job.setCombinerClass(MyReduce.class);
65         job.setOutputKeyClass(GdzBeans.class);
66         job.setOutputValueClass(IntWritable.class);
67 
68         job.setInputFormatClass(TextInputFormat.class);
69         job.setOutputFormatClass(TextOutputFormat.class);
70 
71         FileInputFormat.addInputPath(job, new Path(Input_Path));
72         FileOutputFormat.setOutputPath(job, new Path(Output_Path));
73 
74         System.exit(job.waitForCompletion(true) ? 0 : 1);
75 
76     }
77 }
 这个这里有很多改进,很多之前不理解的也慢慢理解了,但是像这个我想实现二次排序还是没有得到比较满意的解决,
看了好多大家写的二次排序的代码,但是总是和我这情况有点不一致,复杂的类我还得慢慢适应。
原文地址:https://www.cnblogs.com/xunyingFree/p/5147595.html