流式分析系统实现 之一

一、实验介绍
     
我们知道网站用户访问流量是不间断的,基于网站的访问日志,即 Web log 分析是典型的流式实时计算应用场景。比如百度统计,它可以做流量分析、来源分析、网站分析、转化分析。另外还有特定场景分析,比如安全分析,用来识别 CC 攻击、 SQL 注入分析、脱库等。这里我们简单实现一个类似于百度分析的系统。
 
1.1 实验知识点
  • Python 模拟生成 Nginx 日志
  • Spark Streaming 编程
  • 服务器访问日志分析方法
 
1.2 实验环境
  • Spark 2.1.1
  • Python 2.7.6
  • Xfce 终端
二、实验原理
百度统计(tongji.baidu.com)是百度推出的一款免费的专业网站流量分析工具,能够告诉用户访客是如何找到并浏览用户的网站的,以及在网站上浏览了哪些页面。这些信息可以帮助用户改善访客在其网站上的使用体验,不断提升网站的投资回报率。
百度统计提供了几十种图形化报告,包括:趋势分析、来源分析、页面分析、访客分析、定制分析等多种统计分析服务。
这里我们参考百度统计的功能,基于 Spark Streaming 简单实现一个分析系统,使之包括以下分析功能。
    • 流量分析。一段时间内用户网站的流量变化趋势,针对不同的 IP 对用户网站的流量进行细分。常见指标是总 PV 和各 IP 的PV。
    • 来源分析。各种搜索引擎来源给用户网站带来的流量情况,需要精确到具体搜索引擎、具体关键词。通过来源分析,用户可以及时了解哪种类型的来源为其带来了更多访客。常见指标是搜索引擎、关键词和终端类型的 PV 。
    • 网站分析。各个页面的访问情况,包括及时了解哪些页面最吸引访客以及哪些页面最容易导致访客流失,从而帮助用户更有针对性地改善网站质量。常见指标是各页面的 PV 。
2.1日志实时采集
     
Web log 一般在 HTTP 服务器收集,比如 Nginx access 日志文件。一个典型的方案是 Nginx 日志文件 + Flume + Kafka + Spark Streaming,如下所述:
      1. 接收服务器用 Nginx ,根据负载可以部署多台,数据落地至本地日志文件;
      2. 每个 Nginx 节点上部署 Flume ,使用 tail -f 实时读取 Nginx 日志,发送至 KafKa 集群;
      3. 专用的 Kafka 集群用户连接实时日志与 Spark 集群,详细配置可以参考 http://spark.apache.org/docs/2.1.1/streaming-kafka-integration.html ;
      4. Spark Streaming 程序实时消费 Kafka 集群上的数据,实时分析,输出;
      5. 结果写入 MySQL 数据库。
当然,还可以进一步优化,比如 CGI 程序直接发日志消息到 Kafka ,节省了写访问日志的磁盘开销。这里主要专注 Spark Streaming 的应用,所以我们不做详细论述。
 
2.1流式分析系统实现
     
我们简单模拟一下数据收集和发送的环节,用一个 Python 脚本随机生成 Nginx 访问日志,并通过脚本的方式自动上传至 HDFS ,然后移动至指定目录。 Spark Streaming 程序监控 HDFS 目录,自动处理新的文件。
生成 Nginx 日志的 Python 代码如下,保存为文件 sample_web_log.py 。
# ! /usr/bin/env python
# encoding=utf8

import random
import time

