RDD 编程(一)

1. RDD 的创建

创建 RDD 有三种方式:

  • 从集合中创建
  • 从外部存储创建
  • 从其他 RDD 转换得到新的 RDD

1.1 从集合中创建

1、使用 parallelize 函数:

import org.apache.spark.sql.SparkSession

object CreateRdd {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder.appName("create_rdd").master("local[2]").getOrCreate()
    val sc = session.sparkContext
    val arr = Array(10, 20, 30, 40, 50, 60)
    val rdd1 = sc.parallelize(arr)

    rdd1.collect().foreach(println)

    sc.stop()
  }
}

注意:parallelize 函数第二个参数可以指定分区数目

2、使用 makeRDD 函数:

val rdd1 = sc.makeRDD(arr)

1.2 从外部存储创建

外部存储包括:

  • 任意 Hadoop 支持的存储数据源来创建分布式数据集
  • 本地文件系统, HDFS, Cassandra, HVase, Amazon S3 等等
val rdd1 = sc.textFile(URL)
  • URL 可以是本地系统文件,但是必须每个节点都要存在这个路径
  • 可以是 hdfs 路径:hdfs://...
  • 所有基于文件的方法,支持目录、压缩文件、通配符(*):textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz")
  • textFile 第二个参数,表示分区数,默认情况下每个块对应一个分区(对 HDFS 来说,块大小默认是 128M,HDFS 文件有多少个就有多少个分区),可以传递一个大于块数的分区数,但是不能传递一个比块数小的分区数

1.3 从其他 RDD 转换为新的 RDD

1.4 RDD 操作

RDD 支持两种操作:transformation、action 操作:

  • 转换操作 transformation:从一个已知的 rdd 中创建处理一个新的 rdd,转换操作是惰性的,只有遇到行动操作才会触发计算
  • 行动操作 action:数据集计算结束后,给驱动程序返回一个值

RDD 类型

根据数据类型不同,整体分为:

  • value 类型
  • key-value 类型(二维数组)

注意:每次 action 前面的 transformation 都会被重新计算,但可以通过缓存来加快访问速度

2. value 类型

2.1 map(func)

返回一个新的 RDD, 该 RDD 是由原 RDD 的每个元素经过函数转换后的值而组成. 就是对 RDD 中的数据做转换,用来做数据结构的调整

val arr = Array(10, 20, 30, 40, 50, 60)

//    val rdd1 = sc.parallelize(arr).map(x => x * 2)
val rdd1 = sc.parallelize(arr).map(_ * 2)

2.2 mapPartitions(func)

功能类似于 map,但是独立在每个分区上运行,假设有N个元素,有M个分区,那么 map 的函数的将被调用N次,而 mapPartitions 被调用M次,一个函数一次处理所有分区:

// mapPartitions, x.map() 用的是 scala 的 map 函数,而非 spark 的 map
//    val rdd2 = sc.parallelize(arr, 4).mapPartitions(x => x.map(_ * 2))

val rdd2 = sc.parallelize(arr, 4).mapPartitions(x => {
    println("called one times!!!!!!")

    x.map(_ * 3)
})

运行结果:

called one times!!!!!! 会输出 4 次,因为有 4 个分区:

called one times!!!!!!
called one times!!!!!!
called one times!!!!!!
called one times!!!!!!
30
60
90
120
150
180

注意:当每个分区的数据很大时,使用 mapPartitions x.toList() 可能会把内存撑爆,从而导致 OOM;当内存足够大的时候,使用 mapPartitions 执行效率要比 map

2.3 mapPartitionsWithIndex(func)

mapPartitions(func)类似. 但是会给 func 多提供一个 Int 值来表示分区的索引. 所以func的类型是:(Int, Iterator<T>) => Iterator<U>

val rdd3 = sc.parallelize(arr, 2).mapPartitionsWithIndex((index, it) => it.map(x => (index, x * 2)))
rdd3.collect().foreach(println)
sc.stop()

运行结果:

(0,20)
(0,40)
(0,60)
(1,80)
(1,100)
(1,120)

2.4 flatMap(func)

flatMap 返回一个序列,输入一个可能会返回 0 个或多个,map 是一一对应的:

// list1 中每个元素都是一个集合,flatMap 会将集合中的元素拍散,放在一个新的集合中
val list1 = List(1 to 5, 6 to 11, 12 to 18, 18 to 25)
val rdd1 = sc.parallelize(list1, 2)

val rdd2 = rdd1.flatMap(x => x)

运行结果:

1
2
3
4
.
.
.
24
25

其他用法:

val list2 = List(30, 5, 70, 6, 1, 20)
val rdd3 = sc.parallelize(list2).flatMap(x => List(x, x * x, x * x * x))

