Spark05-RDD算子细谈

RDD算子分类

RDD 中的算子从功能上分为两大类

  • Transformation(转换) 它会在一个已经存在的 RDD 上创建一个新的 RDD, 将旧的 RDD 的数据转换为另外一种形式后放入新的 RDD,让RDD之间具有联系,只是生成RDD链条,并不会真的执行整个程序,只有在动作Action时,程序才会执行。
  • Action(动作) 执行各个分区的计算任务, 将的到的结果返回到 Driver 中

执行 RDD 的时候, 在执行到 Transformation 转换操作的时候, 并不会立刻执行, 直到遇见了 Action 操作, 才会触发真正的执行, 这个特点叫做 惰性求值 

默认情况下, 每一个 Action 运行的时候, 其所关联的所有 Transformation RDD 都会重新计算, 但是也可以使用 presist 方法将 RDD 持久化到磁盘或者内存中. 这个时候为了下次可以更快的访问, 会把数据保存到集群上.

下面对这两个方面的一些重要算子进行总结。

Transformation

转换含义
map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]
union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活
join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD
cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
coalesce(numPartitions) 减少 RDD 的分区数到指定值。
repartition(numPartitions) 重新给 RDD 分区
repartitionAndSortWithinPartitions(partitioner) 重新给 RDD 分区,并且每个分区内以记录的 key 排序

下面对其中常用的给出代码实例。

package rdd

import org.apache.spark.{SparkConf, SparkContext}
import org.junit.Test

/**
 * 主要介绍的 RDD算子:
 *  1. 转换: map, mapPartitions, mapPartitionsWithIndex, mapValues
 *  2. 过滤: filter, sample
 *  3. 集合、去重: intersection, union, subtract, distinct
 *  4. 聚合: reduceByKey, groupByKey, combineByKey, foldByKey, aggregateByKey
 *  6. 排序: sortBy, sortByKey
 *  5. 重分区: repartition, coalesce
 */
class TransformationOp {
  private val conf = new SparkConf().setMaster("local[6]").setAppName("transformation_op")
  private val sc = new SparkContext(conf)

