sparkStreming kafka offset 管理

Direct Approach (No Receivers)

简化的并行性:不需要创建多个输入Kafka流并将其合并。 使用directStream,Spark Streaming将创建 与使用Kafka分区一样多的RDD分区,这些分区将全部从Kafka并行读取数据。 所以在Kafka和RDD分 区之间有一对一的映射关系。 效率:在第一种方法中实现零数据丢失需要将数据存储在预写日志中,这会进一步复制数据。 这实际 上是效率低下的,因为数据被有效地复制了两次 - 一次是Kafka,另一次是由预先写入日志(Write Ahead Log)复制。 这个第二种方法消除了这个问题,因为没有接收器,因此不需要预先写入日志。 只要Kafka数据保留时间足够长。 正好一次(Exactly-once)的语义:第一种方法使用Kafka的高级API来在Zookeeper中存储消耗的偏移 量。传统上这是从Kafka消费数据的方式。虽然这种方法(结合预写日志)可以确保零数据丢失 (即至少一次语义),但是在某些失败情况下,有一些记录可能会消费两次。发生这种情况是因为 Spark Streaming可靠接收到的数据与Zookeeper跟踪的偏移之间的不一致。因此,在第二种方法中, 我们可以不使用Zookeeper的简单Kafka API。在其检查点内,Spark Streaming跟踪偏移量。这消除了 Spark Streaming和Zookeeper / Kafka之间的不一致,因此Spark Streaming每次记录都会在发生故障的 情况下有效地收到一次。为了实现输出结果的一次语义,将数据保存到外部数据存储区的输出操作必须 是幂等的,或者是保存结果和偏移量的原子事务。

 

模拟一个消费者--Direct

/**
 * Consumes messages from one or more topics in Kafka and does wordcount.
 * Usage: DirectKafkaWordCount <brokers> <topics>
 *   <brokers> is a list of one or more Kafka brokers
 *   <topics> is a list of one or more kafka topics to consume from
 *
 * Example:
 *   $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port
 *   topic1,topic2
 */
object DirectKafkaWordCount {
 def main(args: Array[String]) {
   if (args.length < 2) {
     System.err.println(s"""
                           |Usage: DirectKafkaWordCount <brokers> <topics>
                           | <brokers> is a list of one or more Kafka brokers
                           | <topics> is a list of one or more kafka topics to consume from
                           |
       """.stripMargin)
     System.exit(1)
  }
   
   val Array(brokers, topics) = args

   // Create context with 2 second batch interval
   val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[2]")
   val ssc = new StreamingContext(sparkConf, Seconds(2))

   // Create direct kafka stream with brokers and topics
   val topicsSet = topics.split(",").toSet
   val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
   val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     ssc, kafkaParams, topicsSet)

   // Get the lines, split them into words, count the words and print
   val lines = messages.map(_._2)
   val words = lines.flatMap(_.split(" "))
   val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
   wordCounts.print()

   // Start the computation
   ssc.start()
   ssc.awaitTermination()
}
}

 

Kafka Offset 管理

使用外部存储保存offset

Checkpoints HBase ZooKeeper Kafka ...

不保存offset

Kafka Offset 管理--Checkpoint

  1. 启用Spark Streaming的checkpoint是存储偏移量最简单的方法。

  2. 流式checkpoint专门用于保存应用程序的状态, 比如保存在HDFS上, 在故障时能恢复。

  3. Spark Streaming的checkpoint无法跨越应用程序进行恢复。

  4. Spark 升级也将导致无法恢复。

  5. 在关键生产应用, 不建议使用spark检查点的管理offset方式。

/**
 * 用checkpoint记录offset
 * 优点:实现过程简单
 * 缺点:如果streaming的业务更改,或别的作业也需要获取该offset,是获取不到的
 */
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}

