spark streaming 读取kafka数据保存到parquet文件,redis存储offset

 spark streaming 读取kafka topic上json格式数据,存储为parquet文件;使用redis存储offset;因为是将数据存储下来,没能使用事务,本文不能实现exactly  once语义;基于幂等的角度,可以考虑数据设置唯一标志,进行merge去重,来实现exactly once。

package com.abc.etl


package spark

import java.util.{HashSet => JavaHashSet, Set => JavaSet}

import cn.hutool.core.util.StrUtil
import com.alibaba.fastjson.JSON
import com.alibaba.fastjson.parser.Feature
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext, TaskContext}
import redis.clients.jedis.{Jedis, JedisSentinelPool}

import scala.collection.mutable.ListBuffer

object MoreTopic {

  /**
    * redis中储存的topic偏移量的数据模型:  topic_partition <- offset
    *
    * @param topics
    * @param jedis
    * @return 所有TopicPartition的offset
    */
  def getRedisOffset(topics: collection.Set[String], jedis: Jedis): collection.Map[TopicPartition, Long] = {
    val res: collection.mutable.HashMap[TopicPartition, Long] = collection.mutable.HashMap.empty;
    for (topic <- topics) {
      val topicPartitionKeys: JavaSet[String] = jedis.keys(topic + StrUtil.UNDERLINE + "*")
      val iterator = topicPartitionKeys.iterator()
      while (iterator.hasNext) {
        val topicPartitionKey = iterator.next()
        val offset = jedis.get(topicPartitionKey)
        val topic = topicPartitionKey.split(StrUtil.UNDERLINE)(0)
        val partition = topicPartitionKey.split(StrUtil.UNDERLINE)(1)
        res.put(new TopicPartition(topic, partition.toInt), offset.toLong)
      }
    }
    res
  }

  def main(args: Array[String]): Unit = {
    val duration = 20
    val appName = "sparkstreamingkafka2"

    val conf = SparkConfSingleton.getInstance().setAppName(appName).setMaster("local[2]")
    val ssc = new StreamingContext(SparkContextSingleton.getInstance(conf), Seconds(duration))

    val kafkaParams = Map[String, Object](
      "auto.offset.reset" -> "earliest",
      "value.deserializer" -> classOf[StringDeserializer]
      , "key.deserializer" -> classOf[StringDeserializer]
      , "bootstrap.servers" -> "zk1:9092,zk2:9092,zk3:9092"
      , "group.id" -> "hellokitty"
      , "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val sentinels = new JavaHashSet[String] {
      {
        add("zk2:26379");
        add("zk3:26379");
      }
    }
    val master = "mymaster";

    val jedis = JedisSentinelPoolSingleton.getInstance(master, sentinels).getResource

    var stream: InputDStream[ConsumerRecord[String, String]] = null
    val topics = Set("Kafka2Hdfs")

    stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics, kafkaParams, getRedisOffset(topics, jedis))
    )
    jedis.close()

    stream.foreachRDD(rdd => {
      if (!rdd.isEmpty()) {
        //require rdd format is Rdd[ConsumerRecord],ref https://blog.csdn.net/xianpanjia4616/article/details/85871063
        val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

        rdd.foreachPartition(partition => {
          val sc = SparkContextSingleton.getInstance(conf)
          val o = offsetRanges(TaskContext.get.partitionId)
          println("reach position .................: " + s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")

          val list = ListBuffer.empty[Row]
          while (partition.hasNext) {
            val consumerRecord = partition.next()
            val json = consumerRecord.value()
            println("json is : " + json)
            val jsonObject = JSON.parseObject(json, Feature.OrderedField)
            val values = jsonObject.values().toArray()
            val row = Row.apply(values: _*)
            list += row
          }
          val rowRdd = sc.makeRDD(list)
          val schema = StructType(
            List(
              StructField("key1", StringType, true),
              StructField("value1", StringType, true),
              StructField("key1.type", StringType, true)
            )
          )
          val sqlContext = SQLContextSingleton.getInstance(sc)
          val df = sqlContext.createDataFrame(rowRdd, schema)
          df.write.format("parquet").mode("append").save("sparkStreamingKafka2HdfsData")


          val jedis = JedisSentinelPoolSingleton.getInstance(master, sentinels).getResource
          offsetRanges.foreach { offsetRange =>
            println("partition : " + offsetRange.partition + " fromOffset:  " + offsetRange.fromOffset + " untilOffset: " + offsetRange.untilOffset)
            val topic_partition_key_new = offsetRange.topic + StrUtil.UNDERLINE + offsetRange.partition
            jedis.set(topic_partition_key_new, offsetRange.untilOffset + "")
          }
          jedis.close()
        })
      }

    })

    ssc.start()
    ssc.awaitTermination()
  }


  object JedisSentinelPoolSingleton {
    @transient private var instance: JedisSentinelPool = _

    def getInstance(master: String, sentinels: JavaHashSet[String]): JedisSentinelPool = {
      if (instance == null) {
        val gPoolConfig = new GenericObjectPoolConfig();
        gPoolConfig.setMaxIdle(10);
        gPoolConfig.setMaxTotal(10);
        gPoolConfig.setMaxWaitMillis(10);
        gPoolConfig.setJmxEnabled(true);
        instance = new JedisSentinelPool(master, sentinels, gPoolConfig)
      }
      instance
    }
  }


  object SQLContextSingleton {
    @transient private var instance: SQLContext = _

    def getInstance(sparkContext: SparkContext): SQLContext = {
      if (instance == null) {
        instance = new SQLContext(sparkContext)
      }
      instance
    }
  }


  object SparkContextSingleton {
    @transient private var instance: SparkContext = _

    def getInstance(sparkConf: SparkConf): SparkContext = {
      if (instance == null) {
        instance = new SparkContext(sparkConf)
      }
      instance
    }
  }

  object SparkConfSingleton {
    @transient private var instance: SparkConf = _

    def getInstance(): SparkConf = {
      if (instance == null) {
        instance = new SparkConf()
      }
      instance
    }

  }

}

  

特别依赖:

               
                <dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
			<version>2.3.2</version>
		</dependency>
                <!-- fastjson-->
		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.37</version>
		</dependency>
		<!-- redis客户端-->
		<dependency>
			<groupId>redis.clients</groupId>
			<artifactId>jedis</artifactId>
			<version>3.1.0</version>
		</dependency>
		<!--hutool -->
		<dependency>
			<groupId>cn.hutool</groupId>
			<artifactId>hutool-all</artifactId>
			<version>4.5.10</version>
		</dependency>        

参考:https://blog.csdn.net/xianpanjia4616/article/details/81709075

原文地址:https://www.cnblogs.com/mylittlecabin/p/11580053.html