Spark RDD编程(1) Value类型

RDDResilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象。代码中是一个抽象类,它代表一个不可变、可分区、里面的元素可并行计算的集合。

 

RDD的创建

1.从集合中创建

从集合中创建RDD,Spark主要提供了两种函数:parallelize和makeRDD

val listRDD: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5))
listRDD.collect().foreach(println)
val arrayRDD: RDD[String] = sc.parallelize(Array("abc", "xyz"))
arrayRDD.collect().foreach(println)

2.由外部存储系统的数据集创建

//从外部存储创建RDD。默认最小并行度2,可能大于2.取决于hadoop读取文件的分片规则
val fileRDD: RDD[String] = sc.textFile("input")
fileRDD.collect().foreach(println)

RDD的转换

1.1 map:返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成

val listRDD: RDD[Int] = sc.makeRDD(1 to 10)

val mapPartitionsRDD = listRDD.map(_ * 2)

mapPartitionsRDD.collect().foreach(println)

1.2  mapPartitions:类似于map,但独立地在RDD的每一个分片上运行,因此在类型为TRDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]假设有N个元素M个分区,那么map的函数的将被调用N,mapPartitions被调用M,一个函数一次处理所有分区。

val listRDD: RDD[Int] = sc.makeRDD(1 to 10)

val mapPartitionsRDD = listRDD.mapPartitions(data => data.map(_ * 2))

mapPartitionsRDD.collect().foreach(println)

1.3 mapPartitionsWithIndex:类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为TRDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]

val listRDD: RDD[Int] = sc.makeRDD(1 to 10)

val mapPartitionsRDD = listRDD.mapPartitionsWithIndex((index,data) => data.map((_,"分区:" + index)))

mapPartitionsRDD.collect().foreach(println)
//(1,分区:0)
//(2,分区:1)
//(3,分区:2)
//(4,分区:3)
//(5,分区:3)
//(6,分区:4)
//(7,分区:5)
//(8,分区:6)
//(9,分区:7)
//(10,分区:7)

map()mapPartition()的区别

① map():每次处理一条数据。

② mapPartition():每次处理一个分区的数据,这个分区的数据处理完后,原RDD中分区的数据才能释放,可能导致OOM

③ 开发指导:当内存空间较大的时候建议使用mapPartition(),以提高处理效率。

2 flatMap:类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)

val listRDD: RDD[String] = sc.makeRDD(List("abc", "xyz"))

val flatMapRDD: RDD[Char] = listRDD.flatMap(_.toCharArray)

flatMapRDD.collect().foreach(println)
//a
//b
//c
//x
//y
//z

3 glom:将每一个分区形成一个数组,形成新的RDD类型是RDD[Array[T]]

val listRDD: RDD[Int] = sc.makeRDD(1 to 18, 4)

val glomRDD: RDD[Array[Int]] = listRDD.glom()

glomRDD.collect().foreach(data => println(data.mkString(",")))
//1,2,3,4
//5,6,7,8,9
//10,11,12,13
//14,15,16,17,18

4 groupBy:分组,按照传入函数的返回值进行分组。将相同的key对应的值放入一个迭代器。

val listRDD: RDD[Int] = sc.makeRDD(1 to 10)

val groupByRDD: RDD[(Int, Iterable[Int])] = listRDD.groupBy(_ % 2)

groupByRDD.collect().foreach(println)
//(0,CompactBuffer(2, 4, 6, 8, 10))
//(1,CompactBuffer(1, 3, 5, 7, 9))

5 filter:过滤。返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。

val listRDD: RDD[Int] = sc.makeRDD(1 to 10)

val filterRDD: RDD[Int] = listRDD.filter(_ % 2 == 0)
filterRDD.collect().foreach(println)
//2
//4
//6
//8
//10

6 sample(withReplacement, fraction, seed):以指定的随机种子随机抽样出数量为fraction的数据,withReplacement表示是抽出的数据是否放回,true为有放回的抽样,false为无放回的抽样seed用于指定随机数生成器种子。

val listRDD: RDD[Int] = sc.makeRDD(1 to 10)

val sampleRDD: RDD[Int] = listRDD.sample(false, 0.5, 3)

sampleRDD.collect().foreach(println)
//2
//3
//6
//7
//8
//9
//10

7 distinct:对源RDD进行去重后返回一个新的RDD默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数改变它。

val listRDD: RDD[Int] = sc.makeRDD(List(1, 1, 2, 2, 3, 3, 4, 4, 5, 5))

//默认cpu核数分区
val distinctRDD: RDD[Int] = listRDD.distinct()
distinctRDD.glom().collect().foreach(data => println(data.mkString(",")))
//4
//1,5
//2
//3

//指定分区
val distinctRDD2 = listRDD.distinct(2)
distinctRDD2.glom().collect().foreach(data => println(data.mkString(",")))
//4,2
//1,3,5

8 coalesce:缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。没有shuffle过程,只是合并其中的某些分区,不会打乱。

val listRDD: RDD[Int] = sc.makeRDD(1 to 16, 4)
listRDD.glom().collect().foreach(data => println(data.mkString(",")))
//1,2,3,4
//5,6,7,8
//9,10,11,12
//13,14,15,16

//合并为2个分区,默认没有shuffle过程,将1、2区合并,3、4区合并
listRDD.coalesce(2).glom().collect().foreach(data => println(data.mkString(",")))
//1,2,3,4,5,6,7,8
//9,10,11,12,13,14,15,16

9 repartition:根据分区数,重新通过网络随机洗牌所有数据。数据打乱重分区,有shuffle过程。

val listRDD: RDD[Int] = sc.makeRDD(1 to 16, 4)
listRDD.glom().collect().foreach(data => println(data.mkString(",")))
//1,2,3,4
//5,6,7,8
//9,10,11,12
//13,14,15,16

//重新分为2个区,调用的也是coalesce方法,只不过默认进行shuffle,将分区打乱重组
listRDD.repartition(2).glom().collect().foreach(data => println(data.mkString(",")))
//1,3,5,7,9,11,13,15
//2,4,6,8,10,12,14,16

coalescerepartition的区别

① coalesce重新分区,可以选择是否进行shuffle过程。由参数shuffle: Boolean = false/true决定。

② repartition实际上是调用的coalesce,默认是进行shuffle的。

def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  coalesce(numPartitions, shuffle = true)
}

10 sortBy(func,[ascending], [numTasks]):使用func先对数据进行处理,按照处理后的数据比较结果排序,默认为正序。

val listRDD: RDD[Int] = sc.makeRDD(List(2, 4, 1, 5, 3))

//按照自身大小排序,默认为升序
val asc: Array[Int] = listRDD.sortBy(x => x).collect()
println(asc.mkString(","))
//1,2,3,4,5

//降序
val desc = listRDD.sortBy(x => x, false).collect()
println(desc.mkString(","))
//5,4,3,2,1

 

原文地址:https://www.cnblogs.com/noyouth/p/12961665.html