026 使用大数据对网站基本指标PV案例的分析

案例:

  使用电商网站的用户行为日志进行统计分析

一:准备

1.指标

  PV:网页流浪量

  UV:独立访客数

  VV:访客的访问数,session次数

  IP:独立的IP数

2.上传测试数据

  

3.查看第一条记录

  

  注意点(字符显示):

            

 二:程序

1.分析

  省份ID-》key  

  value-》1

  -》 <proviced,list(1,1,1)>

2.数据类型

  key:Text

  value:IntWritable

3.map 端的业务

  

4.reduce端的业务

  

5.整合运行

  

6.结果

  

 三:计数器

1.程序

  

2.结果

  

  

  结果完全吻合。

四:完整程序

1.PV程序

  1 package com.senior.network;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.commons.lang.StringUtils;
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.conf.Configured;
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.IntWritable;
 10 import org.apache.hadoop.io.LongWritable;
 11 import org.apache.hadoop.io.Text;
 12 import org.apache.hadoop.mapreduce.Job;
 13 import org.apache.hadoop.mapreduce.Mapper;
 14 import org.apache.hadoop.mapreduce.Mapper.Context;
 15 import org.apache.hadoop.mapreduce.Reducer;
 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 18 import org.apache.hadoop.util.Tool;
 19 import org.apache.hadoop.util.ToolRunner;
 20 
 21 public class WebPvCount extends Configured implements Tool{
 22     //Mapper
 23     public static class WebPvCountMapper extends Mapper<LongWritable,Text,IntWritable,IntWritable>{
 24         private IntWritable mapoutputkey=new IntWritable();
 25         private static final IntWritable mapoutputvalue=new IntWritable(1);
 26         @Override
 27         protected void cleanup(Context context) throws IOException,InterruptedException {
 28             
 29         }
 30         @Override
 31         protected void setup(Context context) throws IOException,InterruptedException {
 32             
 33         }
 34         
 35         @Override
 36         protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
 37             String lineValue=value.toString();
 38             String[] strs=lineValue.split("	");
 39             if(30>strs.length){
 40                 return;
 41             }
 42             String priviceIdValue=strs[23];
 43             String urlValue=strs[1];
 44             if(StringUtils.isBlank(priviceIdValue)){
 45                 return;
 46             }
 47             if(StringUtils.isBlank(urlValue)){
 48                 return;
 49             }
 50             Integer priviceId=Integer.MAX_VALUE;
 51             try{
 52                 priviceId=Integer.valueOf(priviceIdValue);
 53             }catch(Exception e){
 54                 e.printStackTrace();
 55             }
 56             mapoutputkey.set(priviceId);
 57             context.write(mapoutputkey, mapoutputvalue);
 58         }
 59         
 60     }
 61     
 62     
 63     
 64     //Reducer
 65     public static class WebPvCountReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{
 66         private IntWritable outputvalue=new IntWritable();
 67         @Override
 68         protected void reduce(IntWritable key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {
 69             int sum=0;
 70             for(IntWritable value : values){
 71                 sum+=value.get();
 72             }
 73             outputvalue.set(sum);
 74             context.write(key, outputvalue);
 75         }
 76         
 77     }
 78     
 79     //Driver
 80     public int run(String[] args)throws Exception{
 81         Configuration conf=this.getConf();
 82         Job job=Job.getInstance(conf,this.getClass().getSimpleName());
 83         job.setJarByClass(WebPvCount.class);
 84         //input
 85         Path inpath=new Path(args[0]);
 86         FileInputFormat.addInputPath(job, inpath);
 87         
 88         //output
 89         Path outpath=new Path(args[1]);
 90         FileOutputFormat.setOutputPath(job, outpath);
 91         
 92         //map
 93         job.setMapperClass(WebPvCountMapper.class);
 94         job.setMapOutputKeyClass(IntWritable.class);
 95         job.setMapOutputValueClass(IntWritable.class);
 96         
 97         //shuffle
 98         
 99         //reduce
100         job.setReducerClass(WebPvCountReducer.class);
101         job.setOutputKeyClass(IntWritable.class);
102         job.setOutputValueClass(IntWritable.class);
103         
104         //submit
105         boolean isSucess=job.waitForCompletion(true);
106         return isSucess?0:1;
107     }
108     
109     //main
110     public static void main(String[] args)throws Exception{
111         Configuration conf=new Configuration();
112         //compress
113         conf.set("mapreduce.map.output.compress", "true");
114         conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
115         args=new String[]{
116                 "hdfs://linux-hadoop01.ibeifeng.com:8020/user/beifeng/mapreduce/wordcount/inputWebData",
117                 "hdfs://linux-hadoop01.ibeifeng.com:8020/user/beifeng/mapreduce/wordcount/outputWebData1"
118         };
119         int status=ToolRunner.run(new WebPvCount(), args);
120         System.exit(status);
121     }
122 
123 }