  /**
   * mapPartitions和 map算子是一样的,只不过 map是针对每一条数据进行转换, mapPartitions 针对一整个分区的数据进行转换
   * 所以:
   * 1. map的 func参数是单条数据,mapPartitions的 func参数是一个集合(一个分区整个所有的数据)
   * 2. map 的 func返回值也是单条数据,mapPartitions 的 func返回值是一个集合
   * 3.  map 针对每一条数据进行处理,它的粒度是每条数据。
   *     mapPartitions 针对每一个分区处理,它的粒度是每个分区。
   */
  @Test
  def mapPartitions(): Unit ={
    println("--------- 1 ---------")
    // 1、 数据生成
    sc.parallelize(Seq(1, 2, 3, 4, 5, 6), 2) // 指定分区数 2
    // 2、 算子使用
      .mapPartitions( iter => {     // 每个分区是一个集合
        iter.foreach(item => print(item + "	")) // 对集合进行foreach循环遍历打印
        iter                        // 返回值
      })
    // 3、 获取结果
      .collect()

    println("
--------- 2 ---------")
    // 进行一个小练习,将每个数据 * 10 输出结果
    sc.parallelize(Seq(1, 2, 3, 4, 5, 6), 2)
      .mapPartitions( iter => {
        iter.map( item => item * 10) // scala 的方法,将一个集合 每个元素进行操作,返回这个集合
      }).collect().foreach(item => print(item + "	"))
  }

  /**
   * mapPartitionsWithIndex和 mapPartitions 的区别是 func中多了一个index参数,是分区号
   */
  @Test
  def mapPartitionsWithIndex(): Unit ={
    sc.parallelize(Seq(1, 2, 3, 4, 5, 6), 2)
      .mapPartitionsWithIndex( (index, iter) => {
        println("分区编号:" + index)
        iter.foreach(item => println(item)) // 这两个分区是并行执行的,所以打印时,顺序可能发生更改
        iter
      } ).collect()
  }

  /**
   * filter 可以过滤掉数据集中一部分数据
   * 数据中接受的函数,参数是每个元素,如果这个函数返回true,则会加入新数据集,否则被过滤掉
   */
  @Test
  def filter(): Unit ={
    sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
      .filter( item => item % 2 == 0 ) // 过滤掉非偶数
      .collect()
      .foreach( item =>  println(item) )
  }

  /**
   * 把大数据变小,并尽可能的减少数据集规律的损失
   * withReplacement:是否有重复抽取,是否抽完放回
   * fraction:保留
   */
    @Test
  def sample(): Unit ={
    sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
      .sample(true,0.6)
      .collect().foreach( item=> println(item) )
  }

  /**
   * mapValues 也是 map, 只不过 map 作用于整条数据, mapValues 作用于 key-value 中的 Value
   */
    @Test
  def mapValues(): Unit ={
    sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))
      .mapValues( item => item * 10 )
      .collect().foreach(println(_)) // 只有一个参数,可以用下划线指定
  }

  /**
   * 集合操作:求交集,并集,差集
   * 去重
   */
  @Test
  def collection(): Unit ={
    val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5))
    val rdd2 = sc.parallelize(Seq(3, 4, 5, 6, 7))

    rdd1.intersection(rdd2).collect().foreach(println(_)) // 交集
    rdd1.union(rdd2).collect().foreach(println(_)) // 并集,可以重复
    val rdd3 = rdd1.union(rdd2) // 并集,可以重复
    rdd1.subtract(rdd2).collect().foreach(println(_)) // 差集

    rdd3.distinct().collect().foreach(println(_))

  }

  /**
   * groupByKey运算结果:(key, (v1,v2...))
   */
  @Test
  def groupByKey(): Unit ={
    sc.parallelize(Seq(("a", 1), ("a", 2), ("c", 3)))
      .groupByKey().collect().foreach(println(_))
  }

  /**
   * 作用:对数据集按照 Key 进行聚合
   * 调用:combineByKey(createCombiner, mergeValue, mergeCombiners, [partitioner], [mapSideCombiner], [serializer])
   * 参数:
   *   createCombiner 将 Value 进行初步转换
   *   mergeValue 在每个分区把上一步转换的结果聚合
   *   mergeCombiners 在所有分区上把每个分区的聚合结果聚合
   *   partitioner 可选, 分区函数
   *   mapSideCombiner 可选, 是否在 Map 端 Combine
   *   serializer 序列化器
   *
   * combineByKey 是 reduceByKey 的底层
   */
  @Test
  def combineByKey(): Unit ={
    var rdd=sc.parallelize(Seq(
      ("zhangsan", 99.0),
      ("zhangsan", 96.0),
      ("lisi", 97.0),
      ("lisi", 98.0),
      ("zhangsan", 97.0)
    ))

    //2.算子运算
    //  2.1 createCombiner 转换数据
    //  2.2 mergeValue 分区上的聚合
    //  2.3 mergeCombiners 把所有分区上的结果再次聚合,生成最终结果
    val combineResult = rdd.combineByKey(
      createCombiner = (curr: Double) => (curr, 1),
      mergeValue = (curr: (Double, Int), nextValue: Double) => (curr._1 + nextValue, curr._2 + 1),
      mergeCombiners = (curr: (Double, Int), agg: (Double, Int)) => (curr._1 + agg._1, curr._2 + agg._2)
    )

    val resultRDD = combineResult.map(item => (item._1, item._2._1 / item._2._2))

    resultRDD.collect().foreach(print(_))
  }

  /**
   * foldByKey 和 spark 中的 reduceByKey 区别:可以指定初始值
   * foldByKey 和 Scala 中的foldLeft 或 foldRight 区别:这个初始值是作用于每一个数据的
   */
  @Test
  def foldByKey(): Unit ={
    sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
      .foldByKey(10)( (curr, agg) => curr + agg)
      .collect().foreach(println(_))
  }

  /**
   *
   * 作用:聚合所有 Key 相同的 Value, 换句话说, 按照 Key 聚合 Value
   * 调用:rdd.aggregateByKey(zeroValue)(seqOp, combOp)
   * 参数:
   *   zeroValue 初始值
   *   seqOp 转换每一个值的函数
   *   comboOp 将转换过的值聚合的函数
   */
  @Test
  def aggregateByKey(): Unit ={
    val rdd=sc.parallelize(Seq(("手机",10.0),("手机",15.0),("电脑",20.0)))
    rdd.aggregateByKey(0.8)(( zeroValue,item) =>item * zeroValue,(curr,agg) => curr+agg)
      .collect()
      .foreach(println(_))
    //    (手机,20.0)
    //    (电脑,16.0)
  }

  /**
   * 将两个 RDD 按照相同的 Key 进行连接
   * 有点像笛卡尔积
   */
  @Test
  def join(): Unit ={
    val rdd1 = sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 1)))
    val rdd2 = sc.parallelize(Seq(("a", 10), ("a", 11), ("a", 12)))

    rdd1.join(rdd2).collect().foreach(println(_))
    //    (a,(1,10))
    //    (a,(1,11))
    //    (a,(1,12))
    //    (a,(2,10))
    //    (a,(2,11))
    //    (a,(2,12))
  }

  /**
   * sortBy可以指定按照哪个字段来排序, sortByKey 直接按照 Key 来排序
   */
  @Test
  def sortBy(): Unit ={
    val rdd=sc.parallelize(Seq(8,4,5,6,2,1,1,9))
    val rdd2=sc.parallelize(Seq(("a",1),("b",3),("c",2)))
    rdd.sortBy(item =>item).collect().foreach(println(_)) // 指定数据,自动排序
    rdd2.sortBy(item => item._2).collect().foreach(println(_)) // 指定按照 KV 数据的第几个数据进行排序
    rdd2.sortByKey().collect().foreach(println(_)) // 按照key排序,即 a b c
  }

  /**
   * repartition 进行重分区的时候,默认是 Shuffle 的
   * coalesce 合并,默认不是 shuffle的,即分区数只能减少,但可以在方法内指定第二个参数为true来实现增大分区数
   */
  @Test
  def partitioning(): Unit ={
    val rdd=sc.parallelize(Seq(1,2,3,4,5),2)
    //println((rdd.repartition(5)).partitions.size)

    println(rdd.coalesce(5,true).partitions.size)
  }
}
TransformationOp