// 结果:2、4、8,else 返回空列表
val list3 = List(1, 2, 3)
val rdd4 = sc.parallelize(list3, 2).flatMap(x => if (x % 2 == 0) List(x, x * x, x * x * x) else List[Int]())

2.5 glom()

将每个分区元素合并成一个数组,形成新的 rdd,类型是 RDD[Array[T]]

val list1 = List(1, 2, 3, 4, 5, 6)
val rdd = sc.parallelize(list1, 2).glom().map(x => x.toList)
//    val rdd = sc.parallelize(list1, 2).glom().map(_.toList)

运行结果:

List(1, 2, 3)
List(4, 5, 6)

2.6 groupBy(func)

按照 func 的返回值进行分组,func 的返回值作为 key, 对应值放入一个迭代器中,返回:RDD[(K, Iterable[T])],但是顺序不能保证(不推荐使用, groupByKey 用的比较多)

按照元素的奇偶性进行分组:

val rdd = sc.parallelize(List(30, 50, 7, 6, 1, 20), 2)
val rdd2 = rdd.groupBy(x => x % 2)
/*
奇数、偶数聚合
rdd2 =
  (0,CompactBuffer(30, 50, 6, 20))
  (1,CompactBuffer(7, 1))
  
  rdd3
  (0,106)
  (1,8)
  */
val rdd3 = rdd2.map {
  case (k, it) => (k, it.sum)
}
rdd3.collect().foreach(println)

2.7 filter(func)

作用: 过滤. 返回一个新的 RDD 是由 func 的返回值为 true 的那些元素组成

// 过滤出大于 20 的元素
val rdd = sc.parallelize(List(30, 50, 7, 6, 1, 20), 2).filter(x => x > 20)

2.8 sample(withReplacement, fraction, seed)

作用:

  • 以指定的随机种子随机抽样出比例为fraction的数据,(抽取到的数量是: size * fraction). 需要注意的是得到的结果并不能保证准确的比例.
  • withReplacement 表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样. 放回表示数据有可能会被重复抽取到, false 则不可能重复抽取到. 如果是false, 则fraction必须是:[0,1], 是 true 则大于等于0就可以了.
  • seed 用于指定随机数生成器种子。 一般用默认的, 或者传入当前的时间戳
val rdd = sc.parallelize(1 to 26, 2)

//    不放回抽样,比例[0, 1]
//    val sample_rdd = rdd.sample(false, 0.1)

// 放回抽样
val sample_rdd = rdd.sample(true, 1)

2.9 distinct([numTasks]))

RDD 中元素执行去重操作. 参数表示任务的数量,默认值和分区数保持一致:

val list1 = List(1, 2, 3, 4, 5, 6, 6, 4, 3)
val rdd = sc.parallelize(list1, 2).distinct()

模板匹配对象去重:

import org.apache.spark.sql.SparkSession

object Distinct {
  def main(args: Array[String]): Unit = {
    val session = SparkSession.builder.appName("create_rdd").master("local[2]").getOrCreate()
    val sc = session.sparkContext

    val list1 = List(1, 2, 3, 4, 5, 6, 6, 4, 3)
//    val rdd = sc.parallelize(list1, 2).distinct()

    val rdd = sc.parallelize(List(User(10, "lisi"), User(20, "zs"), User(10, "ab")), 2).distinct(2)
    rdd.collect().foreach(println)
    sc.stop()

  }
}

case class User(age: Int, name: String) {
  override def hashCode(): Int = this.age

  override def equals(obj: Any): Boolean = obj match {
    case User(age, _) => this.age == age
    case _ => false
  }
}

注意:distinctshuffle 操作

2.10 coalesce(numPartitions)

作用:减少分区到指定数目,用于大数据集过滤后,提高小数据集的执行效率,不常用,常用 reparation

由原来 5 个分区减少到 2 个分区:

val rdd = sc.parallelize(List(30, 50, 70, 60, 10, 20), 5)
println("分区前:" + rdd.getNumPartitions)

val rdd2 = rdd.coalesce(2)
println("分区后:" + rdd2.getNumPartitions)

rdd.collect().foreach(println)

coalesce 一般用来减少分区,也可以增加分区,增加分区时一定需要 shuffle,减少分区一般不会 shuffle

// 增加分区,第二个参数为 true,表示增加分区
rdd.coalesce(6, true)

2.11 repartition(numPartitions)

作用: 根据新的分区数, 重新 shuffle 所有的数据, 这个操作总会通过网络,新的分区数相比以前可多,也可以少:

rdd.repartition(6)

