网站流量日志数据分析系统1

1点击流数据模型 

1.1点击流概念 

点击流(Click Stream)是指用户在网站上持续访问的轨迹。这个概念更注重用户浏览网站的整个流程。用户对网站的每次访问包含了一系列的点击动作行为,这些点击行为数据就构成了点击流数据(Click Stream Data),它代表了用户浏览网站的整个流程。

点击流和网站日志是两个不同的概念。

点击流是从用户的角度出发,注重用户浏览网站的整个流程;

网站日志是面向整个站点,它包含了用户行为数据、服务器响应数据等众多日志信息,我们通过对网站日志的分析可以获得用户的点击流数据。

网站是由多个网页(Page)构成,当用户在访问多个网页时,网页与网页之间是靠Referrers参数来标识上级网页来源。由此,可以确定网页被依次访问的顺序,当然也可以通过时间来标识访问的次序。其次,用户对网站的每次访问,可视作是一次会话(Session),在网站日志中将会用不同的Sessionid来唯一标识每次会话。如果把 Page 视为“点”的话,那么我们可以很容易的把 Session 描绘成一条“线”,也就是用户的点击流数据轨迹曲线。

图:点击流概念模型 

.2点击流模型生成 

点击流数据在具体操作上是由散点状的点击日志数据梳理所得。点击数据在数据建模时存在两张模型表Pageviews和visits,例如: 

 

页面点击流模型 Pageviews 表 

Session

时间 

访问页面 URL

停留时长 

第几步 

S001 

2012-01-01 12:

31:12 

/a/....

30

1

S002

2012-01-01 12:

31:16 

/a/....

10

1

S002

2012-01-01 12:

31:26 

/b/....

10

2

S002

2012-01-01 12:

31:36

/e/....

30

3

S003

2012-01-01 15:

35:06

/a/....

30

点击流模型 Visits 表( session 聚集的页面访问信息)

Session

起始时间 

结束时间 

进 入页面 

离 开页面 

访问页面数 

IP

referal

S001

2012-01-01

12:1:12

2012-01-01

12:1:12

/a/...

/a/...

1

101.0.0.1

somesite.com

S002

2012-01-01

12:31:16

2012-01-01

12:35:06

/a/...

/e/...

3

201.0.0.2

-

S003

2012-01-01

12:35:42

2012-01-01

12:35:42

/c/...

/c/...

1

234.0.0.3

baidu.com

S004

2012-01-01

15:16:39

2012-01-01

15:19:23

/c/...

/e/...

3

101.0.0.1

google.com

…… 

…… 

…… 

…… 

…… 

…… 

…… 

…… 

2如何进行网站流量分析 

流量分析整体来说是一个内涵非常丰富的体系,整体过程是一个金字塔结构:

金字塔的顶部是网站的目标:投资回报率(ROI)。

2.1网站流量分析模型举例 

2.1.1、网站流量质量分析(流量分析)

流量对于每个网站来说都是很重要,但流量并不是越多越好,应该更加看重流量的质量,换句话来说就是流量可以为我们带来多少收入。

X 轴代表量,指网站获得的访问量。Y 轴代表质,指可以促进网站目标的事件次数(比如商品浏览、注册、购买等行为)。圆圈大小表示获得流量的成本。

BD 流量是指商务拓展流量。一般指的是互联网经过运营或者竞价排名等方式,从外部拉来的流量。比如电商网站在百度上花钱来竞价排名,产生的流量就是 BD 流量的一部分。

2.1.2、网站流量多维度细分(流量分析)

细分是指通过不同维度对指标进行分割,查看同一个指标在不同维度下的表现,进而找出有问题的那部分指标,对这部分指标进行优化。

2.1.3、网站内容及导航分析(内容分析)

对于所有网站来说,页面都可以被划分为三个类别:导航页、功能页、内容页

导航页的目的是引导访问者找到信息,功能页的目的是帮助访问者完成特定任务,内容页的目的是向访问者展示信息并帮助访问者进行决策。

首页和列表页都是典型的导航页,站内搜索页面、注册表单页面和购物车页面都是典型的功能页,而产品详情页、新闻和文章页都是典型的内容页。

比如从内容导航分析中,以下两类行为就是网站运营者不希望看到的行为:

第一个问题:访问者从导航页(首页)还没有看到内容页面之前就从导航页离开网站,需要分析导航页造成访问者中途离开的原因。

第二个问题:访问者从导航页进入内容页后,又返回到导航页,说明需要分

析内容页的最初设计,并考虑中内容页提供交叉的信息推荐。

2.1.4、网站转化以及漏斗分析(转化分析)

所谓转化,即网站业务流程中的一个封闭渠道,引导用户按照流程最终实现业务目标(比如商品成交);而漏斗模型则是指进入渠道的用户在各环节递进过程中逐渐流失的形象描述;

对于转化渠道,主要进行两部分的分析:

访问者的流失和迷失

阻力的流失

造成流失的原因很多,如:不恰当的商品或活动推荐对支付环节中专业名词的解释、帮助信息等内容不当

迷失

造成迷失的主要原因是转化流量设计不合理,访问者在特定阶段得不到需要的信息,并且不能根据现有的信息作出决策,比如在线购买演唱会门票,直到支付也没看到在线选座的提示,这时候就很可能会产生迷失,返回查看。

总之,网站数据分析是一门内容非常丰富的学科,本课程中主要关注网站流量分析过程中的技术运用,更多关于网站数据分析的业务知识可学习文档首页推荐的资料。

