实时分析系统预警需求

1章 需求分析

1.1 简介

  实时预警,是一种经常出现在实时计算中的业务类型。根据日志数据中系统报错异常,或者用户行为异常的检测,产生对应预警日志。预警日志通过图形化界面的展示,可以提醒监控方,需要及时核查问题,并采取应对措施。

1.2 需求说明

  需求:同一设备,5分钟内三次及以上用不同账号登录并领取优惠劵,并且过程中没有浏览商品。达到以上要求则产生一条预警日志。并且同一设备,每分钟只记录一次预警。

1.3 预警日志格式

mid

设备id

uids

领取优惠券登录过的uid

itemIds

优惠券涉及的商品id

events

发生过的行为

ts

发生预警的时间戳

2章 整体流程设计

2.1 框架流程

2.2 开发思路

  1)从kafka中消费数据,根据条件进行过滤筛选,生成预警日志;

  2)预警日志保存到ElasticSearch中;

  3)利用Kibana快速搭建可视化图形界面。

3章 实时计算模块

3.1 筛选条件分析

  1)同一设备(分组)

  2)5分钟内(窗口)

  3)三次不同账号登录(用户去重)

  4)领取优惠券(行为)

  5)没有浏览商品(行为)

  6)同一设备每分钟只记录一次预警ES去重)

3.2 数据处理流程图

3.3 代码开发

3.3.1 打开gmall_realtime模块pom文件中的如下依赖(之前注释过)

<!-- 使用Java操作ES的客户端api -->
        <dependency>
            <groupId>io.searchbox</groupId>
            <artifactId>jest</artifactId>
            <version>5.3.3</version>
        </dependency>

        <dependency>
            <groupId>net.java.dev.jna</groupId>
            <artifactId>jna</artifactId>
            <version>4.5.2</version>
        </dependency>

        <dependency>
            <groupId>org.codehaus.janino</groupId>
            <artifactId>commons-compiler</artifactId>
            <version>2.7.8</version>
        </dependency>

3.3.2 事件日志样例类,在gmall_realtime模块中新建EventLog

package com.yuange.realtime.beans

/**
 * @作者:袁哥
 * @时间:2021/7/9 19:36
 */
case class EventLog(mid:String,
                    uid:String,
                    appid:String,
                    area:String,
                    os:String,
                    `type`:String,
                    evid:String,
                    pgid:String,
                    npgid:String,
                    itemid:String,
                    var logDate:String,
                    var logHour:String,
                    var ts:Long)

3.3.3 预警日志样例类在gmall_realtime模块中新建CouponAlertInfo

package com.yuange.realtime.beans

/**
 * @作者:袁哥
 * @时间:2021/7/9 19:38
 */
case class CouponAlertInfo(mid:String,
                           uids:java.util.HashSet[String],
                           itemIds:java.util.HashSet[String],
                           events:java.util.List[String],
                           ts:Long)

3.3.4 在gmall_realtime模块中新建MyEsUtil

package com.yuange.realtime.utils

import java.util.Objects
import java.util

import io.searchbox.client.config.HttpClientConfig
import io.searchbox.client.{JestClient, JestClientFactory}
import io.searchbox.core.{Bulk, BulkResult, Index}
import collection.JavaConverters._

/**
 * @作者:袁哥
 * @时间:2021/7/9 21:16
 */
object MyEsUtil {

  private val ES_HOST = "http://hadoop102"
  private val ES_HTTP_PORT = 9200
  private var factory: JestClientFactory = null

  /**
   * 获取客户端
   *
   * @return jestclient
   */
  def getClient: JestClient = {
    if (factory == null) build()
    factory.getObject
  }

  /**
   * 关闭客户端
   */
  def close(client: JestClient): Unit = {
    if (!Objects.isNull(client)) try
      client.shutdownClient()
    catch {
      case e: Exception =>
        e.printStackTrace()
    }
  }

  /**
   * 建立连接
   */
  private def build(): Unit = {
    factory = new JestClientFactory
    factory.setHttpClientConfig(new HttpClientConfig.Builder(ES_HOST + ":" + ES_HTTP_PORT).multiThreaded(true)
      .maxTotalConnection(20) //连接总数
      .connTimeout(10000).readTimeout(10000).build)
  }