class WebLogGeneration(object):

    #类属性,由所有类的对象共享
    site_url_base = "http://www.xxx.com/"

    #基本构造函数
    def __init__(self):
        # 前面7条是IE,所以大概浏览器类型70%为IE,接入类型上,20%为移动设备,分别是7和8条,5%为空
        # https://github.com/mssola/user_agent/blob/master/all_test.go
        self.user_agent_dist = {0.0:"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)",
                                0.1:"Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)",
                                0.2:"Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727)",
                                0.3:"Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)",
                                0.4:"Mozilla/5.0 (Windows NT 6.1; Trident/7.0; rv:11.0) like Gecko",
                                0.5:"Mozilla/5.0 (Windows NT 6.1; WOW64; rv:41.0) Gecko/20100101 Firefox/41.0",
                                0.6:"Mozilla/4.0 (compatible; MSIE6.0; Windows NT 5.0; .NET CLR 1.1.4322)",
                                0.7:"Mozilla/5.0 (iPhone; CPU iPhone OS 7_0_3 like Mac OS X) AppleWebKit/537.51.1 (KHTML, like Gecko) Version/7.0 Mobile/11B511 Safari/9537.53",
                                0.8:"Mozilla/5.0 (Linux; Android 4.2.1; Galaxy Nexus Build/JOP40D) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.166 Mobile Safari/535.19",
                                0.9:"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/45.0.2454.85 Safari/537.36",
                                1:" ",}
        self.ip_slice_list = [10, 29, 30, 46, 55, 63, 72, 87, 98,132,156,124,167,143,187,168,190,201,202,214,215,222]
        self.url_path_list = ["login.php","view.php","list.php","upload.php","admin/login.php","edit.php","index.html"]
        self.http_refer = ["http://www.baidu.com/s?wd={query}","http://www.google.cn/search?q={query}","http://www.sogou.com/web?query={query}",
                           "http://one.cn.yahoo.com/s?p={query}","http://cn.bing.com/search?q={query}"]
        self.search_keyword = ["spark","hadoop","hive","spark mlib","spark sql"]

    def sample_ip(self):
        slice = random.sample(self.ip_slice_list,4) #从ip_slice_list中随机获取4个元素,作为一个片断返回
        return ".".join([str(item) for item in slice])

    def sample_url(self):
        return random.sample(self.url_path_list,1)[0]

    def sample_user_agent(self):
        dist_uppon = random.uniform(0,1)
        return self.user_agent_dist[float('%0.1f' % dist_uppon)]

    #主要搜索引擎referrer参数
    def sample_refer(self):
        if random.uniform(0,1) > 0.2: #只有20% 流量有refer
            return "-"

        refer_str = random.sample(self.http_refer,1)
        query_str = random.sample(self.search_keyword,1)
        return refer_str[0].format(query=query_str[0])

    def sample_one_log(self,count=3):
        time_str = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
        while count > 1:
            query_log = "{ip} - - [{local_time}] "GET /{url} HTTP/1.1" 200 0 "{refer}" "{user_agent}" "-"".format(ip
                          =self.sample_ip(),local_time=time_str,url=self.sample_url(),refer=self.sample_refer(),user_agent=self.sample_user_agent())
            print query_log
            count = count -1
if __name__ == "__main__":
    web_log_gene = WebLogGeneration()

    #while True:
    #   time.sleep(random.uniform(0,3))
    web_log_gene.sample_one_log(random.uniform(10,100))
View Code

这是一条日志的示例,为一行形式,各字段间用空格分隔,字符串类型的值用双引号包围:

46.202.124.63 - - [2015-11-26 09:54:27] "GET /view.php HTTP/1.1" 200 0 "http://www.google.cn/search?q=hadoop" "Mozilla/5.0 (compatible; MSIE 10.0; Windows NT 6.2; Trident/6.0)" "-" 
然后需要一个简单的脚本来调用上面的脚本以随机生成日志,上传至 HDFS ,然后移动到目标目录:
#!/bin/bash
echo "Hello World !"
# HDFS命令
HDFS="hadoop fs"

# Streaming 程序监听的目录,注意跟后面Streaming程序的配置要保持一致
streaming_dir="/spark/streaming"

#清空旧数据
#su hdfs <<EOF
$HDFS -rm "${streaming_dir}"'/tmp/*' > /dev/null 2>&1
$HDFS -rm "${streaming_dir}"'/*' > /dev/null 2>&1
#EOF
#一直运行
while [ 1 ];do
    python sample_web_log.py > test.log
    # 给日志文件加上时间戳,避免重名
    tmplog="access.`date +'%s'`.log"
    $HDFS -put test.log ${streaming_dir}/tmp/$tmplog
    $HDFS -mv          ${streaming_dir}/tmp/$tmplog ${streaming_dir}/
    echo "`date +"%F %T"` put $tmplog to HDFS succeed"
    sleep 1
done
#EOF
View Code