2.2流量分析常见分类 

指标是网站分析的基础,用来记录和衡量访问者在网站自的各种行为。比如我们经常说的流量就是一个网站指标,它是用来衡量网站获得的访问量。在进行流量分析之前,我们先来了解一些常见的指标。

2.2.1、骨灰级指标 

IP:1 天之内,访问网站的不重复 IP 数。一天内相同 IP 地址多次访问网站只被计算 1 次。曾经 IP 指标可以用来表示用户访问身份,目前则更多的用来获取访问者的地理位置信息。

PageView 浏览量: 即通常说的 PV 值,用户每打开 1 个网站页面,记录 1 个

PV。用户多次打开同一页面 PV 累计多次。通俗解释就是页面被加载的总次数。

Unique PageView: 1 天之内,访问网站的不重复用户数(以浏览器 cookie 为依据),一天内同一访客多次访问网站只被计算 1 次。

2.2.2、基础级指标 

访问次数:访客从进入网站到离开网站的一系列活动记为一次访问,也称会话(session),1 次访问(会话)可能包含多个 PV。

网站停留时间:访问者在网站上花费的时间。

页面停留时间:访问者在某个特定页面或某组网页上所花费的时间。

2.2.3、复合级指标 

人均浏览页数:平均每个独立访客产生的 PV。人均浏览页数=浏览次数/独立访客。体现网站对访客的吸引程度。

跳出率:指某一范围内单页访问次数或访问者与总访问次数的百分比。其中跳出指单页访问或访问者的次数,即在一次访问中访问者进入网站后只访问了一个页面就离开的数量。

退出率:指某一范围内退出的访问者与综合访问量的百分比。其中退出指访问者离开网站的次数,通常是基于某个范围的。

有了上述这些指标之后,就能结合业务进行各种不同角度的分类分析,主要是以下几大方面:

 

2.2.4基础分析(PV,IP,UV) 

趋势分析:根据选定的时段,提供网站流量数据,通过流量趋势变化形态,分析网站访客的访问规律、网站发展状况提供参考。

对比分析:根据选定的两个对比时段,提供网站流量在时间上的纵向对比报表,帮您发现网站发展状况、发展规律、流量变化率等。

当前在线:提供当前时刻站点上的访客量,以及最近 15 分钟流量、来源、受访、访客变化情况等,方便用户及时了解当前网站流量状况。

访问明细:提供最近 7 日的访客访问记录,可按每个 PV 或每次访问行为(访客的每次会话)显示,并可按照来源、搜索词等条件进行筛选。 通过访问明细,用户可以详细了解网站流量的累计过程,从而为用户快速找出流量变动原因提供最原始、最准确的依据。 

2.2.5来源分析

来源分类:提供不同来源形式(直接输入、搜索引擎、其他外部链接、站内来源)、不同来源项引入流量的比例情况。通过精确的量化数据,帮助用户分析什么类型的来路产生的流量多、效果好,进而合理优化推广方案。

搜索引擎:提供各搜索引擎以及搜索引擎子产品引入流量的比例情况。

搜索词:提供访客通过搜索引擎进入网站所使用的搜索词,以及各搜索词引入流量的特征和分布。帮助用户了解各搜索词引入流量的质量,进而了解访客的兴趣关注点、网站与访客兴趣点的匹配度,为优化 SEO(搜索引擎优化)方案及 SEM(搜索引擎营销)提词方案提供详细依据。

最近 7 日的访客搜索记录:可按每个 PV 或每次访问行为(访客的每次会话)显示,并可按照访客类型、地区等条件进行筛选。为您搜索引擎优化提供最详细的原始数据。

来路域名:提供具体来路域名引入流量的分布情况,并可按“社会化媒体”、“搜索引擎”、“邮箱”等网站类型对来源域名进行分类。 帮助用户了解哪类推广渠道产生的流量多、效果好,进而合理优化网站推广方案。

来路页面:提供具体来路页面引入流量的分布情况。 尤其对于通过流量置换、包广告位等方式从其他网站引入流量的用户,该功能可以方便、清晰地展现广告引入的流量及效果,为优化推广方案提供依据。

来源升降榜:提供开通统计后任意两日的 TOP10000 搜索词、来路域名引入流量的对比情况,并按照变化的剧烈程度提供排行榜。 用户可通过此功能快速找到哪些来路对网站流量的影响比较大,从而及时排查相应来路问题。 

2.2.6受访分析

受访域名:提供访客对网站中各个域名的访问情况。 一般情况下,网站不同域名提供的产品、内容各有差异,通过此功能用户可以了解不同内容的受欢迎程度以及网站运营成效。

受访页面:提供访客对网站中各个页面的访问情况。 站内入口页面为访客进入网站时浏览的第一个页面,如果入口页面的跳出率较高则需要关注并优化;站内出口页面为访客访问网站的最后一个页面,对于离开率较高的页面需要关注并优化。

受访升降榜:提供开通统计后任意两日的 TOP10000 受访页面的浏览情况对比,并按照变化的剧烈程度提供排行榜。 可通过此功能验证经过改版的页面是否有流量提升或哪些页面有巨大流量波动,从而及时排查相应问题。

热点图:记录访客在页面上的鼠标点击行为,通过颜色区分不同区域的点击热度;支持将一组页面设置为"关注范围",并可按来路细分点击热度。 通过访客在页面上的点击量统计,可以了解页面设计是否合理、广告位的安排能否获取更多佣金等。

