spark累加器

出现的原因: 因为在spark中每一个task都会分到不同的节点去计算运行,如果需要将多个节点的数据累加到同一个变量中,则使用累加器

数字累加器

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Author: 唐松怀
 * @Date: 2020/3/15 14:22
 */

//数字累加器
object rdd01 {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAppName("wordcount").setMaster("local[2]")
    val sc = new SparkContext(conf)
    val visitor01=sc.parallelize(Array(("Bob",13),("joe",18),("Jack",24),("Billy",89)))
    val plus_always=sc.longAccumulator("this is a  always_plus tool!")
    visitor01.foreach(
      iter=>if (iter._2>20){
        plus_always.add(1)
      }
    )
    println(plus_always.value)
  }

}
View Code

集合累加器

import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Author: 唐松怀
 * @Date: 2020/3/15 15:59
 */
//集合累加器
object rdd02 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("wordcount").setMaster("local[2]")
    val sc = new SparkContext(conf)
    case class User(name:String,telephone:String)
    val collect_plus=Array(User("Alice","15837312345")
      ,User("Bob","13937312666")
      ,User("Thomas","13637312345")
      ,User("Tom","18537312777")
      ,User("Boris","13837312998")
    )
    val rdd05=sc.parallelize(collect_plus,2)
    val user_plustool=sc.collectionAccumulator[User]("用户累加器")

    rdd05.foreach(user => {
      val teletephone=user.telephone.reverse
      if (teletephone(0)==teletephone(1) && teletephone(0)==teletephone(2)){
        user_plustool.add(user)
//        println(user_plustool)
      }
    }

    )
    println(user_plustool)
  }
}
View Code

自定义累加器:

RUSH B
原文地址:https://www.cnblogs.com/tangsonghuai/p/12498520.html