Flink之广告点击黑名单统计

1、数据格式

543462,1715,北京,北京,1511658000
662867,2244074,广东,广州,1511658060
561558,3611281,广东,深圳,1511658120
894923,1715,北京,北京,1511658180
834377,2244074,上海,上海,1511658240
625915,3611281,广东,珠海,1511658300
578814,1715,广东,深圳,1511658330
873335,1256540,上海,上海,1511658540
429984,2244074,广东,深圳,1511658600
937166,1715,北京,北京,1511661601
937166,1715,北京,北京,1511661602
937166,1715,北京,北京,1511661603
937166,1715,北京,北京,1511661604
937166,1715,北京,北京,1511661605
937166,1715,北京,北京,1511661606
937166,1715,北京,北京,1511661607
937166,1715,北京,北京,1511661608
161501,36156,江苏,南京,1511661608
937166,1715,北京,北京,1511661609
937166,1715,北京,北京,1511661610
937166,1715,北京,北京,1511661611
937166,1715,北京,北京,1511661612
937166,1715,北京,北京,1511661613
937166,1715,北京,北京,1511661614
937166,1715,北京,北京,1511661615
937166,1715,北京,北京,1511661616
937166,1715,北京,北京,1511661617
937166,1715,北京,北京,1511661618
2315,36156,zhejiang,杭州,1511661618
10236,5614,henan,郑州,1511661619
937166,1715,北京,北京,1511661619
937166,1715,北京,北京,1511661620
937166,1715,北京,北京,1511661621
937166,1715,北京,北京,1511661622
937166,1715,北京,北京,1511661623
937166,1715,北京,北京,1511661624
937166,1715,北京,北京,1511661625
937166,1715,北京,北京,1511661626
937166,1715,北京,北京,1511661627
937166,1715,北京,北京,1511661628
937166,1715,北京,北京,1511661629
937166,1715,北京,北京,1511661630
937166,1715,北京,北京,1511661631
937166,1715,北京,北京,1511661632
937166,1715,北京,北京,1511661633
937166,1715,北京,北京,1511661634
937166,1715,北京,北京,1511661635
937166,1715,北京,北京,1511661636
937166,1715,北京,北京,1511661637
937166,1715,北京,北京,1511661638
937166,1715,北京,北京,1511661639
937166,1715,北京,北京,1511661640
2315,36237,zhejiang,杭州,1511661641
10236,2914,henan,郑州,1511661641
2315,7156,zhejiang,杭州,1511661641
10236,5389,hebei,石家庄,1511661641
937166,1715,北京,北京,1511661641
937166,1715,北京,北京,1511661642
937166,1715,北京,北京,1511661643
937166,1715,北京,北京,1511661644
937166,1715,北京,北京,1511661645
937166,1715,北京,北京,1511661646
937166,1715,北京,北京,1511661647
937166,1715,北京,北京,1511661648
937166,1715,北京,北京,1511661649
937166,1715,北京,北京,1511661650
937166,1715,北京,北京,1511661651
937166,1715,北京,北京,1511661652
161501,36156,江苏,南京,1511661652
937166,1715,北京,北京,1511661653
937166,1715,北京,北京,1511661654
161501,36156,江苏,南京,1511661654
161501,36156,江苏,南京,1511661655
937166,1715,北京,北京,1511661655
937166,1715,北京,北京,1511661656
937166,1715,北京,北京,1511661657
937166,1715,北京,北京,1511661658
937166,1715,北京,北京,1511661659
937166,1715,北京,北京,1511661660
937166,1715,北京,北京,1511661661
937166,1715,北京,北京,1511661662
937166,1715,北京,北京,1511661663
937166,1715,北京,北京,1511661664
937166,1715,北京,北京,1511661665
937166,1715,北京,北京,1511661666
937166,1715,北京,北京,1511661667
937166,1715,北京,北京,1511661668
937166,1715,北京,北京,1511661669
937166,1715,北京,北京,1511661670
937166,1715,北京,北京,1511661671
937166,1715,北京,北京,1511661672
937166,1715,北京,北京,1511661673
937166,1715,北京,北京,1511661674
937166,1715,北京,北京,1511661675
937166,1715,北京,北京,1511661676
937166,1715,北京,北京,1511661677
937166,1715,北京,北京,1511661678
937166,1715,北京,北京,1511661679
937166,1715,北京,北京,1511661680
937166,1715,北京,北京,1511661681
937166,1715,北京,北京,1511661682
937166,1715,北京,北京,1511661683
937166,1715,北京,北京,1511661684
937166,1715,北京,北京,1511661685
937166,1715,北京,北京,1511661686
937166,1715,北京,北京,1511661687
937166,1715,北京,北京,1511661688
937166,1715,北京,北京,1511661689
937166,1715,北京,北京,1511661690
937166,1715,北京,北京,1511661691
937166,1715,北京,北京,1511661692
2315,36237,zhejiang,杭州,1511661692
10236,2914,henan,郑州,1511661693
2315,7156,zhejiang,杭州,1511661693
937166,1715,北京,北京,1511661693
937166,1715,北京,北京,1511661694
937166,1715,北京,北京,1511661695
937166,1715,北京,北京,1511661696
937166,1715,北京,北京,1511661697
937166,1715,北京,北京,1511661698
937166,1715,北京,北京,1511661699
937166,1715,北京,北京,1511661700
937166,1715,北京,北京,1511661701
937166,1715,北京,北京,1511661702
937166,1715,北京,北京,1511661703
937166,1715,北京,北京,1511661704
937166,1715,北京,北京,1511661705
937166,1715,北京,北京,1511661706
937166,1715,北京,北京,1511661707
937166,1715,北京,北京,1511661708
937166,1715,北京,北京,1511661709
937166,1715,北京,北京,1511661710
937166,1715,北京,北京,1511661711
937166,1715,北京,北京,1511661712
937166,1715,北京,北京,1511661713
937166,1715,北京,北京,1511661714
937166,1715,北京,北京,1511661715
937166,1715,北京,北京,1511661716
937166,1715,北京,北京,1511661717
937166,1715,北京,北京,1511661718
937166,1715,北京,北京,1511661719
937166,1715,北京,北京,1511661710
937166,1715,北京,北京,1511661711
View Code