2.计数器

  这个计数器集中在mapper端。

  1 package com.senior.network;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.commons.lang.StringUtils;
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.conf.Configured;
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.IntWritable;
 10 import org.apache.hadoop.io.LongWritable;
 11 import org.apache.hadoop.io.Text;
 12 import org.apache.hadoop.mapreduce.Job;
 13 import org.apache.hadoop.mapreduce.Mapper;
 14 import org.apache.hadoop.mapreduce.Mapper.Context;
 15 import org.apache.hadoop.mapreduce.Reducer;
 16 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 17 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 18 import org.apache.hadoop.util.Tool;
 19 import org.apache.hadoop.util.ToolRunner;
 20 
 21 public class WebPvCount extends Configured implements Tool{
 22     //Mapper
 23     public static class WebPvCountMapper extends Mapper<LongWritable,Text,IntWritable,IntWritable>{
 24         private IntWritable mapoutputkey=new IntWritable();
 25         private static final IntWritable mapoutputvalue=new IntWritable(1);
 26         @Override
 27         protected void cleanup(Context context) throws IOException,InterruptedException {
 28             
 29         }
 30         @Override
 31         protected void setup(Context context) throws IOException,InterruptedException {
 32             
 33         }
 34         
 35         @Override
 36         protected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
 37             String lineValue=value.toString();
 38             String[] strs=lineValue.split("	");
 39             if(30>strs.length){
 40                 context.getCounter("webPvMapper_counter", "length_LT_30").increment(1L);
 41                 return;
 42             }
 43             String priviceIdValue=strs[23];
 44             String urlValue=strs[1];
 45             if(StringUtils.isBlank(priviceIdValue)){
 46                 context.getCounter("webPvMapper_counter", "priviceIdValue_null").increment(1L);
 47                 return;
 48                 
 49             }
 50             if(StringUtils.isBlank(urlValue)){
 51                 context.getCounter("webPvMapper_counter", "url_null").increment(1L);
 52                 return;
 53             }
 54             Integer priviceId=Integer.MAX_VALUE;
 55             try{
 56                 priviceId=Integer.valueOf(priviceIdValue);
 57             }catch(Exception e){
 58                 context.getCounter("webPvMapper_counter", "switch_fail").increment(1L);
 59                 e.printStackTrace();
 60             }
 61             mapoutputkey.set(priviceId);
 62             context.write(mapoutputkey, mapoutputvalue);
 63         }
 64         
 65     }
 66     
 67     
 68     
 69     //Reducer
 70     public static class WebPvCountReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{
 71         private IntWritable outputvalue=new IntWritable();
 72         @Override
 73         protected void reduce(IntWritable key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {
 74             int sum=0;
 75             for(IntWritable value : values){
 76                 sum+=value.get();
 77             }
 78             outputvalue.set(sum);
 79             context.write(key, outputvalue);
 80         }
 81         
 82     }
 83     
 84     //Driver
 85     public int run(String[] args)throws Exception{
 86         Configuration conf=this.getConf();
 87         Job job=Job.getInstance(conf,this.getClass().getSimpleName());
 88         job.setJarByClass(WebPvCount.class);
 89         //input
 90         Path inpath=new Path(args[0]);
 91         FileInputFormat.addInputPath(job, inpath);
 92         
 93         //output
 94         Path outpath=new Path(args[1]);
 95         FileOutputFormat.setOutputPath(job, outpath);
 96         
 97         //map
 98         job.setMapperClass(WebPvCountMapper.class);
 99         job.setMapOutputKeyClass(IntWritable.class);
100         job.setMapOutputValueClass(IntWritable.class);
101         
102         //shuffle
103         
104         //reduce
105         job.setReducerClass(WebPvCountReducer.class);
106         job.setOutputKeyClass(IntWritable.class);
107         job.setOutputValueClass(IntWritable.class);
108         
109         //submit
110         boolean isSucess=job.waitForCompletion(true);
111         return isSucess?0:1;
112     }
113     
114     //main
115     public static void main(String[] args)throws Exception{
116         Configuration conf=new Configuration();
117         //compress
118         conf.set("mapreduce.map.output.compress", "true");
119         conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
120         args=new String[]{
121                 "hdfs://linux-hadoop01.ibeifeng.com:8020/user/beifeng/mapreduce/wordcount/inputWebData",
122                 "hdfs://linux-hadoop01.ibeifeng.com:8020/user/beifeng/mapreduce/wordcount/outputWebData2"
123         };
124         int status=ToolRunner.run(new WebPvCount(), args);
125         System.exit(status);
126     }
127 
128 }
原文地址:https://www.cnblogs.com/juncaoit/p/5983122.html