spark 累加器

spark累加器:作用是在task计算的时候统计某些事件的数量。

注意:累加器变量只能执行加法操作;但其支持并行操作,意味着不同任务多次对累加器执行加法操作后,加法器最后的值等于所有累加的和;只能driver驱动程序读取到累加器的值,task端只能进行累加操作,无法访问该值。

Spark内置了三种类型的Accumulator,分别是LongAccumulator用来累加整数型,DoubleAccumulator用来累加浮点型,CollectionAccumulator用来累加集合元素。

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.util.{CollectionAccumulator, DoubleAccumulator, LongAccumulator}

object accumulator {
  def main(args: Array[String]): Unit = {
    val sparkconf: SparkConf = new SparkConf().setAppName("accumulator_test").setMaster("local[2]")
    val sc: SparkContext = new SparkContext(sparkconf)

    val data:RDD[Int] = sc.parallelize(Array(1,2,3,4,5,6))
    println("data:",data.count(),data)

    //LongAccumulator:数值型累加器
    val longacc:LongAccumulator = sc.longAccumulator("long-account")
    //DoubleAccumulator:小数累加器
    val doubelacc:DoubleAccumulator = sc.doubleAccumulator("doubel-account")
    //CollectionAccumulator:集合累加器
    val collectionacc:CollectionAccumulator[Int] = sc.collectionAccumulator("collection-account")
    val data1:RDD[Int] = data.map{ x =>
      println("x:",x)
      longacc.add(1)
      doubelacc.add(x)
      collectionacc.add(x)
      x
    }
    data1.count()
    println("longacc:",longacc.value,longacc)
    println("doubleacc:",doubelacc.sum,doubelacc)
    println("collectionacc:",collectionacc) //每次执行结果列表的元素顺序都不一致

//    (longacc:,6,LongAccumulator(id: 48, name: Some(long-account), value: 6))
//    (doubleacc:,21.0,DoubleAccumulator(id: 49, name: Some(doubel-account), value: 21.0))
//    (collectionacc:,CollectionAccumulator(id: 50, name: Some(collection-account), value: [4, 5, 6, 1, 2, 3]))
  }
}
原文地址:https://www.cnblogs.com/xl717/p/11911816.html