Spark Streaming 程序代码如下所示,可以在 bin/spark-shell 交互式环境下运行,如果要以 Spark 程序的方式运行,按注释中的说明调整一下 StreamingContext 的生成方式即可。启动 bin/spark-shell 时,为了避免因 DEBUG 日志信息太多而影响观察输出,可以将 DEBUG 日志重定向至文件,屏幕上只显示主要输出,方法是 ./bin/spark-shell 2>spark-shell-debug.log:

// 导入类
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

// 设计计算的周期,单位秒
val batch = 10

/*
 * 这是bin/spark-shell交互式模式下创建StreamingContext的方法
 * 非交互式请使用下面的方法来创建
 */
val ssc = new StreamingContext(sc, Seconds(batch))

/*
// 非交互式下创建StreamingContext的方法
val conf = new SparkConf().setAppName("NginxAnay")
val ssc = new StreamingContext(conf, Seconds(batch))
*/


/*
 * 创建输入DStream,是文本文件目录类型
 * 本地模式下也可以使用本地文件系统的目录,比如 file:///home/spark/streaming
 */
val lines = ssc.textFileStream("hdfs:///spark/streaming")


/*
 * 下面是统计各项指标,调试时可以只进行部分统计,方便观察结果
 */


// 1. 总PV
lines.count().print()


// 2. 各IP的PV,按PV倒序//   空格分隔的第一个字段就是IP
lines.map(line => {(line.split(" ")(0), 1)}).reduceByKey(_ + _).transform(rdd => {
  rdd.map(ip_pv => (ip_pv._2, ip_pv._1)).
  sortByKey(false).
  map(ip_pv => (ip_pv._2, ip_pv._1))
}).print()


// 3. 搜索引擎PV
val refer = lines.map(_.split(""")(3))

// 先输出搜索引擎和查询关键词,避免统计搜索关键词时重复计算// 输出(host, query_keys)
val searchEnginInfo = refer.map(r => {

    val f = r.split('/')

    val searchEngines = Map(
        "www.google.cn" -> "q",
        "www.yahoo.com" -> "p",
        "cn.bing.com" -> "q",
        "www.baidu.com" -> "wd",
        "www.sogou.com" -> "query"
    )

    if (f.length > 2) {
        val host = f(2)

        if (searchEngines.contains(host)) {
            val query = r.split('?')(1)
            if (query.length > 0) {
                val arr_search_q = query.split('&').filter(_.indexOf(searchEngines(host)+"=") == 0)
                if (arr_search_q.length > 0)
                    (host, arr_search_q(0).split('=')(1))
                else
                    (host, "")
            } else {
                (host, "")
            }
        } else
            ("", "")
    } else
        ("", "")

})

// 输出搜索引擎PV
searchEnginInfo.filter(_._1.length > 0).map(p => {(p._1, 1)}).reduceByKey(_ + _).print()


// 4. 关键词PV
searchEnginInfo.filter(_._2.length > 0).map(p => {(p._2, 1)}).reduceByKey(_ + _).print()


// 5. 终端类型PV
lines.map(_.split(""")(5)).map(agent => {
    val types = Seq("iPhone", "Android")
    var r = "Default"
    for (t <- types) {
        if (agent.indexOf(t) != -1)
            r = t
    }
    (r, 1)
}).reduceByKey(_ + _).print()


