Spark(RDD)

RDD

1.所谓的RDD,其实就是一个数据结构,类似于链表中的Node

2.RDD中有适合并行计算的分区操作

3.RDD中封装了最小的计算单元,目的是更适合重复使用

4.Spark的计算主要就是通过组合RDD的操作,完成业务需求

1.从集合(内存)中创建RDD

从集合中创建RDD,Spark主要提供了两个方法:parallelize和makeRDD

val conf = new SparkConf().setMaster("local[*]").setAppName("RDD")
        conf.set("spark.default.parallelism", "4")
        val sc = new SparkContext(conf)
      val rdd: RDD[Int] = sc.makeRdd(List(1,2,3,4),2)
      val rdd: RDD[Int] = sc.textFile("input",2)
      sc.stop()

2.RDD转换算子

RDD根据数据处理方式的不同将算子整体上分为Value类型、双Value类型和Key-Value类型

value类型

1)map

将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
val dataRDD: RDD[Int] = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD1: RDD[Int] = dataRDD.map(
    num => {
        num * 2
    }
)
val dataRDD2: RDD[String] = dataRDD1.map(
    num => {
        "" + num
    }
)

2)mapPartitions

将待处理的数据以分区为单位发送到计算节点进行处理,map 是一个数据一个数据的处理,mapPartitions是一个分区为处理单位
// TODO 算子 - 转换 -
        val rdd = sc.makeRDD(List(1,2,3,4), 2)
        // 获取每个数据分区的最大值
        // 【1,2】【3,4】
        val rdd1 = rdd.mapPartitions(
            list => {
                val max = list.max
                List(max).iterator
            }
        )
        rdd1.collect.foreach(println)

3)mapPartitionsWithIndex

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
 val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),2)

    val rdd2: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex {
      case (index, datas) => {

        datas.map((_,index))
      }
    }

4)flatMap

将处理的数据进行扁平化后再进行映射处理,
val rdd = sc.makeRDD(
            List(
                "Hello Scala", "Hello Spark"
            )
        )
        val rdd1 = sc.makeRDD(
            List(
                List(1,2), List(3,4)
            )
        )

        // 整体 => 个体
        //val rdd2 = rdd.flatMap(_.split(" "))
        val rdd2 = rdd.flatMap(
            str => { // 整体(1)
                // 容器(个体(N))
                str.split(" ")
            }
        )

        val rdd3 = rdd1.flatMap(
            list => {
                list
            }
        )

val rdd = sc.makeRDD(
            List(List(1,2),3,List(4,5))
        )

        val rdd1 = rdd.flatMap {
            case list : List[_] => list
            case other => List(other)
        }

        rdd1.collect.foreach(println)

5)glom

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变 
相当于聚合,将相同分区的数据放到一个数组中
val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6), 2)

        val rdd1: RDD[Array[Int]] = rdd.glom()

        rdd1.collect().foreach(a => println(a.mkString(",")))

 val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6),2)
    val rdd2: RDD[Array[Int]] = rdd.glom()
     val rdd3: RDD[Int] = rdd2.map(_.max)
    println(rdd3.collect().sum)

6)groupBy 可以实现wordcount

默认情况下,数据处理后,所在分区不会发生改变
Spark要求,一个组的数据必须在一个分区中
一个分区的数据被打乱重新和其他分区的数据组合在一起,这个操作称之为shuffle
shuffle操作不允许在内存中等待,必须落盘
从服务器日志数据apache.log中获取每个时间段访问量

val rdd: RDD[String] = sc.textFile("data/apache.log")
   //(10,1),(11,1),(10,1)
    val rdd2: RDD[(String, Int)] = rdd.map(line => {
      val strings: Array[String] = line.split(" ")
      val str: String = strings(3)
      val strings1: Array[String] = str.split(":")
      (strings1(1), 1)
    })
    val rdd3: RDD[(String, Iterable[(String, Int)])] = rdd2.groupBy(_._1)
    //(10,list((10,1),(10,1)))
    val rdd4: RDD[(String, Int)] = rdd3.mapValues(_.size)
    rdd4.collect().foreach(println)