object StreamingWithCheckpoint {
 def main(args: Array[String]) {
   //val Array(brokers, topics) = args
   val processingInterval = 2
   val brokers = "node01:9092,node02:9092,node03:9092"
   val topics = "mytest1"
   // Create context with 2 second batch interval
   val sparkConf = new SparkConf().setAppName("ConsumerWithCheckPoint").setMaster("local[2]")
   // Create direct kafka stream with brokers and topics
   val topicsSet = topics.split(",").toSet
   val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "auto.offset.reset" -> "smallest")
   val checkpointPath = "hdfs://node01:9000/spark_checkpoint1"

   def functionToCreateContext(): StreamingContext = {
     val ssc = new StreamingContext(sparkConf, Seconds(processingInterval))
     val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

     ssc.checkpoint(checkpointPath)
     messages.checkpoint(Duration(8 * processingInterval.toInt * 1000))
     messages.foreachRDD(rdd => {
       if (!rdd.isEmpty()) {
         println("################################" + rdd.count())
      }

    })
     ssc
  }

   // 如果没有checkpoint信息,则新建一个StreamingContext
   // 如果有checkpoint信息,则从checkpoint中记录的信息恢复StreamingContext
   // createOnError参数:如果在读取检查点数据时出错,是否创建新的流上下文。
   // 默认情况下,将在错误上引发异常。
   val context = StreamingContext.getOrCreate(checkpointPath, functionToCreateContext _)
   context.start()
   context.awaitTermination()
}
}
// 以上案例测试过程:
// 模拟消费者向mytest1插入10条数据,
// 强制停止streaming,
// 再插入20条数据并启动streaming查看读取的条数为20条

 

Kafka Offset 管理--Zookeeper(常用)

1. 路径:
  val zkPath = s"${kakfaOffsetRootPath}/${groupName}/${o.topic}/${o.partition}"
2. 如果Zookeeper中未保存offset,根据kafkaParam的配置使用最新或者最旧的offset
3. 如果 zookeeper中有保存offset,我们会利用这个offset作为kafkaStream的起始位置
import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.retry.ExponentialBackoffRetry
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.JavaConversions._

object KafkaZKManager  extends Serializable{
 /**
   * 创建rookeeper客户端
   */
 val client = {
   val client = CuratorFrameworkFactory
    .builder
    .connectString("node01:2181/kafka0.9") // zk中kafka的路径
    .retryPolicy(new ExponentialBackoffRetry(1000, 3)) // 重试指定的次数, 且每一次重试之间停顿的时间逐渐增加
    .namespace("mykafka") // 命名空间:mykafka
    .build()
   client.start()
   client
}

 val kafkaOffsetRootPath = "/consumers/offsets"

 /**
   * 确保zookeeper中的路径是存在的
   * @param path
   */
 def ensureZKPathExists(path: String): Unit = {
   if (client.checkExists().forPath(path) == null) {
     client.create().creatingParentsIfNeeded().forPath(path)
  }
}
 // 保存offset
 def storeOffsets(offsetsRanges:Array[OffsetRange], groupName:String) = {
   for (o <- offsetsRanges) {
     val zkPath = s"${kafkaOffsetRootPath}/${groupName}/${o.topic}/${o.partition}"
     ensureZKPathExists(zkPath)
     // 保存offset到zk
     client.setData().forPath(zkPath, o.untilOffset.toString.getBytes())
  }
}

 /**
   * 用于获取offset
   * @param topic
   * @param groupName
   * @return
   */
 def getFromOffsets(topic : String,groupName : String): (Map[TopicAndPartition, Long], Int) = {
   // 如果 zookeeper中有保存offset,我们会利用这个offset作为kafkaStream 的起始位置
   var fromOffsets: Map[TopicAndPartition, Long] = Map()
   val zkTopicPath = s"${kafkaOffsetRootPath}/${groupName}/${topic}"
   // 确保zookeeper中的路径是否存在
   ensureZKPathExists(zkTopicPath)
// 获取topic中,各分区对应的offset
   val offsets: mutable.Buffer[(TopicAndPartition, Long)] = for {
     // 获取分区
     p <- client.getChildren.forPath(zkTopicPath)
  } yield {
     //遍历路径下面的partition中的offset
     val data = client.getData.forPath(s"$zkTopicPath/$p")
     //将data变成Long类型
     val offset = java.lang.Long.valueOf(new String(data)).toLong
     println("offset:" + offset)
    (TopicAndPartition(topic, Integer.parseInt(p)), offset)
  }

   if(offsets.isEmpty) {
    (offsets.toMap,0)
  }else{
    (offsets.toMap,1)
  }
}

