SparkStreaming实现Exactly-Once语义

作者:Syn良子 出处:http://www.cnblogs.com/cssdongl 转载请注明出处

译自:http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/

查资料时发现上面这篇文章不错,虽然是1.3的老版本的知识,但是还是有借鉴的地方,业余时间按照自己的理解翻译了一遍,有不当的地方欢迎指正.

Apache Spark 1.3的版本包括从Apache Kafka读取数据的新的RDD和DStream实现。 作为这些功能的主要作者,我想解释一下它们的实现和用法。 你可能会感兴趣因为你能从以下方面受益:

1>在使用Kafka时更均匀地使用Spark集群资源
2>消息传递语义的控制
3>交付保证,而不依赖于HDFS中的预写日志
4>访问message元数据


我假设你熟悉Spark Streaming文档和Kafka文档。 所有代码示例都在Scala中,但在API中有很多方法对Java也比较友好

Basic Usage

Kafka RDD和DStream的新API在spark-streaming-kafka模块中

SBT 依赖

libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.3.0"

Maven 依赖:

<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-streaming-kafka_2.10</artifactId>
  <version>1.3.0</version>
</dependency>

利用spark streaming从kafka中读取数据,请使用KafkaUtils.createDirectStream:
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka.KafkaUtils
 
val ssc = new StreamingContext(new SparkConf, Seconds(60))
 
// hostname:port for Kafka brokers, not Zookeeper
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092,anotherhost:9092")
 
val topics = Set("sometopic", "anothertopic")
 
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
  ssc, kafkaParams, topics)

对createDirectStream的调用会返回一个元组流,他由每个Kafka消息的键和值形成的。 它的返回类型是InputDStream [(K,V)],其中K和V在这种情况下的类型都是String。 这个返回类型的子类实现是DirectKafkaInputDStream。 createDirectStream方法还有其他重载,允许您访问消息元数据,并精确的为每个主题和分区指定起始偏移。

如果从一个非流运算的spark job中读取kafka数据,请使用KafkaUtils.createRDD:

import kafka.serializer.StringDecoder
    import org.apache.spark.{SparkContext, SparkConf}
    import org.apache.spark.streaming.kafka.{KafkaUtils, OffsetRange}
 
    val sc = new SparkContext(new SparkConf)
 
    // hostname:port for Kafka brokers, not Zookeeper
    val kafkaParams = Map("metadata.broker.list" -> "localhost:9092,anotherhost:9092")
 
    val offsetRanges = Array(
      OffsetRange("sometopic", 0, 110, 220),
      OffsetRange("sometopic", 1, 100, 313),
      OffsetRange("anothertopic", 0, 456, 789)
    )
 
    val rdd = KafkaUtils.createRDD[String, String, StringDecoder, StringDecoder](
      sc, kafkaParams, offsetRanges)

对createRDD的调用为每个Kafka消息的指定批次的偏移范围内返回一个(key,value)格式的元组RDD。它的返回类型是RDD [(K,V)],子类实现是KafkaRDD。 createRDD方法还有其他重载来允许您访问消息元数据,并指定当前每个主题和以及分区的leader。

 Implementation
DirectKafkaInputDStream是批处理流。 每个批次关联对应的KafkaRDD。 KafkaRDD的每个分区对应一个OffsetRange。 大多数实现是私有的,但是
理解了以后还是非常有用的。

 OffsetRange
OffsetRange表示给定Kafka主题和分区中特定消息序列的下限和上限,以下是它的数据结构:
 

OffsetRange("visits", 2, 300, 310)

上面这行代码标识从“visits”主题的第2个分区中的偏移300(包括)到偏移310(不包括)的10个消息。 注意,它实际上不包含消息的内容,它只是一种识别范围的方法。

还要注意,因为Kafka排序只在每个分区的基础上定义,下面这行代码

OffsetRange("visits", 3, 300, 310)

对消息的引用可能来自于完全不同的时间段; 即使偏移量与上述相同,分区也不同。

