MapReduce -- 统计天气信息

示例

数据:

1949-10-01 14:21:02    34c
1949-10-02 14:01:02    36c
1950-01-01 11:21:02    32c
1950-10-01 12:21:02    37c
1951-12-01 12:21:02    23c
1951-12-02 12:21:02    45c
1951-12-03 12:21:02    50c
1951-12-23 12:21:02    33c
1950-10-02 12:21:02    41c
1950-10-03 12:21:02    27c
1951-07-01 12:21:02    45c
1951-07-02 12:21:02    46c
1951-07-03 12:21:03    47c

要求:

将每年每月中的气温排名前三的数据找出来

实现:

1.每一年用一个reduce任务处理;

2.创建自定义数据类型,存储 [年-月-日-温度];

2.自己实现排序函数 根据 [年-月-温度] 降序排列,也可以在定义数据类型中进行排序;

3.自己实现分组函数,对 [年-月] 分组,reduce的key是分组结果,根据相同的分组值,统计reduce的value值,只统计三个值就可以,因为已经实现了自排序函数。

注意点:

1.自定义数据类型的使用;

2.自定义排序类的使用;

3.自定义分组类的使用,分组类对那些数据进行分组;

4.自定义分区类,分区类与reduce job个数的关系;

示例代码:

RunJob.java  

  1 import org.apache.hadoop.conf.Configuration;
  2 import org.apache.hadoop.fs.FileSystem;
  3 import org.apache.hadoop.fs.Path;
  4 import org.apache.hadoop.io.LongWritable;
  5 import org.apache.hadoop.io.NullWritable;
  6 import org.apache.hadoop.io.Text;
  7 import org.apache.hadoop.mapreduce.Job;
  8 import org.apache.hadoop.mapreduce.Mapper;
  9 import org.apache.hadoop.mapreduce.Reducer;
 10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 12 
 13 import java.io.IOException;
 14 import java.text.ParseException;
 15 import java.text.SimpleDateFormat;
 16 import java.util.Calendar;
 17 import java.util.Date;
 18 
 19 /**
 20  * weather 统计天气信息
 21  *
 22  * 数据:
 23  *  1999-10-01 14:21:02    34c
 24  *  1999-11-02 13:01:02    30c
 25  *
 26  * 要求:
 27  * 将每年的每月中气温排名前三的数据找出来
 28  *
 29  * 实现:
 30  * 1.每一年用一个reduce任务处理;
 31  * 2.创建自定义数据类型,存储 [年-月-日-温度];
 32  * 2.自己实现排序函数 根据 [年-月-温度] 降序排列,也可以在定义数据类型中进行排序;
 33  * 3.自己实现分组函数,对 [年-月] 分组,reduce的key是分组结果,根据相同的分组值,统计reduce的value值,只统计三个值就可以,因为已经实现了自排序函数。
 34  *
 35  * Created by Edward on 2016/7/11.
 36  */
 37 public class RunJob {
 38 
 39     public static void main(String[] args)
 40     {
 41         //access hdfs's user
 42         System.setProperty("HADOOP_USER_NAME","root");
 43 
 44         Configuration conf = new Configuration();
 45         conf.set("fs.defaultFS", "hdfs://node1:8020");
 46 
 47 
 48         try {
 49             FileSystem fs = FileSystem.get(conf);
 50 
 51             Job job = Job.getInstance(conf);
 52             job.setJarByClass(RunJob.class);
 53             job.setMapperClass(MyMapper.class);
 54             job.setReducerClass(MyReducer.class);
 55 
 56             //需要指定 map out 的 key 和 value
 57             job.setOutputKeyClass(InfoWritable.class);
 58             job.setOutputValueClass(Text.class);
 59 
 60             //设置分区 继承 HashPartitioner
 61             job.setPartitionerClass(YearPartition.class);
 62             //根据年份创建指定数量的reduce task
 63             job.setNumReduceTasks(3);
 64 
 65             //设置排序 继承 WritableComparator
 66             //job.setSortComparatorClass(SortComparator.class);
 67 
 68             //设置分组 继承 WritableComparator 对reduce中的key进行分组
 69             job.setGroupingComparatorClass(GroupComparator.class);
 70 
 71             FileInputFormat.addInputPath(job, new Path("/test/weather/input"));
 72 
 73             Path path = new Path("/test/weather/output");
 74             if(fs.exists(path))//如果目录存在,则删除目录
 75             {
 76                 fs.delete(path,true);
 77             }
 78             FileOutputFormat.setOutputPath(job, path);
 79 
 80             boolean b = job.waitForCompletion(true);
 81             if(b)
 82             {
 83                 System.out.println("OK");
 84             }
 85 
 86         } catch (Exception e) {
 87             e.printStackTrace();
 88         }
 89     }
 90 
 91 
 92     public static class MyMapper extends Mapper<LongWritable, Text, InfoWritable, Text > {
 93 
 94         private static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 95 
 96         @Override
 97         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
 98             String[] str = value.toString().split("	");
 99 
100             try {
101                 Date date = sdf.parse(str[0]);
102                 Calendar c = Calendar.getInstance();
103                 c.setTime(date);
104                 int year = c.get(Calendar.YEAR);
105                 int month = c.get(Calendar.MONTH)+1;
106                 int day = c.get(Calendar.DAY_OF_MONTH);
107 
108                 double temperature = Double.parseDouble(str[1].substring(0,str[1].length()-1));
109 
110                 InfoWritable info = new InfoWritable();
111                 info.setYear(year);
112                 info.setMonth(month);
113                 info.setDay(day);
114                 info.setTemperature(temperature);
115 
116                 context.write(info, value);
117 
118             } catch (ParseException e) {
119                 e.printStackTrace();
120             }
121         }
122     }
123 
124     public static class MyReducer extends Reducer<InfoWritable, Text, Text, NullWritable> {
125         @Override
126         protected void reduce(InfoWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
127             int i=0;
128             for(Text t: values)
129             {
130                 i++;
131                 if(i>3)
132                     break;
133                 context.write(t, NullWritable.get());
134             }
135         }
136     }
137 }

