大数据综合案例-网站日志分析

第一部分:项目介绍

一、项目背景与数据情况

1.1 项目来源

    本次要实践的数据日志来源于国内某技术学习论坛,该论坛由某培训机构主办,汇聚了众多技术学习者,每天都有人发帖、回帖,如图1所示:

图1 项目来源网站-技术学习论坛

本次实践的目的就在于通过对该技术论坛的apache common日志进行分析,计算该论坛的一些关键指标,供运营者进行决策时参考。

PS:开发该系统的目的是为了获取一些业务相关的指标,这些指标在第三方工具中无法获得的;

1.2 数据情况 

  该论坛数据有两部分:

  (1)历史数据约56GB,统计到2012-05-29。这也说明,在2012-05-29之前,日志文件都在一个文件里边,采用了追加写入的方式。

  (2)自2013-05-30起,每天生成一个数据文件,约150MB左右。这也说明,从2013-05-30之后,日志文件不再是在一个文件里边。

图2展示了该日志数据的记录格式,其中每行记录有5部分组成:访问者IP、访问时间、访问资源、访问状态(HTTP状态码)、本次访问流量。

 

图2 日志记录数据格式

二、关键指标KPI

2.1 浏览量PV

(1)定义:页面浏览量即为PV(Page View),是指所有用户浏览页面的总和,一个独立用户每打开一个页面就被记录1 次。

(2)分析:网站总浏览量,可以考核用户对于网站的兴趣,就像收视率对于电视剧一样。但是对于网站运营者来说,更重要的是,每个栏目下的浏览量。

计算公式:记录计数,从日志中获取访问次数,又可以细分为各个栏目下的访问次数。

2.2 注册用户数

 

该论坛的用户注册页面为member.php,而当用户点击注册时请求的又是member.php?mod=register的url。

计算公式:对访问member.php?mod=register的url,计数。

2.3 IP数

 

(1)定义:一天之内,访问网站的不同独立 IP 个数加和。其中同一IP无论访问了几个页面,独立IP 数均为1。

(2)分析:这是我们最熟悉的一个概念,无论同一个IP上有多少电脑,或者其他用户,从某种程度上来说,独立IP的多少,是衡量网站推广活动好坏最直接的数据。

计算公式:对不同的访问者ip,计数

2.4 跳出率

 

(1)定义:只浏览了一个页面便离开了网站的访问次数占总的访问次数的百分比,即只浏览了一个页面的访问次数 / 全部的访问次数汇总。

(2)分析:跳出率是非常重要的访客黏性指标,它显示了访客对网站的兴趣程度:跳出率越低说明流量质量越好,访客对网站的内容越感兴趣,这些访客越可能是网站的有效用户、忠实用户。

PS:该指标也可以衡量网络营销的效果,指出有多少访客被网络营销吸引到宣传产品页或网站上之后,又流失掉了,可以说就是煮熟的鸭子飞了。比如,网站在某媒体上打广告推广,分析从这个推广来源进入的访客指标,其跳出率可以反映出选择这个媒体是否合适,广告语的撰写是否优秀,以及网站入口页的设计是否用户体验良好。

计算公式:①统计一天内只出现一条记录的ip,称为跳出数;②跳出数/PV;

2.5 板块热度排行榜

(1)定义:版块的访问情况排行。

(2)分析:巩固热点版块成绩,加强冷清版块建设。同时对学科建设也有影响。

  计算公式:按访问次数统计排序;

三、开发步骤

3.0 需要用到的技术

  (1)Linux Shell编程

  (2)HDFS、MapReduce

  (3)HBase、Hive、Sqoop框架

3.1 上传日志文件至HDFS

  把日志数据上传到HDFS中进行处理,可以分为以下几种情况:

  (1)如果是日志服务器数据较小、压力较小,可以直接使用shell命令把数据上传到HDFS中;

  (2)如果是日志服务器数据较大、压力较大,使用NFS在另一台服务器上上传数据;

  (3)如果日志服务器非常多、数据量大,使用flume进行数据处理;

3.2 数据清洗

  使用MapReduce对HDFS中的原始数据进行清洗,以便后续进行统计分析;

3.3 统计分析

  使用Hive对清洗后的数据进行统计分析;

3.4 分析结果导入MySQL

  使用Sqoop把Hive产生的统计结果导出到mysql中;

3.5 提供视图工具

  提供视图工具供用户使用,指标查询mysql、明细则查询Hbase;

 

四、表结构设计

4.1 MySQL表结构设计

 

  这里使用MySQL存储关键指标的统计分析结果。

4.2 HBase表结构设计

 

  这里使用HBase存储明细日志,能够利用ip、时间查询。

后面,我们就开始具体的实战了,本篇作为介绍就到此为止!

 

 

 

 

 

第二部分:数据清洗

一、数据情况分析

1.1 数据情况回顾

  该论坛数据有两部分:

  (1)历史数据约56GB,统计到2012-05-29。这也说明,在2012-05-29之前,日志文件都在一个文件里边,采用了追加写入的方式。

  (2)自2013-05-30起,每天生成一个数据文件,约150MB左右。这也说明,从2013-05-30之后,日志文件不再是在一个文件里边。

  图1展示了该日志数据的记录格式,其中每行记录有5部分组成:访问者IP、访问时间、访问资源、访问状态(HTTP状态码)、本次访问流量。

 

图1 日志记录数据格式

  本次使用数据来自于两个2013年的日志文件,分别为access_2013_05_30.log与access_2013_05_31.log,下载地址为:http://pan.baidu.com/s/1pJE7XR9

1.2 要清理的数据

    (1)根据前一篇的关键指标的分析,我们所要统计分析的均不涉及到访问状态(HTTP状态码)以及本次访问的流量,于是我们首先可以将这两项记录清理掉;

    (2)根据日志记录的数据格式,我们需要将日期格式转换为平常所见的普通格式如20150426这种,于是我们可以写一个类将日志记录的日期进行转换;

  (3)由于静态资源的访问请求对我们的数据分析没有意义,于是我们可以将"GET /staticsource/"开头的访问记录过滤掉,又因为GET和POST字符串对我们也没有意义,因此也可以将其省略掉;

二、数据清洗过程

2.1 定期上传日志至HDFS

  首先,把日志数据上传到HDFS中进行处理,可以分为以下几种情况:

  (1)如果是日志服务器数据较小、压力较小,可以直接使用shell命令把数据上传到HDFS中;

  (2)如果是日志服务器数据较大、压力较大,使用NFS在另一台服务器上上传数据;

(3)如果日志服务器非常多、数据量大,使用flume进行数据处理;

这里我们的实验数据文件较小,因此直接采用第一种Shell命令方式。又因为日志文件时每天产生的,因此需要设置一个定时任务,在第二天的1点钟自动将前一天产生的log文件上传到HDFS的指定目录中。所以,我们通过shell脚本结合crontab创建一个定时任务techbbs_core.sh,内容如下:

#!/bin/sh

#step1.get yesterday format string
yesterday=$(date --date='1 days ago' +%Y_%m_%d)
#step2.upload logs to hdfs
hadoop fs -put /usr/local/files/apache_logs/access_${yesterday}.log /project/techbbs/data

  结合crontab设置为每天1点钟自动执行的定期任务:crontab -e,内容如下(其中1代表每天1:00,techbbs_core.sh为要执行的脚本文件):

* 1 * * * techbbs_core.sh

  验证方式:通过命令 crontab -l 可以查看已经设置的定时任务

2.2 编写MapReduce程序清理日志

(1)编写日志解析类对每行记录的五个组成部分进行单独的解析

static class LogParser {

        public static final SimpleDateFormat FORMAT = new SimpleDateFormat(

                "d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);

        public static final SimpleDateFormat dateformat1 = new SimpleDateFormat(

                "yyyyMMddHHmmss");/**

         * 解析英文时间字符串

         *

         * @param string

         * @return

         * @throws ParseException

         */

        private Date parseDateFormat(String string) {

            Date parse = null;

            try {

                parse = FORMAT.parse(string);

            } catch (ParseException e) {

                e.printStackTrace();

            }

            return parse;

        }

        /**

         * 解析日志的行记录

         *

         * @param line

         * @return 数组含有5个元素,分别是ip、时间、url、状态、流量

         */

        public String[] parse(String line) {

            String ip = parseIP(line);

            String time = parseTime(line);

            String url = parseURL(line);

            String status = parseStatus(line);

            String traffic = parseTraffic(line);

            return new String[] { ip, time, url, status, traffic };

        }

        private String parseTraffic(String line) {

            final String trim = line.substring(line.lastIndexOf(""") + 1)

                    .trim();

            String traffic = trim.split(" ")[1];

            return traffic;

        }

        private String parseStatus(String line) {

            final String trim = line.substring(line.lastIndexOf(""") + 1)

                    .trim();

            String status = trim.split(" ")[0];

            return status;

        }

        private String parseURL(String line) {

            final int first = line.indexOf(""");

            final int last = line.lastIndexOf(""");

            String url = line.substring(first + 1, last);

            return url;

        }

        private String parseTime(String line) {

            final int first = line.indexOf("[");

            final int last = line.indexOf("+0800]");

            String time = line.substring(first + 1, last).trim();

            Date date = parseDateFormat(time);

            return dateformat1.format(date);

        }

        private String parseIP(String line) {

            String ip = line.split("- -")[0].trim();

            return ip;

        }

    }