KafkaRDD
回想一下RDD Class的定义如下:
1>包含一种将Job分区的方法(getPartitions)
2>包含为指定分区执行工作的方法(compute)
3>父RDD的列表, KafkaRDD是一个输入,而不是一个转换,所以它没有parent
4>(可选)定义如何对Key进行哈希的partitioner。 KafkaRDD没有定义.
5>(可选)给定分区的首选主机列表,以便将计算推送到数据所在的位置(getPreferredLocations).


KafkaRDD构造函数接收一个OffsetRanges数组和一个当前leader的主机和端口的映射,这个映射包含所有topic及其分区。 分离leader信息的原因是允许KafkaUtils.createRDD方法很方便调用KafkaRDD的构造函数,而你不需要知道leader信息 在这种情况下,createRDD将使用metadata.broker.list中指定的主机列表作为初始联系,来调用Kafka API的必要的元数据信息去寻找leader 该初始查找将在Spark驱动程序进程中仅发生一次

KafkaRDD的getPartitions方法使用数组中的每个OffsetRange,并通过添加leader的主机和端口信息来将其转换为RDD分区。 这里要注意的重要的是Kafka分区和RDD分区之间有1:1的对应关系。 这意味着Spark并行度(至少对于读取消息)的程度将直接与Kafka并行度的程度相关。


getPreferredLocations方法使用给定分区的Kafka leader作为首选主机。 我没有在和Kafka相同的主机上运行我的Spark executors,所以如果你这样做了,让我知道你是如何使它工作的

compute方法在Spark executors进程中运行。 它使用Kafka SimpleConsumer连接到给定主题和分区的leader,然后重复获取请求以读取指定范围的偏移量的消息。

每个消息都使用构造器中的messageHandler参数做转换。 messageHandler是Kafka MessageAndMetadata类型的用户定义类型函数,默认是键和值的元组。 在大多数情况下,这种类型在每个分区的基础上访问主题和偏移元数据更为有效(参见下面对HasOffsetRanges的讨论),但是如果真的需要将每个消息与其偏移关联,你可以这样做。

关于计算的关键点在于,由于偏移范围是在驱动程序上预先定义的,因此由执行程序直接从Kafka读取,特定KafkaRDD返回的消息是确定的。 因此,这里没有重要的状态保持在executors上,也没有提交读取偏移到Apache ZooKeeper的的概念,因为存在优先使用Kafka高级消费者解决方案。

由于计算操作是确定性的,所以如果任务失败通常可以重新尝试任务。 例如,如果Kafka leader丢失了,计算方法将进行休眠,休眠时间取决于Kafka参数refresh.leader.backoff.ms Kafka 中定义的时间,然后该任务失败并让正常的Spark任务重试机制处理它。 在第一次之后的后续尝试中,新的leader的找出逻辑执行会作为executor 的compute方法的执行中的一部分。

DirectKafkaInputDStream

如果您有现有代码来获取和管理偏移,KafkaUtils.createRDD返回的KafkaRDD可用于批处理作业。 然而,在大多数情况下,您可能会使用KafkaUtils.createDirectStream,它返回一个DirectKafkaInputDStream。 类似于RDD,DStream定义为:
1>包含一个父DStreams列表。 再次说明,这是一个输入DStream,而不是一个转换,所以这里它没有parent
2>包含stream生成批次的时间间隔。 这个stream使用上下文中定义的时间间隔
3>包含一种为给定时间间隔(compute)生成RDD的方法.

compute方法在驱动程序上运行。 它连接每个主题和分区的leader,不是读取消息而是获取最新的可用偏移量。 然后定义KafkaRDD,其偏移范围跨越从最后一个批次的结束点到最近leader的offset.

要定义第一个批次的起始点,您可以为每个TopicAndPartition指定精确的偏移量,或者使用Kafka参数auto.offset.reset,它可以设置为“最大”或“最小”(默认为“最大”) 。 对于速率限制,可以使用Spark配置变量spark.streaming.kafka.maxRatePerPartition设置每个分区每个批次的最大消息数。