  /*
      批量插入数据到ES

        需要先将写入的数据,封装为 docList: List[(String, Any)]
          (String, Any):   K:id
                            V: document
   */
  def insertBulk(indexName: String, docList: List[(String, Any)]): Unit = {

    if (docList.size > 0) {

      val jest: JestClient = getClient

      val bulkBuilder = new Bulk.Builder().defaultIndex(indexName).defaultType("_doc")

      for ((id, doc) <- docList) {
        val indexBuilder = new Index.Builder(doc)
        if (id != null) {
          indexBuilder.id(id)
        }
        val index: Index = indexBuilder.build()
        bulkBuilder.addAction(index)
      }

      val bulk: Bulk = bulkBuilder.build()

      var items: util.List[BulkResult#BulkResultItem] = null

      try {
        items = jest.execute(bulk).getItems
      } catch {
        case ex: Exception => println(ex.toString)
      } finally {
        //自动关闭连接
        close(jest)
        println("保存" + items.size() + "条数据")
        /*
            items: 是一个java的集合
             <- 只能用来遍历scala的集合

             将items,由Java的集合转换为scala的集合    java集合.asScala

             由scala集合转java集合 scala集合.asJava
         */
        for (item <- items.asScala) {
          if (item.error != null && item.error.nonEmpty) {
            println(item.error)
            println(item.errorReason)
          }
        }
      }
    }
  }
}

3.3.预警业务类,在gmall_realtime模块中新建AlertApp

package com.yuange.realtime.app

import java.time.{Instant, LocalDate, LocalDateTime, ZoneId}
import java.time.format.DateTimeFormatter
import java.util

import com.alibaba.fastjson.JSON
import com.yuange.constants.Constants
import com.yuange.realtime.beans.{CouponAlertInfo, EventLog}
import com.yuange.realtime.utils.{MyEsUtil, MyKafkaUtil}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.spark.streaming.Minutes
import org.apache.spark.streaming.dstream.{DStream, InputDStream}

import scala.util.control.Breaks._

/**
 * @作者:袁哥
 * @时间:2021/7/9 19:41
 */
object AlertApp extends BaseApp {
  override var appName: String = "AlertApp"
  override var duration: Int = 10

  def main(args: Array[String]): Unit = {
    run{
      val ds: InputDStream[ConsumerRecord[String,String]] = MyKafkaUtil.getKafkaStream(Constants.GMALL_EVENT_LOG,streamingContext)

      //封装样例类
      val ds1: DStream[EventLog] = ds.map(record => {
        val eventLog: EventLog = JSON.parseObject(record.value(), classOf[EventLog])

        //根据ts 为logDate 和 logHour赋值
        val formatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd")
        val localDateTime: LocalDateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(eventLog.ts), ZoneId.of("Asia/Shanghai"))

        eventLog.logDate = localDateTime.format(formatter)
        eventLog.logHour = localDateTime.getHour.toString

        eventLog
      })

      //开窗:采集过去5分钟的数据
      //然后统计每个设备,每个用户登录时的行为:将数据变为K-V结构,然后按Key分组
      val ds2: DStream[((String,String),Iterable[(EventLog)])]  = ds1.window(Minutes(5)).map(log => {
        ((log.mid, log.uid), log)
      }).groupByKey()

      //判断在5分钟内,用户所产生的行为是否需要预警,将需要预警的数据留下
      //数据包括:设备id,用户id,5分钟内产生的日志
      val ds3: DStream[(String,Iterable[EventLog])] = ds2.map {
        case ((mid, uid), logs) => {
          //是否要预警的标记,默认不需要预警
          var ifneedAlert: Boolean = false
          breakable {
            logs.foreach(log => {
              //只要浏览了商品,就不预警
              if ("clickItem".equals(log.evid)) {
                ifneedAlert = false
                break()
              } else if ("coupon".equals(log.evid)) { //领取了优惠券,符合预警条件
                ifneedAlert = true
              }
            })
          }

          if (ifneedAlert) {
            (mid, logs)
          } else {
            (null, null)
          }
        }
      }

      //过滤空值
      val ds4: DStream[(String,Iterable[EventLog])] = ds3.filter(_._1 != null)

      //按照设备id聚合,并将每个设备中登录用户数小于3个的过滤掉,最后将value进行扁平化处理
      val ds5: DStream[(String,Iterable[EventLog])] = ds4.groupByKey().filter(_._2.size >= 3).mapValues(_.flatten)

      //生成预警日志
      val ds6: DStream[CouponAlertInfo] = ds5.map {
        case (mid, logs) => {
          var uids: util.HashSet[String] = new java.util.HashSet[String]()
          var itemIds: util.HashSet[String] = new java.util.HashSet[String]()
          var events: util.List[String] = new util.ArrayList[String]()

          logs.foreach(log => {
            uids.add(log.uid)
            events.add(log.evid)

            if ("coupon".equals(log.evid)) {
              itemIds.add(log.itemid)
            }
          })
          //将数据进行封装
          CouponAlertInfo(mid, uids, itemIds, events, System.currentTimeMillis())
        }
      }

      //将DS中的数据,转换为docList: List[(String, Any)],再写入,确保mid唯一
      val ds7: DStream[(String,CouponAlertInfo)] = ds6.map(info => {
        val formatter: DateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm")
        val localDateTime: LocalDateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(info.ts), ZoneId.of("Asia/Shanghai"))

        (localDateTime.format(formatter) + "_" + info.mid, info)
      })