  (2)编写MapReduce程序对指定日志文件的所有记录进行过滤

Mapper类:

static class MyMapper extends

            Mapper<LongWritable, Text, LongWritable, Text> {

        LogParser logParser = new LogParser();

        Text outputValue = new Text();

 

        protected void map(

                LongWritable key,

                Text value,

                org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, Text>.Context context)

                throws java.io.IOException, InterruptedException {

            final String[] parsed = logParser.parse(value.toString());

 

            // step1.过滤掉静态资源访问请求

            if (parsed[2].startsWith("GET /static/")

                    || parsed[2].startsWith("GET /uc_server")) {

                return;

            }

            // step2.过滤掉开头的指定字符串

            if (parsed[2].startsWith("GET /")) {

                parsed[2] = parsed[2].substring("GET /".length());

            } else if (parsed[2].startsWith("POST /")) {

                parsed[2] = parsed[2].substring("POST /".length());

            }

            // step3.过滤掉结尾的特定字符串

            if (parsed[2].endsWith(" HTTP/1.1")) {

                parsed[2] = parsed[2].substring(0, parsed[2].length()

                        - " HTTP/1.1".length());

            }

            // step4.只写入前三个记录类型项

            outputValue.set(parsed[0] + " " + parsed[1] + " " + parsed[2]);

            context.write(key, outputValue);

        }

    }

  Reducer类:

    static class MyReducer extends

            Reducer<LongWritable, Text, Text, NullWritable> {

        protected void reduce(

                LongWritable k2,

                java.lang.Iterable<Text> v2s,

                org.apache.hadoop.mapreduce.Reducer<LongWritable, Text, Text, NullWritable>.Context context)

                throws java.io.IOException, InterruptedException {

            for (Text v2 : v2s) {

                context.write(v2, NullWritable.get());

            }

        };

    }

(3)LogCleanJob.java的完整示例代码(见附录)

  (4)导出jar包,并将其上传至Linux服务器指定目录中

2.3 定期清理日志至HDFS

    改写刚刚的定时任务脚本,将自动执行清理的MapReduce程序加入脚本中,内容如下:

#!/bin/sh

#step1.get yesterday format string
yesterday=$(date --date='1 days ago' +%Y_%m_%d)
#step2.upload logs to hdfs
hadoop fs -put /usr/local/files/apache_logs/access_${yesterday}.log /project/techbbs/data
#step3.clean log data
hadoop jar /usr/local/files/apache_logs/mycleaner.jar /project/techbbs/data/access_yesterday.log/project/techbbs/cleaned/yesterday.log/project/techbbs/cleaned/{yesterday}

  这段脚本的意思就在于每天1点将日志文件上传到HDFS后,执行数据清理程序对已存入HDFS的日志文件进行过滤,并将过滤后的数据存入cleaned目录下。 

2.4 定时任务测试

  (1)因为两个日志文件是2013年的,因此这里将其名称改为2015年当天以及前一天的,以便这里能够测试通过。

  (2)执行命令:techbbs_core.sh 2014_04_26

  控制台的输出信息如下所示,可以看到过滤后的记录减少了很多:

15/04/26 04:27:20 INFO input.FileInputFormat: Total input paths to process : 1
15/04/26 04:27:20 INFO util.NativeCodeLoader: Loaded the native-hadoop library
15/04/26 04:27:20 WARN snappy.LoadSnappy: Snappy native library not loaded
15/04/26 04:27:22 INFO mapred.JobClient: Running job: job_201504260249_0002
15/04/26 04:27:23 INFO mapred.JobClient: map 0% reduce 0%
15/04/26 04:28:01 INFO mapred.JobClient: map 29% reduce 0%
15/04/26 04:28:07 INFO mapred.JobClient: map 42% reduce 0%
15/04/26 04:28:10 INFO mapred.JobClient: map 57% reduce 0%
15/04/26 04:28:13 INFO mapred.JobClient: map 74% reduce 0%
15/04/26 04:28:16 INFO mapred.JobClient: map 89% reduce 0%
15/04/26 04:28:19 INFO mapred.JobClient: map 100% reduce 0%
15/04/26 04:28:49 INFO mapred.JobClient: map 100% reduce 100%
15/04/26 04:28:50 INFO mapred.JobClient: Job complete: job_201504260249_0002
15/04/26 04:28:50 INFO mapred.JobClient: Counters: 29
15/04/26 04:28:50 INFO mapred.JobClient: Job Counters 
15/04/26 04:28:50 INFO mapred.JobClient: Launched reduce tasks=1
15/04/26 04:28:50 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=58296
15/04/26 04:28:50 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
15/04/26 04:28:50 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
15/04/26 04:28:50 INFO mapred.JobClient: Launched map tasks=1
15/04/26 04:28:50 INFO mapred.JobClient: Data-local map tasks=1
15/04/26 04:28:50 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=25238
15/04/26 04:28:50 INFO mapred.JobClient: File Output Format Counters 
15/04/26 04:28:50 INFO mapred.JobClient: Bytes Written=12794925
15/04/26 04:28:50 INFO mapred.JobClient: FileSystemCounters
15/04/26 04:28:50 INFO mapred.JobClient: FILE_BYTES_READ=14503530
15/04/26 04:28:50 INFO mapred.JobClient: HDFS_BYTES_READ=61084325
15/04/26 04:28:50 INFO mapred.JobClient: FILE_BYTES_WRITTEN=29111500
15/04/26 04:28:50 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=12794925
15/04/26 04:28:50 INFO mapred.JobClient: File Input Format Counters 
15/04/26 04:28:50 INFO mapred.JobClient: Bytes Read=61084192
15/04/26 04:28:50 INFO mapred.JobClient: Map-Reduce Framework
15/04/26 04:28:50 INFO mapred.JobClient: Map output materialized bytes=14503530
15/04/26 04:28:50 INFO mapred.JobClient: Map input records=548160
15/04/26 04:28:50 INFO mapred.JobClient: Reduce shuffle bytes=14503530
15/04/26 04:28:50 INFO mapred.JobClient: Spilled Records=339714
15/04/26 04:28:50 INFO mapred.JobClient: Map output bytes=14158741
15/04/26 04:28:50 INFO mapred.JobClient: CPU time spent (ms)=21200
15/04/26 04:28:50 INFO mapred.JobClient: Total committed heap usage (bytes)=229003264
15/04/26 04:28:50 INFO mapred.JobClient: Combine input records=0
15/04/26 04:28:50 INFO mapred.JobClient: SPLIT_RAW_BYTES=133
15/04/26 04:28:50 INFO mapred.JobClient: Reduce input records=169857
15/04/26 04:28:50 INFO mapred.JobClient: Reduce input groups=169857
15/04/26 04:28:50 INFO mapred.JobClient: Combine output records=0
15/04/26 04:28:50 INFO mapred.JobClient: Physical memory (bytes) snapshot=154001408
15/04/26 04:28:50 INFO mapred.JobClient: Reduce output records=169857
15/04/26 04:28:50 INFO mapred.JobClient: Virtual memory (bytes) snapshot=689442816
15/04/26 04:28:50 INFO mapred.JobClient: Map output records=169857
Clean process success!

  (3)通过Web接口查看HDFS中的日志数据:

  存入的未过滤的日志数据:/project/techbbs/data/

 

  存入的已过滤的日志数据:/project/techbbs/cleaned/

 

 

 

 

 

 

 

 

 

 

 

第三部分:统计分析

一、借助Hive进行统计

1.1 准备工作:建立分区表

