广播变量与累加器

广播变量与累加器

  • 在spark程序中,当一个传递给Spark操作(例如map和reduce)的函数在远程节点上面运行时,Spark操作实际上操作的是这个函数所用变量的一个独立副本。这些变量会被复制到每台机器上,并且这些变量在远程机器上的所有更新都不会传递回驱动程序。通常跨任务的读写变量是低效的,但是,Spark还是为两种常见的使用模式提供了两种有限的共享变量:广播变量(broadcast variable)和累加器(accumulator)

广播变量(broadcast variable)

  • 不使用广播变量(图一)

  • 使用广播变量的情况(图二)

  • 广播变量是将变量复制到每一台机器上而不是普通变量那样复制到每个task上,从图二可以看出变量先是从远程的driver上复制到一台机器上,然后这台机器的task就从这台机器上获取变量,不需要再从远程节点上获取,减少了IO,加快了速率。广播变量只能读取,并不能修改。经典应用是大表与小表的join中通过广播变量小表来实现以brodcast join 取代reduce join
  • 广播是driver进行的操作,必须是有结果的变量,故不可能直接广播RDD,常常的是将通过collectAsMap算子将需要广播的RDD转换成Map集合然后广播出去
  • 变量一旦被定义为一个广播变量,那么这个变量只能读,不能修改

 注意事项

  1. 能不能将一个RDD使用广播变量广播出去?    不能 ,因为RDD是不存数据的。可以将RDD的结果广播出去。
  2. 广播变量只能在Driver端定义,不能在Executor端定义。
  3. 在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。
  4. 如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。
  5. 如果Executo端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本

 如何定义一个广播变量

  • brodcast join代码实现
/**
* share variables:BroadcastApp
* 广播大变量实现以map join取代reduce join
*/
object BroadcastApp {

def main(args: Array[String]): Unit = {
// commomJoin()

broadcastJoin()
}
def commomJoin(): Unit = {

val conf = new SparkConf().setMaster("local[2]").setAppName("BroadcastApp")
val sc = new SparkContext(conf)

val wideInfo = sc.parallelize(List(("01","阿呆"),("02","sk"),("03","shjqi")))
val baseInfo = sc.parallelize(List(("01",("北京","22")),("05",("上海","22"))))

wideInfo.join(baseInfo).map(x =>(x._1,x._2._1,x._2._2._1,x._2._2._2))
.collect().foreach(println)

Thread.sleep(2000000)
sc.stop()
}

def broadcastJoin(): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("BroadcastApp")
val sc = new SparkContext(conf)

val wideInfo = sc.parallelize(List(("01","阿呆"),("02","sk"),("03","shjqi")))
val baseInfo = sc.parallelize(List(("01",("北京","22")),("05",("上海","22"))))
.collectAsMap()
val broadcastBaseInfo = sc.broadcast(baseInfo)

wideInfo.mapPartitions(x =>{
val value = broadcastBaseInfo.value
for((k,v)<-x if(value.contains(k))) 
yield (k,v,value.get(k).getOrElse().asInstanceOf[(String,String)]._1,value.get(k).getOrElse().asInstanceOf[(String,String)]._2)
}).foreach(println)

Thread.sleep(2000000)
sc.stop()
}
}

Accumulators(累加器)

  • accumulators是一个并行且高效只能进行add的计数器,spark本地只支持的是数值类型的计数器,其他 类型需要开发人员自定义

 累加器的意义

  • 在spark应用程序中,我们经常会有这样的需求,如一场监控,调试,记录符合某特性的数据数目,这种需求都需要用到计数器,如果一个变量不被声明 为一个累加器,那么他将在被改变时 不会在driver端进行全局汇总,即在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值,但是当这个变量被声明为累加器后,该变量就会有分布式计数的功能
  • 错误图解

  •  正确图解

使用accumulators

object AccumulatorsApp {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("AccumulatorsApp")
    val sc = new SparkContext(conf)

    val wskAccum = sc.longAccumulator("wsktets")//定义一个累加器
    val nums = sc.parallelize(List(1,2,3,4),2)
    println("driver端:计数器值:"+wskAccum.value)

    nums.map(x=>{
      println("计数前值:"+wskAccum.value)//还原一个累加器
      wskAccum.add(x)
      println("计数后值:"+wskAccum.value)
      x
    }).collect()
    println("driver端:最终计数器值:"+wskAccum.value)

    Thread.sleep(2000000)
    sc.stop()
  }
}

 自定义累加器

package Mapreduce



import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable

class CustomerAcc extends AccumulatorV2[String,mutable.HashMap[String,Int]] {

  private val _hash = new mutable.HashMap[String,Int]()

//  检测是否为空
  override def isZero: Boolean = {
    _hash.isEmpty
  }
//  拷贝一个新的累加器
  override def copy(): AccumulatorV2[String,mutable.HashMap[String,Int]] = {
    val copyHash = new CustomerAcc()
//    创造一个copy的累加器,然后用synchronized方法设置同步操作
    _hash.synchronized{
      copyHash._hash++=_hash
    }
    copyHash
  }
//  重置累加器
  override def reset(): Unit = {
    _hash.clear()
  }
//  每一个分区中用于添加数据的方法 小Sum
  override def add(v: String) ={

    _hash.get(v) match {
      case None=>_hash+=((v,1))
      case Some(x)=>_hash+=((v,x+1))
    }

  }
//  合并每一个分区的输出 总Sum
  override def merge(other: AccumulatorV2[String,mutable.HashMap[String,Int]]) = {
    other match{
      case o:AccumulatorV2[String,mutable.HashMap[String,Int]]=>{
        for ((k,v)<- o.value){
        _hash.get(k) match {
          case None=>_hash+=((k,v))
          case Some(x)=>_hash+=((k,v+x))
        }
    }
    }
    }
  }
//  输出值
  override def value(): mutable.HashMap[String,Int] = {
    _hash
  }
}

object CustomerAcc {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("Partition1").setMaster("local[*]")

    val sc = new SparkContext(sparkConf)

    val hash = new CustomerAcc()
    sc.register(hash)

    val rdd = sc.makeRDD(Array("a","b","c","a","b","c","d"))

    rdd.foreach(hash.add(_))

    for((k,v)<-hash.value){
      println("["+k+":"+v+"]")
    }

    sc.stop()
  }
}
  • 注意事项

  1. 累加器在Driver端定义赋初始值,累加器只能在Driver端读取最后的值,在Excutor端更新
  2. 累加器不是一个调优的操作,因为如果不这样做,结果是错的

参考博客:https://www.cnblogs.com/frankdeng/p/9301653.html 

                  https://blog.csdn.net/qq_32641659/article/details/90183882#23_REST_API_64

                  https://blog.csdn.net/qq_41936805/article/details/85527417#5RDDcheckpoint_151

原文地址:https://www.cnblogs.com/xuziyu/p/11052862.html