Spark 学习笔记之 Streaming和Kafka Direct

Streaming和Kafka Direct:

Spark version: 2.2.0

Scala version: 2.11

Kafka version: 0.11.0.0

Note: 最新版本感觉接口变化很大,参数都调整了,今天就先写个Streaming word count的例子吧,以后再慢慢深入学习。

build.sbt:

name := "SparkProjects"

version := "0.1"

scalaVersion := "2.11.6"

libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.2.0"
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.11.0.0"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.2.0"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.2.0"

Word Count:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

object KafkaDirect {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("KafkaDirect").setMaster("local[1]")
    val ssc = new StreamingContext(conf, Seconds(10))
    val kafkaMapParams = Map[String, Object](
      "bootstrap.servers" -> "192.168.1.151:9092,192.168.1.152:9092,192.168.1.153:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "g1",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (true: java.lang.Boolean)
    )
    val topicsSet = Set("ScalaTopic")
    val kafkaStream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topicsSet, kafkaMapParams)
    )
    kafkaStream.flatMap(row => row.value().split(" ")).map((_, 1)).reduceByKey(_ + _).print()
    ssc.start()
    ssc.awaitTermination()

  }
}
原文地址:https://www.cnblogs.com/AK47Sonic/p/7625513.html