  为了能够借助Hive进行统计分析,首先我们需要将清洗后的数据存入Hive中,那么我们需要先建立一张表。这里我们选择分区表,以日期作为分区的指标,建表语句如下:(这里关键之处就在于确定映射的HDFS位置,我这里是/project/techbbs/cleaned即清洗后的数据存放的位置)

hive>CREATE EXTERNAL TABLE techbbs(ip string, atime string, url string) PARTITIONED BY (logdate string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LOCATION '/project/techbbs/cleaned';

  建立了分区表之后,就需要增加一个分区,增加分区的语句如下:(这里主要针对20150425这一天的日志进行分区)

hive>ALTER TABLE techbbs ADD PARTITION(logdate='2015_04_25') LOCATION '/project/techbbs/cleaned/2015_04_25';  

1.2 使用HQL统计关键指标

  (1)关键指标之一:PV量

  页面浏览量即为PV(Page View),是指所有用户浏览页面的总和,一个独立用户每打开一个页面就被记录1 次。这里,我们只需要统计日志中的记录个数即可,HQL代码如下:

hive>CREATE TABLE techbbs_pv_2015_04_25 AS SELECT COUNT(1) AS PV FROM techbbs WHERE logdate='2015_04_25';

  

  (2)关键指标之二:注册用户数

  该论坛的用户注册页面为member.php,而当用户点击注册时请求的又是member.php?mod=register的url。因此,这里我们只需要统计出日志中访问的URL是member.php?mod=register的即可,HQL代码如下:

hive>CREATE TABLE techbbs_reguser_2015_04_25 AS SELECT COUNT(1) AS REGUSER FROM techbbs WHERE logdate='2015_04_25' AND INSTR(url,'member.php?mod=register')>0;  

 

  (3)关键指标之三:独立IP数

  一天之内,访问网站的不同独立 IP 个数加和。其中同一IP无论访问了几个页面,独立IP 数均为1。因此,这里我们只需要统计日志中处理的独立IP数即可,在SQL中我们可以通过DISTINCT关键字,在HQL中也是通过这个关键字:

hive>CREATE TABLE techbbs_ip_2015_04_25 AS SELECT COUNT(DISTINCT ip) AS IP FROM techbbs WHERE logdate='2015_04_25';

  

  (4)关键指标之四:跳出用户数

  只浏览了一个页面便离开了网站的访问次数,即只浏览了一个页面便不再访问的访问次数。这里,我们可以通过用户的IP进行分组,如果分组后的记录数只有一条,那么即为跳出用户。将这些用户的数量相加,就得出了跳出用户数,HQL代码如下:

hive>CREATE TABLE techbbs_jumper_2015_04_25 AS SELECT COUNT(1) AS jumper FROM (SELECT COUNT(ip) AS times FROM techbbs WHERE logdate='2015_04_25' GROUP BY ip HAVING times=1)e;

  

PS:跳出率是指只浏览了一个页面便离开了网站的访问次数占总的访问次数的百分比,即只浏览了一个页面的访问次数 / 全部的访问次数汇总。这里,我们可以将这里得出的跳出用户数/PV数即可得到跳出率。

  (5)将所有关键指标放入一张汇总表中以便于通过Sqoop导出到MySQL

  为了方便通过Sqoop统一导出到MySQL,这里我们借助一张汇总表将刚刚统计到的结果整合起来,通过表连接结合,HQL代码如下:

hive>CREATE TABLE techbbs_2013_05_30 AS SELECT '2013_05_30', a.pv, b.reguser, c.ip, d.jumper FROM techbbs_pv_2013_05_30 a JOIN techbbs_reguser_2013_05_30 b ON 1=1 JOIN techbbs_ip_2013_05_30 c ON 1=1 JOIN techbbs_jumper_2013_05_30 d ON 1=1;

  

二、使用Sqoop导入到MySQL

2.1 准备工作:在MySQL中创建结果汇总表

  (1)Step1:创建一个新数据库:techbbs

mysql> create database techbbs;
Query OK, 1 row affected (0.00 sec)

  (2)Step2:创建一张新数据表:techbbs_logs_stat

mysql> create table techbbs_logs_stat(
-> logdate varchar(10) primary key,
-> pv int,
-> reguser int,
-> ip int,
-> jumper int);
Query OK, 0 rows affected (0.01 sec)

2.2 导入操作:通过export命令

  (1)Step1:编写导出命令

sqoop export --connect jdbc:mysql://hadoop-master:3306/techbbs --username root --password root --table techbbs_logs_stat --fields-terminated-by '01' --export-dir '/hive/techbbs_2013_05_30'

  这里的--export-dir是指定的hive目录下的汇总表所在位置,我这里是/hive/techbbs_2015_04_25。

  (2)Step2:查看导出结果

  

三、改写Linux定时任务

  刚刚我们已经借助Hive进行了关键指标的统计分析,并且借助Sqoop导出到了MySQL,后续可以借助JSP或者ASP.NET开发指标浏览界面供决策者进行浏览分析。但是刚刚这些操作都是我们自己手工操作的,我们需要实现自动化的统计分析并导出,于是我们改写前一篇中提到的定时任务脚本文件。

3.1 加入分区、统计与导出操作

重写techbbs_core.sh文件,内容如下,step4~step8为新增内容:

#!/bin/sh

......

#step4.alter hive table and then add partition

hive -e "ALTER TABLE techbbs ADD PARTITION(logdate='yesterday)LOCATION/project/techbbs/cleaned/yesterday′)LOCATION′/project/techbbs/cleaned/{yesterday}';"

#step5.create hive table everyday

hive -e "CREATE TABLE hmbbs_pv_yesterdayASSELECTCOUNT(1)ASPVFROMhmbbsWHERElogdate=yesterdayASSELECTCOUNT(1)ASPVFROMhmbbsWHERElogdate=′{yesterday}';"

hive -e "CREATE TABLE hmbbs_reguser_yesterdayASSELECTCOUNT(1)ASREGUSERFROMhmbbsWHERElogdate=yesterdayASSELECTCOUNT(1)ASREGUSERFROMhmbbsWHERElogdate=′{yesterday}' AND INSTR(url,'member.php?mod=register')>0;"

hive -e "CREATE TABLE hmbbs_ip_yesterdayASSELECTCOUNT(DISTINCTip)ASIPFROMhmbbsWHERElogdate=yesterdayASSELECTCOUNT(DISTINCTip)ASIPFROMhmbbsWHERElogdate=′{yesterday}';"

hive -e "CREATE TABLE hmbbs_jumper_yesterdayASSELECTCOUNT(1)ASjumperFROM(SELECTCOUNT(ip)AStimesFROMhmbbsWHERElogdate=yesterdayASSELECTCOUNT(1)ASjumperFROM(SELECTCOUNT(ip)AStimesFROMhmbbsWHERElogdate=′{yesterday}' GROUP BY ip HAVING times=1) e;"

hive -e "CREATE TABLE hmbbs_yesterdayASSELECTyesterdayASSELECT′{yesterday}', a.pv, b.reguser, c.ip, d.jumper FROM hmbbs_pv_{yesterday} a JOIN hmbbs_reguser_{yesterday} a JOIN hmbbs_reguser_{yesterday} b ON 1=1 JOIN hmbbs_ip_{yesterday} c ON 1=1 JOIN hmbbs_jumper_{yesterday} c ON 1=1 JOIN hmbbs_jumper_{yesterday} d ON 1=1;"

#step6.delete hive tables

hive -e "drop table hmbbs_pv_${yesterday};"

hive -e "drop table hmbbs_reguser_${yesterday};"

hive -e "drop table hmbbs_ip_${yesterday};"

hive -e "drop table hmbbs_jumper_${yesterday};"

#step7.export to mysql

sqoop export --connect jdbc:mysql://hadoop-master:3306/techbbs --username root --password admin --table techbbs_logs_stat --fields-terminated-by '01' --export-dir '/hive/hmbbs_${yesterday}'

#step8.delete hive table

hive -e "drop table techbbs_${yesterday};"

3.2 分离日期获取操作

  (1)改写techbbs_core.sh脚本文件:

#!/bin/sh

#step1.get yesterday format string

#yesterday=`date --date='1 days ago' +%Y_%m_%d`

yesterday=$1

  这里将日期字符串作为参数传入,将该步骤转移到了其他脚本文件中;

  (2)新增techbbs_daily.sh脚本文件:

#!/bin/sh

yesterday=`date --date='1 days ago' +%Y_%m_%d`

hmbbs_core.sh $yesterday

  这里获取日期并作为参数传递给techbbs_core.sh文件;

  (3)改写crontab定时任务配置:crontab -e

* 1 * * * /usr/local/files/apache_logs/techbbs_daily.sh

  这里每天凌晨1点自动执行的就变为techbbs_daily.sh脚本文件了;从此,我们只需定期查看mysql数据库中的汇总结果表进行浏览即可;

3.3 初始化任务操作

  当一个网站已经生成了很多天的日志,而我们的日志分析系统却一直没上线,一直等到了某天才上线。这时,我们需要写一个初始化脚本任务,来对之前的每天的日志进行统计分析与导出结果。这里,我们新增一个techbbs_init.sh脚本文件,内容如下:

#!/bin/sh

#step1.create external table in hive

hive -e "CREATE EXTERNAL TABLE techbbs(ip string, atime string, url string) PARTITIONED BY (logdate string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LOCATION '/project/techbbs/cleaned';"

#step2.compute the days between start date and end date

s1=`date --date="$1"  +%s`

s2=`date +%s`

s3=((((((s2-$s1)/3600/24))

#step3.excute techbbs_core.sh $3 times

for ((i=$s3; i>0; i--))

do

  logdate=`date --date="$i days ago" +%Y_%m_%d`

  techbbs_core.sh $logdate

done

四、小结

通过三部分的介绍,该网站的日志分析工作基本完成,当然还有很多没有完成的东西,但是大体上的思路已经明了,后续的工作只需要在此基础上稍加分析即可完成。当然,我们还可以通过JSP或ASP.NET读取MySQL或HBase中的分析结果表来开发关键指标查询系统,供网站运营决策者进行查看和分析。

 

 

复制代码
import java.net.URI;

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.Locale;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

 

public class LogCleanJob extends Configured implements Tool {

 

    public static void main(String[] args) {

        Configuration conf = new Configuration();

        try {

            int res = ToolRunner.run(conf, new LogCleanJob(), args);

            System.exit(res);

        } catch (Exception e) {

            e.printStackTrace();

        }

    }

 

    @Override

    public int run(String[] args) throws Exception {

        final Job job = new Job(new Configuration(),

                LogCleanJob.class.getSimpleName());

        // 设置为可以打包运行

        job.setJarByClass(LogCleanJob.class);

        FileInputFormat.setInputPaths(job, args[0]);

        job.setMapperClass(MyMapper.class);

        job.setMapOutputKeyClass(LongWritable.class);

        job.setMapOutputValueClass(Text.class);

        job.setReducerClass(MyReducer.class);

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(NullWritable.class);

        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 清理已存在的输出文件

        FileSystem fs = FileSystem.get(new URI(args[0]), getConf());

        Path outPath = new Path(args[1]);

        if (fs.exists(outPath)) {

            fs.delete(outPath, true);

        }

        

        boolean success = job.waitForCompletion(true);

        if(success){

            System.out.println("Clean process success!");

        }

        else{

            System.out.println("Clean process failed!");

        }

        return 0;

    }

 

    static class MyMapper extends

            Mapper<LongWritable, Text, LongWritable, Text> {

        LogParser logParser = new LogParser();

        Text outputValue = new Text();

 

        protected void map(

                LongWritable key,

                Text value,

                org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, Text>.Context context)

                throws java.io.IOException, InterruptedException {

            final String[] parsed = logParser.parse(value.toString());

 

            // step1.过滤掉静态资源访问请求

            if (parsed[2].startsWith("GET /static/")

                    || parsed[2].startsWith("GET /uc_server")) {

                return;

            }

            // step2.过滤掉开头的指定字符串

            if (parsed[2].startsWith("GET /")) {

                parsed[2] = parsed[2].substring("GET /".length());

            } else if (parsed[2].startsWith("POST /")) {

                parsed[2] = parsed[2].substring("POST /".length());

            }

            // step3.过滤掉结尾的特定字符串

            if (parsed[2].endsWith(" HTTP/1.1")) {

                parsed[2] = parsed[2].substring(0, parsed[2].length()

                        - " HTTP/1.1".length());

            }

            // step4.只写入前三个记录类型项

            outputValue.set(parsed[0] + "	" + parsed[1] + "	" + parsed[2]);

            context.write(key, outputValue);

        }

    }

 

    static class MyReducer extends

            Reducer<LongWritable, Text, Text, NullWritable> {

        protected void reduce(

                LongWritable k2,

                java.lang.Iterable<Text> v2s,

                org.apache.hadoop.mapreduce.Reducer<LongWritable, Text, Text, NullWritable>.Context context)

                throws java.io.IOException, InterruptedException {

            for (Text v2 : v2s) {

                context.write(v2, NullWritable.get());

            }

        };

    }

 

    /*

     * 日志解析类

     */

    static class LogParser {

        public static final SimpleDateFormat FORMAT = new SimpleDateFormat(

                "d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);

        public static final SimpleDateFormat dateformat1 = new SimpleDateFormat(

                "yyyyMMddHHmmss");

 

        public static void main(String[] args) throws ParseException {

            final String S1 = "27.19.74.143 - - [30/May/2013:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127";

            LogParser parser = new LogParser();

            final String[] array = parser.parse(S1);

            System.out.println("样例数据: " + S1);

            System.out.format(

                    "解析结果:  ip=%s, time=%s, url=%s, status=%s, traffic=%s",

                    array[0], array[1], array[2], array[3], array[4]);

        }

 

        /**

         * 解析英文时间字符串

         *

         * @param string

         * @return

         * @throws ParseException

         */

        private Date parseDateFormat(String string) {

            Date parse = null;

            try {

                parse = FORMAT.parse(string);

            } catch (ParseException e) {

                e.printStackTrace();

            }

            return parse;

        }

 

        /**

         * 解析日志的行记录

         *

         * @param line

         * @return 数组含有5个元素,分别是ip、时间、url、状态、流量

         */

        public String[] parse(String line) {

            String ip = parseIP(line);

            String time = parseTime(line);

            String url = parseURL(line);

            String status = parseStatus(line);

            String traffic = parseTraffic(line);

 

            return new String[] { ip, time, url, status, traffic };

        }

 

        private String parseTraffic(String line) {

            final String trim = line.substring(line.lastIndexOf(""") + 1)

                    .trim();

            String traffic = trim.split(" ")[1];

            return traffic;

        }

 

        private String parseStatus(String line) {

            final String trim = line.substring(line.lastIndexOf(""") + 1)

                    .trim();

            String status = trim.split(" ")[0];

            return status;

        }

 

        private String parseURL(String line) {

            final int first = line.indexOf(""");

            final int last = line.lastIndexOf(""");

            String url = line.substring(first + 1, last);

            return url;

        }

 

        private String parseTime(String line) {

            final int first = line.indexOf("[");

            final int last = line.indexOf("+0800]");

            String time = line.substring(first + 1, last).trim();

            Date date = parseDateFormat(time);

            return dateformat1.format(date);

        }

 

        private String parseIP(String line) {

            String ip = line.split("- -")[0].trim();

            return ip;

        }

    }

}
复制代码

 完整文档:

https://pan.baidu.com/s/1PymOnCZ1Ytv9BYKjZwNVfg

 

文档连接:

https://pan.baidu.com/s/1Eq85aWfSUXTCqk5EKo8zPQ

数据链接:

https://pan.baidu.com/s/1Y7qQPjBaAvLnnCQPFVvR4Q

1.数据处理 

扩展脚本 (年月日)

vim log-extend.sh

#! /bin/bash

#infile=/home/sogou.500w.utf8

infile=$1

#outfile=/home/sogou_log.txt

outfile=$2

awk -F ' ' '{print 0" "substr(0" "substr(1,0,4)" "substr(1,4,2)" "substr(1,4,2)" "substr(1,6,2)" "substr(1,8,2)}'1,8,2)}'infile > $outfile

[root@master ~]# bash log-extend.sh sogou.500w.utf8 sogou_log.txt

过滤脚本(过滤搜索为空)

Vim log-filter.sh

#!/bin/bash

#infile=/home/sogou_log.txt

infile=$1

#outfile=/home/sogou_log.txt.flt

outfile=$2

awk -F " " '{if(2 != "" &&2 != "" &&3 != "" && 2 != " " &&2 != " " &&3 != " ") print 0}'0}'infile > $outfile

[root@master ~]# bash log-filter.sh sogou_log.txt  sogou_log.txt.flt

       

              

  1. 上传文件到hdfs

基于HIve构建日志数据的数据仓库

创建数据库

hive> create database sogou;

使用数据库

Hive> use sogou;

创建扩展 4 个字段(年、月、日、小时)数据的外部表:

hive> CREATE EXTERNAL TABLE sogou_data(

ts string,

uid string,

keyword string,

rank int,

sorder int,

url string,

year int,

month int,

day int,

hour int)

    > ROW FORMAT DELIMITED

    > FIELDS TERMINATED BY ' '

    > STORED AS TEXTFILE;

OK

Time taken: 0.412 seconds

 

 

Hive表加载数据

load data inpath '/home/sogou_log.txt.flt' into table sogou_data;

 

 

 

创建带分区的表:

hive> CREATE EXTERNAL TABLE sogou_partitioned_data(

ts string,

uid string,

keyword string,

rank int,

sorder int,

url string)

    > PARTITIONED BY(year int,month int,day int,hour int)

    > ROW FORMAT DELIMITED

    > FIELDS TERMINATED BY ' '

    > STORED AS TEXTFILE;

 

 

 

 

 

设置动态分区

hive> set hive.exec.dynamic.partition.mode=nonstrict;

hive> INSERT OVERWRITE TABLE sogou_partitioned_data partition(year,month,day,hour) SELECT * FROM sogou_data;

查询测试

Hive> select * from sogou_data limit 10;

hive> select * from sogou_data limit 10;

hive> select * from sogou_data where uid='6961d0c97fe93701fc9c0d861d096cd9';

(1)查询总条数

hive> select count(*) from sogou_partitioned_data;

OK

5000000

(2)非空查询条数

hive> select count(*) from sogou_partitioned_data where keyword is not null and keyword!='';

5000000

Time taken: 28.606 seconds, Fetched: 1 row(s)

(3)无重复总条数

hive> select count(*) from(select count(*) as no_repeat_count from sogou_partitioned_data group by ts,uid,keyword,url having no_repeat_count=1) a;

OK

4999272

Time taken: 101.228 seconds, Fetched: 1 row(s)

(4)独立UID总数

hive> select count(distinct(uid)) from sogou_partitioned_data;

OK

1352664

Time taken: 44.639 seconds, Fetched: 1 row(s)

实现数据分析需求二:关键字分析

(1)查询频度排名(频度最高的前50词)

hive> select keyword,count(*)query_count from sogou_partitioned_data group by keyword order by query_count desc limit 50;

Total MapReduce CPU Time Spent: 1 minutes 4 seconds 510 msec

OK

百度 38441

baidu 18312

人体艺术 14475

4399小游戏 11438

qq空间 10317

优酷 10158

新亮剑 9654

馆陶县县长闫宁的父亲 9127

公安卖萌 8192

百度一下 你就知道 7505

百度一下 7104

4399 7041

魏特琳 6665

qq网名 6149

7k7k小游戏 5985

黑狐 5610

儿子与母亲不正当关系 5496

新浪微博 5369

李宇春体 5310

新疆暴徒被击毙图片 4997

hao123 4834

123 4829

4399洛克王国 4112

qq头像 4085

nba 4027

龙门飞甲 3917

qq个性签名 3880

张去死 3848

cf官网 3729

凰图腾 3632

快播 3423

金陵十三钗 3349

吞噬星空 3330

dnf官网 3303

武动乾坤 3232

新亮剑全集 3210

电影 3155

优酷网 3115

两次才处决美女罪犯 3106

电影天堂 3028

土豆网 2969

qq分组 2940

全国各省最低工资标准 2872

清代姚明 2784

youku 2783

争产案 2755

dnf 2686

12306 2682

身份证号码大全 2680

火影忍者 2604

Time taken: 119.195 seconds, Fetched: 50 row(s)

实现数据分析需求三:UID分析

(1)查询次数大于2次的用户总数

hive> select count(*) from(select count(*) as query_count  from sogou_partitioned_data group by uid having query_count > 2) a;

OK

546353

Time taken: 69.837 seconds, Fetched: 1 row(s)

(2)查询次数大于2次的用户占比

A:

hive> select count(*) from(select count(*) as query_count  from sogou_partitioned_data group by uid having query_count > 2) a;

OK

546353

Time taken: 69.837 seconds, Fetched: 1 row(s)

B:

hive> select count(distinct(uid)) from sogou_partitioned_data;

OK

1352664

A/B

hive> select 546353/1352664;

OK

0.40390887907122536

Time taken: 0.255 seconds, Fetched: 1 row(s)

(3) rank次数在10以内的点击次数占比(rank既是第四列的内容)

A:

hive> select count(*) from sogou_partitioned_data where rank < 11;

4999869

Time taken: 29.653 seconds, Fetched: 1 row(s)

B:

hive> select count(*) from sogou_partitioned_data;

5000000

A/B

hive> select 4999869/5000000;

OK

0.9999738

(4) 直接输入URL查询的比例

A:

hive> select count(*) from sogou_partitioned_data where keyword like '%www%';

OK

73979

B:

hive> select count(*) from sogou_partitioned_data;

OK

5000000

A/B

hive> select 73979/5000000;

OK

0.0147958

实现数据分析需求四:独立用户行为分析

(1)查询搜索过”仙剑奇侠传“的uid,并且次数大于3

hive> select uid,count(*) as cnt from sogou_partitioned_data where keyword='仙剑奇侠传' group by uid having cnt > 3;

653d48aa356d5111ac0e59f9fe736429 6

e11c6273e337c1d1032229f1b2321a75 5

Time taken: 30.732 seconds, Fetched: 2 row(s)

5.1查询总条数

QueryTotalNumber.java

package com.sogou;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class QueryTotalNumber extends Configured implements Tool {

public static class QueryTotalNumberMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

private Text okey=new Text("QueryTotalNumber");

private LongWritable ovalue=new LongWritable(1L);

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line=value.toString();

if(!"".equals(line)) {

context.write(okey, ovalue);

}

}

}

public static class QueryTotalNumberReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

private LongWritable ovalue=new LongWritable();

@Override

protected void reduce(Text key, Iterable<LongWritable> values, Context context)

throws IOException, InterruptedException {

long sum=0;

for(LongWritable value:values) {

sum +=value.get();

}

ovalue.set(sum);

context.write(key, ovalue);

}

}

@Override

public int run(String[] args) throws Exception {

Configuration conf=new Configuration();

        //远程调试必须加上

conf.set("fs.defaultFS", "hdfs://10.49.23.127:9000");

Job job=Job.getInstance(conf,"SogouLogCount");

job.setJarByClass(QueryTotalNumber.class);

FileInputFormat.addInputPath(job, new Path("/sougou/sogou_log.txt.flt"));

job.setMapperClass(QueryTotalNumberMapper.class);

job.setReducerClass(QueryTotalNumberReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(LongWritable.class);

FileOutputFormat.setOutputPath(job, new Path("/output/1_QueryTotalNumber"));

return job.waitForCompletion(true)? 0:1;

}

public static void main(String[] args) throws Exception {

int res=ToolRunner.run(new QueryTotalNumber(), args);

System.exit(res);

}

}

5.2非空查询条数

package com.sogou;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class NotNullQueryTotalNumber extends Configured implements Tool {

public static class NotNullQueryTotalNumberMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

private Text okey=new Text("NotNullQueryTotalNumber");

private LongWritable ovalue=new LongWritable(1L);

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line=value.toString();

String[] lineSplited=line.split(" ");

String keyword=lineSplited[2];

if((!"".equals(lineSplited) || lineSplited!=null)

&& (!"".equals(keyword) || keyword!=null)) {

context.write(okey, ovalue);

}

}

}

public static class NotNullQueryTotalNumberReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

private LongWritable ovalue=new LongWritable();

@Override

protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

long sum=0;

for(LongWritable value:values) {

sum +=value.get();

}

ovalue.set(sum);

context.write(key, ovalue);

}

}

@Override

public int run(String[] args) throws Exception {

Configuration conf=new Configuration();

conf.set("fs.defaultFS", "hdfs://10.49.23.127:9000");

Job job=Job.getInstance(conf);

job.setJarByClass(NotNullQueryTotalNumber.class);

FileInputFormat.addInputPath(job, new Path("/sougou/sogou_log.txt.flt"));

job.setMapperClass(NotNullQueryTotalNumberMapper.class);

job.setReducerClass(NotNullQueryTotalNumberReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(LongWritable.class);

FileOutputFormat.setOutputPath(job, new Path("/output/2_NotNullQueryTotalNumber"));

return job.waitForCompletion(true)? 0:1;

}

public static void main(String[] args) throws Exception {

int res=ToolRunner.run(new NotNullQueryTotalNumber(), args);

System.exit(res);

}

}

5.3无重复总条数

package com.sogou;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class NotRepeatQueryTotalNumber extends Configured implements Tool {

public static class NotRepeatQueryTotalNumberMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

private Text okey=new Text();

private LongWritable ovalue=new LongWritable(1L);

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

StringBuffer sb=new StringBuffer();

String line=value.toString();

String[] lineSplited=line.split(" ");

sb.append(lineSplited[0]).append("_")

.append(lineSplited[1]).append("_")

.append(lineSplited[2]).append("_")

.append(lineSplited[5]);

okey.set(sb.toString());

context.write(okey, ovalue);

}

}

public static class NotRepeatQueryTotalNumberReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

private LongWritable ovalue=new LongWritable();

@Override

protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

long sum=0;

for(LongWritable value:values) {

sum +=value.get();

}

ovalue.set(sum);

context.write(key, ovalue);

}

}

public static class NotRepeatQueryTotalNumberMapper2 extends Mapper<LongWritable, Text, Text, LongWritable>{

private Text okey=new Text("NotRepeatQueryTotalNumber");

private LongWritable ovalue=new LongWritable();

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String[] splited=value.toString().split(" ");

long count=Long.valueOf(splited[1]);

if(count==1) {

ovalue.set(count);

context.write(okey, ovalue);

}

}

}

public static class NotRepeatQueryTotalNumberReducer2 extends Reducer<Text, LongWritable, Text, LongWritable>{

private LongWritable ovalue=new LongWritable();

@Override

protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

long sum=0;

for(LongWritable value:values) {

sum +=value.get();

}

ovalue.set(sum);

context.write(key, ovalue);

}

}

@Override

public int run(String[] args) throws Exception {

Configuration conf=new Configuration();

conf.set("fs.defaultFS", "hdfs://10.49.23.127:9000");

Job job1=Job.getInstance(conf);

job1.setJarByClass(NotRepeatQueryTotalNumber.class);

FileInputFormat.addInputPath(job1, new Path("/sougou/sogou_log.txt.flt"));

job1.setMapperClass(NotRepeatQueryTotalNumberMapper.class);

job1.setReducerClass(NotRepeatQueryTotalNumberReducer.class);

job1.setOutputKeyClass(Text.class);

job1.setOutputValueClass(LongWritable.class);

FileOutputFormat.setOutputPath(job1, new Path("/outdata/sogou_notrepeat"));

job1.waitForCompletion(true);

Job job2=Job.getInstance(conf);

job2.setJarByClass(NotRepeatQueryTotalNumber.class);

FileInputFormat.addInputPath(job2, new Path("/outdata/sogou_notrepeat"));

job2.setMapperClass(NotRepeatQueryTotalNumberMapper2.class);

job2.setReducerClass(NotRepeatQueryTotalNumberReducer2.class);

job2.setOutputKeyClass(Text.class);

job2.setOutputValueClass(LongWritable.class);

FileOutputFormat.setOutputPath(job2, new Path("/output/3_NotRepeatQueryTotalNumber"));

return job2.waitForCompletion(true)? 0:1;

}