InfoWritable.java 

 1 import org.apache.hadoop.io.WritableComparable;
 2 
 3 import java.io.DataInput;
 4 import java.io.DataOutput;
 5 import java.io.IOException;
 6 
 7 /**
 8  * 自定义数据类型 继承 WritableComparable
 9  * 【年-月-日-温度】
10  * Created by Edward on 2016/7/11.
11  */
12 public class InfoWritable implements WritableComparable<InfoWritable> {
13 
14     private int year;
15     private int month;
16     private int day;
17     private double temperature;
18 
19     public void setYear(int year) {
20         this.year = year;
21     }
22 
23     public void setMonth(int month) {
24         this.month = month;
25     }
26 
27     public void setDay(int day) {
28         this.day = day;
29     }
30 
31     public void setTemperature(double temperature) {
32         this.temperature = temperature;
33     }
34 
35     public int getYear() {
36         return year;
37     }
38 
39     public int getMonth() {
40         return month;
41     }
42 
43     public int getDay() {
44         return day;
45     }
46 
47     public double getTemperature() {
48         return temperature;
49     }
50 
51     /**
52      *
53      * 对象比较,对温度进行倒序排序
54      */
55     @Override
56     public int compareTo(InfoWritable o) {
57 
58         int result = Integer.compare(this.getYear(),o.getYear());
59         if(result == 0)
60         {
61             result = Integer.compare(this.getMonth(),o.getMonth());
62             if(result == 0)
63             {
64                 return -Double.compare(this.getTemperature(), o.getTemperature());
65             }
66             else
67                 return result;
68         }
69         else
70             return result;
71 
72         //return this==o?0:-1;
73     }
74 
75     @Override
76     public void write(DataOutput dataOutput) throws IOException {
77         dataOutput.writeInt(this.year);
78         dataOutput.writeInt(this.month);
79         dataOutput.writeInt(this.day);
80         dataOutput.writeDouble(this.temperature);
81     }
82 
83     @Override
84     public void readFields(DataInput dataInput) throws IOException {
85         this.year = dataInput.readInt();
86         this.month = dataInput.readInt();
87         this.day = dataInput.readInt();
88         this.temperature = dataInput.readDouble();
89     }
90 }

YearPartition.java

 1 import org.apache.hadoop.io.Text;
 2 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
 3 
 4 /**
 5  *
 6  * 创建分区,通过key中的year来创建分区
 7  *
 8  * Created by Edward on 2016/7/11.
 9  */
10 public class YearPartition extends HashPartitioner <InfoWritable, Text>{
11     @Override
12     public int getPartition(InfoWritable key, Text value, int numReduceTasks) {
13         return key.getYear()%numReduceTasks;
14     }
15 }

GroupComparator.java

 1 import org.apache.hadoop.io.WritableComparable;
 2 import org.apache.hadoop.io.WritableComparator;
 3 
 4 /**
 5  * 创建分组类,继承WritableComparator
 6  * 【年-月】
 7  * Created by Edward on 2016/7/11.
 8  */
 9 public class GroupComparator extends WritableComparator {
10 
11     GroupComparator()
12     {
13         super(InfoWritable.class, true);
14     }
15 
16     @Override
17     public int compare(WritableComparable a, WritableComparable b) {
18         InfoWritable ia = (InfoWritable)a;
19         InfoWritable ib = (InfoWritable)b;
20 
21         int result = Integer.compare(ia.getYear(),ib.getYear());
22         if(result == 0)
23         {
24             return Integer.compare(ia.getMonth(),ib.getMonth());
25         }
26         else
27             return result;
28     }
29 }

SortComparator.java

 1 import org.apache.hadoop.io.WritableComparable;
 2 import org.apache.hadoop.io.WritableComparator;
 3 
 4 /**
 5  * 排序类,继承WritableComparator
 6  * 排序规则【年-月-温度】 温度降序
 7  * Created by Edward on 2016/7/11.
 8  */
 9 public class SortComparator extends WritableComparator {
10 
11     /**
12      * 调用父类的构造函数
13      */
14     SortComparator()
15     {
16         super(InfoWritable.class, true);
17     }
18 
19 
20     /**
21      * 比较两个对象的大小,使用降序排列
22      * @param a
23      * @param b
24      * @return
25      */
26     @Override
27     public int compare(WritableComparable a, WritableComparable b) {
28 
29         InfoWritable ia = (InfoWritable)a;
30         InfoWritable ib = (InfoWritable)b;
31 
32         int result = Integer.compare(ia.getYear(),ib.getYear());
33         if(result == 0)
34         {
35             result = Integer.compare(ia.getMonth(),ib.getMonth());
36             if(result == 0)
37             {
38                 return -Double.compare(ia.getTemperature(), ib.getTemperature());
39             }
40             else
41                 return result;
42         }
43         else
44             return result;
45     }
46 }
原文地址:https://www.cnblogs.com/one--way/p/5661148.html