Spark的RDD编程实战案例

             Spark的RDD编程实战案例

                                     作者:尹正杰

版权声明:原创作品,谢绝转载!否则将追究法律责任。

  RDD体现了装饰者设计模式,将数据处理的逻辑进行封装,接下来让我们一起来体验一下吧。

一.RDD概述

1>.什么是RDD

  RDD全称为"Resilient Distributed Dataset",叫做弹性分布式数据集,是Spark中最基本的数据抽象。

  代码中是一个抽象类,它代表一个不可变、可分区、里面的元素可并行计算的集合。

2>.RDD的属性

Internally, each RDD is characterized by five main properties:
    A list of partitions:
        一组分区(Partition),即数据集的基本组成单位;
    A function for computing each split:
        一个计算每个分区的函数,换句话说,是计算数据放在哪个分区中;
    A list of dependencies on other RDDs:
        RDD之间的依赖关系;
    Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned):
        一个Partitioner,即RDD的分片函数;
    Optionally, a list of preferred locations to compute each split on (e.g. block locations foran HDFS file):
        一个列表,存储存取每个Partition的优先位置(preferred location),即数据所存储的节点;

3>.RDD的特点 

  RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDD的转换操作,由一个RDD得到一个新的RDD,新的RDD包含了从其他RDD衍生所必需的信息。

  RDDs之间存在依赖,RDD的执行是按照血缘关系延时计算的。如果血缘关系较长,可以通过持久化RDD来切断血缘关系。
  RDD有以下几个显著特点:
    分区       RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据;
      如果RDD是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换;
    只读       RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD;       由一个RDD转换到另一个RDD,可以通过丰富的操作算子实现,不再像MapReduce那样只能写map和reduce了;       RDD的操作算子(Operate)包括两类:
        transformations(转换算子):
          它是用来将RDD进行转化,构建RDD的血缘关系;
        actions(行动算子):
          它是用来触发RDD的计算,得到RDD的相关计算结果或者将RDD保存的文件系统中;
    依赖       RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖。如下所示,依赖包括两种:
        窄依赖:
          RDDs之间分区是一一对应的;
        宽依赖:
          下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系。
    缓存       如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用;    

    CheckPoint       虽然RDD的血缘关系天然地可以实现容错,当RDD的某个分区数据失败或丢失,可以通过血缘关系重建。
      但是对于长时间迭代型应用来说,随着迭代的进行,RDDs之间的血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能。
      为此,RDD支持checkpoint将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs了,它可以从checkpoint处拿到数据。

二.RDD的创建

1>.编程模型

  在Spark中,RDD被表示为对象,通过对象上的方法调用来对RDD进行转换。

  经过一系列的transformations定义RDD之后,就可以调用actions触发RDD的计算,action可以是向应用程序返回结果(count, collect等),或者是向存储系统保存数据(saveAsTextFile等)。

  在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。
  要使用Spark,开发者需要编写一个Driver程序,它被提交到集群以调度运行Worker,

2>.RDD的创建 

package com.yinzhengjie.bigdata.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CreateRDD {

  def main(args: Array[String]): Unit = {

    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")
    //创建Spark上下文对象
    val sc = new SparkContext(config)

    /**
      *   RDD的创建:
      *     在Spark中创建RDD的创建方式可以分为三种:
      *       (1)从集合(内存)中创建RDD;
      *             从集合中创建RDD,Spark主要提供了两种函数:parallelize和makeRDD
      *       (2)从外部存储创建RDD;
      *             包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等
      *       (3)从其他RDD创建。
      */


    //使用SparkContext对象的parallelize方法可以在内存中创建RDD
    val arrayRDD:RDD[String] = sc.parallelize(Array("yinzhengjie","JasonYin2020"))
    arrayRDD.collect().foreach(println)

    //使用SparkContext对象的makeRDD方法也可以在内存中创建RDD,其底层实现就是parallelize方法
    val listRDD:RDD[Int] = sc.makeRDD(List(100,200,300))
    listRDD.collect().foreach(println)

    /**
      *   使用SparkContext对象的textFile方法从外部存储中创建RDD
      *
      *   温馨提示:
      *     默认情况下可以读取项目路径,也可以读取其它路径,比如HDFS,HBase对应的路径等
      *     默认从文件中读取的数据都是字符串类型
      */
    val fileRDD:RDD[String] = sc.textFile("E:\yinzhengjie\bigdata\spark\data")
    fileRDD.collect().foreach(println)
  }
}
CreateRDD.scala文件内容(RDD的创建)
3>.RDD的分区
package com.yinzhengjie.bigdata.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object RDDPartition {

  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")
    //创建Spark上下文对象
    val sc = new SparkContext(config)

    /**
      *   使用SparkContext对象的makeRDD函数签名如下:
      *     def makeRDD[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism) : RDD[T] = withScope
      *
      *   温馨提示:
      *     seq:
      *       传入一个序列集合类型,比如List,Array
      *     Int = defaultParallelism:
      *       指定分区数的并行度,传入一个整形,不传也可以,即使用defaultParallelism,该值默认是您的操作系统对应的总核心数。
      *
      */
    val listRDD:RDD[String] = sc.makeRDD(List("yinzhengjie","JasonYin2020"),6)  //使用6个自定义分区

    //将RDD的数据保存到文件中
    listRDD.saveAsTextFile("E:/yinzhengjie/bigdata/spark/output")


    /**
      *   使用SparkContext对象的textFile函数签名如下:
      *     def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope
      *
      *   温馨提示:
      *     path:
      *       指定文件的路径,可以是本地路径,也可以是hdfs,hbase等路径
      *     minPartitions:
      *       指定最小的分区数据,但是不一定是这个分区数,取决于Hadoop读取文件时分片规则。
      *
      */
    val fileRDD:RDD[String] = sc.textFile("E:\yinzhengjie\bigdata\spark\data",2)  //自定义2个分区(但实际上可能比2要大,这取决于Hadoop的分片机制)

    //保存文件时建议不要和源文件在同一个目录,否则可能会出错哟~
    fileRDD.saveAsTextFile("E:/yinzhengjie/bigdata/spark/output2")
  }
}
RDDPartition.scala文件内容(RDD的分区)

