01 sparkAPI-阅读总结-sparkstreaming

 

Spark Streaming编程指南

概观

Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。数据可以从许多来源(如Kafka,Flume,Kinesis或TCP套接字)中获取,并且可以使用以高级函数表示的复杂算法进行处理map,例如reducejoinwindow最后,处理后的数据可以推送到文件系统,数据库和实时仪表板。实际上,您可以在数据流上应用Spark的 机器学习和 图形处理算法。

Spark Streaming

在内部,它的工作原理如下。Spark Streaming接收实时输入数据流并将数据分成批处理,然后由Spark引擎处理,以批量生成最终结果流。

Spark Streaming

Spark Streaming提供称为离散流DStream的高级抽象,表示连续的数据流。DStream可以从来自Kafka,Flume和Kinesis等源的输入数据流创建,也可以通过在其他DStream上应用高级操作来创建。在内部,DStream表示为一系列 RDD

本指南向您展示如何使用DStreams开始编写Spark Streaming程序。您可以使用Scala,Java或Python编写Spark Streaming程序(在Spark 1.2中引入),所有这些都在本指南中介绍。您可以在本指南中找到标签,让您在不同语言的代码段之间进行选择。

注意:有一些API在Python中不同或不可用。在本指南中,您将找到标记Python API,突出显示这些差异。


一个快速的例子

首先,我们将Spark Streaming类的名称和StreamingContext中的一些隐式转换导入到我们的环境中,以便将有用的方法添加到我们需要的其他类(如DStream)。StreamingContext是所有流功能的主要入口点。我们使用两个执行线程创建一个本地StreamingContext,批处理间隔为1秒

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))