coalasce 和 reparation 的区别

  • coalasce:重新分区可选择是否进行 shuffle,由第二个参数 Boolean = false/true 决定
  • reparation:实质调用的 coalasce 进行 shuffle
  • 若减少分区,尽量减少 shuffle 使用 coalasce

2.12 sortBy(func,[ascending], [numTasks])

作用:使用 func 先对数据进行处理,而后对结果进行排序,默认为正序

val list1 = List("aaa", "ccc", "bbb", "ddd", "eee")
val rdd1 = sc.parallelize(list1, 2)
val rdd = rdd1.sortBy(x => x)
//    val rdd = rdd1.sortBy(x => x, ascending = false)
//    val rdd = rdd1.sortBy(x => x.length, ascending = false)

2.13 pipe(command, [envVars])

作用:管道,针对每个分区,把 RDD 中的每个数据通过管道传递给shell命令或脚本,返回输出的 RDD。一个分区执行一次这个命令. 如果只有一个分区, 则执行一次命令,shell 脚本必须在 Worker 节点

1、创建脚本 test_pipe.sh

#!/bin/bash

echo Hello
while read line;do
    echo ">>>>>>" $line
done

# 记得添加权限,否则第二步调用会报错,没有权限 chmod +x test_pipe.sh

2、spark-shell 中调用 test_pipe.sh

// 一个分区
scala> val rdd1 = sc.parallelize(List(1, 2, 3, 4, 5, 6))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> rdd1.pipe("/home/hadoop/apps/test_pipe.sh").collect()
res5: Array[String] = Array(Hello, >>>>>> 1, >>>>>> 2, >>>>>> 3, >>>>>> 4, >>>>>> 5, >>>>>> 6)

// 二个分区 每个分区执行一次脚本, 但是每个元素算是标准输入中的一行
scala> val rdd2 = sc.parallelize(List(1, 2, 3, 4, 5, 6), 2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> rdd2.pipe("/home/hadoop/apps/test_pipe.sh").collect()
res6: Array[String] = Array(Hello, >>>>>> 1, >>>>>> 2, >>>>>> 3, Hello, >>>>>> 4, >>>>>> 5, >>>>>> 6)

3. 双 Value 类型

所谓双 value 类型即两个 rdd 进行交互,常用的有:union、subtract、zip

3.1 union(otherDataset) 并集

  • union:并集
  • intersection:交集
  • subtract:差集
  • cartesian:笛卡尔积

作用:求并集,返回一个新的 rdd

val list1 = 1 to 15
val list2 = 13 to 26

val rdd1 = sc.parallelize(list1, 2)
val rdd2 = sc.parallelize(list2, 2)

// 并集
val union_rdd = rdd1.union(rdd2)

// 交集
val intersection_rdd = rdd1.intersection(rdd2)

// 差集
val subtract_rdd = rdd1.subtract(rdd2)

// 笛卡尔积
val cartesian_rdd = rdd1.cartesian(rdd2)

3.2 zip(otherDataset) 并集

作用:拉链操作,两个 rddd元素和分区数必须一致,否则异常(scala 中元素个数可以不一致)

1、元素个数、分区数一致:

val list1 = 1 to 5
val list2 = List("A", "B", "C", "D", "E")
val rdd1 = sc.parallelize(list1, 2)
val rdd2 = sc.parallelize(list2, 2)
val rdd = rdd1.zip(rdd2)

运行结果:

(1,A)
(2,B)
(3,C)
(4,D)
(5,E)

2、分区数不一致:

val list1 = 1 to 5
val list2 = List("A", "B", "C", "D", "E")
val rdd1 = sc.parallelize(list1, 2)
val rdd2 = sc.parallelize(list2, 3)
val rdd = rdd1.zip(rdd2)

运行结果:

Can't zip RDDs with unequal numbers of partitions: List(2, 3)

3、其他函数:

// 和元素索引组成一个元组 (元素,索引),如:(A,0)、(B,1)
//    val rdd = rdd2.zipWithIndex()

// 将多个RDD按照partition组合成为新的RDD,该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求
val rdd = rdd1.zipPartitions(rdd2)((it1, it2) => {
  it1.zipAll(it2, 100, 200)
})

运行结果:

(1,A)
(2,B)
(3,C)
(4,D)
(5,E)

4. key-value(PairRDD) 类型

key-value 键值对类型,大多数有 shuffle 操作,其结构类似于:(key, value),常用的有:groupByKey、reduceByKey、partitionBy 等。

作用: 对 pairRDD 进行分区操作,如果原有的 partionRDD 的分区器和传入的分区器相同, 则返回原 pairRDD,否则会生成 ShuffleRDD,即会产生 shuffle 过程

原文地址:https://www.cnblogs.com/midworld/p/15391273.html