Action

动作含义
reduce(func) reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。
collect() 在驱动程序中,以数组的形式返回数据集的所有元素
count() 返回RDD的元素个数
first() 返回RDD的第一个元素(类似于take(1))
take(n) 返回一个由数据集的前n个元素组成的数组
takeOrdered(n, [ordering]) 返回自然顺序或者自定义顺序的前 n 个元素
saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
saveAsObjectFile(path) 将数据集的元素,以 Java 序列化的方式保存到指定的目录下
countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
foreach(func) 在数据集的每一个元素上,运行函数func
foreachPartition(func) 在数据集的每一个分区上,运行函数func
package rdd

import org.apache.spark.{SparkConf, SparkContext}
import org.junit.Test

/**
 * 主要介绍的 RDD Action(动作) 算子:
 * collect, reduce, foreach, countByKey, count, take, takeSample, first
 */

class ActionOp {
  private val conf = new SparkConf().setMaster("local[6]").setAppName("transformation_op")
  private val sc = new SparkContext(conf)

  /**
   * reduceByKey 是先按照 key分组,然后把每组聚合
   * reduce 是针对一整个数据集来进行集合。
   * reduceByKey 是针对 KV 类型的数据来进行计算的,reduce 可以针对所有类型的数据
   * reduce操作并没有 reducer 和 mapper, 因为 reduce 算子会作用于RDD中的每一个分区,
   *            然后在分区上求得局部结果,最终汇总到 Driver 上求得最终结果
   * 所有 reduce 算子不是Shuffle操作,KV类型的数据才可以进行shuffle,reduce算子处理的数据不仅仅是KV
   */
  @Test
  def reduce(): Unit ={
    val rdd = sc.parallelize(Seq(("手机", 10.0), ("手机", 15.0), ("电脑", 20.0)))
    /*
    * reduce 不是针对 KV 数据的,所有 curr是针对的每一条数据:("手机", 10.0)...
    * */
    val result = rdd.reduce((curr, agg) => ("总价", curr._2 + agg._2))
    println(result)  // (总价,45.0)
  }