public static void main(String[] args) throws Exception {

int res=ToolRunner.run(new NotRepeatQueryTotalNumber(), args);

System.exit(res);

}

}

 

5.4独立UID总数

package com.sogou;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class IndependentUID extends Configured implements Tool {

public static class IndependentUIDMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

private Text okey=new Text();

private LongWritable ovalue=new LongWritable(1L);

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line=value.toString();

String[] lineSplited=line.split(" ");

String uid=lineSplited[1];

if(!"".equals(uid) || uid!=null) {

okey.set(uid);

context.write(okey, ovalue);

}

}

}

public static class IndependentUIDReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

private LongWritable ovalue=new LongWritable();

@Override

protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

long sum=0;

for(LongWritable value:values) {

sum +=value.get();

}

ovalue.set(sum);

context.write(key, ovalue);

}

}

public static class IndependentUIDMapper2 extends Mapper<LongWritable, Text, Text, LongWritable>{

private Text okey=new Text("independentUID");

private LongWritable ovalue=new LongWritable(1L);

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line=value.toString();

String[] lineSplited=line.split(" ");

long count=Long.valueOf(lineSplited[1]);

if(count >=1) {

context.write(okey, ovalue);

}

}

}

public static class IndependentUIDReducer2 extends Reducer<Text, LongWritable, Text, LongWritable>{

private LongWritable ovalue=new LongWritable();

@Override

protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

long sum=0;

for(LongWritable value:values) {

sum +=value.get();

}

ovalue.set(sum);

context.write(key, ovalue);

}

}

@Override

public int run(String[] args) throws Exception {

Configuration conf=new Configuration();

conf.set("fs.defaultFS", "hdfs://10.49.23.127:9000");

Job job1=Job.getInstance(conf);

job1.setJarByClass(IndependentUID.class);

FileInputFormat.addInputPath(job1, new Path("/sougou/sogou_log.txt.flt"));

job1.setMapperClass(IndependentUIDMapper.class);

job1.setReducerClass(IndependentUIDReducer.class);

job1.setOutputKeyClass(Text.class);

job1.setOutputValueClass(LongWritable.class);

FileOutputFormat.setOutputPath(job1, new Path("/outdata/sogou_independentUID"));

job1.waitForCompletion(true);

Job job2=Job.getInstance(conf);

job2.setJarByClass(IndependentUID.class);

FileInputFormat.addInputPath(job2, new Path("/outdata/sogou_independentUID"));

job2.setMapperClass(IndependentUIDMapper2.class);

job2.setReducerClass(IndependentUIDReducer2.class);

job2.setOutputKeyClass(Text.class);

job2.setOutputValueClass(LongWritable.class);

FileOutputFormat.setOutputPath(job2, new Path("/output/4_IndependentUID"));

return job2.waitForCompletion(true)? 0:1;

}

public static void main(String[] args) throws Exception {

int res=ToolRunner.run(new IndependentUID(), args);

System.exit(res);

}

}

 

5.5查询频度排名(频度最高的前50词)

package com.sogou;

import java.io.IOException;

import java.util.Comparator;

import java.util.Map;

import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class QueryFreRankTop50 extends Configured implements Tool {

public static class QueryFreRankMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

private Text okey=new Text();

private LongWritable ovalue=new LongWritable(1L);

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line=value.toString();

String[] lineSplited=line.split(" ");

String keyword=lineSplited[2];

if(!"".equals(keyword) || keyword!=null) {

okey.set(keyword);

context.write(okey, ovalue);

}

}

}

public static class QueryFreRankReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

private LongWritable ovalue=new LongWritable();

@Override

protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

long sum=0;

for(LongWritable value:values) {

sum +=value.get();

}

ovalue.set(sum);

context.write(key, ovalue);

}

}

public static class Top50Mapper extends Mapper<LongWritable, Text, LongWritable, Text>{

private static final int K=50;

private TreeMap<Long, String> tm=new TreeMap<Long, String>();

private LongWritable okey=new LongWritable();

private Text ovalue=new Text();

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line=value.toString();

String[] lineSplited=line.split(" ");

String keyword=lineSplited[0];

long count=Long.valueOf(lineSplited[1].trim());

tm.put(count, keyword);

if(tm.size() > K) {

tm.remove(tm.firstKey());

}

}

@Override

protected void cleanup(Mapper<LongWritable, Text, LongWritable, Text>.Context context)

throws IOException, InterruptedException {

for(Map.Entry<Long,String> entry:tm.entrySet()) {

long count=entry.getKey();

String keyword=entry.getValue();

okey.set(count);

ovalue.set(keyword);

context.write(okey, ovalue);

}

}

}

public static class Top50Reducer extends Reducer<LongWritable, Text, Text, LongWritable>{

private LongWritable ovalue=new LongWritable();

private Text okey=new Text();

private static final int K=50;

private TreeMap<Long, String> tm=new TreeMap<Long, String>(new Comparator<Long>() {

@Override

public int compare(Long o1, Long o2) {

return o2.compareTo(o1);

}

});

@Override

protected void reduce(LongWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

for(Text value:values) {

tm.put(key.get(), value.toString());

if(tm.size() > K) {

tm.remove(tm.firstKey());

}

}

}

@Override

protected void cleanup(Reducer<LongWritable, Text, Text, LongWritable>.Context context)

throws IOException, InterruptedException {

for(Map.Entry<Long, String> entry:tm.entrySet()) {

String keyword=entry.getValue();

long count=entry.getKey();

okey.set(keyword);

ovalue.set(count);

context.write(okey, ovalue);

}

}

}

@Override

public int run(String[] args) throws Exception {

Configuration conf=new Configuration();

conf.set("fs.defaultFS", "hdfs://10.49.23.127:9000");

Job job1=Job.getInstance(conf);

job1.setJarByClass(QueryFreRankTop50.class);

FileInputFormat.addInputPath(job1, new Path("/sougou/sogou_log.txt.flt"));

job1.setMapperClass(QueryFreRankMapper.class);

job1.setReducerClass(QueryFreRankReducer.class);

job1.setOutputKeyClass(Text.class);

job1.setOutputValueClass(LongWritable.class);

FileOutputFormat.setOutputPath(job1, new Path("/outdata/sogou_queryFreRank"));

job1.waitForCompletion(true);

Job job2=Job.getInstance(conf);

job2.setJarByClass(QueryFreRankTop50.class);

FileInputFormat.addInputPath(job2, new Path("/outdata/sogou_queryFreRank"));

job2.setMapperClass(Top50Mapper.class);

job2.setMapOutputKeyClass(LongWritable.class);

job2.setMapOutputValueClass(Text.class);

job2.setReducerClass(Top50Reducer.class);

job2.setOutputKeyClass(Text.class);

job2.setOutputValueClass(LongWritable.class);

FileOutputFormat.setOutputPath(job2, new Path("/output/5_QueryFreRankTop50"));

return job2.waitForCompletion(true)? 0:1;

}

public static void main(String[] args) throws Exception {

int res=ToolRunner.run(new QueryFreRankTop50(), args);

System.exit(res);

}

}

5.6查询次数大于2次的用户总数

package com.sogou;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class QueriesGreaterThan2 extends Configured implements Tool {

public static class NumQueGreTwoMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

private Text okey=new Text();

private LongWritable ovalue=new LongWritable(1);

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line=value.toString();

String[] lineSplited=line.split(" ");

String uid=lineSplited[1];

if(uid != null || !"".equals(uid)) {

okey.set(uid);

context.write(okey, ovalue);

}

}

}

public static class NumQueGreTwoReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

private LongWritable ovalue=new LongWritable();

@Override

protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

long sum=0;

for(LongWritable value:values) {

sum +=value.get();

}

if(sum >2) {

ovalue.set(sum);

context.write(key, ovalue);

}

}

}

public static class NumQueGreTwoToOneMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

private Text okey=new Text("NumQueGreTwo");

private LongWritable ovalue=new LongWritable(1);

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line=value.toString();

String[] lineSplited=line.split(" ");

long count=Long.valueOf(lineSplited[1]);

if(count > 2) {

context.write(okey, ovalue);

}

}

}

public static class NumQueGreTwoToOneReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

private LongWritable ovalue=new LongWritable();

@Override

protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

long sum =0;

for(LongWritable value:values) {

sum +=value.get();

}

ovalue.set(sum);

context.write(key, ovalue);

}

}

@Override

