数据零丢失 + 仅一次消费数据【终极方案】


import java.sql.{DriverManager, ResultSet}

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scalikejdbc.config.DBs

/**
  *  数据零丢失  + 仅一次消费数据  <====  MySQL 管理Offset
  *
  * 使用的0.8版本  spark-streaming-kafka-0-8_2.11
  *
  *
  * @Author: 留歌36
  * @Date: 2019/8/15 10:40
  */
object Offset05App {
  def main(args: Array[String]): Unit ={
    val conf = new SparkConf().setAppName("Offset05App").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(10))

    val kafkaParams = Map[String, String](
      "metadata.broker.list"->"xxx:9092",
              "auto.offset.reset" -> "smallest"
    )
    val topics = "exactlyonce_topic2".split(",").toSet

    // TODO... 获取偏移量 ==> MySQL
    // =====================获取offset的模板==================================
    //  步骤一:先获取offset
    import scalikejdbc._
    /**
      * tuple 转 map
      * ().list.apply().toMap
      */
    DBs.setup()
    val fromOffsets =
      DB.readOnly {
        implicit session => {
          SQL("select * from exactlyonce_topic2").map(rs =>
            (TopicAndPartition(rs.string("topic"), rs.int("partitions")),rs.long("offset"))
          ).list().apply()
        }
      }.toMap

    for (ele <- fromOffsets){
      println("读取MySQL偏移量相关数据==>topic:  " + ele._1.topic + "  partition:" + ele._1.partition +"  offset:"+ele._2)
    }
    //=======================================================
    // 步骤二: Direct 模式对接Kafka ,得到InputDStream
    val stream = if (fromOffsets.isEmpty){ // 从头消费
        KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    }else { // 从指定偏移量进行消费

      val messageHandler = (mm:MessageAndMetadata[String,String]) => (mm.key(), mm.message())

      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)

    }
    // 步骤三: 业务逻辑 + 保存offset
    stream.foreachRDD(rdd => {
      // 假设这里就是业务逻辑   <== 步骤一:保存业务逻辑处理的结果数据
      if (!rdd.isEmpty()) {
        println(s"留歌本轮的统计结果:${rdd.count()}条数据" )
      }
      // 幂等  仅1次消费 :多次消费同一批数据,在外部存储显示为 update即可,类似upsert
      rdd.map(_._2).foreachPartition(partition => {

        val connection = DriverManager.getConnection("jdbc:mysql://xxx:3306/onlineloganalysis?useUnicode=true&characterEncoding=UTF-8&useSSL=false","root","xx")

        partition.foreach(msg => {
          if(StringUtils.isNoneEmpty(msg)){

            val temp = msg.split(",")

            if (temp.length == 3){
              val minute = temp(0)
              val domain = temp(1)

              var traffic = 0L  // 考虑到流量这个值可能脏数据,无法toLong
              try{
                traffic = temp(2).trim.toLong  // 考虑到空格的情况
              }catch {
                case e:Exception => traffic = 0L
              }

              println(msg + "............")
              // TODO... 处理完的结果应该要 “入库”  ==> MySQL
              // 主键冲突 。。 how to slave ?? md5 or upsert
              val sqlbefore = s"select count(1) as nums from traffic_m5 where m5 ='$minute' and domain= '$domain' "
              val resultSet: ResultSet = connection.createStatement().executeQuery(sqlbefore)
              if (resultSet.next()) {
                val flag = resultSet.getInt("nums")
                if (flag == 0){  // 不存在
                  val sql = s"insert into traffic_m5(m5, domain,traffic) values('$minute','$domain','$traffic')"
                  val stmt = connection.prepareStatement(sql)
                  stmt.execute()
                }else {
                  val sql = s"update traffic_m5 set traffic ='$traffic' where m5='$minute' and domain='$domain' "
                  val stmt = connection.prepareStatement(sql)
                  stmt.execute()
                }
              }
            }
          }
        })

        connection.close()
      })

      // 将Offset 提交到外部存储保存  <==   步骤二:保存offsetr
      // ======================保存offset的模板==================================
      var  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      for (o <- offsetRanges) {
        if (o.fromOffset != o.untilOffset){
          println("消费数据从多少到多少:"+s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
        }else {
          println("!该批次没有消费到数据")
        }
        DB.autoCommit{
          implicit session => {
            SQL("replace into exactlyonce_topic2(topic,groupid,partitions,offset) values(?,?,?,?)")
              .bind(o.topic, "liuge.group.id", o.partition, o.untilOffset).update().apply()
          }
        }
      }
      // ========================================================

    })

    ssc.start() // 启动程序
    ssc.awaitTermination() // 等待程序终止
  }

}

补充:https://blog.csdn.net/weixin_41907511/article/details/84842815

原文地址:https://www.cnblogs.com/liuge36/p/12614726.html