基于Spark的网站日志分析

本文只展示核心代码,完整代码见文末链接。

Web Log Analysis

  1. 提取需要的log信息,包括time, traffic, ip, web address
  2. 进一步解析第一步获得的log信息,如把ip转换为对应的省份,从网址中提取出访问内容和内容ID,最后将信息转换为parquet格式。

(1)按日期和内容(video)的ID进行分组,并根据访问次数进行倒序排序。
(2)按日期,内容(video)的ID和省份进行分组,并根据访问次数排名取前3。
最后将(1)和(2)数据写入MySQL。

注意:(1)写入数据库时分partition写入,而非逐条写入。
(2)先filter出公用的df并进行cache
(3)下面代码应该能进一步优化,例如将videoAccessTopNStat的try/catch中生成partition list和StatDAO.inserDayVideoAccessTopN(list)中生成batch应该可以合并,避免两次遍历。

设计和编写思路:
1.设计输入参数args(如inputPath和outputPath)
2.设计转换的工具类,包括StructType(需要提取什么信息,分别是什么格式),parseLog(split并提取各index的信息,用try/catch包裹,设置默认输出)。其中对时间的提取可另外定义一个工具类,包括inputFormat,outputFormat,getTime和parse。而对地域的提取,可另外定义一个IpUtils,引入开源代码ipdatabase。这些工具类写完后都要在自身main方法中测试。最后生成DF。
3.filter出commonDF。
4.实现特定的数据统计
5.输出数据,如果写入MySQL,就另外创建一个StatDAO类,包括获取链接,分批写入数据和release链接。

//Step One:

/**
  * 将原始日志数据进行解析,返回信息包括visit time, url, traffic, ip
  * @param .log, example: 183.162.52.7 - - [10/Nov/2016:00:01:02 +0800] 
  * "POST /api3/getadv HTTP/1.1" ...
  * @return partitioned files, example: 1970-01-01 08:00:00	-
  * 	813	183.162.52.7
  */

if (args.length != 2) {
  println("Usage: logCleanYarn <inputPath> <outputPath>")
  System.exit(1)
}

val Array(inputPath, outputPath) = args

val spark = SparkSession.builder().getOrCreate()

val access = spark.sparkContext.textFile(inputPath)

//access.take(10).foreach(println)