 def main(args: Array[String]): Unit = {
   val processingInterval = 2
   val brokers = "node01:9092,node02:9092,node03:9092"
   val topic = "mytest1"
   val sparkConf = new SparkConf().setAppName("KafkaZKManager").setMaster("local[2]")
   // Create direct kafka stream with brokers and topics
   val topicsSet = topic.split(",").toSet
   val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
     "auto.offset.reset" -> "smallest")

   val ssc = new StreamingContext(sparkConf, Seconds(processingInterval))

   // 读取kafka数据
   val messages = createMyDirectKafkaStream(ssc, kafkaParams, topic, "group01")

   messages.foreachRDD((rdd,btime) => {
     if(!rdd.isEmpty()){
       println("==========================:" + rdd.count() )
       println("==========================btime:" + btime )
    }
     // 消费到数据后,将offset保存到zk
     storeOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, "group01")
  })

   ssc.start()
   ssc.awaitTermination()
  }

 def createMyDirectKafkaStream(ssc: StreamingContext, kafkaParams: Map[String, String], topic: String, groupName: String): InputDStream[(String, String)] = {
   // 获取offset
   val (fromOffsets, flag) = getFromOffsets( topic, groupName)
   var kafkaStream : InputDStream[(String, String)] = null
   if (flag == 1) {
     // 这个会将kafka的消息进行transform,最终kafak的数据都会变成(topic_name, message)这样的tuple
     val messageHandler = (mmd : MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
     println("fromOffsets:" + fromOffsets)
     kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
  } else {
     // 如果未保存,根据kafkaParam的配置使用最新或者最旧的offset
     kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topic.split(",").toSet)
  }
   kafkaStream
}

}

启动zk命令:

zkCli.sh  -timeout 5000  -r  -server  master:2181

Kafka Offset 管理--Hbase

  1. 基于Hbase的通用设计, 使用同一张表保存可以跨越多个spark streaming程序的topic的offset

  2. rowkey = topic名称 + groupid + streaming的batchtime.milliSeconds . 尽管 batchtime.milliSeconds不是必须的, 但是它可以看到历史的批处理任务对offset的管理情况。

  3. kafka的offset保存在下面的表中,列簇为offsets, 30天后自动过期 Hbase表结构 create 'spark_kafka_offsets', {NAME=>'offsets', TTL=>2592000} 4.offset的获取场景 场景1:Streaming作业首次启动。 通过zookeeper来查找给定topic中分区的数量,然后返回“0” 作为所有topic分区的offset。 场景2:长时间运行的Streaming作业已经停止,新的分区被添加到kafka的topic中。 通过 zookeeper来查找给定topic中分区的数量, 对于所有旧的topic分区,将offset设置为HBase中的 最新偏移量。 对于所有新的topic分区,它将返回“0”作为offset。 场景3:长时间运行的Streaming作业已停止,topic分区没有任何更改。 在这种情况下,HBase 中发现的最新偏移量作为每个topic分区的offset返回。

import kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import kafka.utils.ZkUtils
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put, Scan}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object KafkaHbaseManager {
 // 保存offset到hbase
 def saveOffsets(TOPIC_NAME: String, GROUP_ID: String, offsetRanges: Array[OffsetRange],
                 hbaseTableName: String, batchTime: org.apache.spark.streaming.Time) = {
   val hbaseConf = HBaseConfiguration.create()
   val conn = ConnectionFactory.createConnection(hbaseConf)
   val table = conn.getTable(TableName.valueOf(hbaseTableName))
   val rowKey = TOPIC_NAME + ":" + GROUP_ID + ":" + String.valueOf(batchTime.milliseconds)
   val put = new Put(rowKey.getBytes())
   for (offset <- offsetRanges) {
     put.addColumn(Bytes.toBytes("offsets"), Bytes.toBytes(offset.partition.toString),
       Bytes.toBytes(offset.untilOffset.toString))
  }
   table.put(put)
   conn.close()
}

 // 从zookeeper中获取topic的分区数
 def getNumberOfPartitionsForTopicFromZK(TOPIC_NAME: String, GROUP_ID: String,
                                         zkQuorum: String, zkRootDir: String, sessTimeout: Int, connTimeOut: Int): Int = {
   val zkUrl = zkQuorum + "/" + zkRootDir
   val zkClientAndConn = ZkUtils.createZkClientAndConnection(zkUrl, sessTimeout, connTimeOut)
   val zkUtils = new ZkUtils(zkClientAndConn._1, zkClientAndConn._2, false)
   // 获取分区数量
   val zkPartitions = zkUtils.getPartitionsForTopics(Seq(TOPIC_NAME)).get(TOPIC_NAME).toList.head.size
   println(zkPartitions)
   zkClientAndConn._1.close()
   zkClientAndConn._2.close()
   zkPartitions
}

 // 获取hbase的offset
 def getLastestOffsets(TOPIC_NAME: String, GROUP_ID: String, hTableName: String,
                       zkQuorum: String, zkRootDir: String, sessTimeout: Int, connTimeOut: Int): Map[TopicAndPartition, Long] = {

   // 连接zk获取topic的partition数量
   val zKNumberOfPartitions = getNumberOfPartitionsForTopicFromZK(TOPIC_NAME, GROUP_ID, zkQuorum, zkRootDir, sessTimeout, connTimeOut)

   val hbaseConf = HBaseConfiguration.create()

   // 获取hbase中最后提交的offset
   val conn = ConnectionFactory.createConnection(hbaseConf)
   val table = conn.getTable(TableName.valueOf(hTableName))
   val startRow = TOPIC_NAME + ":" + GROUP_ID + ":" + String.valueOf(System.currentTimeMillis())
   val stopRow = TOPIC_NAME + ":" + GROUP_ID + ":" + 0
   val scan = new Scan()
   val scanner = table.getScanner(scan.setStartRow(startRow.getBytes).setStopRow(stopRow.getBytes).setReversed(true))
   val result = scanner.next()

   var hbaseNumberOfPartitions = 0 // 在hbase中获取的分区数量
   if (result != null) {
     // 将分区数量设置为hbase表的列数量
     hbaseNumberOfPartitions = result.listCells().size()
  }

   val fromOffsets = collection.mutable.Map[TopicAndPartition, Long]()
   if (hbaseNumberOfPartitions == 0) { // 如果没有保存过offset
     // 初始化kafka为开始
     for (partition <- 0 until zKNumberOfPartitions) {
       fromOffsets += ((TopicAndPartition(TOPIC_NAME, partition), 0))
    }

  } else if (zKNumberOfPartitions > hbaseNumberOfPartitions) { // 如果zk的partition数量大于hbase的partition数量,说明topic增加了分区,就需要对分区做单独处理
     // 处理新增加的分区添加到kafka的topic
     for (partition <- 0 until zKNumberOfPartitions) {
       val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),
         Bytes.toBytes(partition.toString)))
       fromOffsets += ((TopicAndPartition(TOPIC_NAME, partition), fromOffset.toLong))
    }
     // 对新增加的分区将它的offset值设为0
     for (partition <- hbaseNumberOfPartitions until zKNumberOfPartitions) {
       fromOffsets += ((TopicAndPartition(TOPIC_NAME, partition), 0))
    }
  } else { // 如果既没有新增加的分区,也不是第一次运行
     // 获取上次运行的offset
     for (partition <- 0 until hbaseNumberOfPartitions) {
       val fromOffset = Bytes.toString(result.getValue(Bytes.toBytes("offsets"),
         Bytes.toBytes(partition.toString)))
       fromOffsets += ((TopicAndPartition(TOPIC_NAME, partition), fromOffset.toLong))
    }
  }

   scanner.close()
   conn.close()
   fromOffsets.toMap
}

 def main(args: Array[String]): Unit = {
   val processingInterval = 2
   val brokers = "node01:9092,node02:9092,node03:9092"
   val topics = "mytest1"
   // Create context with 2 second batch interval
   val sparkConf = new SparkConf().setAppName("kafkahbase").setMaster("local[2]")
   // Create direct kafka stream with brokers and topics
   val topicsSet = topics.split(",").toSet
   val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,
     "auto.offset.reset" -> "smallest")

   val ssc = new StreamingContext(sparkConf, Seconds(processingInterval))
   val groupId = "testp"
   val hbaseTableName = "spark_kafka_offsets"

   // 获取kafkaStream
   //val kafkaStream = createMyDirectKafkaStream(ssc, kafkaParams, zkClient, topicsSet, "testp")
   val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
   // 获取offset
   val fromOffsets = getLastestOffsets("mytest1", groupId, hbaseTableName, "node01:2181,node02:2181,node03:2181", "kafka0.9", 30000, 30000)

   var kafkaStream: InputDStream[(String, String)] = null
   kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)

   kafkaStream.foreachRDD((rdd, btime) => {
     if (!rdd.isEmpty()) {
       println("==========================:" + rdd.count())
       println("==========================btime:" + btime)
       saveOffsets(topics, groupId, rdd.asInstanceOf[HasOffsetRanges].offsetRanges, hbaseTableName, btime)
    }

  })

   ssc.start()
   ssc.awaitTermination()
}
}

