combineByKey

 def test66: Unit = {
    val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))
    val conf = new SparkConf().setAppName("wc").setMaster("local[2]")
    val sc = new SparkContext(conf)
/*
val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val c = b.zip(a)
val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
d.collect
res16: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))
     */
    val resrdd0 = sc.makeRDD(initialScores).map(t=>t).combineByKey(List(_),(x:List[Double],y:Double)=>y::x,(x:List[Double],y:List[Double])=>x:::y)
      .map(t=>(t._1,t._2.sum/t._2.size))
    println(resrdd0.collect().toBuffer)

type MVType = (Int, Double)
val resrdd = sc.makeRDD(initialScores)
  .combineByKey(score => (1, score),(x:MVType,y:Double)=>{(x._1+1,x._2+y)},(x:MVType,y:MVType)=>{(x._1+y._1,x._2+y._2)})
  .map(t=>(t._1,t._2._2/t._2._1))
    println(resrdd.collect().toBuffer)

    //reducebykey 的缺陷可以用前后两次map来规避
    val resrdd2 = sc.makeRDD(initialScores).map(t=>(t._1,(List(t._2)))).reduceByKey(_:::_).map(t=>(t._1,t._2.sum/t._2.size))
    println(resrdd2.collect().toBuffer)
  }

}
Spark函数讲解:combineByKey
 Spark  2015-03-19 08:03:47 16623  0评论 下载为PDF
  使用用户设置好的聚合函数对每个Key中的Value进行组合(combine)。可以将输入类型为RDD[(K, V)]转成成RDD[(K, C)]。

函数原型

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, 
    mergeCombiners: (C, C) => C) : RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, 
    mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, 
    mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine:
    Boolean = true, serializer: Serializer = null): RDD[(K, C)]
  第一个和第二个函数都是基于第三个函数实现的,使用的是HashPartitioner,Serializer为null。而第三个函数我们可以指定分区,
如果需要使用Serializer的话也可以指定。combineByKey函数比较重要,我们熟悉地诸如aggregateByKey、foldByKey、reduceByKey等函数都是基于该函数实现的。默认情况会在Map端进行组合操作。
原文地址:https://www.cnblogs.com/rocky-AGE-24/p/7496180.html