Spark性能优化

参考博客之一:https://tech.meituan.com/2016/04/29/spark-tuning-basic.html
参考博客之一:https://tech.meituan.com/2016/05/12/spark-tuning-pro.html
上面是非常好的博客推荐查看

缓慢任务或落后者

由于工作的负载没有均衡在各个节点上,或者是由于某台计算节点比其他节点速度慢

缓慢的聚合操作

具体表现:

  • 在执行groupby操作时产生缓慢任务
  • 聚合

缓慢的连接操作

具体表现:

  • 连接操作阶段需要很长的时间,可能是一项任务也可能是多项任务。
  • 连接操作之前的阶段和连接之后的阶段都似乎很正常。

缓慢的读写操作

配置选项

在又是配置选项的不合理也会导致性能下降。以下选项可以用于调查执行的性能。

名称 默认值 意义
spark.sql.files.maxPartitionBytes 128M 读取文件时打包到单个分区中的最大容量,当且仅当基于文件源时,才有效。(文件源比如Parquet, JSON等)
spark.sql.files.openCostInBytes 4M 可以同时扫描以字节衡量的打开文件的估计成本
spark.sql.files.minPartitionNum 默认并行 建议的分割文件分区的最小数量,在基于文件源才有效
spark.sql.broadcastTimeout 300s 广播连接中广播等待时间的超时标准
spark.sql.autoBroadcastJoinThreshold 10M 配置表的最大大小(以字节为单位),该表在执行联接时将广播到所有工作程序节点。通过将此值设置为-1,可以禁用广播。请注意,当前仅ANALYZE TABLE COMPUTE STATISTICS noscan运行命令的Hive Metastore表支持统计信息 。
spark.sql.shuffle.partitions 200 在分组和聚合的时候使用shuffle操作时默认使用的分区数量
spark.sql.sources.parallelPartitionDiscovery.threshold 32 配置闸值以启用作业输入路径的并行列表。如果输入路径大于此闸值,Spark将使用Spark分布式作业列出文件。否者,它将退到顺序列出。
spark.sql.sources.parallelPartitionDiscovery.parallelism 10000 配置作业输入路径的最大列表并行度,如果输入路径的数量大于此闸值,那么它会被调低使用闸值。

代码优化

优化的方向:

  1. 代码级设计优化
  2. 静息数据
  3. 连接操作
  4. 聚合操作
  5. 处理中的数据
  6. 应用程序属性
  7. 执行器节点的JVM
  8. 工作节点
  9. 集群部署属性
    提高性能最好的方法就是实现良好的监测功能和作业历史跟踪功能。

对RDD的优化

  • 原则1:避免创建重复的RDD
    一个RDD执行某个算子操作后,又接入下一个算子操作,这样形成的“RDD血缘关系链”。当开发冗余时,会出现重复的“RDD血缘关系
    val rdd1=sc.textFile(path).map(_=>_*2)
    val radd2=sc.textFile(path).map(_=>_*2)
    
    上面的rdd1,rdd2就是相同的RDD血缘重复了。
  • 尽可能的复用同一个RDD
    这个原则本质上就是为了减少RDD的个数.
  • 对多次使用的RDD进行持久化
    如果,在程序中对某个RDD进行多次操作时,我们将该RDD进行持久化。RDD持久化会将数据保存在内存或者磁盘中。
    1. 使用cache
      使用非序列化的方式将RDD中的数据全部尝试持久到内存中。
    val rdd=sc.textFile(path).cache()
    rdd.map(...)
    rdd.reduce(...)
    
    1. 使用persist
      使用指定的方式进行持久化,比如说StorageLevel.MEMORY_AND_DISK_SER表示,内存充足时优先持久化到内存中,内存不充足时持久化到磁盘文件中。其中_SER后缀表示,使用序列化的方式来保存RDD数据。序列化的好处是可以减少持久化数据的占用量。
    val rdd=sc.textFile(path).persist(StorageLevel.MEMORY_AND_DISK_SER)
    rdd.map(...)
    rdd.reduce(...)
    
    ![IMAGE](quiver-image-url/5129BB17E53381B448352C0FAD09EDA1.jpg =1299x545)
    注意:通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧下降,有时还不如重新计算。后缀为2的级别,必须将所有数据都要复制一份,并发送到其他节点上,数据复制和网络传输会导致大量的性能开销。除非要求作业的高可用性,否者不建议使用.
    补充:对Spark SQL的缓存数据
    如果,在Spark中进程对一个表进行操作,可以使用spark.catalog.cacheTable(表名)或者使用dataFrame.cache()。如果。后面不需要该表了则使用spark.catalog.uncacheTable(表名)将表从内存中删除即可.