一旦定义了给定时间间隔的KafkaRDD,它将按照上述批处理用例情况完全执行。 与以前的Kafka DStream实现不同,不存在这种长期运行占用每个流的核心而不顾及消息量是多少的reveiver任务,对于我们在Kixer的使用案例,通常在大量主题的同一作业中有重要但少量的主题。 使用 direct stream,低容量分区导致较小的任务快速完成,并释放该节点以处理批处理中的其他分区。 与此同时保持各个主题在逻辑上分开,这是一个相当大的成功,因为其均衡一致了集群的使用,

与批处理使用情况的显着差异是存在随时间变化的一些重要状态,换句话说,即在每个时间间隔产生的偏移范围。 executor或Kafka leader失败不是一个大问题,如上所述,但如果驱动程序失败,偏移范围将会丢失,除非存储在某个地方。 我将在下面的交付语义中更详细地讨论这一点,但你基本上有三个选择:
1>如果您不关心丢失或重复的message,请不要担心,只需从最早或最新的偏移重新启动流
2>给stream建立检查点,在这种情况下,偏移范围(而不是消息,只是偏移范围定义)将存储在检查点中
3>自己存储偏移范围,并在重新启动stream时提供正确的起始偏移

同样,没有消费者偏移存储在ZooKeeper中。 如果你想与现有的Kafka监控工具直接与ZK交谈,你自己需要将偏移存储到ZK(这不意味着它需要系统记录你的偏移量,你可以只是复制它们)

请注意,因为Kafka被视为持久化的消息存储,而不是瞬态网络源,您不需要将消息复制到HDFS以进行错误恢复。 然而,这种设计确实有一些含义。 首先,您无法读取Kafka中不再存在的message,因此请确保您保存了足够的message。 第二个还是是你不能读取Kafka中不存在的消息。 换句话说,执行者的消费者不轮询新消息,驱动程序只是定期地与领导在每个批处理间隔检查,因此有一些固有的延迟。

HasOffsetRanges

另一个实现细节是公共接口HasOffsetRanges,它带有返回OffsetRange数组的单个方法。 KafkaRDD实现了此接口,允许您在每个分区的基础上获取主题和偏移量信息。

