RDD关键性能考量之 并行度

《Spark快速大数据分析》 

8.4 关键性能考量

并行度

RDD的逻辑表示其实是一个对象的集合。在物理执行期间,RDD会被分为一系列的分区,

每个分区都是整个数据的子集。当Spark调度并运行任务时,Spark会为每个分区中的数据

创建出一个任务,该任务在默认情况下会需要集群中的一个计算节点来执行。

Spark也会针对RDD直接自动推断出合适的并行度,这对于大多数用例来说已经足够了。

输入RDD一般会根据其底层的存储系统选择并行度。例如,从HDFS上读数据的输入RDD

会为数据在HDFS上的每个文件区块创建一个分区。从数据混洗后的RDD派生下来的RDD

则会采用与其父RDD相同的并行度。

并行度会从两个方面影响程序的性能。

首先,当并行度过低时,Spark集群会出现资源闲置的情况

比如,假设你的应用有1000个可使用的计算节点,但所运行的步骤只有30个任务,你就应该提高并行度

来充分利用更多的计算节点。

并行度过高时,每个分区产生的间接开销累计起来会更大。评判并行度是否过高的标准包括

任务是否是几乎在瞬间(毫秒级)完成的,或者是否观察到任务没有读写任务数据

Spark提供了两种方法来对操作的并行度进行调优。

第一种方法是在数据混洗操作时,使用参数的方式为混洗后的RDD指定并行度

第二种方法是对于任何已有的RDD,可以进行重新分区来获取更多或者更少的分区数。

重新分区操作通过repartition()实现,该操作会把RDD随机打乱并分成设定的分区数目。

如果你确定要减少分区数,可以使用coalesce()操作。由于没有打乱数据,该操作比repartition()更为高效。

如果你认为当前的并行度过高或者过低,可以利用这些方法对分区重新调整。

举个栗子,假设我们从S3上读取了大量数据,然后马上进行fileter()操作筛选调数据集合中的

大部分数据。默认情况下,filter()返回的RDD的分区数和其父RDD一样,这样会产生很多的空分区

或者只有少量数据的分区。这时,可以通过 合并得到分区数更少的RDD来提高应用的性能。

 
def testCoalesce = {

    val conf = new SparkConf().setMaster("local").setAppName("testCoalesce")

    val sc = new SparkContext(conf)

    val input = sc.parallelize(1 to 9999, 1000)

    logger.warn(s"RDD[input] partitionCount[${input.partitions.length}]")

 

    val test = input.filter { x => x % 2015 == 0  }

    logger.warn(s"RDD[test]  partitionCount[${test.partitions.length}]")

 

    val test2 = test.coalesce(2, true).cache()

    logger.warn(s"RDD[test2] partitionCount[${test2.partitions.length}]")

 

    val result = test2.collect()

 

    logger.warn(s"result [${result.mkString(",")}]")

 

    Thread.sleep(Int.MaxValue)

  }

  

执行结果

00:47:21 831 [main] WARN test.scala.spark.TestSpark2$.testCoalesce(TestSpark2.scala:19): RDD[input] partitionCount[1000]
00:47:22 009 [main] WARN test.scala.spark.TestSpark2$.testCoalesce(TestSpark2.scala:22): RDD[test]  partitionCount[1000]
00:47:22 122 [main] WARN test.scala.spark.TestSpark2$.testCoalesce(TestSpark2.scala:25): RDD[test2] partitionCount[2]
 
[Stage 0:===>                                                   (58 + 1) / 1000]
[Stage 0:=====>                                                 (95 + 1) / 1000]
[Stage 0:=======>                                              (131 + 1) / 1000]
[Stage 0:============>                                        (238 + 24) / 1000]
[Stage 0:============>                                        (243 + 19) / 1000]
[Stage 0:================>                                     (314 + 1) / 1000]
[Stage 0:=================>                                    (330 + 1) / 1000]
[Stage 0:=====================>                                (390 + 1) / 1000]
[Stage 0:=======================>                              (443 + 1) / 1000]
[Stage 0:===========================>                          (500 + 2) / 1000]
[Stage 0:==============================>                       (557 + 1) / 1000]
[Stage 0:=================================>                    (618 + 1) / 1000]
[Stage 0:===================================>                  (662 + 1) / 1000]
[Stage 0:=======================================>              (724 + 1) / 1000]
[Stage 0:==========================================>           (791 + 1) / 1000]
[Stage 0:==============================================>       (855 + 1) / 1000]
[Stage 0:================================================>     (895 + 1) / 1000]
[Stage 0:===================================================>  (953 + 1) / 1000]
 
00:47:30 466 [main] WARN test.scala.spark.TestSpark2$.testCoalesce(TestSpark2.scala:29): result [2015,4030,6045,8060]
打开http://localhost:4040/jobs/,可以看到任务执行统计。
 
原文地址:https://www.cnblogs.com/ihongyan/p/4976414.html