对shuffle

spark中shuffle类算子:

  1. 去重算子
    def distinct()
    def distinct(numPartitions: Int)
    
  2. 聚合算子
    def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
    def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
    def groupBy[K](f: T => K, p: Partitioner):RDD[(K, Iterable[V])]
    def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])]
    def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner): RDD[(K, U)]
    def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int): RDD[(K, U)]
    def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
    def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
    def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
    
  3. 排序算子
    def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
    def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
    
  4. 重分区算子
    def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null)
    
  5. 集合或者表算子
    def intersection(other: RDD[T]): RDD[T]
    def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
    def intersection(other: RDD[T], numPartitions: Int): RDD[T]
    def subtract(other: RDD[T], numPartitions: Int): RDD[T]
    def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
    def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
    def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
    def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
    def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
    def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
    def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
    def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
    

shuffle操作非常消耗时间,所以应该减少shuffle操作。

尽量避免使用shuffle算子的使用

在一些情况下可以使用非shuffle操作去代替shuffle操作。比如,rdd1和rdd2进行join操作时。并且rdd2的数据量比较少时(几百M--一两个G)就有下面两种实现方法:

方法1:使用shuffle操作:join
val rdd3=rdd1.join(rdd2)
方法2:使用非shuffle操作:Broadcast, map
//转化成广播变量
val rdd2data=rdd2.collect()
val rdd2DataBroadcast=sc.broadcast(rdddata)
//使用map遍历rdd1的每条数据,实现join的相同逻辑
val rdd3=rdd1.map(rdd2DataBroadcast...)

方法2的优势在于,将较小的数据变成广播变量这样就不会避免了shuffle操作。

使用shuffle操作,减少分区中的数据

在进行某些聚合操作的时候,其实可以先对分区内的数据进行预聚合,然后再调用shuffle操作,对整体进行聚合操作。下面两张图是未使用预聚合和使用了预聚合:
![IMAGE](quiver-image-url/D68DD84E3A1A2655894D151F82954402.jpg =932x449)
![IMAGE](quiver-image-url/D263CB94F27828F219F06B171EE94A38.jpg =932x449)
本质上,减少了IO的开销和网络传输的开销。
下面是常用的map-side预聚合算子:

reduceByKey函数
combineByKey函数
aggregateByKey函数
效率:reduceByKey > combineByKey > aggregateByKey

使用高性能的算子

  • 使用mapPartitions代替map
    mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重!
    例子:将原来的每个数字变成原来的2倍
    val rdd = sc.makeRDD(1 to 12, 2)
    //使用map函数
    def mapto2(x:Int)={
      (x, x*2)
    }
    val mapRDD = rdd.map(mapto2)
    println(mapRDD.collect().mkString(","))
    //使用mapPartions
    def mapPartitionto2(iter:Iterator[Int]):Iterator[(Int, Int)]={
      var ans = List[(Int, Int)]()
      while(iter.hasNext){
        val e = iter.next()
        ans=ans:+(e, e*2)
      }
      ans.iterator
    }
    val mapPartitionRDD = rdd.mapPartitions(mapPartitionto2)
    println(mapPartitionRDD.collect().mkString(","))
  • 使用foreachPartition代替foreach
    foreachPartition和mapPartitions一样,每次处理都是针对一个partition。在foreach中一条数据就调用一次其中的函数。这样性能非常低。
    //使用foreach
    def foreachMyshow(x:Int)={
      print(x+" ,")
    }
    rdd.foreach(foreachMyshow)
    println("")
    //使用foreachPartition
    def foreachPartitionMyShow(iter:Iterator[Int]):Unit={
      while(iter.hasNext){
        print(iter.next()+"_,")
      }
    }
    rdd.foreachPartition(foreachPartitionMyShow)
  • 使用filter之后进行coalesce操作
    在使用filter之后,某些task的数据可能会非常少,所以没有必要让他成为一个task,则直接使用coalesce
  • 使用repartitionAndSortWithinPartitions代替repartition与sort类操作
    repartitionAndSortWithinPartitions可以一边排序一边分区shuffle。这样比先shuffle再排序要好的多。

广播大变量

在算子函数中使用到外部变量时,在一般情况下,Spark会将该变量复制多个副本,通过网络传输到task中,但是这样会导致网络传输开销大,而且各个节点中占用过多的内存.
所以非常建议在使用外部变量时,建议使用Spark的广播功能。

    val rdd = sc.makeRDD(1 to 8, 2)
    val num=3     //外部变量
    //在map中使用外部变量num
    println(rdd.map(x => x * num).collect().mkString(","))
    //将外部变量转化成广播变量
    val numBroadcast = sc.broadcast(num)
    println(rdd.map(x => x * numBroadcast.value).collect().mkString(","))