public int run(String[] args) throws Exception {

Configuration conf=new Configuration();

conf.set("fs.defaultFS", "hdfs://10.49.23.127:9000");

Job job1=Job.getInstance(conf);

job1.setJarByClass(QueriesGreaterThan2.class);

FileInputFormat.addInputPath(job1, new Path("/sougou/sogou_log.txt.flt"));

job1.setMapperClass(NumQueGreTwoMapper.class);

job1.setReducerClass(NumQueGreTwoReducer.class);

job1.setOutputKeyClass(Text.class);

job1.setOutputValueClass(LongWritable.class);

FileOutputFormat.setOutputPath(job1, new Path("/outdata/sogou_numQueGreTwo"));

job1.waitForCompletion(true);

Job job2=Job.getInstance(conf);

job2.setJarByClass(QueriesGreaterThan2.class);

FileInputFormat.addInputPath(job2, new Path("/outdata/sogou_numQueGreTwo"));

job2.setMapperClass(NumQueGreTwoToOneMapper.class);

job2.setReducerClass(NumQueGreTwoToOneReducer.class);

job2.setOutputKeyClass(Text.class);

job2.setOutputValueClass(LongWritable.class);

FileOutputFormat.setOutputPath(job2, new Path("/output/6_QueriesGreaterThan2"));

return job2.waitForCompletion(true)? 0:1;

}

public static void main(String[] args) throws Exception {

int res=ToolRunner.run(new QueriesGreaterThan2(), args);

System.exit(res);

}

}

5.7查询次数大于2次的用户占比

package com.sogou;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

//import org.apache.hadoop.mapreduce.Partitioner;

import org.apache.hadoop.mapreduce.Reducer;

//import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class RatioOfQueriesGreaterThan2 extends Configured implements Tool {

public static class UserDutyThanTwoMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

private Text okey=new Text("userDutyThanTwn");

private LongWritable ovalue=new LongWritable();

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line=value.toString();

String[] lineSplited=line.split(" ");

long count=Long.parseLong(lineSplited[1]);

ovalue.set(count);

context.write(okey, ovalue);

}

}

public static class UserDutyThanTwoReducere extends Reducer<Text, LongWritable, Text, DoubleWritable>{

private Text okey=new Text("userDutyThanTwn");

private DoubleWritable percent=new DoubleWritable();

@Override

protected void reduce(Text key, Iterable<LongWritable> values, Context context)

throws IOException, InterruptedException {

StringBuffer buffer=new StringBuffer();

for(LongWritable value:values) {

buffer.append(value).append(",");

}

String[] moleculeOrDenominator=buffer.toString().split(",");

double a=Double.valueOf(moleculeOrDenominator[0]);

double b=Double.valueOf(moleculeOrDenominator[1]);

double per=0.0;

if(a<=b) {

per=a/b;

}else {

per=b/a;

}

percent.set(per);

context.write(okey, percent);

}

}

@Override

public int run(String[] args) throws Exception {

Configuration conf=new Configuration();

conf.set("fs.defaultFS", "hdfs://10.49.23.127:9000");

Job job1=Job.getInstance(conf);

job1.setJarByClass(RatioOfQueriesGreaterThan2.class);

MultipleInputs.addInputPath(job1, new Path("/output/4_IndependentUID"),

TextInputFormat.class, UserDutyThanTwoMapper.class);

MultipleInputs.addInputPath(job1, new Path("/output/6_QueriesGreaterThan2"),

TextInputFormat.class, UserDutyThanTwoMapper.class);

job1.setMapOutputKeyClass(Text.class);

job1.setMapOutputValueClass(LongWritable.class);

job1.setReducerClass(UserDutyThanTwoReducere.class);

job1.setOutputKeyClass(Text.class);

job1.setOutputValueClass(DoubleWritable.class);

FileOutputFormat.setOutputPath(job1, new Path("/output/7_RatioOfQueriesGreaterThan2"));

return job1.waitForCompletion(true)? 0 : 1;

}

public static void main(String[] args) throws Exception {

int res=ToolRunner.run(new RatioOfQueriesGreaterThan2(), args);

System.exit(res);

}

}

5.8rank次数在10以内的点击次数占比(rank既是第四列的内容)

package com.sogou;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class RatioOfClickTimesInTen extends Configured implements Tool {

public static class NumberOfLessTenMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

private Text okey=new Text("numberOfRankTen");

private LongWritable ovalue=new LongWritable(1L);

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line=value.toString();

String[] lineSplited=line.split(" ");

int rank=Integer.parseInt(lineSplited[3]);

if(rank < 11) {

context.write(okey, ovalue);

}

}

}

public static class NumberOfRankTenMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

private Text okey=new Text("numberOfRankTen");

private LongWritable ovalue=new LongWritable(1L);

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line=value.toString();

String[] lineSplited=line.split(" ");

int rank=Integer.parseInt(lineSplited[3]);

if(rank >= 0) {

context.write(okey, ovalue);

}

}

}

public static class NumberOfRankTenReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

private LongWritable ovalue=new LongWritable();

@Override

protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

long sum=0;

for(LongWritable value:values) {

sum +=value.get();

}

ovalue.set(sum);

context.write(key, ovalue);

}

}

public static class UserDutyThanTwoMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

private Text okey=new Text();

private LongWritable ovalue=new LongWritable();

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line=value.toString();

String[] lineSplited=line.split(" ");

String word=lineSplited[0];

long count=Long.parseLong(lineSplited[1]);

okey.set(word);

ovalue.set(count);

context.write(okey, ovalue);

}

}

public static class UserDutyThanTwoReducere extends Reducer<Text, LongWritable, Text, DoubleWritable>{

private DoubleWritable percent=new DoubleWritable();

@Override

protected void reduce(Text key, Iterable<LongWritable> values, Context context)

throws IOException, InterruptedException {

StringBuffer buffer=new StringBuffer();

for(LongWritable value:values) {

buffer.append(value).append(",");

}

String[] moleculeOrDenominator=buffer.toString().split(",");

double a=Double.valueOf(moleculeOrDenominator[0]);

double b=Double.valueOf(moleculeOrDenominator[1]);

double per=0.0;

if(a<=b) {

per=a/b;

}else {

per=b/a;

}

percent.set(per);

context.write(key, percent);

}

}

@Override

public int run(String[] args) throws Exception {

Configuration conf=new Configuration();

conf.set("fs.defaultFS", "hdfs://10.49.23.127:9000");

Job job1=Job.getInstance(conf);

job1.setJarByClass(RatioOfClickTimesInTen.class);

FileInputFormat.addInputPath(job1, new Path("/sougou/sogou_log.txt.flt"));

job1.setMapperClass(NumberOfLessTenMapper.class);

job1.setReducerClass(NumberOfRankTenReducer.class);

job1.setOutputKeyClass(Text.class);

job1.setOutputValueClass(LongWritable.class);

FileOutputFormat.setOutputPath(job1, new Path("/outdata/sogou_numberOfLessTen"));

job1.waitForCompletion(true);

Job job2=Job.getInstance(conf);

job2.setJarByClass(RatioOfClickTimesInTen.class);

FileInputFormat.addInputPath(job2, new Path("/sougou/sogou_log.txt.flt"));

job2.setMapperClass(NumberOfRankTenMapper.class);

job2.setReducerClass(NumberOfRankTenReducer.class);

job2.setOutputKeyClass(Text.class);

job2.setOutputValueClass(LongWritable.class);

FileOutputFormat.setOutputPath(job2, new Path("/outdata/sogou_numberOfRankTen"));

job2.waitForCompletion(true);

Job job3=Job.getInstance(conf);

job3.setJarByClass(RatioOfClickTimesInTen.class);

MultipleInputs.addInputPath(job3, new Path("/outdata/sogou_numberOfLessTen"),

TextInputFormat.class, UserDutyThanTwoMapper.class);

MultipleInputs.addInputPath(job3, new Path("/outdata/sogou_numberOfRankTen"),

TextInputFormat.class, UserDutyThanTwoMapper.class);

job3.setMapOutputKeyClass(Text.class);

job3.setMapOutputValueClass(LongWritable.class);

job3.setReducerClass(UserDutyThanTwoReducere.class);

job3.setOutputKeyClass(Text.class);

job3.setOutputValueClass(DoubleWritable.class);

FileOutputFormat.setOutputPath(job3, new Path("/output/8_RatioOfClickTimesInTen"));

return job3.waitForCompletion(true)? 0 : 1;

}

public static void main(String[] args) throws Exception {

int res=ToolRunner.run(new RatioOfClickTimesInTen(), args);

System.exit(res);

}

}

5.9直接输入URL查询的比例

package com.sogou;

import java.io.IOException;

import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.DoubleWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class RatioOfDirectInputURL extends Configured implements Tool {

public static class RatioOfDirectInputURLMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

private Text okey=new Text("SubInputURLPerMapper");

private LongWritable ovalue=new LongWritable(1L);

String pattern=".*www.*";

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line=value.toString();

String[] lineSplited=line.split(" ");

String keyword=lineSplited[2];

if(Pattern.matches(pattern, keyword)) {

context.write(okey, ovalue);

}

}

}

public static class RatioOfDirectInputURLReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

private LongWritable ovalue=new LongWritable();

@Override

protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

long sum=0;

for(LongWritable value:values) {

sum +=value.get();

}

ovalue.set(sum);

context.write(key, ovalue);

}

}

public static class UserDutyThanTwoMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

private Text okey=new Text("subInputURLPer");

