MapReduce分组

  •  分组:相同key的value进行分组

 例子:如下输入输出,右边的第一列没有重复值,第二列取得是当第一列相同时第二例取最大值

          

分析:首先确定<k3,v3>,k3的选择两种方式,

方法1.前两列都作为k3

方法2.两列分别是k3和v3,此种情况的k2和v2分别是那些,第一列为k2,第二列为v2,但是最后如何无法转化为k3,v3呢,思路是从v2s中取值最大的,此种情况不能取值。

第一部分:方法二达到任务目的

(1)自定义Mapper

 1 private static class MyMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{
 2     IntWritable k2= new IntWritable();
 3     IntWritable v2= new IntWritable();
 4     @Override
 5     protected void map(LongWritable key, Text value,
 6             Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context)
 7             throws IOException, InterruptedException {
 8            String[] splited = value.toString().split("	");
 9            k2.set(Integer.parseInt(splited[0]));
10            v2.set(Integer.parseInt(splited[1]));
11            context.write(k2, v2);
12     }
13 }

(2)自定义Reduce

//按照k2進行排序,分組(分为3各组,reduce函数被调用3次,分几组调用几次)
//分组为3-{3,2,1}, 2-{2,1},1-{1}

 1 private static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
 2     IntWritable v3 = new IntWritable();
 3     @Override
 4     protected void reduce(IntWritable k2, Iterable<IntWritable> v2s,
 5             Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context)
 6             throws IOException, InterruptedException {
 7         int max=Integer.MIN_VALUE;
 8         for (IntWritable v2 : v2s) {
 9             if (v2.get()>max) {
10                 max=v2.get();
11             }
12         }
13         //每个组求得一个最大值可得到结果的序列
14         v3.set(max);
15         context.write(k2, v3);
16     }
17 }

(3)组合MapReduce

 1 public static void main(String[] args) throws Exception {
 2     Job job = Job.getInstance(new Configuration(), GroupTest.class.getSimpleName());
 3     job.setJarByClass(GroupTest.class);
 4     //1.自定义输入路径
 5     FileInputFormat.setInputPaths(job, new Path(args[0]));
 6     //2.自定义mapper
 7     //job.setInputFormatClass(TextInputFormat.class);
 8     job.setMapperClass(MyMapper.class);
 9     //job.setMapOutputKeyClass(Text.class);
10     //job.setMapOutputValueClass(TrafficWritable.class);
11     
12     //3.自定义reduce
13     job.setReducerClass(MyReducer.class);
14     job.setOutputKeyClass(IntWritable.class);
15     job.setOutputValueClass(IntWritable.class);
16     //4.自定义输出路径
17     FileOutputFormat.setOutputPath(job, new Path(args[1]));
18     //job.setOutputFormatClass(TextOutputFormat.class);//对输出的数据格式化并写入磁盘
19     
20     job.waitForCompletion(true);
21 }

由此,完整的代码如下:

 1 package Mapreduce;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.fs.Path;
 7 import org.apache.hadoop.io.IntWritable;
 8 import org.apache.hadoop.io.LongWritable;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.Mapper;