7)filter

将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。

当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出现数据倾斜。

 val rdd : RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6))

        // filter算子可以按照指定的规则对每一条数据进行筛选过滤
        // 数据处理结果为true,表示数据保留,如果为false,数据就丢弃
        val rdd1 = rdd.filter(
            num => num % 2 == 1    //等于1的留下  其他的舍弃
        )

        rdd1.collect.foreach(println)

8)sample

// 抽取数据,采样数据
        // 第一个参数表示抽取数据的方式:true. 抽取放回,false. 抽取不放回
        // 第二个参数和第一个参数有关系
        //     如果抽取不放回的场合:参数表示每条数据被抽取的几率
        //     如果抽取放回的场合:参数表示每条数据希望被重复抽取的次数
        // 第三个参数是【随机数】种子
        //     随机数不随机,所谓的随机数,其实是通过随机算法获取的一个数
        //     3 = xxxxx(10)
        //     7 = xxxxx(3)
        //val rdd1: RDD[Int] = rdd.sample(false, 0.5)
        //val rdd1: RDD[Int] = rdd.sample(true, 2)
        val rdd1: RDD[Int] = rdd.sample(false, 0.5, 2)
        rdd1.collect.foreach(println)

9) coalesce

 // TODO 算子 - 转换 - 缩减分区
        val rdd : RDD[Int] = sc.makeRDD(
            List(1,2,3,4,5,6), 3
        )

        // 缩减 (合并), 默认情况下,缩减分区不会shuffle
        //val rdd1: RDD[Int] = rdd.coalesce(2)
        // 这种方式在某些情况下,无法解决数据倾斜问题,所以还可以在缩减分区的同时,进行数据的shuffle操作
        val rdd2: RDD[Int] = rdd.coalesce(2, true)   //true代表数据的shuffle操作  将数据打乱和其他分区的数据组合在一起

        rdd.saveAsTextFile("output")
        rdd2.saveAsTextFile("output1")

10) distinct

底层有shuffle

 // TODO 算子 - 转换
        val rdd : RDD[Int] = sc.makeRDD(
            List(1,1,1)
        )

        // map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
        // 【1,1,1】
        // 【(1, null),(1, null),(1, null)】
        // 【null, null, null】
        // 【null, null】
        // 【(1, null)】
        // 【1】
        val rdd1: RDD[Int] = rdd.distinct()
        rdd1.collect.foreach(println)

        //List(1,1,1,1,1).distinct

11) repartition

该操作内部其实执行的是coalesce操作,参数shuffle的默认值为true。无论是将分区数多的RDD转换为分区数少的RDD,还是将分区数少的RDD转换为分区数多的RDD,
repartition操作都可以完成,因为无论如何都会经shuffle过程。
 // TODO 算子 - 转换 - 缩减分区
        val rdd : RDD[Int] = sc.makeRDD(
            List(1,2,3,4,5,6), 2
        )
        // 扩大分区 - repartition
        // 在不shuffle的情况下,coalesce算子扩大分区是没有意义的。
        //val rdd1: RDD[Int] = rdd.coalesce(3, true)

        val rdd1: RDD[Int] = rdd.repartition(3)  

12) sortBy

排序 默认升序,第二个参数改为flase即为降序
该操作用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序,
默认为升序排列。排序后新产生的RDD的分区数与原RDD的分区数一致。中间存在shuffle的过程
    val rdd: RDD[Int] = sc.makeRDD(List(1,3,6,2,1,7,5),2)
   val rdd1: RDD[Int] = rdd.sortBy(num=>num,false,3)    //参数1:排序规则 参数2:默认升序  flase为降序,参数3:3为分区数,默认分区不变
    rdd1.collect().foreach(println)
    sc.stop()

双Value类型

1)intersection(交集)union(并集)subtract(差集)zip(拉链)

intersection:对源RDD和参数RDD求交集后返回一个新的RDD 如果数据类型不一致的话报错