2、处理主类

package service

/**
 * @program: demo
 * @description: ${description}
 * @author: yang
 * @create: 2020-12-30 14:29
 */

import java.sql.Timestamp

import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, KeyedProcessFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import utils.Utils


// 输入的广告点击事件样例类
case class AdClickEvent( userId: Long,  //用户ID
                         adId: Long, //广告ID
                         province: String,  //省份
                         city: String, //城市
                         timestamp: Long ) //用户点击广告的时间

/**
 * 实时黑名单统计
 */
object AdClickCount {
  //设置了一个侧输出流 外部的变量,全局的变量
  private val outputBlackList = new OutputTag[String]("blacklist")


  def main(args: Array[String]): Unit = {
    //步骤一:获取程序入口
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //设置参数
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //步骤二:计算黑名单
    val adEventStream = env.readTextFile(Utils.adClickLogPath) //获取数据
      .map(Utils.string2ClickEvent(_)) //解析数据
      .assignTimestampsAndWatermarks(new AdClickEventTimeExtractor()) //设置watermark
      .keyBy(data => (data.userId, data.adId)) //分组(userid,adclickid)
      .process(new CountBlackListUser(100)) //实时统计

    //步骤三:从侧输出流打印黑名单
    adEventStream.getSideOutput(outputBlackList)
      .print()



    env.execute("AdClickCount")

  }


  /**
   * 对广告点击次数进行聚合统计
   */
  class AdClickCount extends AggregateFunction[AdClickEvent,Long,Long]{
    //辅助变量赋初始值
    override def createAccumulator(): Long = 0L
    //对每条数据加一
    override def add(in: AdClickEvent, acc: Long): Long = acc + 1
    //返回最后的结果
    override def getResult(acc: Long): Long = acc
    //把所有的数据加起来
    override def merge(acc: Long, acc1: Long): Long = acc + acc1
  }