12 import org.apache.hadoop.mapreduce.Reducer;
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15 
16 public class GroupTest {
17 public static void main(String[] args) throws Exception {
18     Job job = Job.getInstance(new Configuration(), GroupTest.class.getSimpleName());
19     job.setJarByClass(GroupTest.class);
20     //1.自定义输入路径
21     FileInputFormat.setInputPaths(job, new Path(args[0]));
22     //2.自定义mapper
23     //job.setInputFormatClass(TextInputFormat.class);
24     job.setMapperClass(MyMapper.class);
25     //job.setMapOutputKeyClass(Text.class);
26     //job.setMapOutputValueClass(TrafficWritable.class);
27     
28     //3.自定义reduce
29     job.setReducerClass(MyReducer.class);
30     job.setOutputKeyClass(IntWritable.class);
31     job.setOutputValueClass(IntWritable.class);
32     //4.自定义输出路径
33     FileOutputFormat.setOutputPath(job, new Path(args[1]));
34     //job.setOutputFormatClass(TextOutputFormat.class);//对输出的数据格式化并写入磁盘
35     
36     job.waitForCompletion(true);
37 }
38 private static class MyMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{
39     IntWritable k2= new IntWritable();
40     IntWritable v2= new IntWritable();
41     @Override
42     protected void map(LongWritable key, Text value,
43             Mapper<LongWritable, Text, IntWritable, IntWritable>.Context context)
44             throws IOException, InterruptedException {
45            String[] splited = value.toString().split("	");
46            k2.set(Integer.parseInt(splited[0]));
47            v2.set(Integer.parseInt(splited[1]));
48            context.write(k2, v2);
49     }
50 }
51 //按照k2進行排序,分組(分为3各组,reduce函数被调用3次,分几组调用几次)
52 //分组为3-{3,2,1}, 2-{2,1},1-{1}
53 private static class MyReducer extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
54     IntWritable v3 = new IntWritable();
55     @Override
56     protected void reduce(IntWritable k2, Iterable<IntWritable> v2s,
57             Reducer<IntWritable, IntWritable, IntWritable, IntWritable>.Context context)
58             throws IOException, InterruptedException {
59         int max=Integer.MIN_VALUE;
60         for (IntWritable v2 : v2s) {
61             if (v2.get()>max) {
62                 max=v2.get();
63             }
64         }
65         //每个组求得一个最大值可得到结果的序列
66         v3.set(max);
67         context.write(k2, v3);
68     }
69 }
70 }
最值得MapReduce代码

(4)测试代码运行结果

  •   [root@neusoft-master filecontent]# hadoop jar GroupTest.jar /neusoft/twoint  /out9 
  •   [root@neusoft-master filecontent]# hadoop jar -text  /out9/part-r-00000
  •   [root@neusoft-master filecontent]# hadoop dfs -text  /out9/part-r-00000  

       

第二部分:方法一达到任务目的

      前两列都作为k3,无v3,由此类推,k2也是前两列

      但是如果采用默认分组的话,上述数据集分为6组,无法达到同样的数值取得最大值的目的。

      由此,利用Mapreduce的自定义分组规则,使得第一列相同的数值可以在一个组里面,从而正确的分组。

      MapReduce提供了job.setGroupingComparatorClass(cls);其中cls是自定义分组的类

      

      (1) 从源代码可知,该类需要继承RawComparator类,自定义分组代码如下:

 1 //分组比较--自定义分组
 2     private static class MyGroupingComparator implements RawComparator {
 3         public int compare(Object o1, Object o2) {
 4             return 0;//默认的比较方法
 5         }
 6         //byte[] b1 表示第一个参数的输入字节表示,byte[] b2表示第二个参数的输入字节表示
 7         //b1 The first byte array. 第一个字节数组,
 8         //b1表示前8个字节,b2表示后8个字节,字节是按次序依次比较的
 9         //s1 The position index in b1. The object under comparison's starting index.第一列开始位置
10         //l1 The length of the object in b1.第一列长度 ,在这里表示长度8
11         //提供的数据集中的k2一共48个字节,k2的每一行的TwoInt类型表示8字节(t1和t2分别为4字节)
12         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
13             //compareBytes是按字节比较的方法,其中k2表示的是两列,第一列比较,第二例不比较
14             //第一个字节数组的前四个字节和第二个字节数组的前四个字节比较
15             //{3,3},{3,2},{3,1},{2,2},{2,1},{1,1}
16             //比较上述六组的每组的第一个数字,也就是比较twoint中得t1数值
17             //现在就根据t1可分成3个组了{3,(3,2,1)}{2,(2,1)}{1,1}
18             //之后再从v2中取出最大值
19             return WritableComparator.compareBytes(b1, s1, l1-4, b2, s2, l2-4);
20         }
21 
22     }

      (2)主函数中调用      