// 6. 各页面PV
lines.map(line => {(line.split(""")(1).split(" ")(1), 1)}).reduceByKey(_ + _).print()



// 启动计算,等待执行结束(出错或Ctrl-C退出)
ssc.start()
ssc.awaitTermination()
View Code

打开两个终端,一个调用上面的 bash 脚本模拟提交日志,一个在交互式环境下运行上面的 Streaming 程序。你可以看到各项指标的输出,比如某个批次下的输出为(依次对应上面的 6 个计算项):

  1.总PV

    -------------------------------------------

    Time: 1448533850000 ms

    -------------------------------------------

    44374

  2.各IP的PV,按PV倒序

    -------------------------------------------

    Time: 1448533850000 ms

    -------------------------------------------

    (72.63.87.30,30)

    (63.72.46.55,30)

    (98.30.63.10,29)

    (72.55.63.46,29)

    (63.29.10.30,29)

    (29.30.63.46,29)

    (55.10.98.87,27)

    (46.29.98.30,27)

    (72.46.63.30,27)

    (87.29.55.10,26)

  3.搜索引擎PV

    -------------------------------------------

    Time: 1448533850000 ms

    -------------------------------------------

    (cn.bing.com,1745)

    (www.baidu.com,1773)

    (www.google.cn,1793)

    (www.sogou.com,1845)

  4.关键词PV

    -------------------------------------------

    Time: 1448533850000 ms

    -------------------------------------------

    (spark,1426)

    (hadoop,1455)

    (spark sql,1429)

    (spark mlib,1426)

    (hive,1420)

  5.终端类型PV

    -------------------------------------------

    Time: 1448533850000 ms

    -------------------------------------------

    (Android,4281)

    (Default,35745)

    (iPhone,4348)

  6.各页面PV

    -------------------------------------------

    Time: 1448533850000 ms

    -------------------------------------------

    (/edit.php,6435)

    (/admin/login.php,6271)

    (/login.php,6320)

    (/upload.php,6278)

    (/list.php,6411)

    (/index.html,6309)

    (/view.php,6350)

 

查看数据更直观的做法是用图形来展示,常见做法是将结果写入外部 DB ,然后通过一些图形化报表展示系统展示出来。比如对于终端类型,我们可以用饼图展示,如图6-11所示。

图6-11 终端类型分布图示例(另见彩插图6-11)

对于连续的数据,我们也可以用拆线图来展示趋势。比如某页面的PV,如图6-12所示。

除了常规的每个固定周期进行一次统计,我们还可以对连续多个周期的数据进行统计。以统计总 PV 为例,上面的示例是每 10 秒统计一次,可能还需要每分钟统计一次,相当于 6 个 10 秒的周期。我们可以利用窗口方法实现,不同的代码如下:

  // 窗口方法必须配置checkpint,可以这样配置: ssc.checkpoint("hdfs:///spark/checkpoint")

  // 这是常规每10秒一个周期的PV统计 lines.count().print()

  // 这是每分钟(连续多个周期)一次的PV统计 lines.countByWindow(Seconds(batch*6), Seconds(batch*6)).print()

使用相同的办法运行程序之后,我们首先会看到连续 6 次 10 秒周期的 PV 统计输出:

  -------------------------------------------

  Time: 1448535090000 ms

  -------------------------------------------

  1101

  -------------------------------------------

  Time: 1448535100000 ms

  -------------------------------------------

  816

  -------------------------------------------

  Time: 1448535110000 ms

  -------------------------------------------

  892

  -------------------------------------------

  Time: 1448535120000 ms

  -------------------------------------------

  708

  -------------------------------------------

  Time: 1448535130000 ms

  -------------------------------------------

  881

  -------------------------------------------

  Time: 1448535140000 ms

  -------------------------------------------

  872

在这之后,有一个 1 分钟周期的 PV 统计输出,它的值刚好是上面 6 次计算结果的总和:

  ------------------------------------------- 
  Time: 1448535140000 ms 
  -------------------------------------------
  5270 

三、开发准备
3.1 准备生成日志的Python代码
3.1.1 编辑代码
首先把sample_web_log.py代码放到集群中,上传到
3.1.2 修改代码的执行权限
     chmod +x sample_web_log.py
     chown hdfs:hdfs sample_web_log.py
     cp sample_web_log.py /var/lib/hadoop-hdfs/device-report
 
3.2 启动 Spark Shell
    接下来需要启动 Spark Shell 来定制 Streaming 任务。为了避免因 DEBUG 日志信息太多而影响观察输出,可以将 DEBUG 日志重定向至文件,屏幕上只显示主要输出。
     请通过以下代码来启动Spark Shell。启动需要耗费一定的时间,请耐心等待。
     spark-shell 2>spark-shell-debug.log
     等到出现 scala>提示符时,就表明已经成功启动Spark Sheel了。
请不要关闭运行Spark Shell的终端,其他任何的终端命令请在新打开的终端运行。
四、具体步骤
4.1 创建日志目录
      在 hdfs spark 目录下新建 streaming 目录,并增设 tmp 临时文件夹。
4.2 通过bash脚本生成日志
     touch log.sh
     vim log.sh
     

log.sh 文件中需要填入以下内容:

#!/bin/bash
echo "Hello World !"
# HDFS命令
HDFS="hadoop fs"

# Streaming 程序监听的目录,注意跟后面Streaming程序的配置要保持一致
streaming_dir="/spark/streaming"

#清空旧数据
#su hdfs <<EOF
$HDFS -rm "${streaming_dir}"'/tmp/*' > /dev/null 2>&1
$HDFS -rm "${streaming_dir}"'/*' > /dev/null 2>&1
#EOF
#一直运行
while [ 1 ];do
    python sample_web_log.py > test.log
    # 给日志文件加上时间戳,避免重名
    tmplog="access.`date +'%s'`.log"
    $HDFS -put test.log ${streaming_dir}/tmp/$tmplog
    $HDFS -mv          ${streaming_dir}/tmp/$tmplog ${streaming_dir}/
    echo "`date +"%F %T"` put $tmplog to HDFS succeed"
    sleep 1
done
#EOF
View Code
同时需要修改该脚本文件的执行权限。
chmod +x log.sh
 
进入spark-shell 2>spark-shell-debug.log,执行以下代码。
/*
 * 这是bin/spark-shell交互式模式下创建StreamingContext的方法
 * 非交互式请使用下面的方法来创建
 */
val ssc = new StreamingContext(sc,Seconds(batch))

/*
// 非交互式下创建StreamingContext的方法
val conf = new SparkConf().setAppName("NginxAnay")
val ssc = new StreamingContext(conf, Seconds(batch))
*/


/*
 * 创建输入DStream,是文本文件目录类型
 * 本地模式下也可以使用本地文件系统的目录,比如 file:///home/spark/streaming
 */
val lines = ssc.textFileStream("hdfs:///spark/streaming")

/*
 * 下面是统计各项指标,调试时可以只进行部分统计,方便观察结果
 */


//1.总pv
lines.count().print()

//2. 各IP的PV,按PV倒序
// 空格分隔的第一个字段就是IP
lines.map(line => {(line.split(" ")(0),1)}).reduceByKey(_ + _).transform(rdd => {
    rdd.map(ip_pv => (ip_pv._2,ip_pv._1)).
    sortByKey(false).
    map(ip_pv => (ip_pv._2,ip_pv._1))
}).print()

//3.搜索引擎PV
val refer = lines.map(_.split(""")(3))

//先输出搜索引擎和查询关键词,避免统计搜索关键词时重复计算
//输出(host,query_keys)
val searchEnginInfo = refer.map(r => {
    val f = r.split('/')
    val searchEngines = Map(
        "www.google.cn" -> "q",
        "www.yahoo.com" -> "p",
        "cn.bing.com" -> "q",
        "www.baidu.com" -> "wd",
        "www.sogou.com" -> "query"
    )

    if (f.length > 2) {
        val host = f(2)

        if(searchEngines.contains(host)) {
            val query = r.split('?')(1)
            if(query.length > 0) {
                val arr_search_q = query.split('&').filter(_.indexOf(searchEngines(host)+"=") == 0)
                if(arr_search_q.length > 0)
                    (host,arr_search_q(0).split('=')(1))
                else
                    (host,"")
            } else {
                (host,"")
            }
        } else
            ("","")
    } else
        ("","")
})

//输出搜索引擎PV
searchEnginInfo.filter(_._1.length > 0).map(p => {(p._1,1)}).reduceByKey(_ + _).print()

//4.关键词PV
searchEnginInfo.filter(_._2.length > 0).map(p => {(p._2,1)}).reduceByKey(_ + _).print()

//5.终端类型PV
lines.map(_.split(""")(5)).map(agent => {
    val types = Seq("iPhone","Android")
    var r = "Default"
    for (t <- types) {
        if(agent.indexOf(t) != -1)
            r = t
    }
    (r,1)
}).reduceByKey(_ + _).print()

//6.各页面PV
lines.map(line => {(line.split(""")(1).split(" ")(1),1)}).reduceByKey(_ + _).print()

//启动计算,等待执行结束(出错或Ctrl+C退出)
ssc.start()
View Code
在别一个终端中执行log.sh文件(genLog.sh是同一个文件
然后看spark终端中的信息

 

原文地址:https://www.cnblogs.com/xiqing/p/9662579.html