三.RDD常用的算子(Operate)

  RDD的操作算子(Operate)包括两类,即转换算子(transformations operate)和actions(行动算子)。
    
  transformations(转换算子):
    它是用来将RDD进行转化,构建RDD的血缘关系。         
  actions(行动算子):     它是用来触发RDD的计算,得到RDD的相关计算结果或者将RDD保存的文件系统中。

  温馨提示:
    转换算子只是对业务逻辑的封装并没有真正执行代码,而行动算子就会真正触发代码的执行操作。换句话说,行动算子就是用来触发RDD计算操作的,一旦使用了行动算子,那么在行动算子之前的转换算子会被触发执行。

1>.Value类型

package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object MapOperate {

  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")
    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建RDD
    val listRDD:RDD[Int] = sc.makeRDD(20 to 30)
    //遍历listRDD
    listRDD.collect().foreach(println)

    //使用map算子(Operate),将listRDD的所有元素乘以5得到新的RDD
    val mapRDD:RDD[Int] = listRDD.map(x => x * 5)   //该行可简写为"val mapRDD:RDD[Int] = listRDD.map(_ * 5)"

    //遍历mapRDD
    mapRDD.collect().foreach(println)
  }
}
map(func)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object MapPartitionsOperate {

  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")
    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建RDD
    val listRDD:RDD[Int] = sc.makeRDD(20 to 30)

    //遍历listRDD
    listRDD.collect().foreach(println)

    /**
      *   使用mapPartitions算子(Operate),将listRDD的所有元素乘以5得到新的RDD
      *
      *    mapPartitionsk可以对一个RDD中所有的分区进行遍历,假设有N个元素,有M个分区,那么map的函数的将被调用N次,而mapPartitions被调用M次,一个函数一次处理所有分区。
      *
      *    map()和mapPartition()的区别如下:
      *       map():
      *         每次处理一条数据。
      *       mapPartition():
      *         每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才能释放,可能导致OOM。
      *     开发指导:
      *       当内存空间较大的时候建议使用mapPartition(),以提高处理效率。
      *
      *    温馨提示:
      *       mapPartitions效率优于map算子(Operate),减少了发送执行器(Executor)执行交互次数(mapPartitions的Operate是基于分区为单位发送一次任务调度到Executor,而map的Operate是每处理一条数据就发送一次任务调度给Executor)
      *       如果分区的数据比执行器(Executor)的内存大,则使用mapPartitions可能会出现内存溢出(OOM),比如一个分区有12G数据,但Executor仅有10G大小,就会出现OOM现象。
      *       综上所述,到底使用map还是mapPartitions算子(Operate)根据实际情况而定。
      */
    val mapPartitionsRDD:RDD[Int] = listRDD.mapPartitions(datas => {
      datas.map(_ * 5)
    })

    //遍历mapRDD
    mapPartitionsRDD.collect().foreach(println)

  }
}
mapPartitions(func)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object mapPartitionsWithIndexOperate {

  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")
    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建RDD并指定分区数为3
    val listRDD:RDD[Int] = sc.makeRDD(20 to 30,3)
    //遍历listRDD
    listRDD.collect().foreach(println)

    //使用mapPartitionsWithIndex算子(Operate),将listRDD的所有元素跟所在分区形成一个元组组成一个新的RDD
    val tupleRDD:RDD[(Int,String)] = listRDD.mapPartitionsWithIndex{
      case (numPartition,datas) => {
        datas.map((_,"分区编号: " + numPartition))
      }
    }

    //遍历tupleRDD
    tupleRDD.collect().foreach(println)
  }
}
mapPartitionsWithIndex(func)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FlatMapOperate {

  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")
    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建RDD并指定分区数为3
    val listRDD:RDD[List[Int]] = sc.makeRDD(Array(List(10,20),List(60,80)))

    //遍历listRDD
    listRDD.collect().foreach(println)

    //使用flatMap算子(Operate),将listRDD的所有元素扁平化,它类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
    val flatMapRDD:RDD[Int] = listRDD.flatMap(x =>x)

    //遍历flatMapRDD
    flatMapRDD.collect().foreach(println)
  }
}
flatMap(func)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object GlomOperate {

  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建RDD
    val listRDD:RDD[Int] = sc.makeRDD(100 to 161,4)

    //遍历listRDD
    listRDD.collect().foreach(println)

    //将一个分区的数据放到一个数组中,这样我们可以对其进行操作,比如求和,求最值等。
    val glomRDD:RDD[Array[Int]] = listRDD.glom()

    //遍历glomRDD
    glomRDD.collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )

  }
}
glom案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object GroupByOperate {

  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建RDD
    val listRDD:RDD[Int] = sc.makeRDD(10 to 20)

    //遍历listRDD
    listRDD.collect().foreach(println)

    /**
      *   使用groupBy算子(Operate)进行分组,按照传入函数(指定规则)的返回值进行分组,将相同的key对应的值放入一个迭代器。
      *
      *   分组后的数据形成了对偶元组(K,V),K表示分组的key,V表示分组的数据集合。
      *
      *   下面的案例就是按照元素模以2的值进行分组。
      */
    val groupByRDD:RDD[(Int,Iterable[Int])] = listRDD.groupBy(i => i % 2)

    //遍历groupByRDD
    groupByRDD.collect().foreach(println)
  }
}
groupBy(func)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object FilterOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建RDD
    val listRDD:RDD[Int] = sc.makeRDD(10 to 20)

    //遍历listRDD
    listRDD.collect().foreach(println)

    /**
      *   使用filter算子(Operate)进行过滤。返回一个新的RDD,该RDD由经过func函数(按照指定的规则)计算后返回值为true的输入元素组成。
      *
      *   下面的案例就是按照元素模以2的值进行过滤,即仅保留偶数。
      */
    val filterRDD:RDD[Int] = listRDD.filter(x => x % 2 == 0)

    //遍历filterRDD
    filterRDD.collect().foreach(println)
  }
}
filter(func)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SampleOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建RDD
    val listRDD:RDD[Int] = sc.makeRDD(10 to 20)

    //遍历listRDD
    listRDD.collect().foreach(println)

    /**
      *   sample算子(Operate)用以指定的随机种子随机抽样出数量为fraction的数据。
      *
      *   sample的函数签名如下:
      *     def sample( withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T]
      *
      *   以下是相关参数说明:
      *     withReplacement:
      *       表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样,
      *     fraction:
      *       表示sample的打分,是一个Double类型。
      *     seed:
      *       用于指定随机数生成器种子。
      *
      */
    val sampleRDD:RDD[Int] = listRDD.sample(false,0.7,1)
    
    //遍历sampleRDD
    sampleRDD.collect().foreach(println)

  }
}
sample(withReplacement, fraction, seed)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object DistinctOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建RDD
    val listRDD:RDD[Int] = sc.makeRDD(List(6,6,8,1,2,1,6,9,5,6,1,8,2,7,0,7,6,3,5,4,6,0,7,1))

    //遍历listRDD
    listRDD.collect().foreach(println)

    /**
      *   使用distinct算子(Operate)对数据去重,但是因为去重后会导致数据减少,所以可以自定义分区数量,默认分区数是你操作系统的真实core数量。
      *
      */
    //    val distinctRDD:RDD[Int] = listRDD.distinct()
    val distinctRDD:RDD[Int] = listRDD.distinct(3)

    //为了了看到试验效果,建议将结果以文件的形式保存,直接打印到控制台终端可能看不出效果哟~
    // distinctRDD.collect().foreach(println)
    distinctRDD.saveAsTextFile("E:\yinzhengjie\bigdata\spark\output")
  }
}
distinct([numPartitions])) 案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object CoalesceOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建RDD,指定分区数切片为4
    val listRDD:RDD[Int] = sc.makeRDD(1 to 16,4)

    //遍历listRDD