用户视点:提供受访页面对页面上链接的其他站内页面的输出流量,并通过输出流量的高低绘制热度图,与热点图不同的是,所有记录都是实际打开了下一页面产生了浏览次数(PV)的数据,而不仅仅是拥有鼠标点击行为。

访问轨迹:提供观察焦点页面的上下游页面,了解访客从哪些途径进入页面,又流向了哪里。 通过上游页面列表比较出不同流量引入渠道的效果;通过下游页面列表了解用户的浏览习惯,哪些页面元素、内容更吸引访客点击。

2.2.7访客分析

地区运营商:提供各地区访客、各网络运营商访客的访问情况分布。 地方网站、下载站等与地域性、网络链路等结合较为紧密的网站,可以参考此功能数据,合理优化推广运营方案。

终端详情:提供网站访客所使用的浏览终端的配置情况。 参考此数据进行网页设计、开发,可更好地提高网站兼容性,以达到良好的用户交互体验。

新老访客:当日访客中,历史上第一次访问该网站的访客记为当日新访客;历史上已经访问过该网站的访客记为老访客。 新访客与老访客进入网站的途径和浏览行为往往存在差异。该功能可以辅助分析不同访客的行为习惯,针对不同访客优化网站,例如为制作新手导航提供数据支持等。

忠诚度:从访客一天内回访网站的次数(日访问频度)与访客上次访问网站的时间两个角度,分析访客对网站的访问粘性、忠诚度、吸引程度。 由于提升网站内容的更新频率、增强用户体验与用户价值可以有更高的忠诚度,因此该功能在网站内容更新及用户体验方面提供了重要参考。

活跃度:从访客单次访问浏览网站的时间与网页数两个角度,分析访客在网站上的活跃程度。 由于提升网站内容的质量与数量可以获得更高的活跃度,因此该功能是网站内容分析的关键指标之一。

2.2.8转化路径分析

转化定义:

访客在您的网站完成了某项您期望的活动,记为一次转化,如注册、下载、购买。

目标示例:

·获得用户目标:在线注册、创建账号等。

·咨询目标:咨询、留言、电话等。

·互动目标:视频播放、加入购物车、分享等。

·收入目标:在线订单、付款等。

路径分析:

根据设置的特定路线,监测某一流程的完成转化情况,算出每步的转换率和流失率数据,

如注册流程,购买流程等。

转化类型:

l 页面 

l 事件 

三、 整体技术流程及架构 

1数据处理流程 

网站流量日志数据分析是一个纯粹的数据分析项目,其整体流程基本上就是

依据数据的处理流程进行。有以下几个大的步骤:

数据采集

数据采集概念,目前行业会有两种解释:一是数据从无到有的过程(web服务器打印的日志、自定义采集的日志等)叫做数据采集;另一方面也有把通过使用Flume等工具把数据采集到指定位置的这个过程叫做数据采集。

关于具体含义要结合语境具体分析,明白语境中具体含义即可。

数据预处理

通过mapreduce程序对采集到的原始日志数据进行预处理,比如清洗,格式

整理,滤除脏数据等,并且梳理成点击流模型数据。

数据入库

将预处理之后的数据导入到HIVE仓库中相应的库和表中。

数据分析

项目的核心内容,即根据需求开发ETL分析语句,得出各种统计结果。

数据展现

将分析所得数据进行数据可视化,一般通过图表进行展示。

2、系统的架构 

相对于传统的BI数据处理,流程几乎差不多,但是因为是处理大数据,所以流程中各环节所使用的技术则跟传统BI完全不同:  

数据采集:定制开发采集程序,或使用开源框架Flume

数据预处理:定制开发mapreduce程序运行于hadoop集群数据仓库技术:基于hadoop之上的Hive

数据导出:基于hadoop的sqoop数据导入导出工具数据可视化:定制开发web程序(echarts)  

整个过程的流程调度:hadoop生态圈中的azkaban工具

其中,需要强调的是:系统的数据分析不是一次性的,而是按照一定的时间频率反复计算,因而整个处理链条中的各个环节需要按照一定的先后依赖关系紧密衔接,即涉及到大量任务单元的管理调度,所以,项目中需要添加一个任务调度模块。

3、数据展现 

数据展现的目的是将分析所得的数据进行可视化,以便运营决策人员能更方便地获取数据,更快更简单地理解数据。

市面上有许多开源的数据可视化软件、工具。比如Echarts.

四、 模块开发----数据采集 

1、需求 

在网站web流量日志分析这种场景中,对数据采集部分的可靠性、容错能力要求通常不会非常严苛,因此使用通用的 flume 日志采集框架完全可以满足需

求。

2Flume 日志采集系统 

2.1Flume 采集 

Flume 采集系统的搭建相对简单:

1、在服务器上部署 agent 节点,修改配置文件

2、启动 agent 节点,将采集到的数据汇聚到指定的 HDFS 目录中

3、针对nginx日志生成场景,如果通过flume(1.6)收集,无论是Spooling Directory Source和Exec Source均不能满足动态实时收集的需求,在当前flume1.7稳定版本中,提供了一个非常好用的TaildirSource,使用这个source,可以监控一个目录,并且使用正则表达式匹配该目录中的文件名进行实时收集。

核心配置如下: 

# Name the components on this agent 
a1.sources = r1 
a1.sinks = k1 
a1.channels = c1 

