Spark Streaming 整合Kafka的 Offset 管理 【数据零丢失之 MySQL管理Offset】

写在前面:

在使用SparkStreaming 整合 Kafka 0.8版本的时候, spark-streaming-kafka-0-8 是不提供offset的管理的。为了保证数据零丢失,我们需要自己来管理这个偏移量。

参照:http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html

我们是将偏移量储存在MySQL中进行管理,

快速入门使用scalikejdbc 操作MySQL:

1.导入依赖

      <!--scalikejdbc-config_2.11-->
      <dependency>
          <groupId>org.scalikejdbc</groupId>
          <artifactId>scalikejdbc-config_2.11</artifactId>
          <version>2.5.0</version>
      </dependency>

2.resource文件中新建application.conf文件,配置如下

# MySQL example  本地
db.default.driver="com.mysql.jdbc.Driver"
db.default.url="jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf-8"
db.default.user="root"
db.default.password="123456"

3.Scalikejdbc Demo

package com.csylh.logAnalysis.scalikejdbc

import scalikejdbc.{DB, SQL}
import scalikejdbc.config.DBs


/**
  * scala中连接mysql,使用ScaLikeJdbc框架进行数据的增删改查
  */
object ScaLikeJdbcApp {

  def main(args: Array[String]): Unit = {

    //解析application.conf的文件
    DBs.setup()
    //DBs.setupAll()
    DB.autoCommit {
        implicit session =>
        SQL("insert into people(name,age,fv) values(?,?,?)")
          .bind("留歌", 22, 88)
          .update().apply()
        }
    }

    def delete() = {
      DB.autoCommit {
        implicit session =>
        SQL("delete from people where name = ?")
          .bind("留歌")
          .update().apply()
      }
    }


    def update() {
      DB.autoCommit { implicit session =>
        SQL("update people set age = ? where name = ?")
          .bind(18, "留歌")
          .update().apply()
      }
    }

  /**
    * select查询到数据之后会产生一个rs的对象集,然后可以得到这个对象集里面的数据
    */
  def select() = {
      DB.readOnly {
        implicit session =>
        val sql = SQL("select * from people ").map(rs =>
          (rs.string("name"), rs.int("age"))
        ).toList().apply()
      }
    }

}

零丢失数据解决方案代码如下:

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scalikejdbc.config.DBs

/**
  * Description: Spark Streaming 整合Kafka的 偏移量 管理
  *     其实可以在 MySQL/zk/kafka/hbase/redis ...中存储我们的的offset偏移量数据的
  *     这里选用MySQL + scalikejdbc
  *
  * @Author: 留歌36
  * @Date: 2019/8/8 11:24
  */
object OffsetApp {
  def main(args:Array[String]){

    val conf= new SparkConf().setAppName("OffsetApp").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(10))

    val kafkaParams = Map(
      "metadata.broker.list"->"192.168.1.116:9092",
      "group.id"->"liuge.group.id" ,// 消费的时候是以组为单位进行消费
      "auto.offset.reset" -> "smallest"
    )
    val topics = "test".split(",").toSet
    //  步骤一:先获取offset
    import scalikejdbc._
    /**
      * tuple 转 map
      * ().list.apply().toMap
      */
    DBs.setup()
    val fromOffsets =
      DB.readOnly {
        implicit session => {
          SQL("select * from offsets_storage ").map(rs =>
            (TopicAndPartition(rs.string("topic"), rs.int("partitions")),rs.long("offset"))
          ).list().apply()
        }
    }.toMap

    for (ele <- fromOffsets){
      println("读取MySQL偏移量相关数据: " + ele._1.topic + ":" + ele._1.partition +":"+ele._2)
    }

    // 步骤二: Direct 模式对接Kafka ,得到InputDStream
    val stream = if (fromOffsets.isEmpty){
      // 第一次进来,进行消费Kafka ==> stream
      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc,kafkaParams,topics)

    }else{ // 非第一次,进行消费Kafka ==> stream
      val messageHandler = (mm:MessageAndMetadata[String,String]) => (mm.key(), mm.message())
      KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)

    }
    // 步骤三: 业务逻辑 + 保存offset
    stream.foreachRDD(rdd => {

      // TODO.. 3.1这里是你的业务逻辑代码,简单count()为例
      println("留歌本轮的统计结果:" + rdd.count())


      // 3.2这里是保存offset数据的代码
      var  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      for (o <- offsetRanges) {
        println("消费数据从多少到多少:"+s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")

        DB.autoCommit{
          implicit session => {
            SQL("replace into offsets_storage(topic,groupid,partitions,offset) values(?,?,?,?)")
              .bind(o.topic, "liuge.group.id", o.partition, o.untilOffset).update().apply()
          }
        }
      }
    })

    ssc.start()
    ssc.awaitTermination()
  }
}

到这里,我们就能够比较好的去消费Kafka的数据了。

有任何不对的地方,欢迎指证!谢谢~~

KafkaProducer【java版】模拟数据

原文地址:https://www.cnblogs.com/liuge36/p/12614728.html