Spark性能调优

1、对于读入的数据,做好清洗、转换、分区工作

rdd1 = sc.textFile("hdfs://text.txt", 15).map(_.split("|"))

.filter{//尽量严格过滤}

.map(id, money)//提取必要字段,减少数据量

.coalesce(10, True)//分片 有可能有一些partition过滤完了后只有10条,另一个剩900条 造成数据倾斜,会发生很严重问题

可以通过从源头用命令行的形式查看,数据的大小,决定申请资源的大小,也可以从监控的stage里面的Input查看每步的rdd大小,比如300个G 那你分成300个Partition那一个partition就是1个G,然后你想想你下面有几个executor,如果1个executor运行2个partition的话,相当于一个executor要运行1G再运行1G,那你的内存有多少

 

 2、RDD高效使用

相同数据不要多次IO读取

提高对同一RDD使用次数,对于多次使用的RDD考虑是否持久化(考虑要不要存在内存和磁盘中,有一个 spill size,如果有东西了,说明内存溢出了 )

重新计算的时间和缓存到磁盘重新读取那个快,如果本身重新计算不是那种机器学习复杂的模型,不需要去考虑持久化的问题

presist()

3、PairRDD状态维护

对于多次需要join的RDD提前repartition

使用PairRDD特有API,不要破坏分区信息。如果是PairRDD,并且只需要计算value相关信息不改变key,就用mapValue代替map 这样才能保留分区的信息; reduceBykey替代reduce;flatMapValues替代flatMap

val rawClassRDD = sc.makeRDD(Array( "spark", "hadoop", "hive","yarn","hbase"), 4).zipWithIndex()

val classRDD = rawClassRDD.mapValue(1=>1._1, 1._2+1))

4、join

大RDD和小RDD做join可以考虑用广播变量

对于需要多次Join的RDD提前repartition 

val rdd1 = rdd.repartition(100)

rdd1.join(rdd2)

rdd1.join(rdd3)

5、使用预聚合功能

使用reduceByKey替代groupByKey

 rawClassRDD.reduceByKey(_+_)

6、竞争资源批量处理

使用mapPartition替代map

使用foreachPartition替代foreach  

7、数据倾斜

学会如何处理数据倾斜,有时候因为一个数据倾斜问题,一整天都浪费在调试一个spark代码中了,其次,学习如何尽量减少spark任务的空间占用,同时加速spark任务运行速度

存在一些热点数据,导致某些节点数据量特别大,有些节点处理的数据特别小。热点数据过大,内存不够会发生OOM现象,程序不断的恢复又不停的OOM,最后崩溃退出。

数据倾斜往往伴随shuffle过程,相关API:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等

查看倾斜的key

rdd.sample(false, 0.1).countByKey().foreach(println(_))

解决方法:

1.让数仓人员从源头解决倾斜问题,掉更多内存解决它

2.根据业务决定是否可以直接过滤(大部分机器学习在特征工程中需要去噪点数据、裁剪边(关系网络,根据边的权重交互的次数进行过滤))

3.增加partition, 提高并行度,这个方法比较简单,就调下参数

4.利用广播变量调优

5.拆解热点key(找出热点key,给key加上前缀或后缀,最后再合并)

8、参数调优

资源相关申请

spark.driver.cores

spark.driver.memory

spark.executor.cores  与你设置的partition相关,比如申请了1000个core,那你的partition个数就不要是1000,也不要是100了,你的partition个数肯定是你core个数的整数倍,你可以设3000个partition;设置成1000个就是1000个任务并行

spark.executor.memory 与你设置的partition相关,1000G 的rrd,你切成1000份的partition,一个partition就是1G,那你的memory最少是2G以上;这个参数也会影响你做persist

压缩相关

spark.shuffle.compress

spark.shuffle.spill.compress rdd在内存中计算不够的话,会溢出多少磁盘,等里面数据处理完了后再读入

spark.broadcast.compress 

序列化相关

spark.serializer

shuffle相关

spark.shuffle.manager = sort

spark.shuffle.consolidateFiles

9、SparkSql调优

spark.sql.codegen 当设置为true时,spark sql会把每条查询的语句在运行是编写为java二进制代码,当查询的数据量大时,可以这么设置,小的时候设置为false

spark.sql.inMemoryColumnStorage.compressed自动对内存压缩

压缩数据,可以将存储在内存中的数据压缩,也是数据量比较大(看Storage中通过persist功能存储在内存的数量和总的内存是多少)

 如果任务失败会有日志

原文地址:https://www.cnblogs.com/fionacai/p/12923564.html