//当没有下面的自定义分组的话,会调用k2的compareto方法执行k2的比较,如果自定义了分组类则使用自定义分组类
        job.setGroupingComparatorClass(MyGroupingComparator.class);

     (3)根据比较函数个字段的含义,可以得到v2的类型为intwritable,而不是nullwritable,v2是由第二列的数组成的集合

      Mapper函数如下:

 1 private static class MyMapper extends
 2             Mapper<LongWritable, Text, TwoInt, IntWritable> {
 3         //这里的v2需要改为IntWritable而不是nullwritable
 4         TwoInt K2 = new TwoInt();
 5         IntWritable v2= new IntWritable();
 6         @Override
 7         protected void map(LongWritable key, Text value,
 8                 Mapper<LongWritable, Text, TwoInt, IntWritable>.Context context)
 9                 throws IOException, InterruptedException {
10             String[] splited = value.toString().split("	");
11             K2.set(Integer.parseInt(splited[0]), Integer.parseInt(splited[1]));
12             v2.set(Integer.parseInt(splited[1])); //要比较第二列,需要将第二列的值赋值为v2
13             context.write(K2, v2);
14         }
15     }

     (4)k3和v3的类型为reduce输出的类型,均为intwritable类型,但是如何根据得到的v2去统计其中相同key的value中得最大值呢?

 1 private static class MyReducer extends
 2             Reducer<TwoInt, IntWritable, IntWritable, IntWritable> {//k2,v2s,k3,v3
 3         IntWritable k3 = new IntWritable();
 4         IntWritable v3 = new IntWritable();
 5         @Override
 6         protected void reduce(
 7                 TwoInt k2,
 8                 Iterable<IntWritable> v2s,
 9                 Reducer<TwoInt, IntWritable, IntWritable, IntWritable>.Context context)
10                 throws IOException, InterruptedException {
11             int max=Integer.MIN_VALUE;
12             for (IntWritable v2 : v2s) {
13                 if (v2.get()>max) {
14                     max=v2.get();
15                 }
16             }
17             //每个组求得一个最大值可得到结果的序列
18             v3.set(max);
19             k3.set(k2.t1);//k2的第一列作为k3,因为k2为Twoint类型
20             context.write(k3,v3);
21         }
22     }