//    listRDD.collect().foreach(println)

    println("缩减分区前分区数量: " + listRDD.partitions.size)

    //使用coalesce算子(Operate)缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。可以简单理解为合并分区
    val coalesceRDD:RDD[Int] = listRDD.coalesce(3)

    println("缩减分区后分区数量: " + coalesceRDD.partitions.size)

    coalesceRDD.saveAsTextFile("E:\yinzhengjie\bigdata\spark\output")
  }
}
coalesce(numPartitions)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object RepartitionsOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建RDD,指定分区数切片为4
    val listRDD:RDD[Int] = sc.makeRDD(1 to 16,4)

    //遍历listRDD,发现数据是有序的
    listRDD.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )

    println("Rebalance前分区数量: " + listRDD.partitions.size)

    /**
      *   使用repartition算子(Operate)是根据分区数,重新通过网络随机洗牌所有数据。
      *
      *   coalesce和repartition的区别
      *     1>.coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。
      *     2>.repartition实际上是调用的coalesce,默认是进行shuffle的。
      *
      *   下面的案例就是对listRDD进行重新分区(将listRDD的4个分区数重新分区为2个),生成一个新的RDD对象rebalanceRDD。
      */

    val rebalanceRDD:RDD[Int] = listRDD.repartition(2)

    println("Rebalance后分区数量:" + rebalanceRDD.partitions.size)

    //遍历rebalanceRDD,此时你会发现数据并不是有序的,而是被打乱啦~
    rebalanceRDD.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )


  }
}
repartition(numPartitions)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SortByOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    val listRDD:RDD[Int] = sc.parallelize(List(2,1,7,6,9,3,8,5))

    //遍历listRDD
    listRDD.collect().foreach(println)

    /**
      *   sortBy算子的函数参数列表签名如下:
      *     def sortBy[K](f: (T) => K,ascending: Boolean = true,numPartitions: Int = this.partitions.length)
      *
      *   通过函数签名可以知道我们使用时只需要传入一个参数即可, 其它2个参数均有默认值,
      *
      *   使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为升序(ascending: Boolean = true)。
      *
      *   下面的案例按照自身大小进行排序,默认是升序。
      */
    val sortByRDD:RDD[Int] = listRDD.sortBy(x => x)

    //遍历sortByRDD
    sortByRDD.collect().foreach(
      x =>{
        println(x)
      }
    )

    //下面的案例按照自身大小进行排序,我们指定ascending的值为false,排序则为降序。
    val sortByRDD2:RDD[Int] = listRDD.sortBy(x => x,false)

    //遍历sortByRDD2
    sortByRDD2.collect().foreach(println)

  }
}
sortBy(func,[ascending], [numTasks])案例