  /**
   * foreach直接进行循环遍历,并且该函数没有返回值
   * 不用像之前先collect 再 foreach
   */
  @Test
  def foreach(): Unit ={
    // rdd 是并行计算,所以不会顺序打印
    sc.parallelize(Seq(1, 2, 3)).foreach(item => println(item))
  }

  /**
   * count 和 countByKey 打印结果相距很远,是每次调用 Action 都会启用一个job, job运行产生结果
   */
  @Test
  def count(): Unit ={
    val rdd = sc.parallelize(Seq(("a", 1), ("a", 3), ("b", 2), ("b", 1)))
    println(rdd.count()) // 4
    println(rdd.countByKey())  // Map(a -> 2, b -> 2)
  }

  /**
   * take 和 takeSample 都是获取数据,前者直接获取数据集里的前几条,后者是采样获取,参数 withReplacement 是否放回
   * first 获取第一个元素
   */
  @Test
  def take(): Unit ={
    val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6))
    println(rdd.first())  // 1
    rdd.take(3).foreach( item => println(item) )  // 1 2 3
    rdd.takeSample(withReplacement = true, num = 3).foreach( item => println(item) )  // 4 4 1
  }

  /**
   * RDD 对数字型数字的支持
   *  count 个数
   *  mean 平均值
   *  variance 方差
   *  ...
   */
  @Test
  def num(): Unit ={
    val rdd = sc.parallelize(Seq(1,2,3,4,7,9,15,36,100))
    println(rdd.max())
    println(rdd.min())
    println(rdd.mean())
    println(rdd.sum())
  }
}
ActionOp

RDD对KV类型和数字类型的支持

RDD的算子对KV类型的数据操作在Scala中都是封装在一个PairRDDFunctions的类中,而不是RDD类中。总结如下:

对数字类型的支持则是一些数学的基本操作,如平均数方差等,基本上都是Action操作,而不是转换操作。如下图。

 案例演示

数据展示

需求:求出北京PM根据日期(年,月份)的Top10 pm 排行。其中第 7 列是具体的PM,有空值,有NA值,有具体的数据值。

完成代码:

package rdd

import org.apache.commons.lang3.StringUtils
import org.apache.spark.{SparkConf, SparkContext}
import org.junit.Test

class BeijingPM {

  @Test
  def pmProcess(): Unit ={
    val conf = new SparkConf().setMaster("local[6]").setAppName("BeijingPM")
    val sc = new SparkContext(conf)

    // 数据读取
    val source = sc.textFile("dataset/BeijingPM20100101_20151231_noheader.csv")

    // 1、抽取数据 ((年, 月), PM值), 即把月份和年放到一个元组里,然后把这个元组作为 key
    val result = {
    source.map( item => ((item.split(",")(1), item.split(",")(2)), item.split(",")(6)))
    // 2、清洗掉 PM(item._2) 的空值 或者 为 NA 的值,此时 pm为String类型
      .filter(item => ! item._2.equalsIgnoreCase("NA") && StringUtils.isNotEmpty(item._2))
      // 清洗后,重新map,并将 pm 的 String 类型转为 Int
      .map( item => (item._1, item._2.toInt) )
    // 3、聚合 将每个key的所有的PM值相加
      .reduceByKey( (curr, agg) => curr + agg )
    // 4、排序 按照第二个 PM的总和 按照降序进行排列
      .sortBy(item => item._2, ascending = false)

    }
    // 取前十个 Top10 打印
    result.take(10).foreach(item => println(item))
  }

}
BeijingPM
原文地址:https://www.cnblogs.com/dongao/p/14282882.html