TextFile分区问题

val rdd = sc.parallelize(List(1,2,3,4,5,6),第二参数)
这里的第二参数 获取方式有两种:
1.直接给定值,根据传入的值决定分区的数量
2.根据运行环境获取分区数量(core) -->例如 本地运行 设置为local 此时设置分区值默认分区就是1个

val rdd = sc.textFile(path: String, minPartitions: Int = defaultMinPartitions)
读取文件中内容算子中有两个参数 第一个参数是获取数据路径
这个理第二个参数,第二参数决定了分区的数量有两种情况
1.在不传递值的情况,使用是默认defaultMinPartitions --> 这个值时多少?
2.在传递分区数量是时候,这个分区值是多少

第一条主线 --> defaultMinParititons值时多少?
1.先从textFile这个算子入手,进入后台源码
def textFile(
path: String,
minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
}
2.在不传入分区数值的情况下,默认textFile中使用了一个值defaultMinPartitions,这个值就决定了分区数量,查看这个值
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
发现defaultMinPartitions,并不是一个值而是一个方法,在这个方法中实现是一个math比较最小值
这个比较中有一共值时固定是 2 这个值,和2比较时有一个全新的参数defaultParallelism,需要查看这个参数

3.继续拆安defaultParallelism这个值的时候发现他也是一个方法
def defaultParallelism: Int = {
assertNotStopped()
taskScheduler.defaultParallelism
}
方法最后一句是整个方法的返回一直,也就是说这个方法获取值,是最后一句产生,这个产生值还触发了一个TaskScheduler(任务调度),此时defaultParallelism
当查看这个方法的时候发现这个方法并没有实现体,这个方法是存在在特质中 def defaultParallelism(): Int
ctrl+alt+ 左右 回到之前调用或下一次调用(必须知道实现者是谁)
在触发抽方法的位置 --> ctrl+atl+鼠标左键-->就可以查看实现这个方法或触发这个方法的类
发现这个抽象方法实现类 --> TaskSchedulerImpl在这个类中有方法的实现

4.TaskSchedulerImpl在这个类中有方法的实现
override def defaultParallelism(): Int = backend.defaultParallelism()
发现原来抽象方法已经被重写了,并且有一个实现,此时只需要触发defaultParallelism就可以触发出这个值多少了
但是,点击查看后发现也是一个抽象方法defaultParallelism() --> 对这个实现在此查询实现者即可

5.查看defaultParallelism()
此时发现实现方式有两种
1.CoarseGrainedSchedulerBackend spark集群模式
2.LocalSchedulerBackend 本地模式
我们查看是集群模式,结果发现了
override def defaultParallelism(): Int = {
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))

}
这个方法中比较的是最大值,其中第一个采纳数是totalCoreCount即集群核心数 第二个参数固是2

结论: 在调用textFile算子的时候,初始默认分区数量是2,除非小于2,否则默认分区数量就是2个

第二条主线 --> 查看分区计算流程
问题:先阶段已经知道分区数量默认是2个分区,具体分区中计算方式时候什么样式?(分片逻辑)

1.还是在textFile这个算子实现中
已经知道分区数量之后,查看内部对分区数量的使用,需要查看方法的实现
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
分区参数传入到一个叫做 hadoopFile中,所以此时就需要查看hadoopFile是谁

2.查看hadoopFile
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()

// This is a hack to enforce loading hdfs-site.xml.
// See SPARK-11227 for details.
FileSystem.getLocal(hadoopConfiguration)

// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)


//核心出现在这个位置,这里创建了一个HadoopRDD对象
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}

3.查看HadoopRDD中存在哪些操作?
在这里类中
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
val inputFormat = getInputFormat(jobConf)
val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
val array = new Array[Partition](inputSplits.size)
for (i <- 0 until inputSplits.size) {
array(i) = new HadoopPartition(id, i, inputSplits(i))
}
array
}
getPartitions是切片方法的触发
val inputSplits = inputFormat.getSplits(jobConf, minPartitions) 这个方法是具体的切分
val array = new Array[Partition](inputSplits.size) 就是获取分片个数
需要查看getSplits
4.查看 getSplits方法
这个方法是接口中抽象方法,此时需要使用 ctrl+atl+鼠标左边 查看这个方法的实现
一般处理数据方式都是 FileInputFormat类中查看 getSplits方法
这个方法和MR中切片放啊其实逻辑是一样的,核心位置
long totalSize = 0; // compute total size
for (FileStatus file: files) { // check we have valid files
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}
// totalSize 获取文件的大小

Spark中和MR中切片最大的不同位置出现了,Spark会计算切片大小
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
启动numSplits就是之前minPartitions即 默认分区值

最终切片的位置依旧保留着MR中思想即 1.1冗余
long splitSize = computeSplitSize(goalSize, minSize, blockSize); //这里会计算真正切片的大小

long bytesRemaining = length; //文件大小
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) //切片逻辑
//切完一片之后 会减去切片大小
bytesRemaining -= splitSize;

总结:分区数量其实是可以影响最最终文件的个数,但是在最终输出界过之前,会执行分片处理,这个分片才是最终输出分区的个数,我们若需要影响最终输出值,此时可以在最终输出算子之前调用 repartition 来修改分区

 

原文地址:https://www.cnblogs.com/yumengfei/p/12030648.html