val splited = access.map(line => {

   val splits = line.split(" ")
   val ip = splits(0)
   val time = splits(3) + " " + splits(4)
   val url = splits(11).replaceAll(""", "") //remove quotation mark
   val traffic = splits(9)
// (ip, DataUtils.parse(time), url, traffic)

   DataUtils.parse(time) + "	" + url + "	" + traffic + "	" + ip
    })

splited.saveAsTextFile(outputPath)

spark.stop()

/**
  * 用于解析日志时间
  */
object DataUtils {

  //input_format: [10/Nov/2016:00:01:02 +0800]
  val YYYYMMDDHHMM_TIME_FORMAT = FastDateFormat.getInstance("dd/MMM/yyyy:HH:mm:SS Z", Locale.ENGLISH)

  //output_format: yyyy-MM-dd HH:mm:ss
  val TARGET_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")

  def getTime(time: String) = {
    try {
      YYYYMMDDHHMM_TIME_FORMAT.parse(time.substring(time.indexOf("[") + 1, time.lastIndexOf("]"))).getTime
    } catch {
      case _ => 0l
    }
  }
    
  /**
  * example: [10/Nov/2016:00:01:02 +0800] ==> 2016-11-10 00:01:00
  */
  def parse(time: String) = {
    TARGET_FORMAT.format(new Date(getTime(time)))
  }

//  def main(args: Array[String]): Unit = {
//    println(parse("[10/Nov/2016:00:01:02 +0800]"))
//  }
}
//Step Two:

/**
  * 将第一步解析出来的数据转化为DataFrame,并保存为一份parquet文件。
  */

if (args.length != 2) {
  println("Usage: logCleanYarn <inputPath> <outputPath>")
  System.exit(1)
}

val Array(inputPath, outputPath) = args

val spark = SparkSession.builder().getOrCreate()

val access = spark.sparkContext.textFile(inputPath)

// access.take(10).foreach(println)

val accessDF = spark.createDataFrame(access.map(line => AccessConvertUtil.parseLog(line)), AccessConvertUtil.struct)

// accessDF.printSchema()
// accessDF.show(false)

accessDF.coalesce(1).write.format("parquet").partitionBy("day")
      .save(outputPath)

spark.stop()

/**
  * 工具类,定义了schema和进一步解析log的方法
  */
object AccessConvertUtil {

  val struct = StructType(Seq(
    StructField("url", StringType),
    StructField("cmsType", StringType),
    StructField("cmsId", IntegerType),
    StructField("traffic", IntegerType),
    StructField("ip", StringType),
    StructField("city", StringType),
    StructField("time", StringType),
    StructField("day", StringType)
  ))

  /**
    * 进一步解析log,如转化数据类型,解析网址,ip映射具体省份,最后以Row输出
    */
  def parseLog(log: String) = {

    try{
      val splited = log.split("	")

      val url = splited(1)
      val traffic = splited(2).toInt
      val ip = splited(3)

      // 网址:"http://www.xxx.com/article/101"中article为网页内容,101为article的ID
      val domain = "http://www.xxx.com/"
      val cms = url.substring(url.indexOf(domain) + domain.length)
      val cmsTypeId = cms.split("/")

      var cmsType = ""
      var cmsId = 0
      if (cmsTypeId.length > 1) {
        cmsType = cmsTypeId(0)
        cmsId = cmsTypeId(1).toInt
      }

      val city = IpUtils.getCity(ip)
      val time = splited(0)
      val day = time.substring(0, 10).replaceAll("-", "")

      Row(url, cmsType, cmsId, traffic, ip, city, time, day)
    } catch {
      case _ => {
        Row(null, null, null, null, null, null, null, null)
      }
    }
  }
}


/**
  * Ip工具类,将IP映射为省份,利用开源代码ipdatabase
  * https://github.com/wzhe06/ipdatabase
  */
object IpUtils {

  def getCity(ip: String) = {
    IpHelper.findRegionByIp(ip)
  }

  def main(args: Array[String]): Unit = {
    println(getCity("58.30.15.255"))
  }
}
//Step Three:

/**
  * 在第二步的结果数据中,按日期和video的ID进行分组,并根据访问次数进行倒序排序。
  * 最后将数据写入MySQL。
  */

if (args.length != 2) {
  println("Usage: logCleanYarn <inputPath> <day>")
  System.exit(1)
}

val Array(inputPath, day) = args

val spark = SparkSession.builder()
  .config("spark.sql.sources.partitionColumnTypeInference.enabled", "false")
  .getOrCreate()

val accessDF = spark.read.format("parquet").load(inputPath)

//    accessDF.printSchema()
//    accessDF.show(false)

//预先筛选和cache后面两个函数要复用的df
import spark.implicits._
val commonDF = accessDF.filter($"day" === day && $"cmsType" === "video")
commonDF.cache()

//删除已有的内容,避免重复
StatDAO.deleteData(day)

//groupBy video
videoAccessTopNStat(spark, commonDF)

//groupBy city
cityAccessTopNStat(spark, commonDF)

commonDF.unpersist(true)

//    videoAccessTopDF.show(false)

spark.stop()

/**
  * 两个样例类,用于储存不同数据类型,应用于下面两个方法。
  */
case class DayVideoAccessStat(day: String, cmsId: Long, times: Long)
case class DayCityVideoAccessStat(day: String, cmsId: Long, city: String, times: Long, timesRank: Int)

/**
  * 按内容ID分组后排序,并把结果写到Mysql
  */
def videoAccessTopNStat(spark: SparkSession, comDF: DataFrame): Unit = {

  import spark.implicits._
  val videoAccessTopNStat = comDF
    .groupBy($"day", $"cmsId")
    .agg(count("cmsId").as("times"))
    .orderBy(desc("times"))

  try {
    videoAccessTopNStat.foreachPartition(partitionOfRecords =>{
      val list = new ListBuffer[DayVideoAccessStat]

      partitionOfRecords.foreach(info => {
        val day = info.getAs[String]("day")
        val cmsId = info.getAs[Long]("cmsId")
        val times = info.getAs[Long]("times")

        list.append(DayVideoAccessStat(day, cmsId, times))
      })

      StatDAO.inserDayVideoAccessTopN(list)
    })
  } catch {
    case e:Exception => e.printStackTrace()
  }
}

/**
  * 按内容ID和省份分组后排名,并把结果写到Mysql
  */
def cityAccessTopNStat(spark: SparkSession, comDF: DataFrame): Unit = {

  import spark.implicits._

  val videoAccessTopNStat = comDF
    .groupBy($"day", $"city", $"cmsId")
    .agg(count("cmsId").as("times"))

  val windowSpec = Window.partitionBy($"city").orderBy(desc("times"))
  val videoAccessTopNStatDF = videoAccessTopNStat.select(expr("*"), rank().over(windowSpec).as("times_rank"))
    .filter($"times_rank" <= 3)

  try {
    videoAccessTopNStatDF.foreachPartition(partitionOfRecords => {
      val list = new ListBuffer[DayCityVideoAccessStat]

      partitionOfRecords.foreach(info => {
        val day = info.getAs[String]("day")
        val cmsId = info.getAs[Long]("cmsId")
        val city = info.getAs[String]("city")
        val times = info.getAs[Long]("times")
        val timesRank = info.getAs[Int]("times_rank")

        list.append(DayCityVideoAccessStat(day, cmsId, city, times, timesRank))
      })

      StatDAO.inserDayCityVideoAccessTopN(list)
    })
  } catch {
    case e: Exception => e.printStackTrace()
  }
}

/**
  * 分组后排序方法
  */
def videoAccessSortedStat(spark: SparkSession, accessDF: DataFrame) : Unit = {
  import spark.implicits._
    
  val sortedStat= accessDF
    .filter($"day" === "20170511" && $"cmsType" === "video")
    .groupBy($"day", $"cmsId")
    .agg(count("cmsId").as("times"))
    .orderBy(desc("times"))
    
  // 分块创建存储每条信息的list,并调用函数将数据写到到MySQL
  try {
      sortedStat.foreachPartition(partitionOfRecords =>{
        val list = new ListBuffer[DayVideoAccessStat]

        partitionOfRecords.foreach(info => {
          val day = info.getAs[String]("day")
          val cmsId = info.getAs[Long]("cmsId")
          val times = info.getAs[Long]("times")

          list.append(DayVideoAccessStat(day, cmsId, times))
        })

        StatDAO.inserDayVideoAccessSortedStat(list)
      })
   } catch {
  case e:Exception => e.printStackTrace()
 }
}
//Step Three:

/**
  * 工具类,提供两类方法:
  * 1.连接数据库,将数据写入MySQL,并释放连接的方法。
  * 2.删除MySQL中已存在的(相同entry的数据)
  */
object StatDAO {

  def inserDayVideoAccessTopN(list: ListBuffer[DayVideoAccessStat]): Unit = {

    var connection: Connection = null
    var pstmt: PreparedStatement = null

    try{
      connection = MySQLUtils.getConnect()

      val sql = "insert into day_video_access_topn_stat(day, cms_id, times) values (?, ?, ?)"
      val pstmt = connection.prepareStatement(sql)

      connection.setAutoCommit(false)

      for (ele <- list) {
        pstmt.setString(1, ele.day)
        pstmt.setLong(2, ele.cmsId)
        pstmt.setLong(3, ele.times)

        pstmt.addBatch()
      }

      pstmt.executeBatch()
      connection.commit()

    } catch {
      case e:Exception => e.printStackTrace()
    } finally {
      MySQLUtils.release(connection, pstmt)
    }
  }

  def inserDayCityVideoAccessTopN(list: ListBuffer[DayCityVideoAccessStat]): Unit = {

    var connection: Connection = null
    var pstmt: PreparedStatement = null

    try{
      connection = MySQLUtils.getConnect()

      val sql = "insert into day_video_city_access_topn_stat(day, cms_id, city, times, times_rank) values (?, ?, ?, ?, ?)"
      val pstmt = connection.prepareStatement(sql)

      connection.setAutoCommit(false)

      for (ele <- list) {
        pstmt.setString(1, ele.day)
        pstmt.setLong(2, ele.cmsId)
        pstmt.setString(3, ele.city)
        pstmt.setLong(4, ele.times)
        pstmt.setInt(5, ele.timesRank)

        pstmt.addBatch()
      }

      pstmt.executeBatch()
      connection.commit()

    } catch {
      case e:Exception => e.printStackTrace()
    } finally {
      MySQLUtils.release(connection, pstmt)
    }
  }

  def deleteData(day: String): Unit = {

    val tables = Array("day_video_access_topn_stat", "day_video_city_access_topn_stat")
    var connection: Connection = null
    var pstmt: PreparedStatement = null

    try {
      connection = MySQLUtils.getConnect()

      for (table <- tables) {
        val sql = s"delete from $table where day = ?"
        val pstmt = connection.prepareStatement(sql)
        pstmt.setString(1, day)
        pstmt.executeUpdate()

      }
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      MySQLUtils.release(connection, pstmt)
    }

  }
}

/**
  * 工具类,包含连接数据库和释放连接的方法。
  */
object MySQLUtils {

  def getConnect() = {
      DriverManager.getConnection("jdbc:mysql://localhost:3306/log_project","root", "password")
  }

  def release(connection: Connection, pstmt: PreparedStatement): Unit ={
    try{
      if (pstmt != null) {
        pstmt.close()
      }
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      if (connection != null) {
        connection.close()
      }
    }
  }

  def main(args: Array[String]): Unit = {
    println(getConnect())
  }
}

参考:
大数据 Spark SQL慕课网日志分析
GitHub源码

原文地址:https://www.cnblogs.com/code2one/p/9872597.html