a1.sources = r1 
a1.sources.r1.type = TAILDIR 
a1.sources.r1.channels = c1 
a1.sources.r1.positionFile = /root/logs/taildir_position.json 
a1.sources.r1.filegroups = f1 f2 
a1.sources.r1.filegroups.f1 = /root/logs/example.log 
a1.sources.r1.filegroups.f2 = /root/logs/toupload/.*log.* 


# Describe the sink 
a1.sinks.k1.type = hdfs 
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/ 
a1.sinks.k1.hdfs.filePrefix = events- 
a1.sinks.k1.hdfs.round = true 
a1.sinks.k1.hdfs.roundValue = 10 
a1.sinks.k1.hdfs.roundUnit = minute 
a1.sinks.k1.hdfs.rollInterval = 3 
a1.sinks.k1.hdfs.rollSize = 20 
a1.sinks.k1.hdfs.rollCount = 5 
a1.sinks.k1.hdfs.batchSize = 1 
a1.sinks.k1.hdfs.useLocalTimeStamp = true 
#生成的文件类型,默认是 Sequencefile,可用 DataStream,则为普通文本 
a1.sinks.k1.hdfs.fileType = DataStream 
 
# Use a channel which buffers events in memory 
a1.channels.c1.type = memory 
a1.channels.c1.capacity = 1000 
a1.channels.c1.transactionCapacity = 100 
 
# Bind the source and sink to the channel 
a1.sources.r1.channels = c1 
a1.sinks.k1.channel = c1  

制作log命令:
## while true; do echo example... >> /root/logs/example.log; echo access... >> /root/logs/toupload/access.log.1;sleep 0.3;done

启动命令:
bin/flume-ng agent -c conf/ -f conf/kkkk.conf -n a1  -Dflume.root.logger=INFO,console 

filegroups:指定filegroups,可以有多个,以空格分隔;(TailSource可以同时监控 tail多个目录中的文件)

positionFile:配置检查点文件的路径,检查点文件会以json格式保存已经tail文件的位置,解决了断点不能续传的缺陷。

filegroups.<filegroupName>:配置每个filegroup的文件绝对路径,文件名可以用正则表达式匹配。

通过以上配置,就可以监控文件内容的增加和文件的增加。产生和所配置的文件名正则表达式不匹配的文件,则不会被tail。

2.2数据内容样例 

58.215.204.118 - - [18/Sep/2013:06:51:35 +0000] "GET /wp-includes/js/jquery/jquery.js?ver=1.10.2 HTTP/1.1" 304 0 "http://blog.fens.me/nodejs-socketio-chat/" "Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101 
Firefox/23.0" 

字段解析:

访客ip地址:   58.215.204.118

访客用户信息:  - -

请求时间:[18/Sep/2013:06:51:35 +0000]

请求方式:GET

请求的url:/wp-includes/js/jquery/jquery.js?ver=1.10.2

请求所用协议:HTTP/1.1

响应码:304

返回的数据流量:0

访客的来源urlhttp://blog.fens.me/nodejs-socketio-chat/ 

访客所用浏览器:Mozilla/5.0 (Windows NT 5.1; rv:23.0) Gecko/20100101

Firefox/23.0 

五、 模块开发----数据预处理 

1主要目的 

过滤“不合规”数据,清洗无意义的数据格式转换和规整根据后续的统计需求,过滤分离出各种不同主题(不同栏目 path)的基础数据。

2、实现方式 

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
 * 处理原始日志,过滤出真实pv请求 转换时间格式 对缺失字段填充默认值 对记录标记valid和invalid
 * 
 */

public class WeblogPreProcess {

    static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        // 用来存储网站url分类数据
        Set<String> pages = new HashSet<String>();
        Text k = new Text();
        NullWritable v = NullWritable.get();

        /**
         * 从外部配置文件中加载网站的有用url分类数据 存储到maptask的内存中,用来对日志数据进行过滤
         */
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            pages.add("/about");
            pages.add("/black-ip-list/");
            pages.add("/cassandra-clustor/");
            pages.add("/finance-rhive-repurchase/");
            pages.add("/hadoop-family-roadmap/");
            pages.add("/hadoop-hive-intro/");
            pages.add("/hadoop-zookeeper-intro/");
            pages.add("/hadoop-mahout-roadmap/");

        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String line = value.toString();
            WebLogBean webLogBean = WebLogParser.parser(line);
            if (webLogBean != null) {
                // 过滤
                WebLogParser.filtStaticResource(webLogBean, pages);
                /* if (!webLogBean.isValid()) return; */
                k.set(webLogBean.toString());
                context.write(k, v);
            }
        }

    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(WeblogPreProcess.class);

        job.setMapperClass(WeblogPreProcessMapper.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

//         FileInputFormat.setInputPaths(job, new Path(args[0]));
//         FileOutputFormat.setOutputPath(job, new Path(args[1]));
        FileInputFormat.setInputPaths(job, new Path("d:/weblog/input"));
        FileOutputFormat.setOutputPath(job, new Path("d:/weblog/output"));

        job.setNumReduceTasks(0);

        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);

    }

}
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;
import java.util.Set;