2>.双Value类型交互

package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object UnionOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)
    //创建rdd1
    val rdd1:RDD[Int] = sc.parallelize(List(1,3,5,7,9))

    //创建rdd2
    val rdd2:RDD[Int] = sc.makeRDD(List(2,4,6,8,10))

    /**
      *   union算子(Operate)可以对源RDD和参数RDD求并集后返回一个新的RDD。
      *
      *   下面的案例就是将rdd1和rdd2进行合并为sumrdd,
      */
    val sumRDD:RDD[Int] =rdd1.union(rdd2)

    //遍历sumRDD
    sumRDD.collect().foreach(println)
  }

}
union(otherDataset)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SubtractOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建rdd1
    val rdd1:RDD[Int] = sc.parallelize(10 to 20)

    //创建rdd2
    val rdd2:RDD[Int] = sc.makeRDD(15 to 30)

    /**
      *   subtract算子是用来计算差的一种函数,去除两个RDD中相同的元素,不同的RDD将保留下来。
      *
      *   下面的案例就是计算第一个RDD与第二个RDD的差集并打印
      */
    val subtractRDD:RDD[Int] =rdd1.subtract(rdd2)

    //遍历subtractRDD
    subtractRDD.collect().foreach(println)
  }
}
subtract(otherDataset)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object IntersectionOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建rdd1
    val rdd1:RDD[Int] = sc.parallelize(10 to 20)

    //创建rdd2
    val rdd2:RDD[Int] = sc.makeRDD(15 to 30)

    //使用计算两个RDD的交集
    val intersectionRDD:RDD[Int] = rdd1.intersection(rdd2)

    //遍历intersectionRDD
    intersectionRDD.collect().foreach(println)
  }
}
intersection(otherDataset)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object CartesianOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建rdd1
    val rdd1:RDD[Int] = sc.parallelize(10 to 20)

    //创建rdd2
    val rdd2:RDD[Int] = sc.makeRDD(15 to 30)

    //计算两个RDD的笛卡尔积并打印,生产环境中应该尽量避免使用!
    val cartesian:RDD[(Int,Int)]  = rdd1.cartesian(rdd2)

    //遍历cartesian
    cartesian.collect().foreach(println)
  }
}
cartesian(otherDataset)案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object ZipOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建rdd1
    val rdd1:RDD[Int] = sc.parallelize(Array(100,200,300),3)

    //创建rdd2
    val rdd2:RDD[String] = sc.makeRDD(Array("storm","spark","flink"),3)

    //zip算子可以将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。
    val zipRDD:RDD[(Int,String)] = rdd1.zip(rdd2)

    //遍历zipRDD
    zipRDD.collect().foreach(println)
  }
}
zip(otherDataset)案例

3>.Key-Value类型

