流式分析系统实现 之二

Spark Streaming Mysql Window

  继“流式分析系统实现之一”后采用Window函数对1分钟内的数据进行统计,虽然在一中也已说明但是并没有实践,所以在此篇文章中对Window进行介绍及操作同时把数据存储到Mysql数据库中,这样就可以查看每分钟的数据,以下代码只是模拟和展示,没有具体存一些其它数据如时间戳,批次等。

一、Spark window 函数
     在《流式分析系统实现》中我们实现了每10秒统计PV、IP的PV、关键词的PV,但是我们如何能同时统计一分钟的PV的数据呢,答案就是使用Window函数。
Window(窗口函数)

 

具体以上函数的例子及演示请参考https://www.cnblogs.com/duanxz/p/4408789.html
本笔记主要是对《流式分析系统实现》的代码进行修改支持窗口函数和数据结果存入到Mysql数据库,所有代码都是在spark-shell中执行。
     
     注意:因为代码是在spark-shell中执行并且其中还使用到mysql数据库,所以在启动spark-shell时一定要加载mysql-connect-java.jar包,具体如下:
     spark-shell --master local[2] --jars /usr/share/java/mysql-connector-java.jar --driver-class-path /usr/share/java/mysql-connector-java.jar
     在spark streaming window 时一定要设计检查点(checkpoint)不然会报错,代码如下:
//导入类
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds,StreamingContext}
import java.sql.{PreparedStatement,Connection,DriverManager}
import java.util.Date

//得到时间戳
val now = new Date()
def getCurrent_time(): String = {
    val a = now.getTime
    var str = a+""
    str.substring(0,10)
}
//设计计算的周期,单位秒
val batch = 10

/*
 * 这是bin/spark-shell交互式模式下创建StreamingContext的方法
 * 非交互式请使用下面的方法来创建
 */
val ssc = new StreamingContext(sc,Seconds(batch))

/*
// 非交互式下创建StreamingContext的方法
val conf = new SparkConf().setAppName("NginxAnay")
val ssc = new StreamingContext(conf, Seconds(batch))
*/


/*
 * 创建输入DStream,是文本文件目录类型
 * 本地模式下也可以使用本地文件系统的目录,比如 file:///home/spark/streaming
 */
val lines = ssc.textFileStream("hdfs:///spark/streaming")

/*
 * 下面是统计各项指标,调试时可以只进行部分统计,方便观察结果
 */

//窗口方法必须配置checkpoint,可以这样配置:
ssc.checkpoint("hdfs:///spark/checkpoint")

//1.总pv(这是常规每10秒一个周期的PV统计)
lines.count().print()

//1.这是每分钟(连续多个周期)一次的PV统计
lines.countByWindow(Seconds(batch*6),Seconds(batch*6)).print()

//2. 各IP的PV,按PV倒序
// 空格分隔的第一个字段就是IP
lines.map(line => {(line.split(" ")(0),1)}).reduceByKey(_ + _).transform(rdd => {
    rdd.map(ip_pv => (ip_pv._2,ip_pv._1)).
    sortByKey(false).
    map(ip_pv => (ip_pv._2,ip_pv._1))
}).print()
//2.这是每分钟(连续多个周期)一次的各IP的PV,按pv倒序 ,采用窗口函数统计一分钟的数据
val IpPairDStream = lines.map(line => {(line.split(" ")(0),1)})
val IpCountsDStream = IpPairDStream.reduceByKeyAndWindow((v1:Int,v2:Int)=> v1+v2,Seconds(batch * 6),Seconds(batch*6))
val finalDStream = IpCountsDStream.transform(rdd => {
    rdd.map(ip_pv => (ip_pv._2,ip_pv._1)).sortByKey(false).map(ip_pv => (ip_pv._2,ip_pv._1))})
finalDStream.print()

//3.搜索引擎PV
val refer = lines.map(_.split(""")(3))

//先输出搜索引擎和查询关键词,避免统计搜索关键词时重复计算
//输出(host,query_keys)
val searchEnginInfo = refer.map(r => {
    val f = r.split('/')
    val searchEngines = Map(
        "www.google.cn" -> "q",
        "www.yahoo.com" -> "p",
        "cn.bing.com" -> "q",
        "www.baidu.com" -> "wd",
        "www.sogou.com" -> "query"
    )

    if (f.length > 2) {
        val host = f(2)

        if(searchEngines.contains(host)) {
            val query = r.split('?')(1)
            if(query.length > 0) {
                val arr_search_q = query.split('&').filter(_.indexOf(searchEngines(host)+"=") == 0)
                if(arr_search_q.length > 0)
                    (host,arr_search_q(0).split('=')(1))
                else
                    (host,"")
            } else {
                (host,"")
            }
        } else
            ("","")
    } else
        ("","")
})

//输出搜索引擎PV
searchEnginInfo.filter(_._1.length > 0).map(p => {(p._1,1)}).reduceByKey(_ + _).print()

//4.关键词PV
searchEnginInfo.filter(_._2.length > 0).map(p => {(p._2,1)}).reduceByKey(_ + _).print()

//5.终端类型PV
lines.map(_.split(""")(5)).map(agent => {
    val types = Seq("iPhone","Android")
    var r = "Default"
    for (t <- types) {
        if(agent.indexOf(t) != -1)
            r = t
    }
    (r,1)
}).reduceByKey(_ + _).print()

//6.各页面PV
val pagePv = lines.map(line => {(line.split(""")(1).split(" ")(1),1)}).reduceByKey(_ + _)
pagePv.print()
//6.RDD数据插入mysql数据库
pagePv.foreachRDD( rdd => {
    rdd.foreachPartition(eachPartition => {
        var conn: Connection = null;
        var stmt: PreparedStatement = null;
        try {
            val url = "jdbc:mysql://192.168.0.58:3306/streaming?characterEncoding=utf8&useSSL=true";
            val user = "streaming";
            val password = "streaming";
            conn = DriverManager.getConnection(url,user,password)
            eachPartition.foreach(record => {
                val sql = "insert into page_pv(pageName,pageCount) values(?,?)";
                stmt = conn.prepareStatement(sql);
                stmt.setString(1,record._1);
                stmt.setInt(2,record._2);
                stmt.executeUpdate();
            })
            } catch {
                case e: Exception => e.printStackTrace()
            } finally {
                if (stmt != null) {
                    stmt.close()
                }
                if (conn != null) {
                    conn.close()
                }
            }
    })
})

//6.这是每分钟(连续多个周期)一次的各页面的PV(调用窗口函数)
val pageDStream = lines.map(line => {(line.split(""")(1).split(" ")(1),1)})
val pagePairsDStream = pageDStream.reduceByKeyAndWindow((v1:Int,v2:Int) => v1+ v2,Seconds(batch*6),Seconds(batch*6))
pagePairsDStream.print()
//6.RDD数据插入mysql数据库


//启动计算,等待执行结束(出错或Ctrl+C退出)
ssc.start()
ssc.awaitTermination()
View Code
原文地址:https://www.cnblogs.com/xiqing/p/9662607.html