[Spark]-RDD详解之变量&操作

RDD的操作

  1.1 概述

      RDD整体包含两大类操作

      transformation 从现有中创建一个新的数据集

      action 在对数据集做一定程度的计算后将结果返回

    对于所有的transformation,都是Lazy的,也就是说它不会立即执行,只是单纯的记住怎么样从原来的数据集进行转换的逻辑而已,它仅在某一个计算需要的情况下,才会被真正执行.

    因为transformation 的Lazy性,RDD支持在每次计算时都进行重新计算,当然你可以将这个RDD保存下来 (persist  or cache方法)避免每次重计算

    可以通过设置不同的存储级别,将数据保存到硬盘,内存,或者选择同步多个副本到多个节点中.

    1.2 集群环境下的变量与操作

    集群环境,所有操作最终会交给executors去执行.而变量,会以数据副本的形式交给executors.很多时候,这与我们非集群环境下的开发思维有非常大的不同.

    1.2.1 集群下的闭包

        RDD是支持闭包操作的.但务必注意的是Spark不保证对闭包之外的对象引用进行的变化.

        原因是闭包的会被序列化发生给每一个executor,对于闭包的之外的对象引用会拷贝一个副本给executor.这时多个executor执行至少是跨JVM的

        这时对这个副本对象的变更没有任何意义,因为每个JVM(executor)的副本都是独立的.

    1.2.2 集群下的print

      集群环境下,print不会在driver端有任何输出.

      原因也是一样,print最终是在每个executor执行,其输出也是在每个executor的stdout上,在driver端,是不会有这些输出的.

      如果想在driver输出,一个比较简单的办法是调用collect()将结果发送到driver端在进行print,但这样可能会造成driver内存爆掉(所有executor的数据涌入).

        比较推荐的做法是rdd.take(100).foreach(println)

     1.2.3 共享变量

      因为集群下,变量只会以副本方式交给executor,而不会将变量的值变化传回driver,所以一个可读写共享的变量是非常有用的.

      Spark提供了两种共享变量 broadcast(广播变量) 和 accumulators(累加器)

      1.2.3.1 广播变量(broadcast)

        广播变量允许将一个只读变量的副本发送到每个机器上(executor机器),而不是对每一个任务发送一个副本.这样在同一机器上的多个任务,就可以反复使用这个变量了.      

        注意:

          广播变量只会对每个节点分发一次,所以一般来说,广播变量不应该再被修改了.以保证每个广播变量的副本的值都是一致的

          如果广播变量被修改,则需要将广播变量重新分发

        另:

          举个例子:Spark的action操作本身是通过一系列的stage来完成的,这些Stage是通过分布式的shuffle操作来进行切分的.而在每个Stage之间,Spark自动使用广播变量.

          这里用法说明,只有数据本身会在多个Stage的多个任务中反复使用,或者说缓存这个数据是非常重要且非常必要的情况下,使用广播变量才有意义.

        广播变量的使用如下:      

          // SparkContext.broadcast(v)进行创建,返回的是广播变量的封装器,以.value读取广播变量的的值
          val broadcastVar = sc.broadcast(Array(1, 2, 3))
          val v = broadcastVar.value

      1.2.3.2 累加器(accumulators) 

        累加器变量仅支持累加操作,因为可以在并行计算执行一些特殊的计算(比计数或者求和).并且累加器的变化是可以在UI的Task界面上看见的(注,不支持Python)

        累加器操作,依然遵循RDD的Lazy原则:

          累加器更新操作是在Action中,并且在每个任务中只会执行一次(如果任务失败重启了,累加器更新不会执行)

          而在transformation中,累加器依然不会立即执行更新,如果transformation被重新执行了,则累加器操作会重复执行

        对于累加器变量,Spark原生支持数值类型.一个使用例子如下        

          val accum = sc.longAccumulator("My Accumulator")
          sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
          println(accum.value)

         也可以创建继承AccumulatorV2的类型,来实现自定义类型的累加器,例子如下:          

          //两个泛型参数->累加的元素类型和结果类型可以不同的
          class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {

            private val myVector: MyVector = MyVector.createZeroVector

            def reset(): Unit = {
              myVector.reset()
            }

            def add(v: MyVector): Unit = {
              myVector.add(v)
            }
            ...
          }

          // 创建一个自定义的累加器类型:
          val myVectorAcc = new VectorAccumulatorV2
          //将这个触发器注册到SparkContext:
          sc.register(myVectorAcc, "MyVectorAcc1")

  1.3 RDD的一些基本操作

    1.3.1 Transformations 依赖关系

      RDD是由父RDD经过转换函数形成一个个子RDD(子RDD依赖父RDD).针对不同的转换函数,以父RDD分区与子RDD分区的关系为标准,Spark将这些依赖关系分为两类.

      窄依赖

        窄依赖是指转换后,父RDD的每个分区只会被某一个子RDD分区使用.(一对一或者多对一的关系).

        所以窄依赖一般出现在map,filter等子分区沿用父分区,不会发生重分区的时候.

        宽依赖

        宽依赖是指转换后,父RDD的某个或某些分区会被几个子RDD分区使用.(某个分区数据部分在这个RDD,部分在那个RDD,一对多关系)

        宽依赖一般出现在groupByKey等子分区一定会发生重分区的时候

      两种依赖关系的对比

        一般来说,窄依赖比宽依赖对执行优化更加有利

        i).窄依赖允许集群节点上以流水线的形式直接计算所有分区

           宽依赖则需要先计算好父分区的分区信息,然后再以一个shuffle完成重分区,

          ii).某个子分区异常需要重计算时,会对这个子分区所依赖的所有分区进行计算.(这是宽窄依赖都必须的),但是针对分区数据而言

          窄依赖,一个或多个父分区完全对应一个子分区.对这些父分区的重计算,利用率是100%

          宽依赖,父分区的数据不完全对应一个子分区(一对多关系,父分区的某些部分是其它分区的),但此时依然需要重计算父分区全部数据,造成计算浪费(因为白计算其它分区的数据)

    1.3.2 Transformations 操作

      map

        对RDD中的元素执行一个指定函数,将执行结果作为新元素产生一个新的RDD.

          与其它map系函数区别,map新元素的完全是Map函数的执行结果返回,所以新RDD的数量与老RDD是一一对应的.        

        val rdd = sc.parallelize(Seq("aa bb","cc dd","ee ff"),2)
        rdd.map(rec=>rec.split(" ")>).collect().map(println(_))
        //返回结果是rec.split(" ")结果(一维数组)=>[["aa","bb"],["cc","dd"]]

      flatMap 

        与map相同,但结果会扁平化.即如果结果是迭代器容器的,会将元素从容器中取出再返回       

        val rdd = sc.parallelize(Seq("aa bb","cc"),2)
         rdd.flatMap(rec=>rec.split(" ")).collect().map(println(_));
         //返回结果["aa","bb","cc"]

        //flatMap如以下这种方式使用是不行,flatMap返回结果必须是TraversableOnce[U](可迭代一次的类型)
        //rdd.flatMap(rec=>(rec,1)).collect().map(println(_));     

      mapPartitions

        与map相同,不过是以分区为单位,所以语法要求必须为 f: Iterator[T] => Iterator[U],注意返回结果不是以分区为单位,而是所有分区执行函数的结果的合并      

        val rdd = sc.parallelize(Seq("aa bb","cc dd","ee ff"),2)
        rdd.mapPartitions(part=>part.map(rec=>rec.split(" "))).collect().map(println(_))
        //结果是 [["aa","bb"],["cc","dd"],["ee","ff"]]

      mapPartitionsWithIndex

        与mapPartitions类似,不过它带有分区的index以供使用.所以语法要求为f: (Int, Iterator[T]) => Iterator[U]        

        val rdd = sc.parallelize(Seq("aa bb","cc dd","ee ff"),2)
        rdd.mapPartitionsWithIndex((partIdx,part)=>part.map(rec=>(partIdx,rec))).collect().map(println(_))
        //返回结果 (0,aa bb),(1,cc dd),(1,ee ff)

      sample

       抽样函数.可以从数据集中按一定比例抽取部分数据,抽取之后可以选择是否返回

       语法要求 withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong         

      val rdd = sc.parallelize(1 to 50)
       rdd.sample(false,0.2,System.currentTimeMillis).map(rec=>(rec,1)).collect().map(print(_)+" ")
       //返回结果 (8,1)(21,1)(26,1)(27,1)(34,1)(43,1)(46,1)(49,1)

      union      

      将两个数据集合并(包含数据重复)                 

      val rdd = sc.parallelize(1 to 10)
      val rdd2 = sc.parallelize(11 to 20)
      rdd.union(rdd2).map(rec=>rec.toString).collect().map(rec=>print(s"${rec} "))
      //返回结果 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20

     intersection

      将两个数据集合并,取交集作为结果返回

      val rdd = sc.parallelize(1 to 10)
      val rdd2 = sc.parallelize(5 to 15)
      rdd.intersection(rdd2).map(rec=>rec.toString).collect().map(rec=>print(s"${rec} "))
     //返回结果 6 7 9 8 10 5

       distinct      

      对当前结果集去重返回            

        val rdd = sc.parallelize(1 to 10)
        val rdd2 = sc.parallelize(5 to 15)
        rdd.union(rdd2).distinct().map(rec=>rec.toString).collect().map(rec=>print(s"${rec} "))
        //返回结果 4 14 6 8 12 10 2 13 15 11 1 3 7 9 5

    groupByKey

      将一个键值对类型的结果集按照key进行分组(如果是为分组聚合,groupByKey相比reduceByKey效率更低,因为少一个map-shuffer的combine)  

       val rdd = sc.parallelize(Seq("aa bb","cc dd","bb cc"),2)
       rdd.flatMap(rec=>rec.split(" ")).map(rec=>(rec,1)).groupByKey().collect().map(rec=>print(s" ${rec._1} ${rec._2.sum} |"));
       //返回结果 aa 1 | dd 1 | bb 2 | cc 2 |

    reduceByKey

      将一个键值对类型数据集,使用指定的函数分组聚合为另一个键值对类型数据集,(相比groupByKey性能更高,因为可以在map-shuffer进行combine减少数据量)    

      val rdd = sc.parallelize(Seq("aa bb","cc dd","bb cc"),2)
      rdd.flatMap(rec=>rec.split(" ")).map(rec=>(rec,1)).reduceByKey((value1,value2)=>value1+value2).collect().map(rec=>print(s"${rec} "))
      //返回结果 (aa,1) (dd,1) (bb,2) (cc,2)

     aggregate

       给出一个默认基准值,先使用seqOp遍历分区内元素传入基准值进行聚合,再对分区间结果使用combOp聚合为最后结果.

        注意aggregate返回的结果直接是聚合结果(不是RDD),并且要与原RDD的类型一致     

      val rdd = sc.parallelize(1 to 10);
      /**
        * zeroValue:预定义一个初始值 (0,0)
        * seqOp: (U, T) => U  分区内元素聚合,遍历元素传入基准值执行函数.(类似Map-Shuffle)
        *   U:当前基准值,T:当前元素
        *   执行的逻辑是 (基准值(默认初始值), 元素No.1) 执行seqOp ,结果再作为基准值,执行(基准值(上步结果),元素No.2),以此类推
        * combOp: (U, U) => U 分区间聚合,将各分区执行seqOp函数的结果再使用combOp聚合 (类似Reduce-Shuffle)
        */
      val aggregateResult = rdd.aggregate((0,0))(
        seqOp=(sv,tv)=>(sv._1+tv,sv._2+1),
        combOp=(v1,v2)=>(v1._1+v2._1,v2._2+v2._2)
      )
      println(aggregateResult)
      //输出结果 (55,10) (1-10的总和,1-10的个数) <=非RDD结果,并且类型必须是Int

      aggregateByKey

         与aggregate类似,但针对的是key分组,aggregateBykey是以key组为单位,对分组内元素遍历使用seqOp,再使用combOp聚合分组内       

        val rdd = sc.parallelize(Seq("a b c", "b c d"));
        rdd.flatMap(rec => rec.split(" ")).map(rec => (rec, 1))
           .aggregateByKey(0)(
              seqOp = (sv, tv) => (sv + tv),
              combOp = (v1, v2) => (v1 + v2)
            )
           .collect().map(rec => print(s"${rec} |"))
        //输出结果 (d,1) |(b,2) |(a,1) |(c,2) |

      sortByKey

        将一个键值对RDD按key排序转换为另一个RDD

      join

        将两个键值对RDD((K, V),(K, W)),按Key合并为一个RDD(K, (V, W)) .(Spark同时还提供 leftOuterJoin,rightOuterJoin,fullOuterJoin)        

       val rdd = sc.parallelize(Seq("a b")).flatMap(rec => rec.split(" ")).map(rec => (rec, rec));
       val rdd2 = sc.parallelize(Seq("b c")).flatMap(rec => rec.split(" ")).map(rec => (rec, rec));

       rdd.join(rdd2).collect().map(rec => print(s"${rec} |"))
          //两个RDD交集 (b,(b,b))
          rdd.leftOuterJoin(rdd2).collect().map(rec => print(s"${rec} |"))
          //leftOuterJoin左边全数据,右边Opt (b,(b,Some(b))) |(a,(a,None)) |
          rdd.rightOuterJoin(rdd2).collect().map(rec => print(s"${rec} |"))
          //rightOuterJoin右边全数据,左边Opt (b,(Some(b),b)) |(c,(None,c)) |
          rdd.fullOuterJoin(rdd2).collect().map(rec => print(s"${rec} |"))
          //笛卡尔乘积,Opt (b,(Some(b),Some(b))) |(a,(Some(a),None)) |(c,(None,Some(c))) |

      cogroup

        将多个键值对RDD按Key合并在一起.合并为全数据(没有丢失)

        与fullOuterJoin区别在与多个RDD情况下,cogroup按key合并为一个,fullOuterJoin为多个的笛卡尔积

        注意,如果某个数据集少某一个key,合并时是在这个数据集的位置上占CompactBuffer()的位置,而不是直接跳过        

        val rdd = sc.parallelize(Seq("a b")).flatMap(rec => rec.split(" ")).map(rec => (rec, rec));
        val rdd2 = sc.parallelize(Seq("b c")).flatMap(rec => rec.split(" ")).map(rec => (rec, rec));
        rdd.cogroup(rdd2).collect().map(rec => print(s"${rec} |"))
        //(b,(CompactBuffer(b),CompactBuffer(b))) |(a,(CompactBuffer(a),CompactBuffer())) |(c,(CompactBuffer(),CompactBuffer(c))) |

      cartesian

        返回两个RDD的笛卡尔积结果

      pipe

        使用Shell的语法操作RDD

      coalesce

        重新调整RDD的分区后形成一个新的RDD.语法要求:numPartitions: Int, shuffle: Boolean = false.

          numPartitions表示要重新调整的分区数量,shuffle表示重新调整分区时是否允许发生shuffle过程.

        如果子分区数往下减少,则子分区数设置一定会成功.但要注意,在这种情况下会造成任务的并行度降低(分区数,任务数降了),任务内存更容易爆出(单个任务的数据增大了)

        如果子分区数往上增加,则子分区数设置必须要设置shuffle=true,才会成功,否则子分区依然等于父分区

        谨记:如果没有shuffle的参与,RDD只能减少分区(窄依赖),不能增加分区

      repartition      

        只是coalesce的shuffle等于true的快捷方式. coalesce(numPartitions, shuffle = true)

                      repartitionAndSortWithinPartitions

    1.3.2 Action 操作

      reduce

        RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素组成两个元素,再被传给输入函数,直到最后只有一个值为止

        与reduceByKey的区别是  reduceByKey是一个转换操作,返回的是RDD, reduce是一个action操作,返回的是数据结果                     

        val rdd = sc.parallelize(1 to 100,2);
        val value = rdd.reduce((v1,v2)=>v1+v2)
        println(value)
        //输出结果 5050

      collect

        将一个RDD的所有元素以数组的形式发回driver端.注意这个RDD必须是足够小的数据集,否则很容易将driver端的内存撑爆

      count

        返回一个RDD的元素的个数

      first

        返回一个RDD的第一个元素

      take(n)

        返回一个RDD的前N个元素

      takeSample(withReplacementnum, [seed])

        返回一个RDD的百分比抽样(withReplacement标识元素是否放回RDD以供多次使用)

           takeOrdered(n[ordering])

        返回一个RDD按照设定的排序规则后的前N个元素

      countByKey

        只支持键值对类型,返回一个RDD的按照Key分组后的每组计数

      saveAsTextFile(path)

        将一个RDD的全部元素写入一个文本方式的本地文件,HDFS或其它任何Hadoop支持的存储系统中.(每行等于每个元素调用toString()的结果)

      saveAsSequenceFile(path)

        将一个RDD的全部元素写入一个二进制方式的本地文件,HDFS或其它任何Hadoop支持的存储系统中.

        在Scala中,它还可以用于隐式转换为可写类型的类型(Spark包含对基本类型的转换,如Int、Double、String等)。

      saveAsObjectFile(path)

        将一个RDD的全部元素使用Java序列化以简单的格式编写数据集的元素(可以使用SparkContext.objectFile()加载这些元素)。

      foreach(func)

        在数据集的每个元素上运行函数func。这通常用于处理副作用,如更新累加器或与外部存储系统交互

        注意:不可以修改foreach()之外的累加器之外的变量,见前面集群下的变量与闭包一节

  1.4 Shuffle过程

    Spark的某些操作,会引起一个Shuffle过程.Shuffle是指不同节点上的不同分区数据整合重新分区分组的机制.

    所以Shuffle是一个代价很高的操作,因为它会导致executor和不同的机器节点之间进行数据复制.

    1.4.1 Shuffle简述

      以reduceByKey为例,将原始数据中key相同的记录聚合为一个组.这里挑战是原始数据很可能是存在不同分区不同机器的(参考MapReduce执行过程)

      Spark-Shuffle与MapReduce-Shuffle的区别

        MapReduce-Shuffle结果是分区有序,分区内再按Key排序

        Spark-Shuffle结果是分区有序,但分区内Key无序.

          要对Spark-Shuffle的分区内再排序,有以下方法:

           mapPartitions 在已有的每个分区上再使用.sort排序

           repartitionAndSortWithinPartitions  重建分区,并排序

           sortBy提前对RDD本身做一个全范围排序

    1.4.2 RDD中引起Shuffle的操作

       repartition类操作 例如:repartitioncoalesce

       _ByKey操作(除了counting相关操作)例如:groupByKeyreduceByKey

       join 例如:cogroupjoin

      1.4.3 Shuffle的性能影响

      Shuffle本身是同时高耗内存,高耗磁盘IO,高耗网络IO的昂贵操作.

        Spark会启动一系列的MapReduce(Hadoop MapReduce),产生大量的数据缓冲区与归并排序,大量的pill文件与归并Merge等等

原文地址:https://www.cnblogs.com/NightPxy/p/9245707.html