Spark Streaming与kafka整合实践之WordCount

本次实践使用kafka console作为消息的生产者,Spark Streaming作为消息的消费者,具体实践代码如下

首先启动kafka server

.inwindowskafka-server-start.bat    .configserver.properties

创建一个Topic

此处topic名以test为例

kafka-topics.bat  --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

创建一个producer

kafka-console-consumer.bat  --zookeeper localhost:2181  --topic test

创建一个Consumer

package spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object SparkStreamingKakfaWordCount {
  def main(args: Array[String]) {
    println("Start to run SparkStreamingKakfaWordCount")
    val conf = new SparkConf().setMaster("local[3]")setAppName("SparkStreamingKakfaWordCount")
    val ssc = new StreamingContext(conf, Seconds(4))
    val topicMap=Map("test" -> 1)
//    zookeeper quorums server list
    val zkQuorum = "localhost:2181";
//   consumer group
    val group = "test-consumer-group01"
    //下面的处理方式假设topic test只有一个分区
   val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
   lines.print()
   
  val words = lines.flatMap(_.split(" "))   
  val wordCounts = words.map(x => (x,1L)).reduceByKey(_+_)
  wordCounts.print()
//  下面的处理方式假设topic test有2个分区,spark streaming 创建2个Input DStream,并行读2个分区
  //  Spark Streaming将RDD重新分区为4个RDD,进行并行处理,处理逻辑的并行度是读取并行的度的2倍
//    val streams = (1 to 2).map( _ => KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2))

//    将2个stream进行union
//    val partitions = ssc.union(streams).repartition(4).map("DataReceived: " + _)
//    partitions.print()
//    val partitions = ssc.union(streams).repartition(2)   //partition个数根据spark并行处理能力而定
//    val words = partitions.flatMap(_.split(" "))
//    val wordCounts = words.map(x => (x,1L)).reduceByKey(_+_)
//    wordCounts.print()

    ssc.start()   //Start the computation
    ssc.awaitTermination()   //Wait for the computation to termination
  }

}
原文地址:https://www.cnblogs.com/xinlingyoulan/p/6072645.html