Spark- 流量日志分析

日志生成

package zx.Utils

import java.io.{File, FileWriter}
import java.util.Calendar
import org.apache.commons.lang.time.{DateUtils, FastDateFormat}

import scala.collection.mutable.ArrayBuffer
import scala.util.Random

/**
 * Created by 166 on 2017/9/6.
 */
case class FlowLog(time:String,ip:String,upFlow:Long,downFlow:Long) extends Serializable{
  override def toString: String = {
    s"$time	$ip	$upFlow	$downFlow"
  }
}
object CreateLog {
  val ip_buffer: StringBuilder = new StringBuilder
  private val fs: FastDateFormat = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
  var startTime:String="2015-1-12 12:12:12"
  val instance: Calendar = Calendar.getInstance
  val ipPool:ArrayBuffer[String]=getIp  //ipPool    取得20个ip

  //取得20个ip地址
  private [this] def getIp:ArrayBuffer[String]={
    val arrayBuffer: ArrayBuffer[String] = ArrayBuffer()
    ip_buffer.clear()
    for(i<-0 to 20){
      ip_buffer.append(Random.nextInt(255)).append(".")
        .append(Random.nextInt(255)).append(".")
        .append(Random.nextInt(255)).append(".")
        .append(Random.nextInt(255))
      arrayBuffer+=ip_buffer.toString()
      ip_buffer.clear()
    }
    arrayBuffer
  }

  def getTime:String={
    instance.setTime(DateUtils.parseDate(startTime,Array("yyyy-MM-dd HH:mm:ss")))
    instance.add(Calendar.MINUTE,Random.nextInt(200))
    val newTime: String = fs.format(instance.getTime)
    startTime=newTime
    newTime
  }

  def getFlow:Long={
    Random.nextInt(800)
  }

  //从ip地址池中取出一个ip
  def getIP:String={
    ipPool(Random.nextInt(ipPool.size))
  }

  //把日志写入文件
  def write2file(fr:FileWriter,context:String)={
      fr.write(context)
    fr.write(System.lineSeparator())
    fr.flush()
    "SUCCESS"
  }

  def main(args: Array[String]) {
    val file: File = new File("C:\Users\166\Desktop\Data\Log","click_flow.log")
    if(file.exists()){
      file.delete()
      val fw: FileWriter = new FileWriter(file)
      for(i<-0 to 10000)println(write2file(fw,FlowLog(getTime,getIP,getFlow,getFlow).toString))
      fw.close()
    }else{
      val fw: FileWriter = new FileWriter(file)
      for(i<-0 to 10000)println(write2file(fw,FlowLog(getTime,getIP,getFlow,getFlow).toString))
      fw.close()
    }
  }
}

算出每个用户的上行流量总和 和下行流量的总和

package zx.sparkStream

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**需求:算出每个用户的上行流量总和 和下行流量的总和
 * Created by rz on 2017/9/6.
 */
case class ResultTuple()
case class ClickFlow(remoteUser:String,tupleFlow:(Long,Long))
object SparkOffLine {
  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
    val sc: SparkContext = new SparkContext(new SparkConf().setAppName("SparkOffLine").setMaster("local[*]"))
    val rdd: RDD[String] = sc.textFile("C:\Users\166\Desktop\Data\Log\click_flow.log")
    val rdd1:RDD[(String,ClickFlow)]=rdd.map(data=>{
      val datas:Array[String]= data.split("	")
      (datas(1),ClickFlow(datas(1),(datas(2).toLong,datas(3).toLong)))

    })
    val rdd2:RDD[(String,ClickFlow)]=rdd1.reduceByKey((x,y)=>{
      val x_upFlow: Long = x.tupleFlow._1
      val y_upFlow: Long = y.tupleFlow._1
      val x_dowmFlow: Long = x.tupleFlow._2
      val y_downFlow: Long = y.tupleFlow._2
      ClickFlow(x.remoteUser,(x_upFlow+y_upFlow,x_dowmFlow+y_downFlow))
    })

    println(rdd2.collect().toBuffer)
  }
}
原文地址:https://www.cnblogs.com/RzCong/p/7823010.html