Spark RDD编程(3) Key-Value类型

1 partitionBypairRDD进行分区操作,如果原有的partionRDD和现有的partionRDD是一致的话就不进行分区, 否则会生成ShuffleRDD,即会产生shuffle过程。

val conf = new SparkConf().setMaster("local[*]").setAppName("word count")
val sc = new SparkContext(conf)

//----------------------- partitionBy -------------------------
val kvRDD: RDD[(Int, Char)] = sc.makeRDD(Array((1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')), 4)
val partitionByRDD: RDD[(Int, Char)] = kvRDD.partitionBy(new HashPartitioner(2))
val partitionByArray: Array[Array[(Int, Char)]] = partitionByRDD.glom().collect()
partitionByArray.foreach(data => println(data.mkString(",")))
//(2,b),(4,d)
//(1,a),(3,c)

2 groupByKey:groupByKey也是对每个key进行操作,但只生成一个sequence

val rdd = sc.makeRDD(List("java", "scala", "java", "spark"))
val mapRDD = rdd.map((_, 1))
val groupByKeyRDD: RDD[(String, Iterable[Int])] = mapRDD.groupByKey()
groupByKeyRDD.foreach(println)
//(spark,CompactBuffer(1))
//(scala,CompactBuffer(1))
//(java,CompactBuffer(1, 1))

3 reduceByKey在一个(K,V)RDD上调用,返回一个(K,V)RDD,使用指定的reduce函数,将相同key的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。

//分组、所有区预聚合
val rdd2 = sc.makeRDD(List(("female", 1), ("male", 5), ("female", 5), ("male", 2)))
val reduceByKeyRDD: RDD[(String, Int)] = rdd2.reduceByKey((result, v) => result + v)
reduceByKeyRDD.collect().foreach(println)
//(female,6)
//(male,7)

reduceByKeygroupByKey的区别

1. reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。既分组又聚合。

2. groupByKey:按照key进行分组,直接进行shuffle。只分组。

3. 开发指导:reduceByKey比groupByKey,建议使用。但是需要注意是否会影响业务逻辑。

4 aggregateByKey(zeroValue:U,[partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)

  kv对的RDD中,按keyvalue进行分组合并,合并时,将每个value和初始值作为seq函数的参数,进行计算,返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数进行计算(先将前两个value进行计算,将返回结果和下一个value传给combine函数,以此类推),将key与计算结果作为一个新的kv对输出。

参数描述:

1zeroValue给每一个分区中的每一个key一个初始值;

2seqOp函数用于在每一个分区中用初始值逐步迭代value

3combOp函数用于合并每个分区中的结果。

案例:分区内相同key找出最大值,分区间相加

val rdd3: RDD[(String, Int)] = sc.parallelize(List(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)
rdd3.glom().collect().foreach(data => println(data.mkString(",")))
//(a,3),(a,2),(c,4)
//(b,3),(c,6),(c,8)

//区内聚合,区间聚合
val aggregateByKeyRDD: RDD[(String, Int)] = rdd3.aggregateByKey(0)(_.max(_), _ + _)
aggregateByKeyRDD.foreach(println)
//(a,3)
//(c,12)
//(b,3)

5 foldByKey:aggregateByKey的简化操作,seqopcombop相同

案例:计算相同key的value累加值。

val rdd4: RDD[(Char, Int)] = sc.parallelize(List(('a', 3), ('a', 2), ('a', 4), ('b', 3), ('c', 6), ('c', 8)), 3)
//相当于aggregateByKey的简化,区内和区间的操作一样
val foldByKeyRDD: RDD[(Char, Int)] = rdd4.foldByKey(0)(_ + _)
foldByKeyRDD.collect().foreach(println)
//(c,14)
//(a,9)
//(b,3)

6 combineByKey(createCombiner: V => C,  mergeValue: (C, V) => C,  mergeCombiners: (C, C) => C) 对相同K,把V合并成一个集合。

参数描述:

1createCombiner: combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值

2mergeValue: 如果这是一个在处理当前分区之前已经遇到的键,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并

3mergeCombiners: 由于每个分区都是独立处理的, 因此对于同一个键可以有多个累加器。如果有两个或者更多的分区都有对应同一个键的累加器, 就需要使用用户提供的 mergeCombiners() 方法将各个分区的结果进行合并。

案例:求每个key所对应值的平均值

val rdd5: RDD[(String, Int)] = sc.parallelize(Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)), 2)
//acc的类型是第一个函数的返回值类型,无法自动推断,因此不能用下划线简写
val combineByKeyRDD = rdd5.combineByKey(
  (_, 1), //对每一个v进行map
  (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), //区内聚合
  (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) //区间聚合
combineByKeyRDD.collect().foreach(println)
//(b,(286,3))
//(a,(274,3))

//求平均值.map入参只有一个,模式匹配更方便取出元组的值
val mapRDD1: RDD[(String, Double)] = combineByKeyRDD.map {
  case (k, t) => (k, t._1 / t._2.toDouble)
}
mapRDD1.collect().foreach(println)
//(b,95.33333333333333)
//(a,91.33333333333333)

7 sortByKey([ascending], [numTasks])在一个(K,V)RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)RDD。

val rdd6 = sc.parallelize(Array((3, "aa"), (6, "cc"), (2, "bb"), (1, "dd")))
val sortByKeyRDD: RDD[(Int, String)] = rdd6.sortByKey(true)
sortByKeyRDD.collect().foreach(println)
//(1,dd)
//(2,bb)
//(3,aa)
//(6,cc)

8 mapValues针对于(K,V)形式的类型只对V进行操作。

val rdd7 = sc.parallelize(Array((1, "a"), (1, "d"), (2, "b"), (3, "c")))
val mapValuesRDD: RDD[(Int, String)] = rdd7.mapValues(_ + "|||")
mapValuesRDD.collect().foreach(println)
//(1,a|||)
//(1,d|||)
//(2,b|||)
//(3,c|||)

9 join(otherDataset, [numTasks]):在类型为(K,V)(K,W)RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))RDD。

val rdd8 = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"), (4, "c")))
val rdd9 = sc.parallelize(Array((1, 4), (2, 5), (3, 6)))
val rdd10 = sc.parallelize(Array((1, "x"), (2, "y"), (3, "z")))
val joinRDD1: RDD[(Int, (String, Int))] = rdd8.join(rdd9)
//相当于内连接,双方都存在此key的保留
joinRDD1.collect().foreach(println)
//(1,(a,4))
//(2,(b,5))
//(3,(c,6))

val joinRDD2: RDD[(Int, ((String, Int), String))] = rdd8.join(rdd9).join(rdd10)
joinRDD2.collect().foreach(println)
//(1,((a,4),x))
//(2,((b,5),y))
//(3,((c,6),z))

10 cogroup(otherDataset, [numTasks]):在类型为(K,V)(K,W)RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD。

val rdd8 = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"), (4, "c")))
val rdd9 = sc.parallelize(Array((1, 4), (2, 5), (3, 6)))
//相当于全外连接,只要某个数据集中出现了此key,就组合成元祖
val cogroupRDD1: RDD[(Int, (Iterable[String], Iterable[Int]))] = rdd8.cogroup(rdd9)
cogroupRDD1.collect().foreach(println)
//(4,(CompactBuffer(c),CompactBuffer()))
//(1,(CompactBuffer(a),CompactBuffer(4)))
//(2,(CompactBuffer(b),CompactBuffer(5)))
//(3,(CompactBuffer(c),CompactBuffer(6)))

val cogroupRDD2: RDD[(Int, (Iterable[Int], Iterable[String]))] = rdd9.cogroup(rdd8)
cogroupRDD2.collect().foreach(println)
//(4,(CompactBuffer(),CompactBuffer(c)))
//(1,(CompactBuffer(4),CompactBuffer(a)))
//(2,(CompactBuffer(5),CompactBuffer(b)))
//(3,(CompactBuffer(6),CompactBuffer(c)))
原文地址:https://www.cnblogs.com/noyouth/p/13023570.html