【Spark】SparkStreaming和Kafka的整合


Streaming和Kafka整合

概述

Kafka项目在0.8和0.10版本之间引入了一个新的消费者api,因此有两个单独的相应的Spark流媒体包。请为您的服务器和所需的功能选择正确的包;注意,0.8集成与后面的0.9和0.10代理兼容,但0.10集成与前面的代理不兼容。
以上是spark官方文档给出的介绍
更多详情可以参考官方文档:http://spark.apache.org/docs/2.2.0/streaming-kafka-integration.html
想了解关于Kafka的更多信息可以参考文章:

【Kafka】消息队列相关知识
【Kafka】Kafka集群环境搭建
【Kafka】Kafka集群基础操作!新手上路必备~
【Kafka】Kafka简单介绍

在这里插入图片描述
SparkStreaming一般情况下更多地会和Kafka进行整合,而非Flume,因为Kafka可以实现数据的限流。
从上图和官方文档可以了解到,SparkStreaming和Kafka整合有两个大版本:0.8版本0.10版本

0.8版本接收数据有两种方式:
Receiver DStream —— 是使用HighLevel API(高阶API) 进行消费,每隔一段时间就将offset值保存在ZooKeeper中,消费模式为at least once模式,有可能会造成重复消费的情况
Direct DStream —— 使用LowLevel API(低阶 API) 进行消费,offset值保存在Kafka自带的默认topic中,每次消费都默认从最新的offset值进行消费,消费模式为at most once,可能会造成数据丢失的情况

0.10版本只有一种接收数据的方式:
Direct DStream —— 也是使用LowLevel API 进行消费,不过配合手动提交offset,实现exactly once消费模式,尽量避免重复消费和数据丢失的情况


使用0.8版本下Receiver DStream接收数据进行消费

步骤

一、启动Kafka集群

必须安装了JDK和ZooKeeper,并保证Zookeeper服务正常启动
在三台机器都后台启动kafka服务

cd /export/servers/kafka_2.11-1.0.0/

bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &
二、创建maven工程,导入jar包
<properties>
    <scala.version>2.11.8</scala.version>
    <spark.version>2.2.0</spark.version>
</properties>
<dependencies>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.11</artifactId>
        <version>${spark.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.7.5</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-hive_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.38</version>
    </dependency>

</dependencies>
<build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.0</version>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
                <encoding>UTF-8</encoding>
                <!--    <verbal>true</verbal>-->
            </configuration>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.0</version>
            <executions>
                <execution>
                    <goals>
                        <goal>compile</goal>
                        <goal>testCompile</goal>
                    </goals>
                    <configuration>
                        <args>
                            <arg>-dependencyfile</arg>
                            <arg>${project.build.directory}/.scala_dependencies</arg>
                        </args>
                    </configuration>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>3.1.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <filters>
                            <filter>
                                <artifact>*:*</artifact>
                                <excludes>
                                    <exclude>META-INF/*.SF</exclude>
                                    <exclude>META-INF/*.DSA</exclude>
                                    <exclude>META-INF/*.RSA</exclude>
                                </excludes>
                            </filter>
                        </filters>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass></mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>
三、创建一个kafka的topic
bin/kafka-topics.sh  --create --partitions 3 --replication-factor 2 --topic sparkafka --zookeeper node01:2181,node02:2181,node03:2181
四、启动kafka的Producer
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic sparkafka
五、开发代码
package cn.itcast.sparkstreaming.demo5_08

import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka.KafkaUtils

import scala.collection.immutable

object SparkKafkaReceiverDStream {
  def main(args: Array[String]): Unit = {

    //获取SparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("SparkKafkaReceiverDStream").setMaster("local[6]").set("spark.driver.host", "localhost")
    //获取SparkContext
    val sparkContext = new SparkContext(sparkConf)
    //设置日至等级
    sparkContext.setLogLevel("WARN")
    //获取StreamingContext
    val streamingContext = new StreamingContext(sparkContext, Seconds(5))

    //获取zkQuorum
    val zkQuorum = "node01:2181,node02:2181,node03:2181"
    //获取topics,开启三个线程
    val topics = Map("sparkafka" -> 3)

    /**
     * 使用 1 to 3开启三个线程消费kafka三个分区中的数据
     * IndexedSeq 表示里面存放的各个分区的数据
     */
    val receiverDStream: immutable.IndexedSeq[ReceiverInputDStream[(String, String)]] = (1 to 3).map(x => {
      /**
       * Create an input stream that pulls messages from Kafka Brokers. | 创建一个从Kafka代理获取消息的输入流。
       * ssc       StreamingContext object | StreamingContext对象
       * zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..) | zookeeper quorum配置
       * groupId   The group id for this consumer  | 次消费者的组名
       * topics    Map of (topic_name to numPartitions) to consume. Each partition is consumed
       * in its own thread
       * Map(topic_name -> numPartitions).每个分区都在自己的线程中使用
       * DStream of (Kafka message key, Kafka message value)
       */
      val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(streamingContext, zkQuorum, "ReceiverDStreamGroup", topics)
      stream
    })
    // 将各个分区的数据合并到一起
    val union: DStream[(String, String)] = streamingContext.union(receiverDStream)
    //此时拿到的数据是key,value对的形式,value是我们需要的数据
    val value: DStream[String] = union.map(x => x._2)
    //打印接收到的数据
    value.print()

    streamingContext.start()
    streamingContext.awaitTermination()

    
  }
}