使用此上下文,我们可以创建一个DStream来表示来自TCP源的流数据,指定为主机名(例如localhost)和端口(例如9999

// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

linesDStream表示将从数据服务器接收的数据流。此DStream中的每条记录都是一行文本。接下来,我们希望将空格字符分割为单词。

// Split each line into words
val words = lines.flatMap(_.split(" "))

 flatMap是一对多DStream操作,它通过从源DStream中的每个记录生成多个新记录来创建新的DStream。在这种情况下,每行将被分成多个单词,单词流表示为 wordsDStream。接下来,我们要计算这些单词。

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()

所述wordsDSTREAM被进一步映射(一到一个变换)到一个DSTREAM (word, 1)对,然后将其还原得到的单词的频率数据中的每一批。最后,wordCounts.print()将打印每秒生成的一些计数。

请注意,执行这些行时,Spark Streaming仅设置启动时将执行的计算,并且尚未启动实际处理。要在设置完所有转换后开始处理,我们最终调用

ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

如果您已经下载构建了 Spark,则可以按如下方式运行此示例。您首先需要使用Netcat(在大多数类Unix系统中找到的小实用程序)作为数据服务器运行

$ nc -lk 9999
# TERMINAL 1:
# Running Netcat

$ nc -lk 9999

hello world



...
 
# TERMINAL 2: RUNNING NetworkWordCount

$ ./bin/run-example streaming.NetworkWordCount localhost 9999
...
-------------------------------------------
Time: 1357008430000 ms
-------------------------------------------
(hello,1)
(world,1)
...

依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.4.0</version>
</dependency>

初始化StreamingContext

要初始化Spark Streaming程序,必须创建一个StreamingContext对象,它是所有Spark Streaming功能的主要入口点。

StreamingContext对象可以从被创建SparkConf对象。

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

appName参数是应用程序在群集UI上显示的名称。 masterSpark,Mesos或YARN群集URL,或在本地模式下运行的特殊“local [*]”字符串。实际上,当在群集上运行时,您不希望master在程序中进行硬编码,而是启动应用程序spark-submit并在那里接收它。但是,对于本地测试和单元测试,您可以传递“local [*]”以在进程中运行Spark Streaming(检测本地系统中的核心数)。请注意,这会在内部创建一个SparkContext(所有Spark功能的起点),可以访问它ssc.sparkContext

必须根据应用程序的延迟要求和可用的群集资源设置批处理间隔。有关 更多详细信息,请参见性能调整部分。

StreamingContext目的还可以从现有的创建的SparkContext对象。

import org.apache.spark.streaming._

val sc = ...                // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))

定义上下文后,您必须执行以下操作。

  1. 通过创建输入DStreams来定义输入源。
  2. 通过将转换和输出操作应用于DStream来定义流式计算。
  3. 开始接收数据并使用它进行处理streamingContext.start()
  4. 等待处理停止(手动或由于任何错误)使用streamingContext.awaitTermination()
  5. 可以使用手动停止处理streamingContext.stop()
要记住的要点:
  • 一旦启动了上下文,就不能设置或添加新的流式计算。
  • 上下文停止后,无法重新启动。
  • 在JVM中只能同时激活一个StreamingContext。
  • StreamingContext上的stop()也会停止SparkContext。要仅停止StreamingContext,请将stop()called 的可选参数设置stopSparkContext为false。
  • 只要在创建下一个StreamingContext之前停止前一个StreamingContext(不停止SparkContext),就可以重复使用SparkContext来创建多个StreamingContexts。

离散流(DStreams)

Discretized StreamDStream是Spark Streaming提供的基本抽象。它表示连续的数据流,可以是从源接收的输入数据流,也可以是通过转换输入流生成的已处理数据流。在内部,DStream由一系列连续的RDD表示,这是Spark对不可变分布式数据集的抽象(有关更多详细信息,请参阅Spark编程指南)。DStream中的每个RDD都包含来自特定时间间隔的数据,如下图所示。

Spark Streaming

应用于DStream的任何操作都转换为底层RDD上的操作。例如,在先前将行流转换为字的示例中,flatMap操作应用于linesDStream中的每个RDD 以生成DStream的 wordsRDD。如下图所示。

Spark Streaming

这些底层RDD转换由Spark引擎计算。DStream操作隐藏了大部分细节,并为开发人员提供了更高级别的API以方便使用。这些操作将在后面的章节中详细讨论。


输入DStreams和Receivers

输入DStream是表示从流源接收的输入数据流的DStream。快速示例中lines输入DStream是表示从netcat服务器接收的数据流。每个输入DStream(文件流除外,本节稍后讨论)都与Receiver (Scala doc, Java doc)对象相关联,该对象从源接收数据并将其存储在Spark的内存中进行处理。

Spark Streaming提供两类内置流媒体源。

  • 基本来源:StreamingContext API中直接提供的示例:文件系统和套接字连接。
  • 高级资源:Kafka,Flume,Kinesis等资源可通过额外的实用程序类获得。这些需要链接额外的依赖关系,如 链接部分所述。

我们将在本节后面讨论每个类别中的一些来源。

请注意,如果要在流应用程序中并行接收多个数据流,可以创建多个输入DStream(在“ 性能调整”部分中进一步讨论)。这将创建多个接收器,这些接收器将同时接收多个数据流。但请注意,Spark worker / executor是一个长期运行的任务,因此它占用了分配给Spark Streaming应用程序的其中一个核心。因此,重要的是要记住,Spark Streaming应用程序需要分配足够的内核(或线程,如果在本地运行)来处理接收的数据,以及运行接收器。

要记住的要点
  • 在本地运行Spark Streaming程序时,请勿使用“local”或“local [1]”作为主URL。这两种方法都意味着只有一个线程将用于本地运行任务。如果您正在使用基于接收器的输入DStream(例如套接字,Kafka,Flume等),则单线程将用于运行接收器,不会留下任何线程来处理接收到的数据。因此,在本地运行时,始终使用“local [ n ]”作为主URL,其中n >要运行的接收器数量(有关如何设置主服务器的信息,请参阅Spark属性)。

  • 将逻辑扩展到在集群上运行,分配给Spark Streaming应用程序的核心数cores必须大于接收器receiver数否则系统将接收数据,但无法处理数据。

基本来源

我们已经看过快速示例中的内容ssc.socketTextStream(...),该示例 根据通过TCP套接字连接接收的文本数据创建DStream。除了套接字之外,StreamingContext API还提供了从文件创建DStream作为输入源的方法。

文件流

对于从与HDFS API兼容的任何文件系统(即HDFS,S3,NFS等)上的文件读取数据,可以创建DStream作为via StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]