private LongWritable ovalue=new LongWritable();

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line=value.toString();

String[] lineSplited=line.split(" ");

@SuppressWarnings("unused")

String word=lineSplited[0];

long count=Long.parseLong(lineSplited[1]);

ovalue.set(count);

context.write(okey, ovalue);

}

}

public static class UserDutyThanTwoReducere extends Reducer<Text, LongWritable, Text, DoubleWritable>{

private DoubleWritable percent=new DoubleWritable();

@Override

protected void reduce(Text key, Iterable<LongWritable> values, Context context)

throws IOException, InterruptedException {

StringBuffer buffer=new StringBuffer();

for(LongWritable value:values) {

buffer.append(value).append(",");

}

String[] moleculeOrDenominator=buffer.toString().split(",");

double a=Double.valueOf(moleculeOrDenominator[0]);

double b=Double.valueOf(moleculeOrDenominator[1]);

double per=0.0;

if(a<=b) {

per=a/b;

}else {

per=b/a;

}

percent.set(per);

context.write(key, percent);

}

}

@Override

public int run(String[] args) throws Exception {

Configuration conf=new Configuration();

conf.set("fs.defaultFS", "hdfs://10.49.23.127:9000");

Job job1=Job.getInstance(conf);

job1.setJarByClass(RatioOfDirectInputURL.class);

FileInputFormat.addInputPath(job1, new Path("/sougou/sogou_log.txt.flt"));

job1.setMapperClass(RatioOfDirectInputURLMapper.class);

job1.setReducerClass(RatioOfDirectInputURLReducer.class);

job1.setOutputKeyClass(Text.class);

job1.setOutputValueClass(LongWritable.class);

FileOutputFormat.setOutputPath(job1, new Path("/outdata/sogou_subInputURLPer"));

job1.waitForCompletion(true);

Job job2=Job.getInstance(conf);

job2.setJarByClass(RatioOfDirectInputURL.class);

MultipleInputs.addInputPath(job2, new Path("/outdata/sogou_subInputURLPer"),

TextInputFormat.class, UserDutyThanTwoMapper.class);

MultipleInputs.addInputPath(job2, new Path("/outdata/sogou_numberOfRankTen"),

TextInputFormat.class, UserDutyThanTwoMapper.class);

job2.setMapOutputKeyClass(Text.class);

job2.setMapOutputValueClass(LongWritable.class);

job2.setReducerClass(UserDutyThanTwoReducere.class);

job2.setOutputKeyClass(Text.class);

job2.setOutputValueClass(DoubleWritable.class);

FileOutputFormat.setOutputPath(job2, new Path("/output/9_RatioOfDirectInputURL"));

return job2.waitForCompletion(true)? 0 : 1;

}

public static void main(String[] args) throws Exception {

int res=ToolRunner.run(new RatioOfDirectInputURL(), args);

System.exit(res);

}

}

5.10查询搜索过”仙剑奇侠传“的uid,并且次数大于3

package com.sogou;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class QuerySearch extends Configured implements Tool {

public static class QuerySearchMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

private Text okey=new Text();

private LongWritable ovalue=new LongWritable(1L);

@Override

protected void map(LongWritable key, Text value, Context context)

throws IOException, InterruptedException {

String line=value.toString();

String[] lineSplited=line.split(" ");

String uid=lineSplited[1];

String keyword=lineSplited[2];

if(keyword.equals("仙剑奇侠传")) {

String uid_keyword=uid+"_"+keyword;

okey.set(uid_keyword);

context.write(okey, ovalue);

}

}

}

public static class QuerySearchReducer extends Reducer<Text, LongWritable, Text, LongWritable>{

private LongWritable ovalue=new LongWritable();

@Override

protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

long sum =0;

for(LongWritable value:values) {

sum +=value.get();

}

if(sum > 3) {

ovalue.set(sum);

context.write(key, ovalue);

}

}

}

@Override

public int run(String[] args) throws Exception {

Configuration conf=new Configuration();

conf.set("fs.defaultFS", "hdfs://10.49.23.127:9000");

Job job=Job.getInstance(conf);

job.setJarByClass(QuerySearch.class);

FileInputFormat.addInputPath(job, new Path("/sougou/sogou_log.txt.flt"));

job.setMapperClass(QuerySearchMapper.class);

job.setReducerClass(QuerySearchReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(LongWritable.class);

FileOutputFormat.setOutputPath(job, new Path("/output/10_QuerySearch"));

return job.waitForCompletion(true)? 0:1;

}

public static void main(String[] args) throws Exception {

int res=ToolRunner.run(new QuerySearch(), args);

System.exit(res);

}

}

6.生成的文件通过Java API方式导入到HBase(一张表)。

package com.sogou;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.Mutation;

import org.apache.hadoop.hbase.client.Put;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;

import org.apache.hadoop.hbase.mapreduce.TableReducer;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

public class Hbase_Import {

// reduce输出的表名

    // private static String tableName = "sogou_data_analysis_results_table";

private static String tableName = "sogou_data";

// 初始化连接

static Configuration conf = null;

static {

conf = HBaseConfiguration.create();

conf.set("hbase.rootdir", "hdfs://10.49.23.127:9000/hbase");

conf.set("hbase.master", "hdfs://10.49.23.127:60000");

conf.set("hbase.zookeeper.property.clientPort", "2181");

conf.set("hbase.zookeeper.quorum", "10.49.23.127,10.49.23.134,10.49.23.129");

conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);

// conf.set("dfs.socket.timeout", "180000");

}

public static class BatchMapper extends

Mapper<LongWritable, Text, LongWritable, Text> {

protected void map(LongWritable key, Text value,

Mapper<LongWritable, Text, LongWritable, Text>.Context context)

throws IOException, InterruptedException {

String line = value.toString();

Text v2s = new Text();

v2s.set(line);

context.write(key, v2s);

}

}

public static class BatchReducer extends

TableReducer<LongWritable, Text, NullWritable> {

private String family = "info";

@Override

protected void reduce(

LongWritable arg0,

Iterable<Text> v2s,

Reducer<LongWritable, Text, NullWritable, Mutation>.Context context)

throws IOException, InterruptedException {

for (Text v2 : v2s) {

String[] splited = v2.toString().split(" ");

String rowKey = splited[0];

Put put = new Put(rowKey.getBytes());

// put.addColumn(family.getBytes(), "raw".getBytes(), v2.toString().getBytes());

put.addColumn(Bytes.toBytes(family), Bytes.toBytes("raw"), Bytes.toBytes(v2.toString()));

context.write(NullWritable.get(), put);

}

// for (Text v2 : v2s) {

// String[] splited = v2.toString().split(" ");

// String rowKey = splited[0];

// Put put = new Put(Bytes.toBytes("rowkey"));

//// put.addColumn(family.getBytes(), "raw".getBytes(), v2.toString().getBytes());

//

// put.addColumn(Bytes.toString(family), Bytes.toBytes("raw"), Bytes.toBytes(v2.toString()));

// context.write(NullWritable.get(), put);

// }

}

public static void imputil(String str) throws IOException, ClassNotFoundException,

InterruptedException {

Job job = Job.getInstance(conf, Hbase_Import.class.getSimpleName());

TableMapReduceUtil.addDependencyJars(job);

job.setJarByClass(Hbase_Import.class);

FileInputFormat.setInputPaths(job,str);

job.setInputFormatClass(TextInputFormat.class);

job.setMapperClass(BatchMapper.class);

job.setMapOutputKeyClass(LongWritable.class);

job.setMapOutputValueClass(Text.class);

job.setReducerClass(BatchReducer.class);

job.setOutputFormatClass(TableOutputFormat.class);

job.waitForCompletion(true);

}

public static void main(String[] args) throws ClassNotFoundException, IOException, InterruptedException {

String[] str={

"hdfs://10.49.23.127:9000/output/1_QueryTotalNumber",

"hdfs://10.49.23.127:9000/output/2_NotNullQueryTotalNumber",

"hdfs://10.49.23.127:9000/output/3_NotRepeatQueryTotalNumber",

"hdfs://10.49.23.127:9000/output/4_IndependentUID",

"hdfs://10.49.23.127:9000/output/5_QueryFreRankTop50",

"hdfs://10.49.23.127:9000/output/6_QueriesGreaterThan2",

"hdfs://10.49.23.127:9000/output/7_RatioOfQueriesGreaterThan2",

"hdfs://10.49.23.127:9000/output/8_RatioOfClickTimesInTen",

"hdfs://10.49.23.127:9000/output/9_RatioOfDirectInputURL",

"hdfs://10.49.23.127:9000/output/10_QuerySearch"};

for (String stri:str){

imputil(stri);

}

}

 }

}

multipleinputs新旧jar包问题

https://stackoverflow.com/questions/26434790/multipleinputs-not-working-hadoop-2-5-0

Java编码问题

https://blog.csdn.net/u011597415/article/details/53506574

本文转载

https://www.cnblogs.com/mzc1997/p/9200486.html

 

原文地址:https://www.cnblogs.com/aibabel/p/11572782.html