public class WebLogParser {

//194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
    public static WebLogBean parser(String line) {
        WebLogBean webLogBean = new WebLogBean();
        String[] arr = line.split(" ");
        if (arr.length > 11) {
            webLogBean.setRemote_addr(arr[0]);
            webLogBean.setRemote_user(arr[1]);
            
            String time_local = formatDate(arr[3].substring(1));
            if(null==time_local || "".equals(time_local)) time_local="-invalid_time-";
            webLogBean.setTime_local(time_local);
            
            webLogBean.setRequest(arr[6]);
            webLogBean.setStatus(arr[8]);
            webLogBean.setBody_bytes_sent(arr[9]);
            webLogBean.setHttp_referer(arr[10]);

            //如果useragent元素较多,拼接useragent
            if (arr.length > 12) {
                StringBuilder sb = new StringBuilder();
                for(int i=11;i<arr.length;i++){
                    sb.append(arr[i]);
                }
                webLogBean.setHttp_user_agent(sb.toString());
            } else {
                webLogBean.setHttp_user_agent(arr[11]);
            }

            if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP错误
                webLogBean.setValid(false);
            }
            
            if("-invalid_time-".equals(webLogBean.getTime_local())){
                webLogBean.setValid(false);
            }
        } else {
            webLogBean=null;
        }

        return webLogBean;
    }

    public static void filtStaticResource(WebLogBean bean, Set<String> pages) {
        if (!pages.contains(bean.getRequest())) {
            bean.setValid(false);
        }
    }
        //格式化时间方法
    public static String formatDate(String time_local) { // 18/Sep/2013:06:49:18
          SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
          SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
        try {
            return df2.format(df1.parse(time_local));//dfs2=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
        } catch (ParseException e) {
            return null;
        }

    }

}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class WebLogBean implements Writable {

    private boolean valid = true;// 判断数据是否合法
    private String remote_addr;// 记录客户端的ip地址
    private String remote_user;// 记录客户端用户名称,忽略属性"-"
    private String time_local;// 记录访问时间与时区
    private String request;// 记录请求的url与http协议
    private String status;// 记录请求状态;成功是200
    private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
    private String http_referer;// 用来记录从那个页面链接访问过来的
    private String http_user_agent;// 记录客户浏览器的相关信息

    
    public void set(boolean valid,String remote_addr, String remote_user, String time_local, String request, String status, String body_bytes_sent, String http_referer, String http_user_agent) {
        this.valid = valid;
        this.remote_addr = remote_addr;
        this.remote_user = remote_user;
        this.time_local = time_local;
        this.request = request;
        this.status = status;
        this.body_bytes_sent = body_bytes_sent;
        this.http_referer = http_referer;
        this.http_user_agent = http_user_agent;
    }

    public String getRemote_addr() {
        return remote_addr;
    }

    public void setRemote_addr(String remote_addr) {
        this.remote_addr = remote_addr;
    }

    public String getRemote_user() {
        return remote_user;
    }

    public void setRemote_user(String remote_user) {
        this.remote_user = remote_user;
    }

    public String getTime_local() {
        return this.time_local;
    }

    public void setTime_local(String time_local) {
        this.time_local = time_local;
    }

    public String getRequest() {
        return request;
    }

    public void setRequest(String request) {
        this.request = request;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public String getBody_bytes_sent() {
        return body_bytes_sent;
    }

    public void setBody_bytes_sent(String body_bytes_sent) {
        this.body_bytes_sent = body_bytes_sent;
    }

    public String getHttp_referer() {
        return http_referer;
    }

    public void setHttp_referer(String http_referer) {
        this.http_referer = http_referer;
    }

    public String getHttp_user_agent() {
        return http_user_agent;
    }

    public void setHttp_user_agent(String http_user_agent) {
        this.http_user_agent = http_user_agent;
    }

    public boolean isValid() {
        return valid;
    }

    public void setValid(boolean valid) {
        this.valid = valid;
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.valid);
        sb.append("01").append(this.getRemote_addr());
        sb.append("01").append(this.getRemote_user());
        sb.append("01").append(this.getTime_local());
        sb.append("01").append(this.getRequest());
        sb.append("01").append(this.getStatus());
        sb.append("01").append(this.getBody_bytes_sent());
        sb.append("01").append(this.getHttp_referer());
        sb.append("01").append(this.getHttp_user_agent());
        return sb.toString();
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.valid = in.readBoolean();
        this.remote_addr = in.readUTF();
        this.remote_user = in.readUTF();
        this.time_local = in.readUTF();
        this.request = in.readUTF();
        this.status = in.readUTF();
        this.body_bytes_sent = in.readUTF();
        this.http_referer = in.readUTF();
        this.http_user_agent = in.readUTF();

    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeBoolean(this.valid);
        out.writeUTF(null==remote_addr?"":remote_addr);
        out.writeUTF(null==remote_user?"":remote_user);
        out.writeUTF(null==time_local?"":time_local);
        out.writeUTF(null==request?"":request);
        out.writeUTF(null==status?"":status);
        out.writeUTF(null==body_bytes_sent?"":body_bytes_sent);
        out.writeUTF(null==http_referer?"":http_referer);
        out.writeUTF(null==http_user_agent?"":http_user_agent);

    }

}

3点击流模型数据梳理 

由于大量的指标统计从点击流模型中更容易得出,所以在预处理阶段,可以使用mr程序来生成点击流模型的数据。

3.1点击流模型 pageviews  

Pageviews 表模型数据生成, 详细见:ClickStreamPageView.java  

/**
 * 
 * 将清洗之后的日志梳理出点击流pageviews模型数据
 * 
 * 输入数据是清洗过后的结果数据
 * 
 * 区分出每一次会话,给每一次visit(session)增加了session-id(随机uuid)
 * 梳理出每一次会话中所访问的每个页面(请求时间,url,停留时长,以及该页面在这次session中的序号)
 * 保留referral_url,body_bytes_send,useragent
 * 
 * 
 * @author
 * 
 */
