小记--------spark streaming调优

一、数据序列化
    如果使用一种对象序列化慢、占用字节多的序列化格式,就会严重降低计算效率。
 
在spark中有三个方面涉及序列化:
1.在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。
 
2.将自定义的类型作为RDD的泛型类型时,所有自定义类型对象都会进行序列化,因此在这种情况下,也要求自定义的类必须实现Serializable接口。
 
3.使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),spark会将RDD中的每个partition都序列化成一个大的字节数组。
    spark综合考量易用性和性能,提供了下面两种序列化库:
    3.1Java序列化
    默认情况下,spark使用Java的对象输出流框架来进行对象的序列化,并且可用在任意实现java.io.Serializable接口的自定义类上,我们可以通过扩展java.io.Externalizable来更加精细地控制序列化行为。Java序列化方式非常灵活,但是通常序列化速度非常慢而且对于很多类会产生非常巨大的序列化结果
    3.2Kryo序列化
    spark2.0以上的版本可以使用kryo库来非常快速地进行对象序列化,Kryo要比Java序列化更快、更紧凑(10倍)但是其不支持所有的Serializable类型,并且在使用自定义类之前必须先注册。
        我们可以再初始化sparkconf时调用conf.set(“spark.serializer” , “org.apache.spark.serializer.KryoSerializer”)来使用Kyro。进行了这个配置,Kryo序列化不仅仅会用在Shuffing操作时worker节点间的数据传递,也会用在RDDs序列化到硬盘的过程。
        另外需要注意的是,在spark2.0.0之后,spark已经默认将Kryo序列化作为简单类型(基本类型,基本类型的数组及string类型)RDD进行shuffing操作时传输数据的对象序列化方式。
        如果我们的对象非常大,可能需要增加Spark.kryoserializer.buffer的配置。
 
    同样在sparkstreaming中,通过优化序列化格式可以缩减数据序列化的开销,而在streaming中还会涉及以下两类数据的序列化。
        输入数据:spark streaming 中不同于RDD默认是以非序列化的形式存于内存当中,streaming中的接收器(Receiver)接收而来的数据,默认是以序列化重复形式(StorageLevel.MEMORY_AND_DISK_SER_2)存放于Executor的内存当中,而采用这种方式的目的,一方面是由于将输入数据序列化为字节流可以减少垃圾回收(GC)的开销,另一方面是对数据的重复可以对Executor节点的失败有更好的容错性。同时需要注意的是,输入数据流一开始是保存在内存当中,当内存不足以存放流式计算依赖的输入数据时,会自动存放于硬盘当中,而在streaming中这部分序列化是一个很大的开销,接收器必须先反序列化(deserialize)接收到的数据,然后再序列化(serialize)为spark本身的序列化格式。
 
        由于streaming操作产生RDD的持久化:由流式计算产生的RDDs有可能持久化在内存当中,例如由于基于窗口操作的数据会被重复使用,所以会持久化在内存当中。值得注意的是:不同于spark核心默认使用非序列化的持久化方式(StorageLevel.MEMORY_ONLY),流式计算为了减少垃圾回收(GC)的开销,默认使用了序列化的持久化方式(StorageLevel.MEMORY_ONLY_SER)。
 
    不管在spark还是在spark streaming中,使用Kryo序列化方式,都可以减少CPU和内存的开销,而对于流式计算,如果数据量不是很大,并且不会造成过大的垃圾回收(GC)开销,我们可以考虑利用非序列化对象进行持久化。
例如:我们使用很小的批处理时间间隔,并且没有基于窗口的操作可以通过显示设置相应的存储级别来关闭持久化数据时的序列化,这样可以减少序列化引起的CPU开销,但是潜在的增加GC的开销。
 
二、内存管理
Spark对于内存的使用主要有两类用途:执行和存储
执行:内存主要被用于shuffle类操作、join操作及排序sort和聚合aggregation类操作。
存储:用于缓存数据和集群间内部数据的传送
 
spark提供了两个相关的配置,一般大多数情况下默认值就可以满足负载情况
1、spark Memory.Fraction 表示M的大小占整个JVM堆空间的比例(默认比例是0.6),剩余的空间(40%)被用来保存用户的数据结构及spark内部的元数据,另一方面预防某些异常数据记录造成的OOM错误
2.Spark.Memory.StorageFraction 表示R的大小占整个M的比例(默认是0.5)R是存储类内存在M中占用的空间,其中缓存的数据块不会被执行类内存剥夺
 
 
三、垃圾回收(GC)优化
 
查看GC发生的频率和消耗时间:
    在Java选项中加入:-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 来实现。之后在spark运行后的worker端日志中看到GC发生时打印的信息。(注意的是在worker端的日志,不在driver端)
        