      //写入ES
      ds7.foreachRDD(rdd => {
        //以分区为单位写入,节省创建连接的开销
        rdd.foreachPartition(partition => {
          //将这个分区的数据,封装为 docList: List[(String, Any)]
          val list: List[(String,CouponAlertInfo)] = partition.toList

          MyEsUtil.insertBulk("gmall_coupon_alert"+LocalDate.now() , list)
        })
      })
    }
  }
}

4ElasticSearch 的保存

4.1 ES集群搭建

  1)详情请查看官网:https://www.elastic.co/cn/

  2)或者博客:https://www.cnblogs.com/LzMingYueShanPao/p/14984577.html

4.2 ES上建好索引

  其实即使不提前建立索引,ES也是可以将数据保存进去的。这种情况,ES会根据第一条要插入的数据进行推断,但是ES的这种推断往往不够准确。

  比如:要区分字段要不要进行索引,字段要不要进行分词,如果分词选用哪个分词器等等。

  建立索引语句(包含Mapping)

PUT _template/gmall_coupon_alert_template
{
  "index_patterns": ["gmall_coupon_alert*"],
  "settings": {
    "number_of_shards": 3
  },
  "aliases" : { 
    "{index}-query": {},
    "gmall_coupon_alert-query":{}
  },
   "mappings": {
     "_doc":{  
       "properties":{
         "mid":{
           "type":"keyword"
         },
         "uids":{
           "type":"keyword"
         },
         "itemIds":{
           "type":"keyword"
         },
         "events":{
           "type":"keyword"
         },
         "ts":{
           "type":"date"
         } 
       }
     }
   }
}

4.3 测试

  1)启动Zookeeper

zookeeper.sh start

  2)启动Kafka

kafka.sh start

  3)启动日志服务器

startgmall.sh start

  4)启动Nginx

sudo /opt/module/nginx/sbin/nginx

  5)启动ES集群

elasticsearch.sh start

  6)启动JsonMocker中的mian方法,生产数据

  7)启动AlertApp消费数据

  8)查看ES中的数据条数

GET /gmall_coupon_alert2021-07-09/_search

5Kibana发布可视化界面

5.1 建立index pattern

  建立数据源表达式

gmall_coupon_alert*

5.2 建立visualize

5.2.1 新增一个可视化图

5.2.2 选择一个图形类型

  选择柱形图(Vertical Bar)

5.3 建立Dashboard

  Dashboard是一个可以放很多个可视化图的大仪表盘,你可以把之前设计好的多个可视化图,放置在一个仪表盘中,一起显示。

5.3.1 新增一个Dashboard

5.3.2 加入多个可视化图

5.3.4 最后保存

5.4 分享到网页中

   点击最上方的share按钮

  可以把剪切板中的iframe代码嵌入到网页代码中

  这样就可以在一张网页中显示kibana中的仪表盘

原文地址:https://www.cnblogs.com/LzMingYueShanPao/p/14992266.html