文件流不需要运行接收器,因此不需要分配任何内核来接收文件数据。

对于简单的文本文件,最简单的方法是StreamingContext.textFileStream(dataDirectory)

streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

对于文本文件

streamingContext.textFileStream(dataDirectory)
如何监控目录

Spark Streaming将监视目录dataDirectory并处理在该目录中创建的任何文件。

  • 可以监视一个简单的目录,例如"hdfs://namenode:8040/logs/"直接在这种路径下的所有文件将在发现时进行处理。
  • POSIX glob模式可以被提供,例如 "hdfs://namenode:8040/logs/2017/*"这里,DStream将包含与模式匹配的目录中的所有文件。那就是:它是目录的模式,而不是目录中的文件。
  • 所有文件必须采用相同的数据格式。
  • 根据文件的修改时间而不是创建时间,文件被视为时间段的一部分。
  • 处理完毕后,对当前窗口中文件的更改不会导致重新读取文件。即:忽略更新
  • 目录下的文件越多,扫描更改所需的时间就越长 - 即使没有修改过任何文件。
  • 如果使用通配符来标识目录,例如"hdfs://namenode:8040/logs/2016-*",重命名整个目录以匹配路径,则会将目录添加到受监视目录列表中。只有修改时间在当前窗口内的目录中的文件才会包含在流中。
  • 调用FileSystem.setTimes() 时间戳是一种在稍后的窗口中拾取文件的方法,即使其内容未更改。
使用对象存储作为数据源

“完整”文件系统(如HDFS)会在创建输出流后立即在其文件上设置修改时间。打开文件时,即使在数据完全写入之前,它也可能包含在DStream- 之后 - 将忽略同一窗口中文件的更新。即:可能会遗漏更改,并从流中省略数据。

要保证在窗口中选择更改,请将文件写入不受监视的目录,然后在关闭输出流后立即将其重命名为目标目录。如果重命名的文件在其创建窗口期间出现在扫描的目标目录中,则将拾取新数据。

相比之下,Amazon S3和Azure Storage等对象存储通常具有较慢的重命名操作,因为实际上是复制了数据。此外,重命名的对象可能将rename()操作的时间作为其修改时间,因此可能不被视为原始创建时间所暗示的窗口的一部分。

需要对目标对象存储进行仔细测试,以验证存储的时间戳行为是否与Spark Streaming所期望的一致。可能是直接写入目标目录是通过所选对象库流式传输数据的适当策略。

有关此主题的更多详细信息,请参阅Hadoop文件系统规范

基于自定义接收器的流

可以使用通过自定义接收器接收的数据流创建DStream。有关详细信息,请参阅自定义接收器指南

RDD作为流的队列

为了测试带有测试数据的Spark Streaming应用程序,还可以使用基于RDD队列创建DStream streamingContext.queueStream(queueOfRDDs)推入队列的每个RDD将被视为DStream中的一批数据,并像流一样处理。

有关从套接字和文件流的详细信息,请参阅在相关函数的API单证 的StreamingContext斯卡拉,JavaStreamingContext 对Java和的StreamingContext为Python。

高级资源

Python API从Spark 2.4.0开始,在这些来源中,Kafka,Kinesis和Flume在Python API中可用。

此类源需要与外部非Spark库连接,其中一些库具有复杂的依赖性(例如,Kafka和Flume)。因此,为了最大限度地减少与依赖项版本冲突相关的问题,从这些源创建DStream的功能已移至可在必要时显式链接的单独库

请注意,Spark shell中不提供这些高级源,因此无法在shell中测试基于这些高级源的应用程序。如果您真的想在Spark shell中使用它们,则必须下载相应的Maven工件JAR及其依赖项,并将其添加到类路径中。

其中一些高级资源如下。

  • Kafka: Spark Streaming 2.4.0与Kafka经纪人版本0.8.2.1或更高版本兼容。有关更多详细信息,请参阅Kafka集成指南

  • Flume: Spark Streaming 2.4.0与Flume 1.6.0兼容。有关详细信息,请参阅Flume集成指南

  • Kinesis: Spark Streaming 2.4.0与Kinesis Client Library 1.2.1兼容。有关详细信息,请参阅Kinesis集成指南

