RDD算子分类
RDD 中的算子从功能上分为两大类
- Transformation(转换) 它会在一个已经存在的 RDD 上创建一个新的 RDD, 将旧的 RDD 的数据转换为另外一种形式后放入新的 RDD,让RDD之间具有联系,只是生成RDD链条,并不会真的执行整个程序,只有在动作Action时,程序才会执行。
- Action(动作) 执行各个分区的计算任务, 将的到的结果返回到 Driver 中
执行 RDD 的时候, 在执行到 Transformation 转换操作的时候, 并不会立刻执行, 直到遇见了 Action 操作, 才会触发真正的执行, 这个特点叫做 惰性求值
默认情况下, 每一个 Action 运行的时候, 其所关联的所有 Transformation RDD 都会重新计算, 但是也可以使用 presist 方法将 RDD 持久化到磁盘或者内存中. 这个时候为了下次可以更快的访问, 会把数据保存到集群上.
下面对这两个方面的一些重要算子进行总结。
Transformation
转换 | 含义 |
---|---|
map(func) | 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成 |
filter(func) | 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成 |
flatMap(func) | 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素) |
mapPartitions(func) | 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U] |
mapPartitionsWithIndex(func) | 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U] |
union(otherDataset) | 对源RDD和参数RDD求并集后返回一个新的RDD |
intersection(otherDataset) | 对源RDD和参数RDD求交集后返回一个新的RDD |
groupByKey([numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD |
reduceByKey(func, [numTasks]) | 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置 |
sortByKey([ascending], [numTasks]) | 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD |
sortBy(func,[ascending], [numTasks]) | 与sortByKey类似,但是更灵活 |
join(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD |
cogroup(otherDataset, [numTasks]) | 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD |
coalesce(numPartitions) | 减少 RDD 的分区数到指定值。 |
repartition(numPartitions) | 重新给 RDD 分区 |
repartitionAndSortWithinPartitions(partitioner) | 重新给 RDD 分区,并且每个分区内以记录的 key 排序 |
下面对其中常用的给出代码实例。
package rdd import org.apache.spark.{SparkConf, SparkContext} import org.junit.Test /** * 主要介绍的 RDD算子: * 1. 转换: map, mapPartitions, mapPartitionsWithIndex, mapValues * 2. 过滤: filter, sample * 3. 集合、去重: intersection, union, subtract, distinct * 4. 聚合: reduceByKey, groupByKey, combineByKey, foldByKey, aggregateByKey * 6. 排序: sortBy, sortByKey * 5. 重分区: repartition, coalesce */ class TransformationOp { private val conf = new SparkConf().setMaster("local[6]").setAppName("transformation_op") private val sc = new SparkContext(conf) /** * mapPartitions和 map算子是一样的,只不过 map是针对每一条数据进行转换, mapPartitions 针对一整个分区的数据进行转换 * 所以: * 1. map的 func参数是单条数据,mapPartitions的 func参数是一个集合(一个分区整个所有的数据) * 2. map 的 func返回值也是单条数据,mapPartitions 的 func返回值是一个集合 * 3. map 针对每一条数据进行处理,它的粒度是每条数据。 * mapPartitions 针对每一个分区处理,它的粒度是每个分区。 */ @Test def mapPartitions(): Unit ={ println("--------- 1 ---------") // 1、 数据生成 sc.parallelize(Seq(1, 2, 3, 4, 5, 6), 2) // 指定分区数 2 // 2、 算子使用 .mapPartitions( iter => { // 每个分区是一个集合 iter.foreach(item => print(item + " ")) // 对集合进行foreach循环遍历打印 iter // 返回值 }) // 3、 获取结果 .collect() println(" --------- 2 ---------") // 进行一个小练习,将每个数据 * 10 输出结果 sc.parallelize(Seq(1, 2, 3, 4, 5, 6), 2) .mapPartitions( iter => { iter.map( item => item * 10) // scala 的方法,将一个集合 每个元素进行操作,返回这个集合 }).collect().foreach(item => print(item + " ")) } /** * mapPartitionsWithIndex和 mapPartitions 的区别是 func中多了一个index参数,是分区号 */ @Test def mapPartitionsWithIndex(): Unit ={ sc.parallelize(Seq(1, 2, 3, 4, 5, 6), 2) .mapPartitionsWithIndex( (index, iter) => { println("分区编号:" + index) iter.foreach(item => println(item)) // 这两个分区是并行执行的,所以打印时,顺序可能发生更改 iter } ).collect() } /** * filter 可以过滤掉数据集中一部分数据 * 数据中接受的函数,参数是每个元素,如果这个函数返回true,则会加入新数据集,否则被过滤掉 */ @Test def filter(): Unit ={ sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) .filter( item => item % 2 == 0 ) // 过滤掉非偶数 .collect() .foreach( item => println(item) ) } /** * 把大数据变小,并尽可能的减少数据集规律的损失 * withReplacement:是否有重复抽取,是否抽完放回 * fraction:保留 */ @Test def sample(): Unit ={ sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) .sample(true,0.6) .collect().foreach( item=> println(item) ) } /** * mapValues 也是 map, 只不过 map 作用于整条数据, mapValues 作用于 key-value 中的 Value */ @Test def mapValues(): Unit ={ sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3))) .mapValues( item => item * 10 ) .collect().foreach(println(_)) // 只有一个参数,可以用下划线指定 } /** * 集合操作:求交集,并集,差集 * 去重 */ @Test def collection(): Unit ={ val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5)) val rdd2 = sc.parallelize(Seq(3, 4, 5, 6, 7)) rdd1.intersection(rdd2).collect().foreach(println(_)) // 交集 rdd1.union(rdd2).collect().foreach(println(_)) // 并集,可以重复 val rdd3 = rdd1.union(rdd2) // 并集,可以重复 rdd1.subtract(rdd2).collect().foreach(println(_)) // 差集 rdd3.distinct().collect().foreach(println(_)) } /** * groupByKey运算结果:(key, (v1,v2...)) */ @Test def groupByKey(): Unit ={ sc.parallelize(Seq(("a", 1), ("a", 2), ("c", 3))) .groupByKey().collect().foreach(println(_)) } /** * 作用:对数据集按照 Key 进行聚合 * 调用:combineByKey(createCombiner, mergeValue, mergeCombiners, [partitioner], [mapSideCombiner], [serializer]) * 参数: * createCombiner 将 Value 进行初步转换 * mergeValue 在每个分区把上一步转换的结果聚合 * mergeCombiners 在所有分区上把每个分区的聚合结果聚合 * partitioner 可选, 分区函数 * mapSideCombiner 可选, 是否在 Map 端 Combine * serializer 序列化器 * * combineByKey 是 reduceByKey 的底层 */ @Test def combineByKey(): Unit ={ var rdd=sc.parallelize(Seq( ("zhangsan", 99.0), ("zhangsan", 96.0), ("lisi", 97.0), ("lisi", 98.0), ("zhangsan", 97.0) )) //2.算子运算 // 2.1 createCombiner 转换数据 // 2.2 mergeValue 分区上的聚合 // 2.3 mergeCombiners 把所有分区上的结果再次聚合,生成最终结果 val combineResult = rdd.combineByKey( createCombiner = (curr: Double) => (curr, 1), mergeValue = (curr: (Double, Int), nextValue: Double) => (curr._1 + nextValue, curr._2 + 1), mergeCombiners = (curr: (Double, Int), agg: (Double, Int)) => (curr._1 + agg._1, curr._2 + agg._2) ) val resultRDD = combineResult.map(item => (item._1, item._2._1 / item._2._2)) resultRDD.collect().foreach(print(_)) } /** * foldByKey 和 spark 中的 reduceByKey 区别:可以指定初始值 * foldByKey 和 Scala 中的foldLeft 或 foldRight 区别:这个初始值是作用于每一个数据的 */ @Test def foldByKey(): Unit ={ sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1))) .foldByKey(10)( (curr, agg) => curr + agg) .collect().foreach(println(_)) } /** * * 作用:聚合所有 Key 相同的 Value, 换句话说, 按照 Key 聚合 Value * 调用:rdd.aggregateByKey(zeroValue)(seqOp, combOp) * 参数: * zeroValue 初始值 * seqOp 转换每一个值的函数 * comboOp 将转换过的值聚合的函数 */ @Test def aggregateByKey(): Unit ={ val rdd=sc.parallelize(Seq(("手机",10.0),("手机",15.0),("电脑",20.0))) rdd.aggregateByKey(0.8)(( zeroValue,item) =>item * zeroValue,(curr,agg) => curr+agg) .collect() .foreach(println(_)) // (手机,20.0) // (电脑,16.0) } /** * 将两个 RDD 按照相同的 Key 进行连接 * 有点像笛卡尔积 */ @Test def join(): Unit ={ val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 1))) val rdd2 = sc.parallelize(Seq(("a", 10), ("a", 11), ("a", 12))) rdd1.join(rdd2).collect().foreach(println(_)) // (a,(1,10)) // (a,(1,11)) // (a,(1,12)) // (a,(2,10)) // (a,(2,11)) // (a,(2,12)) } /** * sortBy可以指定按照哪个字段来排序, sortByKey 直接按照 Key 来排序 */ @Test def sortBy(): Unit ={ val rdd=sc.parallelize(Seq(8,4,5,6,2,1,1,9)) val rdd2=sc.parallelize(Seq(("a",1),("b",3),("c",2))) rdd.sortBy(item =>item).collect().foreach(println(_)) // 指定数据,自动排序 rdd2.sortBy(item => item._2).collect().foreach(println(_)) // 指定按照 KV 数据的第几个数据进行排序 rdd2.sortByKey().collect().foreach(println(_)) // 按照key排序,即 a b c } /** * repartition 进行重分区的时候,默认是 Shuffle 的 * coalesce 合并,默认不是 shuffle的,即分区数只能减少,但可以在方法内指定第二个参数为true来实现增大分区数 */ @Test def partitioning(): Unit ={ val rdd=sc.parallelize(Seq(1,2,3,4,5),2) //println((rdd.repartition(5)).partitions.size) println(rdd.coalesce(5,true).partitions.size) } }
Action
动作 | 含义 |
---|---|
reduce(func) | reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。 |
collect() | 在驱动程序中,以数组的形式返回数据集的所有元素 |
count() | 返回RDD的元素个数 |
first() | 返回RDD的第一个元素(类似于take(1)) |
take(n) | 返回一个由数据集的前n个元素组成的数组 |
takeOrdered(n, [ordering]) | 返回自然顺序或者自定义顺序的前 n 个元素 |
saveAsTextFile(path) | 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) | 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。 |
saveAsObjectFile(path) | 将数据集的元素,以 Java 序列化的方式保存到指定的目录下 |
countByKey() | 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。 |
foreach(func) | 在数据集的每一个元素上,运行函数func |
foreachPartition(func) | 在数据集的每一个分区上,运行函数func |
package rdd import org.apache.spark.{SparkConf, SparkContext} import org.junit.Test /** * 主要介绍的 RDD Action(动作) 算子: * collect, reduce, foreach, countByKey, count, take, takeSample, first */ class ActionOp { private val conf = new SparkConf().setMaster("local[6]").setAppName("transformation_op") private val sc = new SparkContext(conf) /** * reduceByKey 是先按照 key分组,然后把每组聚合 * reduce 是针对一整个数据集来进行集合。 * reduceByKey 是针对 KV 类型的数据来进行计算的,reduce 可以针对所有类型的数据 * reduce操作并没有 reducer 和 mapper, 因为 reduce 算子会作用于RDD中的每一个分区, * 然后在分区上求得局部结果,最终汇总到 Driver 上求得最终结果 * 所有 reduce 算子不是Shuffle操作,KV类型的数据才可以进行shuffle,reduce算子处理的数据不仅仅是KV */ @Test def reduce(): Unit ={ val rdd = sc.parallelize(Seq(("手机", 10.0), ("手机", 15.0), ("电脑", 20.0))) /* * reduce 不是针对 KV 数据的,所有 curr是针对的每一条数据:("手机", 10.0)... * */ val result = rdd.reduce((curr, agg) => ("总价", curr._2 + agg._2)) println(result) // (总价,45.0) } /** * foreach直接进行循环遍历,并且该函数没有返回值 * 不用像之前先collect 再 foreach */ @Test def foreach(): Unit ={ // rdd 是并行计算,所以不会顺序打印 sc.parallelize(Seq(1, 2, 3)).foreach(item => println(item)) } /** * count 和 countByKey 打印结果相距很远,是每次调用 Action 都会启用一个job, job运行产生结果 */ @Test def count(): Unit ={ val rdd = sc.parallelize(Seq(("a", 1), ("a", 3), ("b", 2), ("b", 1))) println(rdd.count()) // 4 println(rdd.countByKey()) // Map(a -> 2, b -> 2) } /** * take 和 takeSample 都是获取数据,前者直接获取数据集里的前几条,后者是采样获取,参数 withReplacement 是否放回 * first 获取第一个元素 */ @Test def take(): Unit ={ val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6)) println(rdd.first()) // 1 rdd.take(3).foreach( item => println(item) ) // 1 2 3 rdd.takeSample(withReplacement = true, num = 3).foreach( item => println(item) ) // 4 4 1 } /** * RDD 对数字型数字的支持 * count 个数 * mean 平均值 * variance 方差 * ... */ @Test def num(): Unit ={ val rdd = sc.parallelize(Seq(1,2,3,4,7,9,15,36,100)) println(rdd.max()) println(rdd.min()) println(rdd.mean()) println(rdd.sum()) } }
RDD对KV类型和数字类型的支持
RDD的算子对KV类型的数据操作在Scala中都是封装在一个PairRDDFunctions的类中,而不是RDD类中。总结如下:
对数字类型的支持则是一些数学的基本操作,如平均数方差等,基本上都是Action操作,而不是转换操作。如下图。
案例演示
数据展示
需求:求出北京PM根据日期(年,月份)的Top10 pm 排行。其中第 7 列是具体的PM,有空值,有NA值,有具体的数据值。
完成代码:
package rdd import org.apache.commons.lang3.StringUtils import org.apache.spark.{SparkConf, SparkContext} import org.junit.Test class BeijingPM { @Test def pmProcess(): Unit ={ val conf = new SparkConf().setMaster("local[6]").setAppName("BeijingPM") val sc = new SparkContext(conf) // 数据读取 val source = sc.textFile("dataset/BeijingPM20100101_20151231_noheader.csv") // 1、抽取数据 ((年, 月), PM值), 即把月份和年放到一个元组里,然后把这个元组作为 key val result = { source.map( item => ((item.split(",")(1), item.split(",")(2)), item.split(",")(6))) // 2、清洗掉 PM(item._2) 的空值 或者 为 NA 的值,此时 pm为String类型 .filter(item => ! item._2.equalsIgnoreCase("NA") && StringUtils.isNotEmpty(item._2)) // 清洗后,重新map,并将 pm 的 String 类型转为 Int .map( item => (item._1, item._2.toInt) ) // 3、聚合 将每个key的所有的PM值相加 .reduceByKey( (curr, agg) => curr + agg ) // 4、排序 按照第二个 PM的总和 按照降序进行排列 .sortBy(item => item._2, ascending = false) } // 取前十个 Top10 打印 result.take(10).foreach(item => println(item)) } }