Spark源码解读1-算子操作

第一部分:Transformation算子操作,延迟操作,返回新的RDD

Map算子

1 /**
2  *对RDD中每个元素进行操作,返回一个新的RDD
3  * Return a new RDD by applying a function to all elements of this RDD.
4  */
5 def map[U: ClassTag](f: T => U): RDD[U] = withScope {
6   val cleanF = sc.clean(f)
7   new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
8 }

FlatMap算子

1  /**
2    *先map再flat
3    *  Return a new RDD by first applying a function to all elements of this
4    *  RDD, and then flattening the results.
5    */
6   def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
7     val cleanF = sc.clean(f)
8     new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
9   }

Filter算子

 1 /**
 2    *保留为true
 3    * Return a new RDD containing only the elements that satisfy a predicate.
 4    */
 5   def filter(f: T => Boolean): RDD[T] = withScope {
 6     val cleanF = sc.clean(f)
 7     new MapPartitionsRDD[T, T](
 8       this,
 9       (context, pid, iter) => iter.filter(cleanF),
10       preservesPartitioning = true)
11   }

Distinct算子(有参数)

1  /**
2    *有参数,去重,先mapToPair,然后reduceBykey实现去重,再取Pair的第一个即原来的值
3    * Return a new RDD containing the distinct elements in this RDD.
4    */
5   def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
6     map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
7   }

Distinct算子(无参数)

1 /**
2    *其实是调用有参数的Distinct方法
3    * Return a new RDD containing the distinct elements in this RDD.
4    */
5   def distinct(): RDD[T] = withScope {
6     distinct(partitions.length)
7   }

Coalesce算子

 1 1  /**
 2  2    * Return a new RDD that is reduced into `numPartitions` partitions.
 3  3    *用于减少Partition数量,默认shuffle=false
 4  4    * This results in a narrow dependency, e.g. if you go from 1000 partitions
 5  5    * to 100 partitions, there will not be a shuffle, instead each of the 100
 6  6    * new partitions will claim 10 of the current partitions.
 7  7    *因为减少Partition是窄依赖,没有shuffle
 8  8    * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,
 9  9    *如果Partition数量减少过多,会造成计算在很少的节点上进行造成资源浪费等,这时将shuffle=true,可以并行在原有所有节点上进行shuffle的map端
10 10    * this may result in your computation taking place on fewer nodes than
11 11    * you like (e.g. one node in the case of numPartitions = 1). To avoid this,
12 12    * you can pass shuffle = true. This will add a shuffle step, but means the
13 13    * current upstream partitions will be executed in parallel (per whatever
14 14    * the current partitioning is).
15 15    *增加partition数量,需要数据重新分布,重新对partition取hash
16 16    * Note: With shuffle = true, you can actually coalesce to a larger number
17 17    * of partitions. This is useful if you have a small number of partitions,
18 18    * say 100, potentially with a few partitions being abnormally large. Calling
19 19    * coalesce(1000, shuffle = true) will result in 1000 partitions with the
20 20    * data distributed using a hash partitioner.
21 21    */
22 22   def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
23 23       : RDD[T] = withScope {
24 24     if (shuffle) {
25 25       /** Distributes elements evenly across output partitions, starting from a random partition. */
26 26       val distributePartition = (index: Int, items: Iterator[T]) => {
27 27         var position = (new Random(index)).nextInt(numPartitions)
28 28         items.map { t =>
29 29           // Note that the hash code of the key will just be the key itself. The HashPartitioner
30 30           // will mod it with the number of total partitions.
31 31           position = position + 1
32 32           (position, t)
33 33         }
34 34       } : Iterator[(Int, T)]
35 35 
36 36       // include a shuffle step so that our upstream tasks are still distributed
37 37       new CoalescedRDD(
38 38         new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
39 39         new HashPartitioner(numPartitions)),
40 40         numPartitions).values
41 41     } else {
42 42       new CoalescedRDD(this, numPartitions)
43 43     }
44 44   }

Repartition算子

 1 /**
 2    *内部调用Coalesce算子,参数NumPartition数量增大,shuffle=true  
 3    * Return a new RDD that has exactly numPartitions partitions.
 4    *
 5    * Can increase or decrease the level of parallelism in this RDD. Internally, this uses
 6    * a shuffle to redistribute data.
 7    *
 8    * If you are decreasing the number of partitions in this RDD, consider using `coalesce`,
 9    * which can avoid performing a shuffle.
10    */
11   def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
12     coalesce(numPartitions, shuffle = true)
13   }

Sample算子

 1 /**
 2    * Return a sampled subset of this RDD.
 3    * 取样算子
 4    * @param withReplacement can elements be sampled multiple times (replaced when sampled out)是否放回抽样
 5    * @param fraction expected size of the sample as a fraction of this RDD's size   抽样比例
 6    *  without replacement: probability that each element is chosen; fraction must be [0, 1]
 7    *  with replacement: expected number of times each element is chosen; fraction must be >= 0
 8    * @param seed seed for the random number generator   随机数生成器
 9    */
10   def sample(
11       withReplacement: Boolean,
12       fraction: Double,
13       seed: Long = Utils.random.nextLong): RDD[T] = withScope {
14     require(fraction >= 0.0, "Negative fraction value: " + fraction)
15     if (withReplacement) {
16       new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
17     } else {
18       new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
19     }
20   }

RandomSplit算子

 1  /**
 2    * Randomly splits this RDD with the provided weights.
 3    * 按比例随机切割RDD,如果随机数列和不为1,会转换成1,然后随机切割
 4    * @param weights weights for splits, will be normalized if they don't sum to 1
 5    * @param seed random seed
 6    *
 7    * @return split RDDs in an array
 8    */
 9   def randomSplit(
10       weights: Array[Double],
11       seed: Long = Utils.random.nextLong): Array[RDD[T]] = withScope {
12     val sum = weights.sum
13     val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
14     normalizedCumWeights.sliding(2).map { x =>
15       randomSampleWithRange(x(0), x(1), seed)
16     }.toArray
17   }

RandomSampleWithRange算子

 1  /**
 2    * Internal method exposed for Random Splits in DataFrames. Samples an RDD given a probability
 3    * range.  有范围的随机取样
 4    * @param lb lower bound to use for the Bernoulli sampler 下限
 5    * @param ub upper bound to use for the Bernoulli sampler 上限 
 6    * @param seed the seed for the Random number generator   随机数种子
 7    * @return A random sub-sample of the RDD without replacement.返回不放回随机取样的RDD
 8    */
 9   private[spark] def randomSampleWithRange(lb: Double, ub: Double, seed: Long): RDD[T] = {
10     this.mapPartitionsWithIndex( { (index, partition) =>
11       val sampler = new BernoulliCellSampler[T](lb, ub)
12       sampler.setSeed(seed + index)
13       sampler.sample(partition)
14     }, preservesPartitioning = true)
15   }


我要把所有的坑都趟平!
原文地址:https://www.cnblogs.com/loveling-0239/p/5882607.html