笔记 很早东西日常的一些复制粘贴 怕忘了

对于kafak与sparkstreaming集成后 存在的问题 
一。基于receiver的方式在kafka1.0后好像是去取消了 都是高级api
默认是200毫秒接受的数据形成一个block块,设置5s为一个批次 那就是5000/200 为25个分区
 
 1.val kafkaParams = Map(
      "zookeeper.connect" -> "bigdata.server1:2181",   //连接zookeeper的地址,获取和提交offet
      "group.id" ->"KafkaReceive",             //消费组的名称
      "zookeeper.connection.timeout.ms" -> "10000",
      "auto.offset.reset"-> "smallest"    //当前sparksreaing对应的消费者组第一次消费的时候方式,当前是从头消费
    )
    val lines: DStream[String] = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](
        ssc,
        kafkaParams,
        topics,
        StorageLevel.MEMORY_AND_DISK_SER_2
    ).map(_._2)
    
2.  val topics = Map("test1" -> 4)

    val lines = KafkaUtils.createStream(
        ssc,
      "KafkaReceiverWC02",
      "bigdata.server1:2181",
        topics
    ).map(_._2)
    
二。基于direct模式
对应的是是topic有几个分区就有几个task
对应的也是两种集成
低级api可以定义从哪消费
    //由于Direct方式的kafka和Spark Streaming的集成方式中采用的api是低级封装的api(low lever api),消费的offset信息不需要zookeeper保存,而是直接去找broker节点
    val kafkaParams = Map(
      "metadata.broker.list"->"bigdata.server1:9092,bigdata.server1:9093,bigdata.server1:9094,bigdata.server1:9095"
    )

    //由于Direct方式的kafka和Spark Streaming的集成方式中采用的api是低级封装的api(low lever api),此时消费者的offet,由自己保管,不再是zookeeper,
    // 同时还可以自己指定从哪个offet开始消费 ,指定消费的topic以及对应每个分区,开始消费的offset
    val fromOffsets:Map[TopicAndPartition, Long] = Map(
      TopicAndPartition("bc",0) -> 0,
      TopicAndPartition("bc",1) -> 100,
      TopicAndPartition("bc",2) -> 200,
      TopicAndPartition("bc",3) -> 300
    )

    //MessageAndMetadata可以同时获取message的所属的topic,partiron,offset等元数据,也可以获取key和value,这里仅需要value
    val messageHandler: MessageAndMetadata[String, String] => String = (mmd:MessageAndMetadata[String, String])=>{
      //Messaged的Metadata
     // mmd.topic
     // mmd.partition
     // mmd.offset            在元数据区域获取到的偏移量与对应分区 进行存储
      //Messaged本身
      //mmd.key()
      mmd.message()
    }

    val lines: InputDStream[String] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder,String](
      ssc,
      kafkaParams,
      fromOffsets,
      messageHandler
    )
    
    HasOffsetRanges是一个接口 kafkardd是他的子类 也是rdd的子类 所以使用foreachRdd都是rdd    
    class KafkaRDD[
    K: ClassTag,
    V: ClassTag,
    U <: Decoder[_]: ClassTag,
    T <: Decoder[_]: ClassTag,
    R: ClassTag] private[spark] (
    sc: SparkContext,
    kafkaParams: Map[String, String],
    val offsetRanges: Array[OffsetRange],
    leaders: Map[TopicAndPartition, (String, Int)],
    messageHandler: MessageAndMetadata[K, V] => R
  ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges




    DirectKafkaInputDStream是inputstream的子类
    DirectKafkaInputDStream.foreachRdd后都是{都是kafkaRdd}
    
 kafkaRDD.asInstanceOf[HasOffsetRanges]    
    
原文地址:https://www.cnblogs.com/hejunhong/p/10493428.html