最终的代码如下:

  1 package Mapreduce;
  2 
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 
  7 import org.apache.hadoop.conf.Configuration;
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.IntWritable;
 10 import org.apache.hadoop.io.LongWritable;
 11 import org.apache.hadoop.io.NullWritable;
 12 import org.apache.hadoop.io.RawComparator;
 13 import org.apache.hadoop.io.Text;
 14 import org.apache.hadoop.io.WritableComparable;
 15 import org.apache.hadoop.io.WritableComparator;
 16 import org.apache.hadoop.mapreduce.Job;
 17 import org.apache.hadoop.mapreduce.Mapper;
 18 import org.apache.hadoop.mapreduce.Reducer;
 19 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 20 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 21 
 22 
 23 public class Group2Test {
 24     public static void main(String[] args) throws Exception {
 25         Job job = Job.getInstance(new Configuration(),
 26                 Group2Test.class.getSimpleName());
 27         job.setJarByClass(Group2Test.class);
 28         // 1.自定义输入路径
 29         FileInputFormat.setInputPaths(job, new Path(args[0]));
 30         // 2.自定义mapper
 31         job.setMapperClass(MyMapper.class);
 32         //这里的k2,v2和k3,v3不同,需要显式定义k2和v2类型
 33         job.setMapOutputKeyClass(TwoInt.class);  
 34         job.setMapOutputValueClass(IntWritable.class);
 35 
 36         //当没有下面的自定义分组的话,会调用k2的compareto方法执行k2的比较,如果自定义了分组类则使用自定义分组类
 37         job.setGroupingComparatorClass(MyGroupingComparator.class);
 38 
 39         // 3.自定义reduce
 40         job.setReducerClass(MyReducer.class);
 41         job.setOutputKeyClass(IntWritable.class);
 42         job.setOutputValueClass(IntWritable.class);
 43         // 4.自定义输出路径
 44         FileOutputFormat.setOutputPath(job, new Path(args[1]));
 45 
 46         job.waitForCompletion(true);
 47     }
 48     //分组比较--自定义分组
 49     private static class MyGroupingComparator implements RawComparator {
 50         public int compare(Object o1, Object o2) {
 51             return 0;//默认的比较方法
 52         }
 53         //byte[] b1 表示第一个参数的输入字节表示,byte[] b2表示第二个参数的输入字节表示
 54         //b1 The first byte array. 第一个字节数组,
 55         //b1表示前8个字节,b2表示后8个字节,字节是按次序依次比较的
 56         //s1 The position index in b1. The object under comparison's starting index.第一列开始位置
 57         //l1 The length of the object in b1.第一列长度 ,在这里表示长度8
 58         //提供的数据集中的k2一共48个字节,k2的每一行的TwoInt类型表示8字节(t1和t2分别为4字节)
 59         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
 60             //compareBytes是按字节比较的方法,其中k2表示的是两列,第一列比较,第二例不比较
 61             //第一个字节数组的前四个字节和第二个字节数组的前四个字节比较
 62             //{3,3},{3,2},{3,1},{2,2},{2,1},{1,1}
 63             //比较上述六组的每组的第一个数字,也就是比较twoint中得t1数值
 64             //现在就根据t1可分成3个组了{3,(3,2,1)}{2,(2,1)}{1,1}
 65             //之后再从v2中取出最大值
 66             return WritableComparator.compareBytes(b1, s1, l1-4, b2, s2, l2-4);
 67         }
 68 
 69     }
 70 
 71     private static class MyMapper extends
 72             Mapper<LongWritable, Text, TwoInt, IntWritable> {
 73         //这里的v2需要改为IntWritable而不是nullwritable
 74         TwoInt K2 = new TwoInt();
 75         IntWritable v2= new IntWritable();
 76         @Override
 77         protected void map(LongWritable key, Text value,
 78                 Mapper<LongWritable, Text, TwoInt, IntWritable>.Context context)
 79                 throws IOException, InterruptedException {
 80             String[] splited = value.toString().split("	");
 81             K2.set(Integer.parseInt(splited[0]), Integer.parseInt(splited[1]));
 82             v2.set(Integer.parseInt(splited[1]));
 83             context.write(K2, v2);
 84         }
 85     }
 86 
 87     private static class MyReducer extends
 88             Reducer<TwoInt, IntWritable, IntWritable, IntWritable> {//k2,v2s,k3,v3
 89         IntWritable k3 = new IntWritable();
 90         IntWritable v3 = new IntWritable();
 91         @Override
 92         protected void reduce(
 93                 TwoInt k2,
 94                 Iterable<IntWritable> v2s,
 95                 Reducer<TwoInt, IntWritable, IntWritable, IntWritable>.Context context)
 96                 throws IOException, InterruptedException {
 97             int max=Integer.MIN_VALUE;
 98             for (IntWritable v2 : v2s) {
 99                 if (v2.get()>max) {
100                     max=v2.get();
101                 }
102             }
103             //每个组求得一个最大值可得到结果的序列
104             v3.set(max);
105             k3.set(k2.t1);//k2的第一列作为k3,因为k2为Twoint类型
106             context.write(k3,v3);
107         }
108     }
109 
110     private static class TwoInt implements WritableComparable<TwoInt> {
111         public int t1;
112         public int t2;
113 
114         public void write(DataOutput out) throws IOException {
115             out.writeInt(t1);
116             out.writeInt(t2);
117         }
118 
119         public void set(int t1, int t2) {
120             this.t1 = t1;
121             this.t2 = t2;
122         }
123 
124         public void readFields(DataInput in) throws IOException {
125             this.t1 = in.readInt();
126             this.t2 = in.readInt();
127         }
128 
129         public int compareTo(TwoInt o) {
130             if (this.t1 == o.t1) { // 當第一列相等的時候,第二列升序排列
131                 return this.t2 - o.t2;
132             }
133             return this.t1 - o.t1;// 當第一列不相等的時候,按第一列升序排列
134         }
135         @Override
136         public String toString() {
137             return t1 + "	" + t2;
138         }
139     }
140 }
方法1求最值

测试并运行结果如下:

[root@neusoft-master filecontent]# hadoop dfs -text  /out9/part-r-00000

 [root@neusoft-master filecontent]# hadoop dfs -text  /out10/part-r-00000

 

结果是正确无误的。

 END~

       

原文地址:https://www.cnblogs.com/jackchen-Net/p/6425757.html