val stream = KafkaUtils.createDirectStream(...)
      ...
      stream.foreachRDD { rdd =>
        // Cast the rdd to an interface that lets us get a collection of offset ranges
        val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 
        rdd.mapPartitionsWithIndex { (i, iter) =>
          // index to get the correct offset range for the rdd partition we're working on
          val osr: OffsetRange = offsets(i)
 
          // get any needed data from the offset range
          val topic = osr.topic
          val kafkaPartitionId = osr.partition
          val begin = osr.fromOffset
          val end = osr.untilOffset
          ...

使用这种间接层的原因是因为DStream方法使用的静态类型像foreachRDD和transform只是RDD,而不是底层实现的类型(在这种情况下,私有)。 因为createDirectStream返回的DStream生成了KafkaRDD的批次,所以可以安全地转换为HasOffsetRanges。 还要注意,由于偏移范围和rdd分区之间的1:1对应关系,rdd分区的索引对应由offsetRanges返回的数组中的索引。
Delivery Semantics

首先,了解delivery 语义的Kafka文档。 如果你已经阅读过,请再次阅读。 总之:消费者delivery 语义取决于你,不是Kafka。

第二,理解Spark不能保证输出动作的一次性语义。 当Spark streaming guide谈论至少一次时,它只是指一个RDD中的给定item包含在计算值中一次,纯粹是功能意义上的。 任何包含副作用的输出操作(即任何你在foreachRDD中保存结果的任何操作)可能会重复,因为进程的任何阶段都可能失败并重试。


第三,了解Spark检查点可能无法恢复,例如在您需要更改应用程序代码以获取stream重新启动的情况下。 这种情况可能会在1.4版本中被改进,但是要注意这是一个问题。 我以前遇到过这个坑,你可能也是。 任何地方,我提到“检查点stream”作为一个选项,考虑所涉及的风险。 还要注意,反正任何窗口变换都将依赖于检查点,

最后,我将重复,除了最多一次以外的任何语义需要您在Kafka有足够的日志保留。 如果你看到像OffsetOutOfRangeException这样的东西,这可能是因为你的Kafka存储不足,而不是因为Spark或Kafka的错误。

鉴于所有这一切,你如何获得相当于你想要的语义?

At-most-once
这在您将结果发送到非记录系统,不想要重复的情况下,以及确保不会陷入message丢失的这种麻烦的情况下,这可能非常有用。 一个例子比如通过UDP发送摘要统计信息,因为这开始于一个不可靠的协议

要获取最常用的语义,请执行以下所有操作:
1>将spark.task.maxFailures设置为1,因此作业失败时作业立即结束。
2>确保spark.speculation为false(默认值),因此任务的多个副本不会被推测地运行。
3>当作业死亡时,使用Kafka param auto.offset.reset设置为“最大”启动stream备份,因此它将跳转到日志的当前结尾。

这意味着您在重新启动时会丢失消息,但至少大概不应该重演一次 请仔细测试,如果你的消息不会重复对你实际上很重要,因为它不是一个常见的用例,我没有提供它的示例代码。

At-least-once
您可以重复message,但不会丢失message。 这方面的一个stream的例子相对少见,比如发送内部电子邮件警报。 在短时间内获得重复的紧急警报比没有得到它们好得多。

基本选项如下:
1>建立stream检查点,或者
2>设置auto.offset.reset为最小并重新启动作业。 这将从您的保留开始重新获取整个日志,因此您最好保留相对较短的时间,或者对重复的邮件确实很好。

对stream建立检查点作为下一个选项的基础,因此请查看它的示例代码。

Exactly-once using idempotent writes

幂等写入使重复的消息安全,至少转换一次到等同于一次。 这样做的典型方式是通过具有某种类型的唯一key(嵌入在消息中,或使用主题/分区/偏移量作为key),并根据该key存储结果。 依赖于每个消息的唯一键意味着这对于转换或过滤单独有价值的消息很有用,对于聚合多个消息则不一定。

关于这个idea,在IdempotentExample.scala有一个完整的例子。 它使用Postgres,为了与下一个示例的一致性,但是可以使用允许唯一键的任何存储系统。

stream.foreachRDD { rdd =>
      rdd.foreachPartition { iter =>
        // make sure connection pool is set up on the executor before writing
        SetupJdbc(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
 
        iter.foreach { case (key, msg) =>
          DB.autoCommit { implicit session =>
            // the unique key for idempotency is just the text of the message itself, for example purposes
            sql"insert into idem_data(msg) values (${msg})".update.apply
          }
        }
      }
    }

在失败的情况下,可以安全地重试上述输出动作。 对stream进行检查点确保偏移范围在生成时保存。 检查点以通常的方式完成,通过定义配置流上下文(ssc)和设置stream的功能,然后调用   
ssc.checkpoint(checkpointDir)
在返回ssc之前。 有关更多详细信息,请参阅Streaming Guide
Exactly-once using transactional writes
对于支持事务的数据存储,即使在故障情况下,也可以将结果保存在同一事务中的偏移来保持同步。 如果你仔细检查那些重复或跳过的偏移范围,回滚事务可防止重复或丢失的message影响结果。 这给出了恰好一次语义的等价物,并且直接用于聚合。
TransactionalExample.scala是一个完整的Spark作业,它实现了这个想法。 它虽然使用Postgres,但是可以使用任何具有事务语义的数据存储。
第一个重要的点是,使用最后成功提交的偏移量作为开始点来启动stream。 这允许故障恢复:

// begin from the the offsets committed to the database
    val fromOffsets = DB.readOnly { implicit session =>
      sql"select topic, part, off from txn_offsets".
        map { resultSet =>
          TopicAndPartition(resultSet.string(1), resultSet.int(2)) -> resultSet.long(3)
        }.list.apply().toMap
    }
 
    val stream: InputDStream[Long] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Long](
      ssc, kafkaParams, fromOffsets,
      // we're just going to count messages, don't care about the contents, so convert each message to a 1
      (mmd: MessageAndMetadata[String, String]) => 1L)

对于第一次运行作业,可以使用适当的起始偏移预先加载表。
如上面对HasOffsetRanges的讨论中所述,该示例根据每个分区访问偏移范围,
注意mapPartitionsWithIndex是一个转换,并且没有等效的foreachPartitionWithIndex操作。 RDD转换通常是惰性的,所以除非你添加某种类型的输出动作,Spark将永远不会调度作业来做任何事情。 在empty body上调用RDD的foreach就足够了。 另外,注意一些迭代器方法,如map是懒惰的。 如果你正在设置瞬态状态,如网络或数据库连接,则在映射完全强制时连接可能已关闭。 在这种情况下,请确保使用像foreach这样的热衷于使用迭代器的方法。

rdd.mapPartitionsWithIndex { (i, iter) =>
        // set up some connection
 
        iter.foreach {
          // use the connection
        }
 
        // close the connection
 
        Iterator.empty
      }.foreach {
        // Without an action, the job won't get scheduled, so empty foreach to force it
        // This is a little awkward, but there is no foreachPartitionWithIndex method on rdds
        (_: Nothing) => ()
      }

最后要注意的例子是,确保保存结果和保存偏移要么成功,要么都失败,这个很重要。 如果先前提交的偏移不等于当前偏移范围的开始,则存储偏移将失败; 这将防止间隙或重复。 Kafka语义确保在偏移范围内的消息中没有间隙(如果您特别关心,您可以通过将偏移范围的大小与消息数量进行比较来验证)。

// localTx is transactional, if metric update or offset update fails, neither will be committed
    DB.localTx { implicit session =>
      // store metric data
      val metricRows = sql"""
    update txn_data set metric = metric + ${metric}
      where topic = ${osr.topic}
    """.update.apply()
      if (metricRows != 1) {
        throw new Exception("...")
      }
 
      // store offsets
      val offsetRows = sql"""
    update txn_offsets set off = ${osr.untilOffset}
      where topic = ${osr.topic} and part = ${osr.partition} and off = ${osr.fromOffset}
    """.update.apply()
      if (offsetRows != 1) {
        throw new Exception("...")
      }
    }

示例代码抛出异常,这将导致事务回滚。 其他故障处理策略可能是适当的,只要它们也导致事务回滚。

 Future Improvements
虽然Spark 1.3这个功能被认为是实验性的,但是底层的KafkaRDD设计已经在Kixer生产几个月。 它目前每天处理数十亿条消息,批处理大小介于2秒到5分钟之间。 话虽如此,已知已经有很多可以改进的地方(也可能有几个未知的地方)。
1>连接池。 目前,Kafka消费者连接是根据需要创建的; 池应该有助于效率。 希望这可以以这样一种方式实现:它很好地与正在进行的工作朝向Spark中的Kafka生产者API集成。
2>Kafka元数据API。 与Kafka交互的类目前是私有的,这意味着如果您想要通过low-level API访问Kafka元数据,您需要复制一些工作。 这部分是因为Kafka消费者偏移API现在还不稳定, 如果这个代码被证明是稳定的,那么有一个面向用户的API来与Kafka元数据进行交互是很不错的。
3>批生成策略。 现在,速率限制是唯一可用于定义流中下一批次的调整。 我们有一些涉及更大调整的用例,如固定的时间延迟。 定义批生成策略的灵活方式可能很有用

如果还有你可以想到的其他改进,请让我知道

终于翻译完了,有些复杂句子的意思我结合语境和相关知识斟酌了好久,有不当的地方欢迎指正.

原文地址:https://www.cnblogs.com/cssdongl/p/6210757.html