  /**
   * 过滤黑名单数据
   * @param maxCount 最大次数
   */
  class CountBlackListUser(maxCount:Int)
    extends KeyedProcessFunction[(Long,Long),AdClickEvent,AdClickEvent]{
    //记录当前用户对当前广告的点击量
    lazy val clickCountState:ValueState[Long] = getRuntimeContext.getState(
      new ValueStateDescriptor[Long]("count-click-state",classOf[Long]))

    //保存是否发送过黑名单
    lazy val isSetBlackList:ValueState[Boolean] = getRuntimeContext.getState(
      new ValueStateDescriptor[Boolean]("is-sent-state",classOf[Boolean]))

    //保存定时器触发的时间戳
    lazy val saveTimerState:ValueState[Long] = getRuntimeContext.getState(
      new ValueStateDescriptor[Long]("reset-time-state",classOf[Long])


    )

    override def processElement(value: AdClickEvent,
                                ctx: KeyedProcessFunction[(Long, Long),
                                  AdClickEvent, AdClickEvent]#Context,
                                out: Collector[AdClickEvent]): Unit = {

      val currentCount = clickCountState.value()
      //如果当前用户的当前广告第一次来,注册定时器,定时器每天00:00触发
      //也就是说,到了晚上12:00的时候,你要清空今天统计的数据。
      if(currentCount == 0){
        //计算时间
        val ts = (ctx.timerService().currentProcessingTime()/(1000*60*60*24) +1) * (1000 * 60 * 60 * 24)
        saveTimerState.update(ts)
        //注册定时器
        ctx.timerService().registerProcessingTimeTimer(ts)
      }
      //判断计数是否达到上线,如果达到加入黑名单
      if(currentCount >= maxCount){ //100 101
        //是否发送过黑名单
        if(!isSetBlackList.value()){ //如果没有发送过黑名单消息
          //更新一下发送黑名单的状态
          isSetBlackList.update(true)
          //输入到侧输出流
          ctx.output(outputBlackList,
            "用户"+value.userId+" 对广告:"+value.adId+" 点击超过 " + maxCount +" 次")
        }
        return
      }
      //更新当前的状态,累加访问的次数
      clickCountState.update(currentCount + 1)
      out.collect(value)
    }

    override def onTimer(timestamp: Long,
                         ctx: KeyedProcessFunction[(Long, Long),
                           AdClickEvent, AdClickEvent]#OnTimerContext,
                         out: Collector[AdClickEvent]): Unit = {
      if(timestamp == saveTimerState.value()){
        //清空状态数据
        isSetBlackList.clear()
        clickCountState.clear()
        saveTimerState.clear()
      }

    }

  }

}

class AdClickEventTimeExtractor extends AssignerWithPeriodicWatermarks[AdClickEvent]{
  //当前窗口的时间最大值
  var currentMaxEventTime = 0L
  //最大乱序时间 10s
  val maxOufOfOrderness = 10

  override def getCurrentWatermark: Watermark = {
    new Watermark((currentMaxEventTime - maxOufOfOrderness) * 1000)
  }

  override def extractTimestamp(element: AdClickEvent, previousElementTimestamp: Long): Long = {
    //时间字段
    val timestamp = element.timestamp * 1000

    currentMaxEventTime = Math.max(element.timestamp, currentMaxEventTime)
    timestamp;
  }
}

3、UTils工具类

package utils

/**
 * @program: demo
 * @description: ${description}
 * @author: yang
 * @create: 2020-12-30 14:26
 */
import java.text.SimpleDateFormat

import service.{AdClickEvent, ApacheLogEvent, UserBehavior}


object Utils {

  //时间日志路径
  val eventLogPath = "E:\java\demo\src\main\resources\file\data2.log"
  //广告点击日志路径
  val adClickLogPath = "E:\java\demo\src\main\resources\file\data3.csv"

  //用户行为数据日志路径
  val userBehaviorLogPath="E:\java\demo\src\main\resources\file\data1.csv"


  /**
   * 根据字符串把数据转换成为日志服务数据对象
   * @param line
   * @return
   */
  def string2ApacheLogEvent(line:String):ApacheLogEvent={
    val fields = line.split(" ")
    val dateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
    val timeStamp = dateFormat.parse(fields(3).trim).getTime
    ApacheLogEvent(fields(0).trim,fields(1).trim,timeStamp,
      fields(5).trim,fields(6).trim)
  }

  /**
   * 根据字符串生成广告点击日志对象
   * @param line
   * @return
   */
  def string2ClickEvent(line:String):AdClickEvent={
    val dataArray = line.split(",")
    AdClickEvent(dataArray(0).trim.toLong, dataArray(1).trim.toLong, dataArray(2).trim, dataArray(3).trim, dataArray(4).trim.toLong)
  }

  /**
   * 根据字符串,把数据转换成为用户行为对象
   * @param line
   * @return
   */
  def string2UserBehavior(line:String):UserBehavior={
    val fields = line.split(",")
    UserBehavior(fields(0).trim.toLong,
      fields(1).trim.toLong,
      fields(2).trim.toLong,
      fields(3).trim,
      fields(4).trim.toLong,
      fields(5).trim
    )

  }

}
原文地址:https://www.cnblogs.com/ywjfx/p/14234924.html