spark2.1消费kafka0.8的数据 Recevier && Direct

官网案例:

http://spark.apache.org/docs/2.1.1/streaming-kafka-0-8-integration.html

pom.xml依赖

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.1.1</version>
      <!--      <scope>provided</scope>   -->
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
      <version>2.1.1</version>
    </dependency>

Receiver  方式代码:

package SpartStreamingaiqiyi

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils

object kafkaReading {
  def main(args: Array[String]): Unit = {
    if (args.length != 4){
      println("usage: SpartStreamingaiqiyi.test <zkQuorm> <group> <topics> <numthreads>")
    }
    val Array(zkQuorm,group,topics,numthreads) = args
    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(5))
    val topicMap=topics.split(",").map((_,numthreads.toInt)).toMap
    val messages = KafkaUtils.createStream(ssc,
      zkQuorm, group,topicMap)
    messages.map(_._2).print()
    ssc.start()
    ssc.awaitTermination()
  }
}

Direct模式代码:

package SpartStreamingaiqiyi

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
object kafkaReading {
  def main(args: Array[String]): Unit = {
    if (args.length != 2){
      System.err.print("usage: SpartStreamingaiqiyi.test <brockers>  <topics> ")
      System.exit(1)
    }
    val Array(brokers,topics) = args

    val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
    val ssc = new StreamingContext(conf, Seconds(5))
    val kafkaParams=Map[String, String]("bootstrap.servers" -> brokers)
    val topicSet=topics.split(",").toSet
    val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)
    messages.map(_._2).print()
    ssc.start()
    ssc.awaitTermination()
  }
}
原文地址:https://www.cnblogs.com/students/p/12035376.html