sparkstreaming direct方式读取kafka(0.10版本)数据, 并手动维护offset

旧版本的kafka的offset值是由zookeeper监管。

0.10版本的kafka则是由kafka自己管理offset,将其放到__consumer_offset的topic下。

注意:spark-streaming-kafka-0-10在这个集成包的下面, sparkstreaming和kafka集成只有一种方式,那就是direct模式, 而且这个版本的消费者偏移量和zookeeper没有任何关系!!!!!

pom.xml:

<properties>
<kafka.version>0.10.0.1</kafka.version>
<spark.version>2.3.4</spark.version>
</properties>

<!--kafka--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-scala_2.11</artifactId> <version>2.9.9</version> </dependency> <!--SparkStreaming--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>${spark.version}</version> </dependency>

代码:

import org.apache.commons.lang.StringUtils
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext, TaskContext, rdd}

object SparkKafkaDemo {
  def main(args: Array[String]): Unit = {
    //1.创建StreamingContext
    //spark.master should be set as local[n], n > 1
    val conf = new SparkConf().setAppName("demo").setMaster("local[6]")
    val sc = new SparkContext(conf)
    sc.setLogLevel("ERROR")
    val ssc = new StreamingContext(sc, Seconds(5))
    //5表示5秒中对数据进行切分形成一个RDD
    //准备连接Kafka的参数
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.100.100:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "SparkKafkaDemo",
      //earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      //latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      //none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      //这里配置latest自动重置偏移量为最新的偏移量,即如果有偏移量从偏移量位置开始消费,没有偏移量从新来的数据开始消费
      "auto.offset.reset" -> "latest",
      //false表示关闭自动提交.由spark帮你提交到Checkpoint或程序员手动维护
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    // 消费topic 可以有多个topic
    val topics = Array("test")
    //2.使用KafkaUtil连接Kafak获取数据
    val recordDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
      LocationStrategies.PreferConsistent, //位置策略,源码强烈推荐使用该策略,会让Spark的Executor和Kafka的Broker均匀对应
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)) //消费策略,源码强烈推荐使用该策略

    recordDStream.foreachRDD(rdd => {
      //手动提交处理后的offset值到kafka
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd.foreachPartition(partitions =>{
        //此处将遍历分区,得到每个分区的fromOffset和untilOffset
        val o = offsetRanges(TaskContext.get().partitionId())
        //打印到控制台可以明了的查看offset值
        println(o.fromOffset+"- - - - - - - - - - "+o.untilOffset)
        partitions.foreach(line =>{
          //将分区中数据的key,value值打印到控制台
          println("key"+line.key()+"...........value"+line.value())
        })
      })

      //单词统计
      val result: RDD[(String, Int)] = rdd
        .filter(msg => StringUtils.isNotBlank(msg.value()))
        .map(_.value())
        .flatMap(_.split(" "))
        .map((_, 1))
        .reduceByKey((curr, agg) => (curr + agg))
      result.foreach(println(_))

      //手动提交处理后的offset值到kafka
      recordDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
    })

    /*//3.获取VALUE数据
    val lineDStream: DStream[String] = recordDStream.map(msg=>msg.value())//_指的是ConsumerRecord
    val wrodDStream: DStream[String] = lineDStream.flatMap(_.split(" ")) //_指的是发过来的value,即一行数据
    val wordAndOneDStream: DStream[(String, Int)] = wrodDStream.map((_,1))
    val result: DStream[(String, Int)] = wordAndOneDStream.reduceByKey(_+_)
    result.print()*/
    ssc.start() //开启
    ssc.awaitTermination() //等待优雅停止
  }

}
原文地址:https://www.cnblogs.com/chong-zuo3322/p/13810093.html