public class ClickStreamPageView {

    static class ClickStreamMapper extends Mapper<LongWritable, Text, Text, WebLogBean> {

        Text k = new Text();
        WebLogBean v = new WebLogBean();
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String line = value.toString();

            String[] fields = line.split("01");
            if (fields.length < 9) return;
            //将切分出来的各字段set到weblogbean中
            v.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]);
            //只有有效记录才进入后续处理
            if (v.isValid()) {
                    //此处用ip地址来标识用户
                k.set(v.getRemote_addr());
                context.write(k, v);
            }
        }
    }

    static class ClickStreamReducer extends Reducer<Text, WebLogBean, NullWritable, Text> {

        Text v = new Text();

        /*

输入:<ip,[weblogbean,weblogbean]
同一个ip的所有请求,按照时间先后顺序排序了

         */
        protected void reduce(Text key, Iterable<WebLogBean> values, Context context) throws IOException, InterruptedException {
            ArrayList<WebLogBean> requestList = new ArrayList<WebLogBean>();

            // 先将一个用户的所有访问记录中的时间拿出来排序
            try {
                for (WebLogBean bean : values) {
                    WebLogBean webLogBean = new WebLogBean();
                    try {
                        BeanUtils.copyProperties(webLogBean, bean);
                    } catch(Exception e) {
                        e.printStackTrace();
                    }
                    requestList.add(webLogBean);
                }
                //将bean按时间先后顺序排序 Arrays.sort()
                
                Collections.sort(requestList, new Comparator<WebLogBean>() { //[b,a,]    , c

                    @Override
                    public int compare(WebLogBean o1, WebLogBean o2) {
                        try {
                            Date d1 = toDate(o1.getTime_local());
                            Date d2 = toDate(o2.getTime_local());
                            
                            if (d1 == null || d2 == null)
                                return 0;
                            
                            return d1.compareTo(d2);
                        } catch (Exception e) {
                            e.printStackTrace();
                            return 0;
                        }
                    }

                });

                /**
                 * 以下逻辑为:从有序bean中分辨出各次visit,并对一次visit中所访问的page按顺序标号step
                 * 核心思想:
                 * 就是比较相邻两条记录中的时间差,如果时间差<30分钟,则该两条记录属于同一个session
                 * 否则,就属于不同的session
                 * 
                 */
                
                

                
                int step = 1;
                String session = UUID.randomUUID().toString();
                
                // 如果仅有1条数据,则直接输出
                if (1 == requestList.size()) {
                    WebLogBean bean = requestList.get(0);
                    // 设置默认停留时长为60s
                    v.set(session+"01"+key.toString()+"01"+bean.getRemote_user() + "01" + bean.getTime_local() + "01" + bean.getRequest() + "01" + step + "01" + (60) + "01" + bean.getHttp_referer() + "01" + bean.getHttp_user_agent() + "01" + bean.getBody_bytes_sent() + "01"
                            + bean.getStatus());
                    context.write(NullWritable.get(), v);
                    return;
                }
                
                
                
                for (int i = 0; i < requestList.size(); i++) {
                    
                    // 如果不止1条数据,则将第一条跳过不输出,遍历第二条时再输出
                    if (i == 0) {
                        continue;
                    }
                    
                    /*
                    beans集合
                    s1    false58.215.204.118-2013-09-18 06:51:35   0  
        ip1            s1    false58.215.204.118-2013-09-18 06:51:36   1
        
                    s2    false58.215.204.118-2013-09-18 07:51:36   2

                        */
                    
                    WebLogBean bean1 = requestList.get(i - 1);
                    WebLogBean bean2 = requestList.get(i);

                    // 求近两次时间差
                    long timeDiff = timeDiff(toDate(bean2.getTime_local()), toDate(bean1.getTime_local()));
                    // 如果本次-上次时间差<30分钟,则输出前一次的页面访问信息
                    
                    if (timeDiff < 30 * 60 * 1000) {
                        
                        v.set(session+"01"+key.toString()+"01"+bean1.getRemote_user() + "01" + bean1.getTime_local() + "01" + bean1.getRequest() + "01" + step + "01" + (timeDiff / 1000) + "01" + bean1.getHttp_referer() + "01"
                                + bean1.getHttp_user_agent() + "01" + bean1.getBody_bytes_sent() + "01" + bean1.getStatus());
                        context.write(NullWritable.get(), v);
                        step++;
                    } else {
                        
                        // 如果本次-上次时间差>30分钟,则输出前一次的页面访问信息且将step重置,以分隔为新的visit
                        v.set(session+"01"+key.toString()+"01"+bean1.getRemote_user() + "01" + bean1.getTime_local() + "01" + bean1.getRequest() + "01" + (step) + "01" + (60) + "01" + bean1.getHttp_referer() + "01"
                                + bean1.getHttp_user_agent() + "01" + bean1.getBody_bytes_sent() + "01" + bean1.getStatus());
                        context.write(NullWritable.get(), v);
                        // 输出完上一条之后,重置step编号
                        step = 1;
                        session = UUID.randomUUID().toString();
                    }

                    // 如果此次遍历的是最后一条,则将本条直接输出
                    if (i == requestList.size() - 1) {
                        // 设置默认停留市场为60s
                        v.set(session+"01"+key.toString()+"01"+bean2.getRemote_user() + "01" + bean2.getTime_local() + "01" + bean2.getRequest() + "01" + step + "01" + (60) + "01" + bean2.getHttp_referer() + "01" + bean2.getHttp_user_agent() + "01" + bean2.getBody_bytes_sent() + "01" + bean2.getStatus());
                        context.write(NullWritable.get(), v);
                    }
                }

            } catch (ParseException e) {
                e.printStackTrace();

            }

        }

        private String toStr(Date date) {
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
            return df.format(date);
        }

        private Date toDate(String timeStr) throws ParseException {
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
            return df.parse(timeStr);
        }

        private long timeDiff(String time1, String time2) throws ParseException {
            Date d1 = toDate(time1);
            Date d2 = toDate(time2);
            return d1.getTime() - d2.getTime();

        }

        private long timeDiff(Date time1, Date time2) throws ParseException {

            return time1.getTime() - time2.getTime();

        }

    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(ClickStreamPageView.class);

        job.setMapperClass(ClickStreamMapper.class);
        job.setReducerClass(ClickStreamReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(WebLogBean.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

//        FileInputFormat.setInputPaths(job, new Path(args[0]));
//        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        FileInputFormat.setInputPaths(job, new Path("d:/weblog/output"));
        FileOutputFormat.setOutputPath(job, new Path("d:/weblog/pageviews"));

        job.waitForCompletion(true);

    }

}
public class PageViewsBean implements Writable {

    private String session;
    private String remote_addr;
    private String timestr;
    private String request;
    private int step;
    private String staylong;
    private String referal;
    private String useragent;
    private String bytes_send;
    private String status;

    public void set(String session, String remote_addr, String useragent, String timestr, String request, int step, String staylong, String referal, String bytes_send, String status) {
        this.session = session;
        this.remote_addr = remote_addr;
        this.useragent = useragent;
        this.timestr = timestr;
        this.request = request;
        this.step = step;
        this.staylong = staylong;
        this.referal = referal;
        this.bytes_send = bytes_send;
        this.status = status;
    }

    public String getSession() {
        return session;
    }

    public void setSession(String session) {
        this.session = session;
    }

    public String getRemote_addr() {
        return remote_addr;
    }

    public void setRemote_addr(String remote_addr) {
        this.remote_addr = remote_addr;
    }

    public String getTimestr() {
        return timestr;
    }

    public void setTimestr(String timestr) {
        this.timestr = timestr;
    }

    public String getRequest() {
        return request;
    }

    public void setRequest(String request) {
        this.request = request;
    }

    public int getStep() {
        return step;
    }

    public void setStep(int step) {
        this.step = step;
    }

    public String getStaylong() {
        return staylong;
    }

    public void setStaylong(String staylong) {
        this.staylong = staylong;
    }

    public String getReferal() {
        return referal;
    }

    public void setReferal(String referal) {
        this.referal = referal;
    }

    public String getUseragent() {
        return useragent;
    }

    public void setUseragent(String useragent) {
        this.useragent = useragent;
    }

    public String getBytes_send() {
        return bytes_send;
    }

    public void setBytes_send(String bytes_send) {
        this.bytes_send = bytes_send;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.session = in.readUTF();
        this.remote_addr = in.readUTF();
        this.timestr = in.readUTF();
        this.request = in.readUTF();
        this.step = in.readInt();
        this.staylong = in.readUTF();
        this.referal = in.readUTF();
        this.useragent = in.readUTF();
        this.bytes_send = in.readUTF();
        this.status = in.readUTF();

    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(session);
        out.writeUTF(remote_addr);
        out.writeUTF(timestr);
        out.writeUTF(request);
        out.writeInt(step);
        out.writeUTF(staylong);
        out.writeUTF(referal);
        out.writeUTF(useragent);
        out.writeUTF(bytes_send);
        out.writeUTF(status);

    }

}

3.2点击流模型 visit 信息表 

注:“一次访问”=“N 次连续请求”

直接从原始数据中用hql 语法得出每个人的“次”访问信息比较困难,可先用mapreduce 程序分析原始数据得出“次”信息数据,然后再用hql 进行更多维度统计用 MR 程序从 pageviews 数据中,梳理出每一次 visit 的起止时间、页面信息详细代码见工程:ClickStreamVisit.java 

/**
 * 输入数据:pageviews模型结果数据
 * 从pageviews模型结果数据中进一步梳理出visit模型
 * sessionid  start-time   out-time   start-page   out-page   pagecounts  ......
 * 
 * @author
 *
 */
public class ClickStreamVisit {

    // 以session作为key,发送数据到reducer
    static class ClickStreamVisitMapper extends Mapper<LongWritable, Text, Text, PageViewsBean> {

        PageViewsBean pvBean = new PageViewsBean();
        Text k = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//5abe467e-d500-4889-82c1-36695b7affbf101.226.167.201-2013-09-18 09:30:36/hadoop-mahout-roadmap/160"http://blog.fens.me/hadoop-mahout-roadmap/""Mozilla/4.0(compatible;MSIE8.0;WindowsNT6.1;Trident/4.0;SLCC2;.NETCLR2.0.50727;.NETCLR3.5.30729;.NETCLR3.0.30729;MediaCenterPC6.0;MDDR;.NET4.0C;.NET4.0E;.NETCLR1.1.4322;TabletPC2.0);360Spider"10335200
            String line = value.toString();
            String[] fields = line.split("01");
            int step = Integer.parseInt(fields[5]);
            //(String session, String remote_addr, String timestr, String request, int step, String staylong, String referal, String useragent, String bytes_send, String status)
            //299d6b78-9571-4fa9-bcc2-f2567c46df3472.46.128.140-2013-09-18 07:58:50/hadoop-zookeeper-intro/160"https://www.google.com/""Mozilla/5.0"14722200
            pvBean.set(fields[0], fields[1], fields[2], fields[3],fields[4], step, fields[6], fields[7], fields[8], fields[9]);
            k.set(pvBean.getSession());
            context.write(k, pvBean);

        }

    }

    static class ClickStreamVisitReducer extends Reducer<Text, PageViewsBean, NullWritable, VisitBean> {

/**                    2             1              3
  session001,[PageViewsBean1,PageViewsBean2,PageViewsBean3]
  session001,[PageViewsBean2,PageViewsBean1,PageViewsBean3]
  
 */
        protected void reduce(Text session, Iterable<PageViewsBean> pvBeans, Context context) throws IOException, InterruptedException {

            // 将pvBeans按照step排序
            ArrayList<PageViewsBean> pvBeansList = new ArrayList<PageViewsBean>();
            for (PageViewsBean pvBean : pvBeans) {
                PageViewsBean bean = new PageViewsBean();
                try {
                    BeanUtils.copyProperties(bean, pvBean);
                    pvBeansList.add(bean);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }

            Collections.sort(pvBeansList, new Comparator<PageViewsBean>() {

                @Override
                public int compare(PageViewsBean o1, PageViewsBean o2) {

                    return o1.getStep() > o2.getStep() ? 1 : -1;
                }
            });

            // 取这次visit的首尾pageview记录,将数据放入VisitBean中
            VisitBean visitBean = new VisitBean();
            // 取visit的首记录
            visitBean.setInPage(pvBeansList.get(0).getRequest());
            visitBean.setInTime(pvBeansList.get(0).getTimestr());
            // 取visit的尾记录
            visitBean.setOutPage(pvBeansList.get(pvBeansList.size() - 1).getRequest());
            visitBean.setOutTime(pvBeansList.get(pvBeansList.size() - 1).getTimestr());
            // visit访问的页面数
            visitBean.setPageVisits(pvBeansList.size());
            // 来访者的ip
            visitBean.setRemote_addr(pvBeansList.get(0).getRemote_addr());
            // 本次visit的referal
            visitBean.setReferal(pvBeansList.get(0).getReferal());
            visitBean.setSession(session.toString());

            context.write(NullWritable.get(), visitBean);

        }

    }

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(ClickStreamVisit.class);

        job.setMapperClass(ClickStreamVisitMapper.class);
        job.setReducerClass(ClickStreamVisitReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(PageViewsBean.class);

        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(VisitBean.class);
        
        
//        FileInputFormat.setInputPaths(job, new Path(args[0]));
//        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        FileInputFormat.setInputPaths(job, new Path("d:/weblog/pageviews"));
        FileOutputFormat.setOutputPath(job, new Path("d:/weblog/visitout"));
        
        boolean res = job.waitForCompletion(true);
        System.exit(res?0:1);

    }

}
public class VisitBean implements Writable {

    private String session;
    private String remote_addr;
    private String inTime;
    private String outTime;
    private String inPage;
    private String outPage;
    private String referal;
    private int pageVisits;

    public void set(String session, String remote_addr, String inTime, String outTime, String inPage, String outPage, String referal, int pageVisits) {
        this.session = session;
        this.remote_addr = remote_addr;
        this.inTime = inTime;
        this.outTime = outTime;
        this.inPage = inPage;
        this.outPage = outPage;
        this.referal = referal;
        this.pageVisits = pageVisits;
    }

    public String getSession() {
        return session;
    }

    public void setSession(String session) {
        this.session = session;
    }

    public String getRemote_addr() {
        return remote_addr;
    }

    public void setRemote_addr(String remote_addr) {
        this.remote_addr = remote_addr;
    }

    public String getInTime() {
        return inTime;
    }

    public void setInTime(String inTime) {
        this.inTime = inTime;
    }

    public String getOutTime() {
        return outTime;
    }

    public void setOutTime(String outTime) {
        this.outTime = outTime;
    }

    public String getInPage() {
        return inPage;
    }

    public void setInPage(String inPage) {
        this.inPage = inPage;
    }

    public String getOutPage() {
        return outPage;
    }

    public void setOutPage(String outPage) {
        this.outPage = outPage;
    }

    public String getReferal() {
        return referal;
    }

    public void setReferal(String referal) {
        this.referal = referal;
    }

    public int getPageVisits() {
        return pageVisits;
    }

    public void setPageVisits(int pageVisits) {
        this.pageVisits = pageVisits;
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.session = in.readUTF();
        this.remote_addr = in.readUTF();
        this.inTime = in.readUTF();
        this.outTime = in.readUTF();
        this.inPage = in.readUTF();
        this.outPage = in.readUTF();
        this.referal = in.readUTF();
        this.pageVisits = in.readInt();

    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(session);
        out.writeUTF(remote_addr);
        out.writeUTF(inTime);
        out.writeUTF(outTime);
        out.writeUTF(inPage);
        out.writeUTF(outPage);
        out.writeUTF(referal);
        out.writeInt(pageVisits);

    }

    @Override
    public String toString() {
        return session + "01" + remote_addr + "01" + inTime + "01" + outTime + "01" + inPage + "01" + outPage + "01" + referal + "01" + pageVisits;
    }
}

 pom.xml

<dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>lib/</classpathPrefix>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>utf-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>
原文地址:https://www.cnblogs.com/daiwei1981/p/9677454.html