package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object PartitionByOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建RDD
    val rdd1:RDD[(Int,String)] = sc.makeRDD(Array((1,"hdfs"),(2,"yarn"),(3,"mapreduce"),(4,"spark")),4)

    //遍历rdd2
    rdd1.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )
    println("rdd1分区数是: " + rdd1.partitions.size)

    /**
      *   对rdd1进行重分区(对rdd1进行分区操作,如果原有的rdd1和现有的rdd2分区数是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程。)
      *
      *   需要注意的是,partitionBy算子属于PairRDDFunctions类,因此这里设计到了隐式转换哟~
      *
      */
    val rdd2:RDD[(Int,String)] = rdd1.partitionBy(new org.apache.spark.HashPartitioner(2))

    println("rdd2分区数是: " + rdd2.partitions.size)

    //遍历rdd2
    rdd2.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )
  }
}
partitionBy案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object GroupByKeyOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建一个数组
    val words = Array("HDFS","YARN","HDFS","STORM","HDFS","SPARK","YARN","FLLINK","HDFS")

    //创建RDD并将上面的words映射为二元组便于后面使用grooupByKey算子
    val mapRDD:RDD[(String,Int)] = sc.makeRDD(words).map(word => (word,1))

    //groupByKey也是对每个key进行操作,但只生成一个sequence。
    val groupByKeyRDD:RDD[(String,Iterable[Int])] = mapRDD.groupByKey()

    //遍历groupByKeyRDD
    groupByKeyRDD.collect().foreach(println)

    //对每个单词进行统计
    groupByKeyRDD.map(word => (word._1, word._2.sum)).collect().foreach(println)
  }
}
groupByKey案例 
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ReduceByKeyOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建一个数组
    val words = Array("HDFS","YARN","HDFS","STORM","HDFS","SPARK","YARN","FLLINK","HDFS")

    //创建RDD并将上面的words映射为二元组便于后面使用reduceByKey算子
    val mapRDD:RDD[(String,Int)] = sc.makeRDD(words).map(word => (word,1))

    /**
      *   在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。
      *
      *   reduceByKey和groupByKey的区别如下:
      *     reduceByKey:
      *       按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v].
      *     groupByKey:
      *       按照key进行分组,直接进行shuffle。
      *     开发指导:
      *       reduceByKey比groupByKey建议使用,因为预聚合操作会节省带宽传输,但是需要注意是否会影响业务逻辑。
      */
    val reduceByKeyRDD:RDD[(String,Int)] = mapRDD.reduceByKey(_+_)

    //遍历reduceByKeyRDD
    reduceByKeyRDD.collect().foreach(println)


  }
}
reduceByKey(func, [numTasks])案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object AggregateByKeyOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)


    val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",30),("A",21),("C",40),("B",13),("C",61),("C",18)),2)

    //遍历listRDD各个分区的元素
    listRDD.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )


    /**
      *     aggregateByKey的函数签名如下:
      *       def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] = self.withScope
      *
      *     作用:
      *       在kv对的RDD中,按key将value进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并;
      *       最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。
      *       参数描述:
      *         zeroValue:
      *           给每一个分区中的每一个key一个初始值;
      *         seqOp:
      *           函数用于在每一个分区中用初始值逐步迭代value;
      *         combOp:
      *           函数用于合并每个分区中的结果。
      *
      *     下面的案例为创建一个pairRDD,取出每个分区相同key对应值的最大值,然后相加
      */
    val aggregateByKeyRDD:RDD[(String, Int)] = listRDD.aggregateByKey(0)(math.max(_,_),_+_)

    //遍历aggregateByKeyRDD各个分区的元素
    aggregateByKeyRDD.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )

    //使用aggregateByKey也可以实现类似于WordCount的功能
    val wcRDD:RDD[(String, Int)] = listRDD.aggregateByKey(0)(_+_,_+_)

    //遍历wcRDD
    wcRDD.collect().foreach(println)

  }
}
aggregateByKey案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object FoldByKeyOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)


    val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",30),("A",21),("C",40),("B",13),("C",61),("C",18)),2)

    //遍历listRDD各个分区的元素
    listRDD.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )

    /**
      *   aggregateByKey与foldByKey的区别:
      *     aggregateByKey的简化操作,seqop和combop相同。
      *     我们会发现aggregateByKey需要传递2个参数,分别用于分区内和分区间的操作;
      *     而foldByKey只需要传递一个参数,因为分区内和分区间的操作相同,因此只需要传递一个参数即可.
      *
      *   下面的案例是计算相同key对应值的相加结果
      */
    val foldByKeyRDD:RDD[(String,Int)] = listRDD.foldByKey(0)(_+_)

    //遍历foldByKeyRDD各个分区的元素
    foldByKeyRDD.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )
  }
}
foldByKey案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object CombineByKeyOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)


    val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",130),("B",121),("A",140),("B",113),("A",127)),2)

    //遍历listRDD各个分区的元素
    listRDD.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )


    /**
      *   参数:
      *       (createCombiner: V => C,  mergeValue: (C, V) => C,  mergeCombiners: (C, C) => C)
      *   作用:
      *       对相同K,把V合并成一个集合。
      *   参数描述:
      *     createCombiner:
      *       combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。
      *       如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值
      *     mergeValue:
      *       如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并
      *     mergeCombiners:
      *       由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。
      *       如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。
      *
      *
      *    下面的案例就是根据key计算每种key的均值。(先计算每个key出现的次数以及可以对应值的总和,再相除得到结果)
      */
    val combineByKeyRDD:RDD[(String,(Int,Int))] =  listRDD.combineByKey(
      (_,1),                                    //转换结构,一个key第一次出现对其计数为1
      (acc:(Int,Int),v)=>(acc._1+v,acc._2+1),   //定义分区内的计算规则,即相同key的vlaue相加,并将计数器加1
      (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)  //定义分区间的计算规则,即将各个分区相同key的计算结果进行累加操作。
    )

    //遍历combineByKeyRDD各个分区的元素
    combineByKeyRDD.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )

    //计算平均值
    val averageValueRDD:RDD[(String,Double)] = combineByKeyRDD.map{case (key,value) => (key,value._1/value._2.toDouble)}

    //遍历averageValueRDD各个分区的元素(可以查看对应key的平均值)
    averageValueRDD.glom().collect().foreach(
      array =>{
        println(array.mkString(","))
      }
    )
  }
}
combineByKey[C]案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SortByKeyOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

   val arrayRDD:RDD[(Int,String)] = sc.parallelize(Array((3,"Hadoop"),(8,"storm"),(2,"spark"),(6,"flink")))

    /**
      *   sortByKey算子在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD,ascending的值默认为true
      */
    val positiveSequenceRDD:RDD[(Int,String)] = arrayRDD.sortByKey()

    positiveSequenceRDD.collect().foreach(println)

    /**
      *   sortByKey算子在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD,ascending的值为false时,顺序为倒叙。
      */
    val ReverseOrderRDD:RDD[(Int,String)] = arrayRDD.sortByKey(false)

    ReverseOrderRDD.collect().foreach(println)
  }
}
sortByKey([ascending], [numTasks])案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object MapValuesOperate {

  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    val arrayRDD:RDD[(Int,String)] = sc.parallelize(Array((3,"Hadoop"),(8,"storm"),(2,"spark"),(6,"flink"),(1,"mapreduce")))
    arrayRDD.collect().foreach(println)

    /**
      *   针对于(K,V)形式的类型只对V进行操作
      *
      *   下面的案例就是对value添加字符串"*****"
      */
    val mapValuesRDD:RDD[(Int,String)] = arrayRDD.mapValues(_ + "*****")

    mapValuesRDD.collect().foreach(println)
  }
}
mapValues案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object JoinOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)


    val rdd1:RDD[(Int,String)] = sc.parallelize(Array((1,"MapReduce"),(2,"Spark"),(3,"Flink")))

    val rdd2:RDD[(Int,Int)] = sc.parallelize(Array((1,30),(2,60),(3,90)))


    /**
      *   在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD.
      */
    val rdd3:RDD[(Int,(String,Int))] = rdd1.join(rdd2)

    rdd3.collect().foreach(println)

  }
}
join(otherDataset, [numTasks])案例
package com.yinzhengjie.bigdata.spark.transformations

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CogroupOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    val rdd1:RDD[(Int,String)] = sc.parallelize(Array((1,"MapReduce"),(2,"Spark"),(3,"Flink")))

    val rdd2:RDD[(Int,Int)] = sc.makeRDD(Array((1,30),(2,60),(3,90)))


    /**
      *   在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD..
      */
    val rdd3:RDD[(Int, (Iterable[String], Iterable[Int]))] = rdd1.cogroup(rdd2)

    rdd3.collect().foreach(println)

  }

}
cogroup(otherDataset, [numTasks])案例

