Spark 创建RDD、DataFrame各种情况的默认分区数

1、前置知识:

(1)sc.defaultMinPartitions

  sc.defaultMinPartitions=min(sc.defaultParallelism,2)

  也就是sc.defaultMinPartitions只有两个值1和2,当sc.defaultParallelism>1时值为2,当sc.defaultParallelism=1时,值为1

  上面的公式是在源码里定义的(均在类SparkContext里):

def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
def defaultParallelism: Int = {
    assertNotStopped()
    taskScheduler.defaultParallelism
  }

(2)sc.defaultParallelism

a、首先可通过spark.default.parallelism设置sc.defaultParallelism的值

  •  在文件中配置

在文件spark-defaults.conf添加一行

spark.default.parallelism=20

验证:

在spark-shell里输入sc.defaultParallelism,输出结果为20。

  • 在代码中配置
val spark = SparkSession.builder()
  .appName("TestPartitionNums")
  .master("local")
  .config("spark.default.parallelism", 20)
  .getOrCreate()

val sc = spark.sparkContext
println(sc.defaultParallelism)

spark.stop

b、sc.defaultParallelism 没有配置时候,会有默认值

  • spark-shell:spark-shell里的值等于cpu的核数,比如我的windows的cpu的核数为再比如测试机的核数为8。
  • 指定master为local:在spark-shell里通过–master local和在代码里通过.master(“local”)的结果是一样的,这里以spark-shell为例当master为local时,值为1,当master为local[n]时,值为n
  • master为local[*]和不指定master一样,都为cpu核数
  • master为yarn模式时为分配的所有的Executor的cpu核数的总和或者2,两者取最大值,将2.1.2的代码的master注释掉并打包,然后用下面的脚本执行测试。

test.sh

spark-submit --num-executors $1 --executor-cores 1 --executor-memory 640M --master yarn   --class  com.dkl.leanring.spark.TestPartitionNums   spark-scala_2.11-1.0.jar

2、HDFS文件创建RDD时的默认分区数:

  这里及后面讨论的是rdd和dataframe的分区,也就是读取hdfs文件并不会改变前面讲的sc.defaultParallelism和sc.defaultMinPartitions的值。

(1)sc.textFile():  rdd的分区数 = max(hdfs文件的block数目, sc.defaultMinPartitions)

  • 测试大文件(block的数量大于2):上传了一个1.52G的txt到hdfs上用来测试,其中每个block的大小为默认的128M,那么该文件有13个分区* 1.52*1024/128=12.16。

用下面代码可以测试读取hdfs文件的分区数

val rdd = sc.textFile("hdfs://ambari.master.com/data/egaosu/txt/20180416.txt")
rdd.rdd.getNumPartitions

  这种方式无论是sc.defaultParallelism大于block数还是sc.defaultParallelism小于block数,rdd的默认分区数都为block数。

* 注:之所以说是默认分区,因为textFile可以指定分区数,sc.textFile(path, minPartitions),通过第二个参数可以指定分区数

sc.defaultMinPartitions大于block数

sc.defaultMinPartitions小于block数

当用参数指定分区数时,有两种情况,当参数大于block数时,则rdd的分区数为指定的参数值,否则分区数为block数。

  • 测试小文件(block数量等于1):默认分区数为sc.defaultMinPartitions,下面是对应的hdfs文件:

将上面的hdfs路径改为:hdfs://ambari.master.com/tmp/dkl/data.txt,结果:

当用参数指定分区数时,rdd的分区数大于等于参数值,本次测试为等于参数值或参数值+1。

(2)读取hive表创建的DataFrame的分区数:

  hdfs文件的block的数目为10(2*5)。

//切换数据库
spark.sql("use route_analysis")
//读取该数据库下的egaosu表为df
val df = spark.table("egaosu")
//打印df对应的rdd的分区数
df.rdd.getNumPartitions

  测试发现,当sc.defaultParallelism大于block时,df的分区是等于sc.defaultParallelism,当小于block时,df的分区数介于sc.defaultParallelism和block之间,至于详细的分配策略,我还没有查到~

  用spark.sql(“select * from egaosu”)这种方式得到df和上面的分区数是一样的

3、从代码里的内部数据集创建RDD时的默认分区数:

(1)sc.parallelize()创建RDD:默认分区数等于sc.defaultParallelism,指定参数时分区数值等于参数值。

(2)spark.createDataFrame(data)创建DataFrame:当data的长度小于sc.defaultParallelism,分区数等于data长度,否则分区数等于sc.defaultParallelism。

参考博客:https://blog.csdn.net/dkl12/article/details/81663018

原文地址:https://www.cnblogs.com/guoyu1/p/12300404.html