使用0.8版本下Direct DStream接收数据进行消费

开发代码

import kafka.serializer.StringDecoder
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka.KafkaUtils

object SparkKafkaDirectDStream {
  def main(args: Array[String]): Unit = {
    //获取SparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("SparkKafkaDirectDStream").setMaster("local[6]").set("spark.driver.host", "localhost")
    //获取SparkContext
    val sparkContext = new SparkContext(sparkConf)
    //设置日志级别
    sparkContext.setLogLevel("WARN")
    //获取StreamingContext
    val streamingContext = new StreamingContext(sparkContext, Seconds(5))
    //配置kafka相关参数
    val kafkaParams = Map("metadata.broker.list" -> "node01:9092,node02:9092,node03:9092", "group.id" -> "Kafka_Direct")

    val topics = Set("sparkafka")

    /**
     * 所需泛型和参数
     * [K: ClassTag,    type of Kafka message key
     * V: ClassTag,     type of Kafka message value
     * KD <: Decoder[K]: ClassTag,      type of Kafka message key decoder
     * VD <: Decoder[V]: ClassTag] (    type of Kafka message value decoder
     * ssc: StreamingContext,     StreamingContext object
     * kafkaParams: Map[String, String],
     * topics: Set[String])     Names of the topics to consume
     */
    val resultDStream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topics)
    //key是null,value是需要的值
    val value: DStream[String] = resultDStream.map(x => x._2)
    //打印value
    value.print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }

}

使用0.10版本下Direct DStream接收数据进行消费

注意事项

要把pom.xml中0.8版本的包注释掉,把项目中以上两个object注释或者删掉

步骤

一、添加jar包
<dependency>
	<groupId>org.apache.spark</groupId>
	<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
	<version>2.2.0</version>
</dependency>
二、开发代码
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, ConsumerStrategy, HasOffsetRanges, KafkaUtils, LocationStrategies, LocationStrategy, OffsetRange}

object NewSparkKafkaDirectDStream {
  def main(args: Array[String]): Unit = {
    //获取SparkConf
    val sparkConf: SparkConf = new SparkConf().setAppName("NewSparkKafkaDirectDStream").setMaster("local[6]").set("spark.driver.host", "localhost")
    //获取SparkContext
    val sparkContext = new SparkContext(sparkConf)
    //设置日志级别
    sparkContext.setLogLevel("WARN")
    //获取StreamingContext
    val streamingContext = new StreamingContext(sparkContext, Seconds(5))

    /*
    sealed abstract class LocationStrategy
    sealed类表示密封类,需要调用LocationStrategy伴生对象的方法
     */
    //PreferConsistent: Use this in most cases, it will consistently distribute partitions across all executors. | 在大多数情况下,它会在所有执行器之间一致地分配分区。
    val locationStrategy: LocationStrategy = LocationStrategies.PreferConsistent
    //需要参数consumerStrategy,就调用其伴生对象中的subscribe方法
    /*
    topics: Iterable[jl.String],
    kafkaParams: collection.Map[String, Object]
     */
    val topics: Array[String] = Array("sparkafka")
    //创建topic
    val brokers = "node01:9092,node02:9092,node03:9092"
    val sourcetopic = "sparkafka";
    //创建消费者组
    var group = "sparkafkaGroup"
    //消费者配置
    val kafkaParam = Map(
      "bootstrap.servers" -> brokers, //用于初始化链接到集群的地址
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      //用于标识这个消费者属于哪个消费团体
      "group.id" -> group,
      //如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性
      //可以使用这个配置,latest自动重置偏移量为最新的偏移量
      "auto.offset.reset" -> "latest",
      //如果是true,则这个消费者的偏移量会在后台自动提交
      "enable.auto.commit" -> (false: java.lang.Boolean)
    );
    val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.Subscribe[String, String](topics, kafkaParam)

    /**
     * 所需参数如下:
     * ssc: StreamingContext,
     * locationStrategy: LocationStrategy,
     * consumerStrategy: ConsumerStrategy[K, V]
     */
    val resultStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(streamingContext, locationStrategy, consumerStrategy)


    //循环遍历每一个RDD,一个RDD中有多个分区,每个分区中有多条数据
    resultStream.foreachRDD(iter => {
      //判断RDD中是否有数据,如果大于0就是有数据
      if (iter.count() > 0) {
        //使用foreach,获取到rdd中的每一条数据,并进行处理
        iter.foreach(line => {
          //key为null,value为我们需要的数据
          val value: String = line.value()
          //打印数据
          println(value)
        })
        /*
        RDD中每处理完一批数据,就手动提交这批数据的offset
        利用asInstanceOf将iter强转为HasOffsetRanges,
         */
        val ranges: Array[OffsetRange] = iter.asInstanceOf[HasOffsetRanges].offsetRanges
        //将resultStream强转为CanCommitOffsets,进行异步提交
        resultStream.asInstanceOf[CanCommitOffsets].commitAsync(ranges)
      }
    })
    streamingContext.start()
    streamingContext.awaitTermination()
  }
}
原文地址:https://www.cnblogs.com/zzzsw0412/p/12772382.html