union:对源RDD和参数RDD求并集后返回一个新的RDD,保留重复元素

subtract:以一个RDD元素为主,去除两个RDD中重复元素,将其他元素保留下来。求差集

zip:将两个RDD中的元素,以键值对的形式进行合并。其中,键值对中的Key为第1个RDD中的元素,Value为第2个RDD中的相同位置的元素。每个分区元素个数相同,分区数相同

// TODO 算子 - 转换 - 排序
    val rdd : RDD[Int] = sc.makeRDD(
      List(1,2,3,4),2
    )

    val rdd1 : RDD[Int] = sc.makeRDD(
      List(3,4,5,6),2
    )
    val rdd2 : RDD[String] = sc.makeRDD(
      List("3","4","5", "6"),2
    )
    // 交集
    println(rdd.intersection(rdd1).collect().mkString(","))
    // 并集
    println(rdd.union(rdd1).collect().mkString(","))
    // 差集
    println(rdd.subtract(rdd1).collect().mkString(","))

    // 拉链
    // 英文翻译:
    // Can only zip RDDs with same number of elements in each partition  每个分区元素个数相同
    // Can't zip RDDs with unequal numbers of partitions: List(2, 3) 分区个数相同
    println(rdd.zip(rdd1).collect().mkString(","))
    println(rdd.zip(rdd2).collect().mkString(","))
打印结果
4,3
1,2,3,4,3,4,5,6
2,1
(1,3),(2,4),(3,5),(4,6)
(1,3),(2,4),(3,5),(4,6)

Key-Value类型

1)partitionBy

将数据按照指定Partitioner重新进行分区。Spark默认的分区器是HashPartitioner

val rdd : RDD[Int] = sc.makeRDD(
      List(1,2,3,4),2
    )
    val rdd1: RDD[(Int, Int)] = rdd.map((_,1))
    val rdd2: RDD[(Int, Int)] = rdd1.partitionBy(new HashPartitioner(2))
  rdd2.collect().foreach(println)

2)reduceByKey (可以实现wordcount)

可以将数据按照相同的Key对Value进行聚合 对key分组然后进行对value聚合 相当于groupByKey+mapValues

 // TODO 算子 - 转换 - 排序
    val rdd  = sc.makeRDD(
     List(("a",1),("b",1),("a",2))
    )
    val rdd1: RDD[(String, Int)] = rdd.reduceByKey(_+_)
    rdd1.collect().foreach(println)
    sc.stop()

3)groupByKey 有shuffle流程

// TODO groupByKey & groupBy
        // 1. groupBy不需要考虑数据类型,groupByKey必须保证数据kv类型
        // 2. groupBy按照指定的规则进行分组,groupByKey必须根据key对value分组
        // 3. 返回结果类型
        //    groupByKey => (String, Iterable[Int])
        //    groupBy    => (String, Iterable[(String, Int)])
val rdd : RDD[(String, Int)] = sc.makeRDD(
            List(
                ("a", 1),
                ("a", 1),
                ("a", 1)
            )
        )
val rdd1: RDD[(String, Iterable[Int])] = rdd.groupByKey()

        val rdd2: RDD[(String, Int)] = rdd1.mapValues(_.size)

        rdd2.collect.foreach(println)

reduceBykey 和groupByKey的区别
从shuffle的角度:reduceByKey和groupByKey都存在shuffle的操作,但是reduceByKey可以在shuffle前对分区内相同key的数据进行预聚合(combine)功能,
这样会减少落盘的数据量,从而减少了读取数据量的时间。而groupByKey只是进行分组,不存在数据量减少的问题,reduceByKey性能比较高。
从功能的角度:reduceByKey其实包含分组和聚合的功能。groupByKey只能分组,不能聚合,所以在分组聚合的场合下,
推荐使用reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用groupByKey

4)aggregateByKey