4>.Actions

package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ReduceOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)


    val rdd1:RDD[Int] = sc.makeRDD(1 to 100,2)
    val rdd2 = sc.parallelize(Array(("Hadoop",100),("Spark",300),("Flink",500),("MapReduce",700)))

    /**
      *   通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。
      */
    val res1:Int = rdd1.reduce(_+_)
    val res2:(String,Int) = rdd2.reduce((x,y)=>(x._1 + "-" + y._1,x._2 + y._2))

    println(res1)
    println(res2)
  }
}
reduce(func)案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object CollectOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建RDD
    val rdd1:RDD[Int] = sc.parallelize(1 to 100)

    /**
      *   在驱动程序中,以数组的形式返回数据集的所有元素。
      */
    val res:Array[Int] = rdd1.collect()

    res.foreach(println)
  }
}
collect()案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object CountOperate {

  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建RDD
    val rdd1:RDD[Int] = sc.makeRDD(1 to 100)

    /**
      *   返回RDD中元素的个数
      */
    val count:Long = rdd1.count

    println(count)
  }
}
count()案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object FirstOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建RDD
    val rdd1:RDD[Int] = sc.makeRDD(50 to 100)

    /**
      *   返回RDD中的第一个元素
      */
    val res1:Int = rdd1.first()

    println(res1)

  }
}
first()案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object TakeOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建RDD
    val rdd1:RDD[Int] = sc.makeRDD(50 to 100)

    /**
      *   返回一个由RDD的前n个元素组成的数组
      */
    val res1:Array[Int] = rdd1.take(5)

    res1.foreach(println)
  }
}
take(n)案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object TakeOrderedOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    //创建RDD
    val listRDD:RDD[Int] = sc.makeRDD(List(9,5,20,7,10,4,8,30,6))

    /**
      *   返回该RDD排序后的前n个元素组成的数组
      */
    val res:Array[Int] = listRDD.takeOrdered(5)

    res.foreach(println)

  }
}
takeOrdered(n)案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object AggregateOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    val rdd1:RDD[Int] = sc.parallelize(1 to 100,2)

    /**
      *   aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。
      *
      *   需要注意的是,aggregate算子在计算时,各分区内部计算需要加上初始值(zeroValue),分区间计算也会加上该初始值(zeroValue)
      */
    val res1:Int = rdd1.aggregate(0)(_+_,_+_)
    val res2:Int = rdd1.aggregate(100)(_+_,_+_)

    println(res1)
    println(res2)
  }
}
aggregate案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object FoldOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    val rdd1:RDD[Int] = sc.parallelize(1 to 100,2)

    /**
      *   折叠操作,aggregate的简化操作,seqop和combop一样。
      */
    val res1:Int =  rdd1.fold(0)(_+_)
    val res2:Int =  rdd1.fold(100)(_+_)

    println(res1)
    println(res2)
  }
}
fold(num)(func)案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SaveAsTextFileOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val  config:SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)


    val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",130),("B",121),("A",140),("B",113),("A",127)),2)

    /**
      *   将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
      */
    listRDD.saveAsTextFile("E:\yinzhengjie\bigdata\spark\text")
  }
}
saveAsTextFile(path)案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SaveAsSequenceFileOperate {
  def main(args: Array[String]): Unit = {
      //创建SparkConf对象
      val  config:SparkConf = new SparkConf()
      config.setMaster("local[*]")
      config.setAppName("WordCount")

      //创建Spark上下文对象
      val sc = new SparkContext(config)


      val listRDD:RDD[(String,Int)] = sc.parallelize(List(("A",130),("B",121),("A",140),("B",113),("A",127)),2)

      /**
        *   将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
        */
      listRDD.saveAsSequenceFile("E:\yinzhengjie\bigdata\spark\sequence")
  }
}
saveAsSequenceFile(path)案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object SaveAsObjectFileOperate {
  def main(args: Array[String]): Unit = {
      //创建SparkConf对象
      val config: SparkConf = new SparkConf()
      config.setMaster("local[*]")
      config.setAppName("WordCount")

      //创建Spark上下文对象
      val sc = new SparkContext(config)


      val listRDD: RDD[(String, Int)] = sc.parallelize(List(("A", 130), ("B", 121), ("A", 140), ("B", 113), ("A", 127)), 2)

      /**
        * 用于将RDD中的元素序列化成对象,存储到文件中。
        */
      listRDD.saveAsObjectFile("E:\yinzhengjie\bigdata\spark\object")
  }
}
saveAsObjectFile(path)案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import scala.collection.Map

object CountByKeyOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val config: SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)


    val listRDD: RDD[(String, Int)] = sc.parallelize(List(("A", 130), ("B", 121), ("A", 140), ("B", 113), ("A", 127)), 2)

    /**
      *   针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
      */
    val res:Map[String,Long] = listRDD.countByKey

    println(res)
  }
}
countByKey()案例
package com.yinzhengjie.bigdata.spark.transformations.action

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object ForeachOperate {
  def main(args: Array[String]): Unit = {
    //创建SparkConf对象
    val config: SparkConf = new SparkConf()
    config.setMaster("local[*]")
    config.setAppName("WordCount")

    //创建Spark上下文对象
    val sc = new SparkContext(config)

    val listRDD: RDD[Int] = sc.makeRDD(20 to 30,2)

    /**
      *   在数据集的每一个元素上,运行函数func进行更新。
      */
    listRDD.foreach(println)
  }
}
foreach(func)案例

5>.RDD的函数传递

  在实际开发中我们往往需要自己定义一些对于RDD的操作,那么此时需要主要的是,初始化工作是在Driver端进行的,而实际运行程序是在Executor端进行的,这就涉及到了跨进程通信,是需要序列化的。

  接下来我们看下面2个案例操作。
package com.yinzhengjie.bigdata.spark.rdd.functionTransfer

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import java.io.Serializable

/**
  *   传递Search对象时,必须得先序列化后才能在网络传递,否则无法在Exector端进行反序列化。
  *
  */
class Search(query:String) extends Serializable {
  //过滤出包含字符串的数据
  def isMatch(s: String): Boolean = {
    s.contains(query)
  }

  //过滤出包含字符串的RDD
  def getMatch1 (rdd: RDD[String]): RDD[String] = {
    rdd.filter(isMatch)
  }

  //过滤出包含字符串的RDD
  def getMatche2(rdd: RDD[String]): RDD[String] = {
    rdd.filter(x => x.contains(query))
  }
}

object SerializableableMethod {
  def main(args: Array[String]): Unit = {

      //1.初始化配置信息及SparkContext
      val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
      val sc = new SparkContext(sparkConf)

      //2.创建一个RDD
      val rdd: RDD[String] = sc.parallelize(Array("Hadoop", "Spark", "Hive", "Storm"))

      //3.创建一个Search对象,该过程是在Driver端执行的
      val search = new Search("S")

      //4.运用第一个过滤函数并打印结果,该过程是在Exector端执行的,因此需要将Driver端的Search对象传递给Exector,这意味着Search对象必须是序列化的,否则就会报错哟(Caused by: java.io.NotSerializableException: com.yinzhengjie.bigdata.spark.rdd.functionTransfer.Search)
      val match1: RDD[String] = search.getMatch1(rdd)
      match1.collect().foreach(println)

      //5.释放资源
      sc.stop()
  }
}
传递一个方案案例
package com.yinzhengjie.bigdata.spark.rdd.functionTransfer

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD


/**
  *   仅传递字符串时,无需进行序列化操作哟~
  */
class Search(query:String) {
  //过滤出包含字符串的数据
  def isMatch(s: String): Boolean = {
    s.contains(query)
  }

  //过滤出包含字符串的RDD
  def getMatch1 (rdd: RDD[String]): RDD[String] = {
    rdd.filter(isMatch)
  }

  //过滤出包含字符串的RDD
  def getMatche2(rdd: RDD[String]): RDD[String] = {
//    rdd.filter(x => x.contains(query))
    val query_ : String = this.query    //将类变量赋值给局部变量,该代码是在Driver端执行
    rdd.filter(x => x.contains(query_))  //该代码在Exector端执行,因此query_这个成员变量属性需要传递过来,而query_本身就是字符串,因此无需序列化。
  }
}

object SerializableableAttribute {
  def main(args: Array[String]): Unit = {
    //1.初始化配置信息及SparkContext
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    //2.创建一个RDD
    val rdd: RDD[String] = sc.makeRDD(Array("Hadoop", "Spark", "Hive", "Storm"))

    //3.创建一个Search对象
    val search = new Search("o")

    //4.运用第一个过滤函数并打印结果
    val match1: RDD[String] = search.getMatche2(rdd)
    match1.collect().foreach(println)

    //5.释放资源
    sc.stop()
  }
}
传递一个属性(局部变量)案例

四.RDD依赖关系

1>.Lineage(血统)

package com.yinzhengjie.bigdata.spark.dependent

import org.apache.spark.rdd.RDD
import org.apache.spark.{Dependency, SparkConf, SparkContext}

/**
  *   RDD只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。
  *
  *   RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
  */
object Lineage {
  def main(args: Array[String]): Unit = {
          //1.初始化配置信息及SparkContext
          val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
          val sc = new SparkContext(sparkConf)

          //2.创建一个RDD
          val listRDD: RDD[Int] = sc.parallelize(1 to 10)

          //3.将listRDD射成一个个元组
          val mapRDD: RDD[(Int,Int)] = listRDD.map((_,1))

          //4.统计每一种key对应的个数
          val reduceRDD:RDD[(Int,Int)] = mapRDD.reduceByKey(_+_)

          /**
          *    5 >.查看reduceRDD的Lineage(血统)
          *
          *    RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
          *      窄依赖
          *        窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用,窄依赖我们形象的比喻为独生子女。
          *      宽依赖
          *        宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,会引起shuffle操作,宽依赖我们形象的比喻为超生。
          */
          val lineage:String = reduceRDD.toDebugString
          println(lineage)

          //6>.查看依赖类型
          val dependencies:Seq[Dependency[_]] = reduceRDD.dependencies
          println(dependencies)

          //5.释放资源
          sc.stop()
  }
}