Java虚拟机内部内存管理:
  1.     Java对象是存储在堆空间内的,堆空间被分为两部分,既年轻区域和老年区域,
               其中年轻代(Young generation)会用来存储短生命周期的对象
               其中老年代(Old generation) 会用来存储较长生命周期的对象 
      2.      年轻代的区域又分为3个部分[Eden,Survivor1,Survivor2]
      3.      一个简单的GC流程大致为:当Eden区域满了,一次小型GC过程会将Eden和Survivor1中还存活的对象复制到Survivor2区域上,Surivivor区域是可以交换的,当一个对象存活周期已足够长或者Survivor2区域已经满时,那么他们会被移动到老年代上,而当老年代的区域也满了时,就会触发一次完整的GC过程
 
 
spark在GC优化主要目标是:
    只有长生命周期的RDD会被存储在老年代上,二年轻代上有足够的空间来存储短生命周期的对象,从而尽可能避免任务执行时创建的临时对象触发完整GC流程。
  1. 通过GC统计信息观察是否存在过于频繁的GC操作,如果在任务完成前,完整的GC操作被调用了多次,那么说明可执行任务并没有获得足够内存空间
  2. 如果触发了过多小型GC,而完整GC操作并没有调用多次,那么给Eden区域多分配一些内存空间,我们可以根据每个任务所需内存大小来预估Eden的大小,如果Eden设置大小为E,可以利用配置项-Xmm=4/3*E来对年轻代的区域大小进行设置(其中4/3的比例是考虑到Survivor区域所需空间)
  3. 如果发现老年代接近存满,那么久需要改变spark.memory.fraction来减少存储类内存的占用,因为与其降低任务的执行速度,不如减少对象的缓存大小。另一个可选方案是减少年轻代的大小,既通过-Xmm来进行配置,也可以通过JVM的NewRatio参数进行调整,大多数JVM的该参数的默认值是2,意思是老年代占整个堆内存的2/3,这个比例需要大于Spark.memory,Fraction
  4. 通过加入-XX:+UserG1GC 来使用G1GC垃圾回收器,这可以一定程度提高GC的性能,另外注意对于Executor堆内存非常大的情况,一定通过-XX:G1HeapRegionSize来增加G1区域的大小。
 
    对于GC优化,主要还是从降低全局GC的频率出发。Executor中对于GC优化的配置可以通过spark.executor.extraJavaOptions来配置
    注意:当GC开销成为瓶颈时,首先要尝试的便是序列化缓存
 
 
四、Spark Streaming 内存优化
sparkstreaming 程序对实时性要求会较高,所以我们需要尽可能降低JVM垃圾回收所导致的延迟。
  1. DStream的持久化级别:输入数据默认是持久化字节流的,所以优先使用Kryo序列化方式,可以降低序列化后的尺寸和内存开销,如果需要进一步减少内存开销,可以通过配置spark.rdd.compress进行更进一步的压缩。
  2. 及时清理老数据:默认情况下所有的输入数据和由DStream的Transformation操作产生的持久RDD会被自动清理
  3. GMS垃圾回收器:使用这种GC可以使得每个batch的处理时间更加一致(不会因为某个batch处理时发生了GC,而导致处理时间剧增),我们需要再driver节点(spark-submit中使用  driver-java-options)和Executor节点(在spark配置中使用spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC)都进行设置CMS GC方式
  4. 减少其他GC开销的方式:可以通过OFF_HEAP存储级别的RDD持久化方式,以及可以在Executor上使用更小的堆内存,从而降低每个JVM堆垃圾回收的压力。
 
五、合理的kafka拉取量(maxRatePerPartition参数设置)
    在kafka数据频率过高的情况下,调整spark.streaming.kafka.maxRatePerPartition参数的值来进行上限的调整,默认是无上限的,也就是kafka有多少数据,是parkstreaming就会一次性全拉出,但是由于批处理的时间是固定的,所以如果持续数据频率过高,会造成数据堆积、阻塞的现象。
    需要注意的是:该参数的配置指的是kafka每个partition拉取的上限,数据总量还需要乘以所有的partition个数。在调整该参数的同时,配合调整batchDuration(批处理时间)
 
六、缓存反复使用的DStream(RDD)
    spark中的RDD和SparkStreaming中的DStream如果反复被使用,最好使用cache()函数将数据流缓存起来
 
七、其他优化
  • 设置合理的GC方式:使用—conf ”spark.executor.extraJavaOptions=-XX:UseCOncMarkSweepGC“ 来配置垃圾回收机制
  • 设置合理的parallelism:在SparkStreaming+Kafka中,采用Direct直连方式,spark的partition与kafka的partition是一一对应的,一般默认设置为kafka中partition的数量
  • 设置合理的CPU资源数:CPU的core数量,每个Executor可以占用一个或多个core观察CPU使用率(Linux top命令);可以增加Executor个数
  • 使用高性能的算子:建议使用reduceByKey/aggregateByKey代替使用groupbykey,而在数据库连接、资源加载创建等需求时,我们使用带partition的操作。可以使用mapPartition、foreachPartition代替map、foreach操作。
  • Kryo优化序列化性能
    
 
原文地址:https://www.cnblogs.com/yzqyxq/p/13727411.html