使用Kryo优化序列化性能

调优概述

这个部分还是给这篇博客吧
https://tech.meituan.com/2016/04/29/spark-tuning-basic.html

缓慢任务和落后者的解决方法

在大数据中”数据倾斜“是非常棘手的问题,因为某些task会拖累整个性能。

下面是这些问题的具体表现:

  • spark阶段中只剩下少数任务未完成,这些任务运行时间长。
  • 在spark UI中可以观察到这些缓慢的任务始终在相同的数据集上发生
  • 各个阶段都有这些缓慢的任务
  • 扩大Spark集群规模没有太大的效果,有些任务依然比其他任务耗时更长。
  • 在Spark指标中,某些执行器进程读取和写入的数据比其他执行器进程大得多。
    ![IMAGE](quiver-image-url/9E3E8E3DF92833C970270CBDB95E8ADC.jpg =728x489)
    发生数据倾斜的原因:
  • 数据本身的特性,比如高斯分布中,中间数据特别多。
  • 自定义的分区策略不合理

如何定位数据倾斜

  • 使用Spark UI观察程序是否在几个task执行时间过长,而其他很短。还有一种情况,某task执行的时间可能不长,但是,它爆内存了。这也说明了数据倾斜。
  • 数据倾斜只会发生在shuffle过程中。这里给大家罗列一些常用的并且可能会触发shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的。
    注意上面的内存爆出,其实不能直接判定是数据倾斜,也有可能是其他原因导致内存溢出。但是,如果根据Spark UI中对各个task的执行时间和分配数据量是可以判断出是否属于数据倾斜导致的内存爆出。

查看导致数据倾斜的原因:

这个过程本质就是统计那个Shuffle操作导致了数据倾斜。就要看代码中的Shuffle是根据什么标准进行分区的。比如某shuffle操作通过key值进行分区。那么你就要去统计key的分布。比如下面的wordcont例子中,去统计key值得大概分布:

    val lines = sc.textFile(path)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map((_, 1))
    //对原始数据进行抽样
    val samplePairs = pairs.sample(false, 0.2)    //我们对原始数据进行无放回的抽取20%
    val samplePairCounts = samplePairs.countByKey()
    samplePairCounts.collect().foreach(println)

方案1:将数据倾斜转移到Hive中

具体:https://tech.meituan.com/2016/05/12/spark-tuning-pro.html

方案2:过滤少量导致数据倾斜的数据

如果99%都是某类数据,但是在实际业务中,我们不需要分析这类数据,所以直接筛选掉即可。
比如,研究犯罪集团,得到的数据99%都是好人。那么就应该删除好人,而专门去研究坏人。

方案3:提高shuffle操作的并行度

如果,我们已经不能避免数据倾斜了。比如,我们的数据是一个高斯分布,这种数据就是一个有数据倾斜的数据,但是我们需要掌握的数据就是与其有关,那么久不得不让这个数据倾斜必须存在。
在上文中使用分组和聚合的算子时,默认的分区是200,所以直接添加分区,增加shuffle的并行度有可能优化性能。

//shuffle参数更改
    spark.conf.set("spark.sql.shuffle.partitions", 700)

注意这样只能说会优化性能,比如key的数量远远高于分区数量,那么确实会带来优化。但是,假如某数据的数量绝大多数集中在某一个key上,那么无论你增大多大的分区,该数据倾斜的key始终在一个task中。

方案4:两阶段聚合(局部聚合+全局聚合)

如上问使用map-side预聚合算子。先在分区内进行聚合,然后再全局聚合。假如,某key值是导致数据倾斜的原因,那么使用该方法会完美解决这个数据倾斜。因为,该key值在进行分区内聚合后数据量会非常少。
缺点:只适合在聚合操作的shffle操作中,对join操作的shffle就不起作用。

方案5:对于join的shuffle操作对数据倾斜的优化

数据倾斜的处理是因为shuffle操作导致的,如果join操作中直接避免了shuffle操作,则就解决了这个问题。所以,将join算子转化成Broadcast变量和map操作。这种方案只适合一个大的rdd和一个小的rdd。
下面都是挖坑!

方案6:采样倾斜key并拆分join操作

方案7:使用随机前缀和扩容RDD进行join

方案8:多种方案组合使用

关于shuffle的调优概述

原文地址:https://www.cnblogs.com/ALINGMAOMAO/p/14487372.html