2>.DAG

  DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage;
  
  对于窄依赖,partition的转换处理在Stage中完成计算;

  对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据;

  如下图所示,是某个job分为3个阶段(stage),窄依赖可以放在同一个阶段(stage),而宽依赖由于shuffle的存在因此不能放在同一个阶段(state)中:
    A和B:
      groupBy操作是宽依赖,存在shuffle操作。  
    F和G:
      join操作是宽依赖,存在shuffle操作。
    B和G:
      是窄依赖,以为B的各个分区和G的分区唯一对应。
    E和F,D和F,C和D:
      map和union均没有shuffle操作,因此均是宅依赖,因此他们可以在同一个阶段(stage)。

  温馨提示:
    宽依赖有shufle操作,窄依赖没有shuffle操作,因此我们可以将宅依赖放在同一个阶段执行,而宽依赖则需要分开不同的阶段操作,因为宽依赖要做shuffle的前提是需要依赖上一个阶段的执行结果。
    由于窄依赖不需要等待,就可以利用并行的概念来执行数据,从而提升效率。

 

3>.任务规划

  RDD任务切分中间分为:Application、Job、Stage和Task。

  Application:
    初始化一个SparkContext即生成一个Application
  
  Job:
    一个Action算子就会生成一个Job

  Stage:
    根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。
 
  Task:
    Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task。

  温馨提示:
    Application->Job->Stage-> Task每一层都是1对n的关系。

五.RDD缓存

1>.RDD缓存概述

  RDD通过persist方法或cache方法可以将前面的计算结果缓存,默认情况下 persist()会把数据以序列化的形式缓存在JVM的堆空间中。

  但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

  通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。
  
  如下图所示,在存储级别的末尾加上"_2"来把持久化数据存为两份。
  
  缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

 

2>.缓存代码实现案例

package com.yinzhengjie.bigdata.spark.cache

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object CacheDemo {
  def main(args: Array[String]): Unit = {

    //1.初始化配置信息及SparkContext
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    //2.创建一个RDD
    val listRDD: RDD[String] = sc.parallelize(Array("yinzhengjie2020"))

    //3.将RDD转换为携带当前时间戳不做缓存
    val nocache = listRDD.map(_.toString+System.currentTimeMillis)

    //4>.查看nocache的Lineage(血统)关系
    System.out.println(nocache.toDebugString)

    //5>.多次打印无缓存结果
    nocache.collect.foreach(println)
    nocache.collect.foreach(println)
    nocache.collect.foreach(println)
    nocache.collect.foreach(println)
    nocache.collect.foreach(println)

    //6>.将RDD转换为携带当前时间戳并做缓存
    val cache =  listRDD.map(_.toString+System.currentTimeMillis).cache

    //7>.查看cache的Lineage(血统)关系
    System.out.println(cache.toDebugString)

    //8>.多次打印缓存结果
    cache.collect.foreach(println)
    cache.collect.foreach(println)
    cache.collect.foreach(println)
    cache.collect.foreach(println)
    cache.collect.foreach(println)

    //9.释放资源
    sc.stop()
  }
}

六.RDD 检查点(CheckPoint)

1>.检查点概述

  Spark中对于数据的保存除了持久化操作之外,还提供了一种检查点的机制,检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。

  检查点通过将数据写入到HDFS文件系统实现了RDD的检查点功能。为当前RDD设置检查点,该函数将会创建一个二进制的文件,并存储到checkpoint目录中,该目录是用SparkContext.setCheckpointDir()设置的。

  在checkpoint的过程中,该RDD的所有依赖于父RDD中的信息将全部被移除。对RDD进行checkpoint操作并不会马上被执行,必须执行Action操作才能触发。

2>.检查点代码实现案例

package com.yinzhengjie.bigdata.spark.cache

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object CheckpointDemo {
  def main(args: Array[String]): Unit = {
    //初始化配置信息及SparkContext
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")
    val sc = new SparkContext(sparkConf)

    //设置检查点的保存目录,实际工作中应该使用hdfs路径,本地目录一般用于测试使用
    sc.setCheckpointDir("E:\yinzhengjie\bigdata\spark\checkpoint")

    //创建一个RDD
    val listRDD: RDD[String] = sc.parallelize(Array("yinzhengjie2020"))

    //将RDD转换为携带当前时间戳
    val nocache = listRDD.map(_.toString+System.currentTimeMillis)

    //设置检查点,数据会被持久化到sc上定义的检查点保存目录
    nocache.checkpoint()

    //使用行动算子多次打印结果
    nocache.collect().foreach(println)
    nocache.collect().foreach(println)
    nocache.collect().foreach(println)
    nocache.collect().foreach(println)
    nocache.collect().foreach(println)

    //查看Lineage(血统)
    println(nocache.toDebugString)

    //释放资源
    sc.stop()
  }
}
原文地址:https://www.cnblogs.com/yinzhengjie2020/p/13155362.html