自定义来源

Python API Python尚不支持此功能。

输入DStream也可以从自定义数据源创建。您所要做的就是实现一个用户定义的接收器(参见下一节以了解它是什么),它可以从自定义源接收数据并将其推送到Spark。有关详细信息,请参阅自定义接收器指南

接收器可靠性

根据其可靠性,可以有两种数据源来源(如Kafka和Flume)允许传输数据得到确认。如果从这些可靠来源接收数据的系统正确地确认接收到的数据,则可以确保不会因任何类型的故障而丢失数据。这导致两种接收器:

  1. 可靠的接收器 - 可靠的接收器在接收到数据并通过复制存储在Spark中时正确地向可靠的源发送确认。
  2. 不可靠的接收器 - 不可靠的接收不会向源发送确认。这可以用于不支持确认的源,甚至可以用于不需要或需要进入确认复杂性的可靠源。

“ 自定义接收器指南”中讨论了如何编写可靠接收器的详细信息 


DStreams的转换

与RDD类似,转换允许修改来自输入DStream的数据。DStreams支持普通Spark RDD上可用的许多转换。一些常见的如下。

转型含义
map功能 通过将源DStream的每个元素传递给函数func来返回一个新的DStream 
flatMapfunc 与map类似,但每个输入项可以映射到0个或更多输出项。
过滤器功能 通过仅选择func返回true 的源DStream的记录来返回新的DStream 
重新分区numPartitions 通过创建更多或更少的分区来更改此DStream中的并行度级别。
unionotherStream 返回一个新的DStream,它包含源DStream和otherDStream中元素的并 
count() 通过计算源DStream的每个RDD中的元素数量,返回单元素RDD的新DStream。
减少功能 通过使用函数func(它接受两个参数并返回一个)聚合源DStream的每个RDD中的元素,返回单元素RDD的新DStream 该函数应该是关联的和可交换的,以便可以并行计算。
countByValue() 当在类型K的元素的DStream上调用时,返回(K,Long)对的新DStream,其中每个键的值是其在源DStream的每个RDD中的频率。
reduceByKeyfunc,[numTasks ]) 当在(K,V)对的DStream上调用时,返回(K,V)对的新DStream,其中使用给定的reduce函数聚合每个键的值。注意:默认情况下,这使用Spark的默认并行任务数(本地模式为2,在群集模式下,数量由config属性确定spark.default.parallelism)进行分组。您可以传递可选numTasks参数来设置不同数量的任务。
joinotherStream,[ numTasks]) 当在(K,V)和(K,W)对的两个DStream上调用时,返回(K,(V,W))对的新DStream与每个键的所有元素对。
协同组otherStream,[numTasks ]) 当在(K,V)和(K,W)对的DStream上调用时,返回(K,Seq [V],Seq [W])元组的新DStream。
变换功能 通过将RDD-to-RDD函数应用于源DStream的每个RDD来返回新的DStream。这可以用于在DStream上执行任意RDD操作。
updateStateByKeyfunc 返回一个新的“状态”DStream,其中通过在键的先前状态和键的新值上应用给定函数来更新每个键的状态。这可用于维护每个密钥的任意状态数据。
   

其中一些转换值得更详细地讨论。

UpdateStateByKey操作

updateStateByKey操作允许您在使用新信息持续更新时保持任意状态。要使用它,您必须执行两个步骤。

  1. 定义状态 - 状态可以是任意数据类型。
  2. 定义状态更新功能 - 使用函数指定如何使用先前状态和输入流中的新值更新状态。

在每个批处理中,Spark都会对所有现有密钥应用状态更新功能,无论它们是否在批处理中都有新数据。如果更新函数返回,None则将删除键值对。

让我们举一个例子来说明这一点。假设您要维护文本数据流中看到的每个单词的运行计数。这里,运行计数是状态,它是一个整数。我们将更新函数定义为:

2.4.0
原文地址:https://www.cnblogs.com/star521/p/10009860.html