Kafka Offset 管理--Kafka

stream.foreachRDD { rdd => val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges // some time later, after outputs have completed stream.asInstanceOf[CanCommitOffsets].commitAsync(off setRanges) } http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html#kafka-itself

Kafka Offset 管理--HDFS等

  1. 可以将offset保存在HDFS上

  2. 与其他系统(Zookeeper、Hbase)相比, HDFS具有更高 的延迟。 此外, 如果管理不当, 在HDFS上写入每个批次的 offsetRanges可能会导致小文件问题

Kafka Offset 管理--不保存offset

根据业务需要是否管理offset 对于一些streaming应用程序, 如实时活动监控, 只需要当前最新的数据, 这种情况不需要管理offset 。 在这种场景下, 如果使用老的kafka的api, 可以将参数auto.offset.reset设置为largest 或者smallest 。

如果使用新的kafka的api, 可以将参数 auto.offset.reset设置为earliest 或者latest 。

 

Spark Streaming消费数据反写Kafka

需求:
1、flume将socket流数据采集到kafka
2、streaming读取kafka的数据进行清洗
3、将清洗后的数据再次放到kafka

 

清洗后的结果数据为:
houseid|gathertime|srcip:srcport|destip:destport|url
import java.util
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.streaming.kafka.HasOffsetRanges
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.JavaConversions._

/**
 * 将kafka中的数据消费后写入到kafka, 按照batch的方式。
 * 使用广播变量 将kafka创建生产者广播到每个executor上面
 */
object Kafka2KafkaPerBatch {
 def main(args: Array[String]): Unit = {
   val sparkConf = new SparkConf().setAppName("test").setMaster("local[2]")
   val sc = new SparkContext(sparkConf)
   val sqlContext = new HiveContext(sc)

   val processingInterval = 2
   val brokers = "node01:9092,node02:9092,node03:9092"
   val topic = "mytest1"
   val topicsSet = topic.split(",").toSet
   val groupName = "group02"
   val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, "auto.offset.reset" -> "smallest")

   val ssc = new StreamingContext(sc, Seconds(processingInterval))

   val streaming = MyKafkaUtils.createMyDirectKafkaStream(
     ssc, kafkaParams, Set(topic), groupName)

   val sinkTopic = "mykafka"

   // Kafka的Producer不能序列化
   // Caused by: java.io.NotSerializableException: org.apache.kafka.clients.producer.KafkaProducer
   //   streaming.foreachRDD(rdd=>{
   //     if(!rdd.isEmpty()){
   //       val props = new util.HashMap[String, Object]()
   //       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
   //       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
   //         "org.apache.kafka.common.serialization.StringSerializer")
   //       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
   //         "org.apache.kafka.common.serialization.StringSerializer")
   //       val producer = new KafkaProducer[String,String](props)
   //
   //       rdd.map(x=>x._2).map(msg=>ParseUtils.parseMsg(msg)).foreach(msg=>{
   //
   //         val message=new ProducerRecord[String, String]( sinkTopic ,null,msg)
   //         producer.send(message)
   //       })
   //     }
   //   })


   // 数据可以写入到kafka, 但是性能差, 每条记录都需要创建producer
   // streaming.foreachRDD(rdd=>{
   //     if(!rdd.isEmpty()){
   //       rdd.map(x=>x._2).map(msg=>ParseUtils.parseMsg(msg)).filter(_.length!=1).foreach(msg=>{
   //
   //         val props = new util.HashMap[String, Object]()
   //         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
   //         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
   //           "org.apache.kafka.common.serialization.StringSerializer")
   //         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
   //           "org.apache.kafka.common.serialization.StringSerializer")
   //         val producer = new KafkaProducer[String,String](props)
   //         val message=new ProducerRecord[String, String]( sinkTopic ,null,msg)
   //         producer.send(message)
   //       })
   //     }
   //   })

   // 推荐:
   // 将KafkaProducer对象广播到所有的executor节点,
   // 这样就可以在每个executor节点将数据插入到kafka
//   val kafkaProducer: Broadcast[MyKafkaProducer[String, String]] = {
//     val kafkaProducerConfig = {
//       val p = new Properties()
//       p.setProperty("bootstrap.servers", brokers)
//       p.setProperty("key.serializer", classOf[StringSerializer].getName)
//       p.setProperty("value.serializer", classOf[StringSerializer].getName)
//       p
//     }
//     ssc.sparkContext.broadcast(MyKafkaProducer[String, String](kafkaProducerConfig))
//   }
//
//   streaming.foreachRDD(rdd => {
//     if (!rdd.isEmpty()) {
//       rdd.map(x => x._2).map(msg => ParseUtils.parseMsg(msg)).filter(_.length != 1).foreach(msg => {
//         kafkaProducer.value.send(sinkTopic, msg)
//       })
//       MyKafkaUtils.saveOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, groupName)
//     }
//   })

   // 推荐:
   // 用partition的方式,一个rdd的partition对应一个KafkaProducer
   streaming.foreachRDD(rdd=>rdd.foreachPartition(
     // 该rdd的partition对应着kafka里topic的partition
     partition=>{
       val props = new util.HashMap[String, Object]()
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
         "org.apache.kafka.common.serialization.StringSerializer")
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
         "org.apache.kafka.common.serialization.StringSerializer")
       // 创建的producer在partition里而不是在foreach里,这样减少了KafkaProducer对象的个数
       val producer = new KafkaProducer[String,String](props)

       partition.map(msg=>ParseUtils.parseMsg(msg._2)).filter(_.length!=1).foreach(msg=>{
         val message=new ProducerRecord[String, String](sinkTopic, null, msg)
         producer.send(message)
      })

       MyKafkaUtils.saveOffsets(rdd.asInstanceOf[HasOffsetRanges].offsetRanges, groupName)
    }
  ))

   ssc.start()
   ssc.awaitTermination()
}

}

生产环境中存在问题分析

原文地址:https://www.cnblogs.com/lshan/p/13993603.html