aggregateByKey算子存在函数柯里化
第一个参数列表中有一个参数
参数为零值,表示计算初始值 zero, z, 用于数据进行分区内计算
第二个参数列表中有两个参数
第一个参数表示 分区内计算规则
第二个参数表示 分区间计算规则
 val rdd: RDD[(String, Int)] = sc.makeRDD(List(
      ("a", 1), ("a", 2), ("c", 3),
      ("b", 4), ("c", 5), ("c", 6)
    ), 2)
    val rdd1: RDD[(String, Int)] = rdd.aggregateByKey(0)(
      (x, y) => math.max(x, y),
      (x, y) => x + y
    )
    rdd1.collect().foreach(println)
(b,4)
(a,2)
(c,9)

5)foldByKey

如果aggregateByKey算子的分区内计算逻辑和分区间计算逻辑相同,那么可以使用foldByKey算子简化

// TODO aggregateByKey也可以实现WordCount ( 4 / 10 )
        val rdd2 = rdd.aggregateByKey(0)(_+_, _+_)

        // TODO foldByKey也可以实现WordCount ( 5 / 10 )
        // TODO 如果aggregateByKey算子的分区内计算逻辑和分区间计算逻辑相同,那么可以使用foldByKey算子简化
        val rdd3 = rdd.foldByKey(0)(_+_)

6)combineByKey

 // combineByKey算子有三个参数
        // 第一个参数表示 当第一个数据不符合我们的规则时,用于进行转换的操作
        // 第二个参数表示 分区内计算规则
        // 第三个参数表示 分区间计算规则
val rdd = sc.makeRDD(
            List(
                ("a", 1), ("a", 2), ("b", 3),
                ("b", 4), ("b", 5), ("a", 6)
            ),
            2
        )
val rdd2 = rdd.combineByKey(
            num => (num, 1),
            (x : (Int, Int), y) => {
                (x._1 + y, x._2 + 1)
            },
            ( x : (Int, Int), y:(Int, Int) ) => {
                (x._1 + y._1, x._2 + y._2)
            }
        )
        rdd2.collect.foreach(println)

7)sortByKey

返回一个按照key进行排序

    val dataRDD1 = sc.makeRDD(List(("a",1),("b",2),("c",3)))
    val value: RDD[(String, Int)] = dataRDD1.sortByKey()
    value.collect().foreach(println)

8)join

spark中join操作主要针对于两个数据集中相同的key的数据连接
join操作可能会产生笛卡尔乘积,可能会出现shuffle,性能比较差
所以如果能使用其他方式实现同样的功能,不推荐使用join
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
    val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))
    rdd.join(rdd1).collect().foreach(println)

行动算子

1)aggregate

// aggregate & aggregateByKey的区别?
        // 1. 数据格式
        // 2. aggregateByKey是一个转换算子,所以执行后会产生新的RDD
        //    aggregate是一个行动算子,所以执行后会得到结果
        // 3. aggregateByKey执行计算时,初始值只会参与分区内计算
        //    aggregate执行计算时,初始值会参与分区内计算,也会参与分区间的计算
        //  【1,4】,【3,2】
        //  【5,1,4】,【5,3,2】
        // 【10】【10】
        // 【5, 10, 10】
val rdd = sc.makeRDD(List(1,4,3,2),2)
        val i: Int = rdd.aggregate(5)(_ + _, _ + _)
        val j: Int = rdd.fold(5)(_ + _)
        val k: Int = rdd.reduce(_ + _)
25

2)collect

在驱动程序(Driver)中,以数组Array的形式返回数据集的所有元素
将数据从Executor端采集回到Driver端
// collect会将数据全部拉取到Driver端的内存中,形成数据集合,可能会导致内存溢出
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))

// 收集数据到Driver
rdd.collect().foreach(println)

3)foreach

collect是按照分区号进行采集,先采集0号分区,依次采集

        // TODO 算子 - 行动
        val rdd = sc.makeRDD(
            List(1,4,3,2),2
        )

        // collect是按照分区号码进行采集
        rdd.collect.foreach(println)
        println("****************************")
        rdd.foreach(println)
1
4
3
2
****************************
1
3
4
2
原文地址:https://www.cnblogs.com/xiao-bu/p/14843007.html