038 spark中使用sparksql对日志进行分析(属于小案例)

一:使用sparksql开发

1.sparksql开发的两种方式

  HQL:SQL语句开发

    eq : sqlContext.sql("xxxx")

  DSL : sparkSql中DataFrame的API调用方式

    eq:val df=sqlContext.xxx

       df.select("number")

二:HQL的开发案例

1.新建目录上传日志

  

2.开启服务

  

三:书写程序

1.描述

  这个程序一共包括两个部分。

  所以写的是两个程序。

2.程序一:对日志的描述--ApacheAccessLog

 1 package com.ibeifeng.bigdata.spark.log
 2 
 3 import scala.util.matching.Regex
 4 
 5 /**
 6    * 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846
 7    * Created by ibf on 01/15.
 8    */
 9 case class ApacheAccessLog(
10                              ipAddress: String, // IP地址
11                              clientId: String, // 客户端唯一标识符
12                              userId: String, // 用户唯一标识符
13                              serverTime: String, // 服务器时间
14                              method: String, // 请求类型/方式
15                              endpoint: String, // 请求的资源
16                              protocol: String, // 请求的协议名称
17                              responseCode: Int, // 请求返回值:比如:200、401
18                              contentSize: Long // 返回的结果数据大小
19                            )
20 
21 /**
22    * 64.242.88.10 - - [07/Mar/2004:16:05:49 -0800] "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1" 401 12846
23    * Created by ibf on 01/15.
24    * 提供一些操作Apache Log的工具类供SparkCore使用
25    */
26 object ApacheAccessLog {
27    // Apache日志的正则
28    val PARTTERN: Regex =
29    """^(S+) (S+) (S+) [([w:/]+s[+-]d{4})] "(S+) (S+) (S+)" (d{3}) (d+)""".r
30 
31    /**
32      * 验证一下输入的数据是否符合给定的日志正则,如果符合返回true;否则返回false
33      *
34      * @param line
35      * @return
36      */
37    def isValidateLogLine(line: String): Boolean = {
38      val options = PARTTERN.findFirstMatchIn(line)
39 
40      if (options.isEmpty) {
41        false
42      } else {
43        true
44      }
45    }
46 
47    /**
48      * 解析输入的日志数据
49      *
50      * @param line
51      * @return
52      */
53    def parseLogLine(line: String): ApacheAccessLog = {
54      if (!isValidateLogLine(line)) {
55        throw new IllegalArgumentException("参数格式异常")
56      }
57 
58      // 从line中获取匹配的数据
59      val options = PARTTERN.findFirstMatchIn(line)
60 
61      // 获取matcher
62      val matcher = options.get
63 
64      // 构建返回值
65      ApacheAccessLog(
66        matcher.group(1), // 获取匹配字符串中第一个小括号中的值
67        matcher.group(2),
68        matcher.group(3),
69        matcher.group(4),
70        matcher.group(5),
71        matcher.group(6),
72        matcher.group(7),
73        matcher.group(8).toInt,
74        matcher.group(9).toLong
75      )
76    }
77  }

3.程序二:针对需求进行--LogAnalysis

 1 package com.ibeifeng.bigdata.spark.log
 2 
 3 import com.ibeifeng.bigdata.spark.core.ApacheAccessLog
 4 import org.apache.spark.sql.{DataFrame, SQLContext}
 5 import org.apache.spark.{SparkContext, SparkConf}
 6 /**
 7  * Created by Administrator on 2017/4/25.
 8  */
 9 object LogAnalysis {
10   def main(args: Array[String]):Unit={
11     //sqlContext
12     val conf=new SparkConf()
13       .setMaster("local[*]")
14       .setAppName("log-analysis-sparksql")
15     val sc=SparkContext.getOrCreate(conf)
16     val sqlContext=new SQLContext(sc)
17     import sqlContext.implicits._                //如果不写,下面的转换不成功
18 
19     //transform
20     val path="/spark/logs/input"
21     val rdd=sc.textFile(path)
22     val apacheAccessDataFrame=rdd
23       .filter(line=>ApacheAccessLog.isValidateLogLine(line))
24       .map(line => {
25         ApacheAccessLog.parseLogLine(line)
26     }).toDF()                                    //rdd转换为DataFrame
27 
28     //register temptable
29     apacheAccessDataFrame.registerTempTable("log_analysis_temp_table")
30     sqlContext.sql("select * from log_analysis_temp_table limit 1").show()
31 
32     //需求一:求contentSize的平均值,最大值以及最小值
33     val resultDataFrame1=sqlContext.sql(
34       """
35         |SELECT
36         |AVG(contentSize) as avg_contentSize,
37         |MAX(contentSize) as max_contentSize,
38         |MIN(contentSize) as min_contentSize
39         |FROM log_analysis_temp_table
40       """.stripMargin)
41     resultDataFrame1.show()
42 
43     //save                                         //save as HDFS
44     val resultRdd=resultDataFrame1.map(row=>{
45       val avgSize=row.getAs[Double]("avg_contentSize")
46       val minSize=row.getAs[Long]("min_contentSize")
47       val maxSize=row.getAs[Long]("max_contentSize")
48       (avgSize,minSize,maxSize)
49     })
50     resultRdd.saveAsTextFile(s"/spark/logs/output/sql_${System.currentTimeMillis()}")
51 
52     //需求二:求各个返回值出现的数据个数
53     val resultDataFrame2=sqlContext.sql(
54     """
55       |SELECT
56       |responseCode AS code,
57       |COUNT(1) AS count
58       |FROM log_analysis_temp_table
59       |GROUP BY responseCode
60     """.stripMargin
61     )
62     resultDataFrame2.show()
63 
64     //需求三:求访问次数大于N的IP地址,并对黑名单进行限制
65     val blackIP=Array("200-55-104-193.ds1.prima.net.ar","10.0.0.153","208-38-57-205.ip.cal.radiant.net")
66     val N=10
67     val resultDataFrame3=sqlContext.sql(
68     s"""
69       |SELECT
70       |ipAddress AS ip,
71       |COUNT(1) AS count
72       |FROM log_analysis_temp_table
73       |WHERE not(ipAddress in(${blackIP.map(ip=>s"'${ip}'").mkString(",")}))
74       |GROUP BY ipAddress
75       |HAVING count>${N}
76     """.stripMargin)
77     resultDataFrame3.show()
78 
79     //需求四:求访问次数最多的前k个endpoint的值
80     val k=10
81     val resultDataFrame4=sqlContext.sql(
82     s"""
83        |SELECT
84        |  t.endpoint,
85        |  t.count
86        |FROM(
87        |SELECT
88        |  endpoint,
89        |  COUNT(1) AS count
90        |FROM log_analysis_temp_table
91        |GROUP BY endpoint) t
92        |ORDER BY t.count DESC
93        |limit ${k}
94      """.stripMargin)
95     resultDataFrame4.show()
96   }
97 }

4.运行结果

  

  

  

  

  

  

  

原文地址:https://www.cnblogs.com/juncaoit/p/6764833.html