Hadoop.2.x_网站PV示例

一、网站基本指标(即针对于网站用户行为而产生的日志中进行统计分析)

1. PV:网页浏览量(Page View页面浏览次数,只要进入该网页就产生一条记录,不限IP,统计点每天(较多)/每周/每月/..)
2. UV:独立访客数(Unique Vistor,以Cookie为依据,同一天内一个用户多次访问,只记为一个)
3. VV:访客的访问次数(Visit View,以Session为依据,访客访问网站到关掉该网站所有页面即记为一次访问)
4. IP:独立IP数(即记录不同IP,同一IP访问多次算作一次)
5. 通常网站流量(traffic)是指网站的访问量,是用来描述访问一个网站的用户数量以及用户所浏览的网页数量等指标
   对于虚拟空间商来说流量是指:用户在访问网站过程中,产生的数据量大小

二、PV统计示例(统计各省的PV)

1. 分析数据中字段(provinceId)
2. 数据类型 <11,11,13>Map()--><11,list<1,1>><12,list<1>>-->reduce()---><11 /t 2><13 /t 1>
3. 条件过滤(或称之为数据清洗)
    values.length < 30;
    StringUtils.isBlank(url)
    StringUtils.isBlank(proviceIdValue)
    proviceId = Integer.valueOf(proviceIdValue);
  注意:条件的前后放置一定程度上会影响MR程序的运行效率
  这是一处优化,还可以使用combine,压缩提高效率
  PS:Configuration configuration = new Configuration();
     这行代码会先读取默认配置文件后从资源文件中获取自定义配置文件
4. 自定义计数器(用于记录被过滤掉那些数据)
  //Counter LEGTH_LT_30_COUNTER
  context.getCounter("WEBPVMAP_COUNTERS", "LEGTH_LT_30_COUNTER").increment(1L);
  //Counter URL_ISBLANK
  context.getCounter("WEBPVMAP_COUNTERS", "URL_ISBLANK").increment(1L);
  //Counter PROVICEIDVALUE_ISBLANK
  context.getCounter("WEBPVMAP_COUNTERS", "PROVICEIDVALUE_ISBLANK").increment(1L);
  //Counter STRING_CASE_TO_INTEGER_EXCEPTION
  context.getCounter("WEBPVMAP_COUNTERS", "STRING_CASE_TO_INTEGER_EXCEPTION").increment(1L);
  运行MR输出:
    WEBPVMAP_COUNTERS
    PROVICEIDVALUE_ISBLANK=21742
    STRING_CASE_TO_INTEGER_EXCEPTION=1
    URL_ISBLANK=29092

三、具体代码实现
  1. 放置好资源文件
     (即:将自定义配置文件拷贝到MR程序的资源文件夹,当然使用javaAPI将属性set进configuration也行)

[liuwl@hadoop09-linux-01 hadoop-2.5.0]$ cp etc/hadoop/core-site.xml etc/hadoop/hdfs-site.xml etc/hadoop/log4j.properties /home/liuwl/local/workspace/bigdata-test/src/resouce
[liuwl@hadoop09-linux-01 hadoop-2.5.0]$

  2. 具体代码

package com.eRrsr.bigdata_test;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WebPVMapReduce {

  //Mapper Class
  private static class WebPVMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable>{
    
       private IntWritable mapOutKey = new IntWritable();
    private final static IntWritable mapOutValue = new IntWritable(1);
    @Override
    public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {
      
          String lineValue = value.toString();

      //使用	分隔
      String [] values = lineValue.split("	");

      //过滤掉分隔长度小于30的记录
      if(values.length < 30){

        //计数器:参数1->计数器分组名称;参数2->计数器名称
        context.getCounter("WEBPVMAP_COUNTERS", "LEGTH_LT_30_COUNTER").increment(1L);
        return;
      } 

      //过滤掉分隔后的字段中url为空的记录
      String url = values[1];
      if(StringUtils.isBlank(url)){

        context.getCounter("WEBPVMAP_COUNTERS", "URL_ISBLANK").increment(1L);
        return;
      }

      //过滤掉省份id为空的记录
      String proviceIdValue = values[23];
      if(StringUtils.isBlank(proviceIdValue)){

        context.getCounter("WEBPVMAP_COUNTERS", "PROVICEIDVALUE_ISBLANK").increment(1L);
        return;
      }

      //过滤掉省份id转int异常的数据
      Integer proviceId = Integer.MAX_VALUE;
      try{
        proviceId = Integer.valueOf(proviceIdValue);
      }catch(Exception e){

        context.getCounter("WEBPVMAP_COUNTERS", "STRING_CASE_TO_INTEGER_EXCEPTION").increment(1L);
        return;
      }

      mapOutKey.set(proviceId);
      context.write(mapOutKey, mapOutValue);
    }
  }

  //Reduce Class
  private static class WebPVReduce extends Reducer<IntWritable, IntWritable, IntWritable, IntWritable>{
    private IntWritable reduceOutValue = new IntWritable(); 
    @Override
    public void reduce(IntWritable key, Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {
    
      int sum = 0;
      for(IntWritable value : values){
        sum += value.get();
      }
      reduceOutValue.set(sum);
      context.write(key, reduceOutValue);
    }
  }

  //Driver Method
  public int run(String[] args) throws Exception {
    
    //会先读取默认配置,后读取资源文件中自定义配置
    Configuration configuration = new Configuration();
    Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
    job.setJarByClass(this.getClass());
    //input
    Path inPath = new Path(args[0]);
    FileInputFormat.addInputPath(job,inPath);
    //output
    Path outPath = new Path(args[1]);
    FileOutputFormat.setOutputPath(job, outPath);
    //mapper
    job.setMapperClass(WebPVMapper.class);
    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(IntWritable.class);
    // ======================shuffle==========================
    // 1.partitioner
    // job.setPartitionerClass(cls);
    // 2.sort
    // job.setSortComparatorClass(cls);
    // 3.combiner
    // 在shullfe过程中预先执行类似reduce的累加操作,使得reduce从本地文件获取map()后的数据更快,效率也就更高
    job.setCombinerClass(WebPVReduce.class);
    // 5.group
    // job.setGroupingComparatorClass(cls);
    // ======================shuffle==========================
    //Reduce
    job.setReducerClass(WebPVReduce.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(IntWritable.class);
    //submit job
    boolean isSuccess = job.waitForCompletion(true);
    return isSuccess ? 0 : 1;
  }

  public static void main(String[] args) throws Exception {
  
    args = new String[]{
      "hdfs://hadoop09-linux-01.ibeifeng.com:8020/user/liuwl/tmp/mapreduce/PV/input",
      "hdfs://hadoop09-linux-01.ibeifeng.com:8020/user/liuwl/tmp/mapreduce/PV/output5"
    };
    //run job
    int status = new WebPVMapReduce().run(args);
    System.exit(status);
  }
}
原文地址:https://www.cnblogs.com/eRrsr/p/5978249.html