探究Spark算子-RDD

血统概念:

  分类:分为宽依赖和窄依赖用来解决数据容错性以及划分任务的时候起到重要作用

    1、宽依赖:同一个父RDD的分区被多个子RDD的分区依赖,引起shuffle 。比作一个家庭生了2个及以上的孩子,宠爱切分

    2、窄依赖:每一个父RDD的分区最多被子RDD的一个分区使用。比作独生子女的家庭,独享恩宠

Spark切分Stage,stage根据什么决定task的个数?

此前要说一下Application和Job

-Application:初始化一个SparkContext 即生成一个Application

-Job:一个行动(Action)算子就会生成一个Job

  ~Stage:根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖就划分一个Stage,Stage = 宽依赖个数 + 1

  ~Task:Stage根据分区数划分成一个一个的Task。一个Stage中,最后一个RDD的分区个数就是Task数

(行动算子&转换算子)简述:

转换算子:

  1. map(fun):返回一个新的RDD,经过fun函数转换后返回新的结构,数量与原来的不变。

  2. mapPartitions(fun):类似map算子,将待处理的数据以分区为单位发送到节点进行批处理,数据可能会增多或减少。

    可以以分区为单位进行数据转换操作;但是会将整个分区的数据加载到内存进行引用

    如果处理完的数据是不会被释放,存在对象的引用。不适合内存小、数据量大的场合,造成内存溢出

  3. reduceByKey(fun, [numTask]):在一个键值对RDD上调用,返回一个(K,V)的RDD,

    将相同的key的值聚合到一起,reduce任务个数通过参数设定([numTask])。

  4. aggregateByKey(one)(two)(three):

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
    aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
}

    第一个参数one:初始值,每次分组完成之后的每个组的初始值

    第二个参数two:对每个分区内的数据按照key分别进行two函数的处理,分区内

    第三个参数three:对经过two处理的数据按照key分别进行three函数的处理,分区间

    最后将key与计算结果作为一个新的KV对输出

  5. CombineByKey:

def combineByKey[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}

    -对相同的Key 把V合并成一个集合,这里说三个参数

    -第一个参数:createCombiner 将相同的key的第一个数据进行结构化的转换

    -第二个参数:mergeValue 分区内的计算规则

    -第三个参数:mergeCombiners 分区间的计算规则

  6. reduceByKey:相同的key的数据进行value数据的聚合操作

  7. groupByKey:将数据源中相同的key的数据分在一个组中,形成一个对偶元组

    第一个元素就是key,第二个元素就是相同key的value集合

    ***reduceByKey & groupByKey 的区别?

groupByKey 会导致数据打乱重组存在shuffle操作
    reduceByKey shuffle之前在分区内进行一个预处理 ==> 预聚合
        可以有效的减少shuffle时落盘的数据量 提升shuffle的性能
    从shuffle角度:
        reduceByKey 和 groupByKey 都存在shuffle阶段
        但是reduceByKey 可以在shuffle之前对分区内相同的key的数据进行预聚合功能(combine)
        而 groupByKey 只是进行按照key分组,直接shuffle,不存在数据量减少的问题 reduceByKey 性能比较好
    从功能的角度来说:
        reduceByKey 其实包含分组和聚合的功能
        groupByKey 只能分组 不能聚合 所以在分组聚合的场合下 推荐使用 reduceByKey
        如果仅仅需要分组 不要聚合 则使用 groupByKey 

  8. filter:返回一个符合布尔表达式条件的元素的新RDD

    当数据进行筛选之后,分区不变,但是分区内的数据可能不均衡,可能会产生数据倾斜

  9. flatMap:算子应用于该RDD的所有元素,然后将结果展平

val rdd = sc.makeRDD(List(
      List(1, 2), List(3, 4)
    ))
 val flatRdd = rdd.flatMap(
      list => {
        list
      }
    )
    flatRdd.collect().foreach(println)

#  结果: 1 2 3 4

  10. sample:采样子集

def sample(
     withReplacement: Boolean,  可以多次对元素进行采样
     fraction: Double,  预期样本大小是此RDD大小的一部分
     seed: Long = Utils.random.nextLong): RDD[T] = {  随机数生成器
     require(fraction >= 0,
     s"Fraction must be nonnegative, but got ${fraction}")

    第一个参数表示抽取数据之后是否将数据返回 true(返回) flase(丢弃)

     第二个参数表示数据源中每条数据被抽取的概率

     第三个参数表示随机数的种子,如果不传入第三个参数,默认使用当前系统时间

  11. coalesce: 缩减分区

    默认不会讲分区的数据打乱重新组合,这种情况下缩减分区可能会导致数据不均衡 即数据倾斜

# Repartition和Coalesce关系与区别
1)关系:
    两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)
2)区别:
    repartition一定会发生shuffle,coalesce根据传入的参数来判断是否发生shuffle
    一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce

行动算子:触发作业Job执行的方法,底层代码中创建 ActiveJob,并提交执行

  1. collect:返回一个包含此RDD中所有元素的数组

    将不同分区的数据按照分区顺序采集到Driver端内存中,形成数组

    因为会返回整个数据集,不推荐使用

  2. reduce:两两聚合操作

  3.count:数据源中数据的个数

  4.first:获取数据源中数据的首位

  5.take(N):获取数据源中的前N个数据

  6.takeOrdered(N):数据排序之后,获取N个数据,默认是升序

  7.saveAsTextFile(path):使用元素的字符串表示将此RDD保存为文本文件

注意:个人总结,转换算子和行动算子没有列完整,如有问题欢迎指点一二!

会引起shuffle的算子:(功能如上所述)

  1. reduceByKey

  2. groupByKey

  3. aggregateByKey

  4. coalesce

  5. repartition

  6. join

 

原文地址:https://www.cnblogs.com/joey-413/p/14090544.html