[Spark]-Streaming-调优

1.概述

  Spark Streaming的主要应用方向是实时计算.这代表一个Spark Streaming应用必然是对执行性能和运行稳定性(7 x 24)有一定要求的

2.性能

  在性能方面,主要是合理的利用的集群资源,设置正确的批处理大小(提升并行度)和减少每个批次的处理时间(计算逻辑优化).以让数据流处理的能像接受一样快

  2.1 调整Spark配置参数

    

  2.2 数据接收优化

    一个Spark Streaming 应用的开端便是数据接收,那么性能调优的第一步就是保证:数据不会在数据接收器端产生积压.

    2.2.2 及时的处理数据以防止数据积压

      如果批处理时间(batch processing time)超过批次间隔(batchinterval), 那么显然数据会不断的积压(receiver 的内存将会开始填满) , 最终会抛出 exceptions(最可能是 BlockNotFoundException).

      考虑提升并行度等方法,以加快批次处理速度.

      适度加大批次间隔,如果这样能及时在下个批次触发之前将数据处理完

           使用 SparkConf 配置 spark.streaming.receiver.maxRate,可以限制receiver的接受速率

    2.2.1 提升并行度  

      2.2.1.1 设置良好的执行任务并行数

        Spark Streaming 依然保持一个分区(partition)一个任务(task)的原则

        设置块间隔以影响任务数

          对于大多数接收器而言,接收到的数据在Spark都是合并在一起的,这称之为数据块(blocks of data).并依据块间隔对数据块分割分区(partition)

          所以对每个批次而言,批次中的块间隔决定了执行Map系的转换操作的任务数.(每个批次的任务数=批间隔(batch interval) / 块间隔(block interval))

            例如:批处理间隔2秒的Steaming应用,200毫秒的块间隔大约会创建10个任务(10=2000/200)

          如果执行的任务数太少(少于机器的可用核数),它将会是低效的.因为有空闲的核没有参与执行.

          如果执行的任务数太多,也是低效的.因为会产生大量的任务排队与任务启动开销.

          所以调整块间隔,以优化出一个合适的执行任务数是非常有必要的.

            配置项 : spark.streaming.blockInterval (默认200(毫秒)). (官方推荐的 block interval (块间隔)最小值约为 50ms , 低于此任务启动开销可能是一个问题)

        直接干预分区(partition)数以影响任务数

          通过调用 inputDstream.repartition(n) 来重新分区也可以.但这会产生一个Shuffle操作.      

      2.2.1.2 并行化数据接收

        每个 input DStream 都会创建一个接收数据流(single stream of data)和一个接收器(single receiver)

        因此, 可以通过创建多个 input DStreams 来实现接收多个数据流(Receiving multiple data streams) ,并配置用数据流的不同分区去接收数据源的不同数据段

        例如, 接收两个数据主题(two topics of data)的单个Kafka input DStream 可以分为两个输入流( Kafka input streams) ,每个只接收一个主题(kafka topic) topic

            这将运行两个接收器(receivers) 以并行的接受数据,从而提高总体吞吐量(overall throughput

            而这多个Input DStreams 可以连接在一起(Union),然后一起进行转换计算等处理.(这种Union不会影响分区(partition)数)

        如下:          

                    val numStreams = 5
                    val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
                    val unifiedStream = streamingContext.union(kafkaStreams)
                    unifiedStream.print()

         注意: 并行数据接受,必须保证有足够多的核可以使用 (spark.cores.max)

  2.3 序列化的影响

    在一个Spark Streaming,在很多时候都会将数据序列化与反序列化,比如:

      i).输入数据

         默认会将接收器接受到的输入数据,通过 StorageLevel.MEMORY_AND_DISK_SER_2 存储到executor的内存或磁盘中.

         这个存储过程就会将输入数据序列化存入.此外,因为是序列化存入,则读取时也必要有一个反序列化过程.

      ii).缓存(persist)一个 DSream 

       流式计算过程中的某一个DSream,可能会需要缓存(临时存储在内存).而在流式计算,缓存模式使用StorageLevel.MEMORY_ONLY_SER,也是序列化操作. (Spark Core是StorageLevel.MEMORY_ONLY)

    总之,在一个Spark Streaming中,使用数据序列化的场合是非常多的.而在这时,序列化算法本身的性能就变得非常关键了.

    Spark 内置提供了 Kryo 序列化.相比Java内置的序列化可以提升一个数量级(10倍)以上. Kryo 唯一的要求时需要提前注册需序列化的类

      使用Kryo序列化:  conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))  详见Spark调优篇之序列化调优

   2.4 内存调优

    关于 Spark 内存调优的已经在Spark调优篇之内存调优讨论过,这里再针对Spark Streaming 应用进行一些讨论.

    2.4.1 对 Spark Streaming 应用,保证有足够的内存使用

      一个Spark Streaming 应用中所需要的内存,很大程序取决于转换(transformations)类型.

        例如:使用一个最近10分钟的窗口操作,那么集群中应该有足够的内存来容纳这10分钟的数据.

          或者要使用大量keys的UpdateStateByKey.则必然会使用大量内存.

          相反,简单的 map-filter-store 操作,则所需内存就会很低

      默认情况,Streaming会使用MEMORY_AND_DISK_SER_2.所以内存溢出的数据会写到磁盘中.这必然会降低应用的性能.

      因此,官方建议对Streaming应用一定要保证有足够的内存使用

     2.4.2 Spark Streaming 的GC

      对 Spark Streaming 内存调优的另一个方向是GC.对于需要低延迟的Spark Streaming 应用 , 肯定是不希望由 JVM GC 引起大量暂停的

      Spark Streaming 一些调整内存使用量和GC 的思路

      i).DStreams 的持久化

          如前所述,DSteam的持久化默认策略是 MEMORY_AND_DISK_SER_2 .序列化存储,已经极大的降低内存使用量和GC的次数.

          进一步,如果使用更好的序列化算法(例如Kryo),获得更快的序列化速度和更小的内存占用,也会进一步提升应用性能.

          再进一步,对序列化结果进行压缩,可以进一步缩小内存占用.(参见Spark配置 spark.rdd.compress)

          当然.这些优化都会耗用更多的CPU资源.

      ii).旧数据清理(Clearing old data)

        DSteam经过转换生成所有的 InputData 和 Persist RDD,将由 Spark 内部决定何时进行清除.

          例如 一个10分钟的窗口操作,Spark默认只会保留最近10分钟的数据,而更早的数据将会被自动清理.

        可以通过设置 streamingContext.remember 保持更长的持续时间(例如交互式查询旧数据) 

      iii).CMS垃圾收集器

          虽然已知 concurrent GC 可以提升系统的整体处理吞吐量,但仍然建议使用 concurrent mark-and-sweep(CMS) GC.

          强烈建议 在Driver 和 Executor 都设置 CMS GC 并设置统一的批处理时间, 以保持 GC 相关的暂停始终如一.

      iv).其它

        使用 OFF_HEAP 存储级别的持久化 RDDs

        使用更小的 heap sizes 的 executors.这将降低每个 JVM heap 内的 GC 压力

原文地址:https://www.cnblogs.com/NightPxy/p/9313524.html