课堂测试-数据清洗2

题目:

Result文件数据说明:

Ip:106.39.41.166,(城市)

Date:10/Nov/2016:00:01:02 +0800,(日期)

Day:10,(天数)

Traffic: 54 ,(流量)

Type: video,(类型:视频video或文章article)

Id: 8701(视频或者文章的id)

测试要求:

1、 数据清洗:按照进行数据清洗,并将清洗后的数据导入hive数据库中。

两阶段数据清洗:

(1)第一阶段:把需要的信息从原始日志中提取出来

ip:    199.30.25.88

time:  10/Nov/2016:00:01:03 +0800

traffic:  62

文章: article/11325

视频: video/3235

1 2 4 5 6

(2)第二阶段:根据提取出来的信息做精细化操作

ip--->城市 city(IP)

date--> time:2016-11-10 00:01:03

day: 10

traffic:62

type:article/video

id:11325

(3)hive数据库表结构:

create table data(  ip string,  time string , day string, traffic bigint,

type string, id   string )

2、数据处理:

·统计最受欢迎的视频/文章的Top10访问次数 (video/article)

·按照地市统计最受欢迎的Top10课程 (ip)

·按照流量统计最受欢迎的Top10课程 (traffic)

3、数据可视化:将统计结果倒入MySql数据库中,通过图形化展示的方式展现出来。

完成情况:

目前完成了第一,二步。下面拿第二步的第一个来说:

第二步中主要是对数据进行处理,一开始看到这个题目,想到的是用mapreduce一步解决,但是不仅需要将相同id的video/article进行总和得到新的一列数据num,还要对num进行倒顺处理,一步解决难度有些大,所以进行两次mapreduce来对数据进行处理,然后导入hive,查询前十个数据即为所求。

第一个步骤:将相同id的video/article进行总和得到新的一列数据num,并且只保留num与id这两列数据。

 1 import java.io.IOException;
 2 import org.apache.hadoop.conf.Configuration;
 3 import org.apache.hadoop.fs.Path;
 4 import org.apache.hadoop.io.Text;
 5 import org.apache.hadoop.mapreduce.Job;
 6 import org.apache.hadoop.mapreduce.Mapper;
 7 import org.apache.hadoop.mapreduce.Reducer;
 8 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 9 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
10 
11 public class text_2_1 {
12     public static class Map extends Mapper<Object,Text,Text,Text>{
13         private static Text newKey = new Text();
14         private static Text newvalue = new Text("1");
15         public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
16         String line = value.toString();
17         String arr[] = line.split(" ");
18         newKey.set(arr[5]);
19         context.write(newKey,newvalue);
20         }
21     }
22         public static class Reduce extends Reducer<Text, Text, Text, Text> {
23             private static Text newkey = new Text();
24             private static Text newvalue = new Text();
25             protected void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
26                 int num = 0;
27                     for(Text text : values){
28                         num++;
29                     }
30                     newkey.set(""+num);
31                     newvalue.set(key);
32                     context.write(newkey,newvalue);
33             }
34         }
35     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
36     Configuration conf = new Configuration();
37     conf.set("mapred.textoutputformat.separator", " ");
38     System.out.println("start");
39       Job job=Job.getInstance(conf); 
40       job.setJarByClass(text_2_1.class);
41       job.setMapperClass(Map.class);
42       job.setReducerClass(Reduce.class);
43       job.setOutputKeyClass(Text.class); 
44       job.setOutputValueClass(Text.class); 
45       Path in=new Path("hdfs://localhost:9000/text/in/data");
46       Path out=new Path("hdfs://localhost:9000/text/out1");
47       FileInputFormat.addInputPath(job, in);
48       FileOutputFormat.setOutputPath(job, out);
49       boolean flag = job.waitForCompletion(true);
50         System.out.println(flag);
51         System.exit(flag? 0 : 1);
52   }
53 }

第二个步骤,对num进行倒顺处理。因为在MapReduce中默认为正序排序,所以新定义了一个比较的类。

 1 import java.io.IOException;
 2 import org.apache.hadoop.conf.Configuration;
 3 import org.apache.hadoop.fs.Path;
 4 import org.apache.hadoop.io.IntWritable;
 5 import org.apache.hadoop.io.Text;
 6 import org.apache.hadoop.mapreduce.Job;
 7 import org.apache.hadoop.mapreduce.Mapper;
 8 import org.apache.hadoop.mapreduce.Reducer;
 9 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
10 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
11 public class m {
12     public static class Map extends Mapper<Object,Text,IntWritable,Text>{
13         private static IntWritable  newKey = new IntWritable ();
14         private static Text  newvalue = new Text ();
15         public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
16         String line = value.toString();
17         String arr[] = line.split(" ");
18         newKey.set(Integer.parseInt(arr[0]));
19         newvalue.set(arr[1]);
20         context.write(newKey,newvalue);
21         }
22     }
23         public static class Reduce extends Reducer<IntWritable, Text, IntWritable, Text> {
24             protected void reduce(IntWritable key, Iterable<Text> values, Context context)throws IOException, InterruptedException {
25                     for(Text text : values){
26                         context.write(key,text);
27                     }
28             }
29         }
30         public static class IntWritableDecreasingComparator extends    IntWritable.Comparator 
31         {        
32             public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) 
33             {            
34                 return -super.compare(b1, s1, l1, b2, s2, l2);        
35                 }    
36             }
37     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
38     Configuration conf = new Configuration();
39     conf.set("mapred.textoutputformat.separator", " ");
40     System.out.println("start");
41       Job job=Job.getInstance(conf); 
42       job.setJarByClass(m.class);
43       job.setMapperClass(Map.class);
44       job.setReducerClass(Reduce.class);
45       job.setOutputKeyClass(IntWritable.class); 
46       job.setOutputValueClass(Text.class); 
47       job.setSortComparatorClass(IntWritableDecreasingComparator.class);
48       Path in=new Path("hdfs://localhost:9000/text/out1/part-r-00000");
49       Path out=new Path("hdfs://localhost:9000/text/out2");
50       FileInputFormat.addInputPath(job, in);
51       FileOutputFormat.setOutputPath(job, out);
52       boolean flag = job.waitForCompletion(true);
53         System.out.println(flag);
54         System.exit(flag? 0 : 1);
55   }
56 }

经过了上面两步骤的处理后,得到的数据是这样的:

 然后将其导入hive数据库,再查询前十行,即为所求。

原文地址:https://www.cnblogs.com/123456www/p/11859287.html