031 广播变量与累加器

1.广播变量机制

  将传递给task的值,变成传递给executor。

  为什么可以共用,因为task是executor下的线程。

  只读的变量,在task中不允许修改

  

2.累加器介绍

  在只写的变量,在task中只允许被修改,不允许读的操作。

  但是在driver中就只能读操作。

  

 3.程序

  需求一:对应于MR中的累加器,累积计算次数

  需求二:将累加器做成共享变量来使用。避免了shuffle过程,提高了效率。

  1 package com.ibeifeng.senior.accumulator
  2 
  3 import org.apache.spark.{AccumulableParam, SparkConf, SparkContext}
  4 
  5 import scala.collection.mutable
  6 import scala.util.Random
  7 
  8 /**
  9   * Spark累加器
 10   * Created by ibf on 02/15.
 11   */
 12 object AccumulatorDemo {
 13   def main(args: Array[String]): Unit = {
 14     val conf = new SparkConf()
 15       //.setMaster("local[*]") // local模式下默认不进行失败重启机制
 16       .setMaster("local[*,4]") // 开启local模式的失败重启机制,重启次数4-1=3次
 17       .setAppName("accumulator")
 18     val sc = SparkContext.getOrCreate(conf)
 19 
 20     // ===============================
 21     val rdd = sc.parallelize(Array(
 22       "hadoop,spark,hbase",
 23       "spark,hbase,hadoop",
 24       "",
 25       "spark,hive,hue",
 26       "spark,hadoop",
 27       "spark,,hadoop,hive",
 28       "spark,hbase,hive",
 29       "hadoop,hbase,hive",
 30       "hive,hbase,spark,hadoop",
 31       "hive,hbase,hadoop,hue"
 32     ), 5)
 33 
 34     // 需求一:实现WordCount程序,同时统计输入的记录数量以及最终输出结果的数量
 35     val inputRecords = sc.accumulator(0, "Input Record Size")
 36     val outputRecords = sc.accumulator(0, "Output Record Size")
 37     rdd.flatMap(line => {
 38       // 累计数量
 39       inputRecords += 1
 40       val nline = if (line == null) "" else line
 41       // 进行数据分割、过滤、数据转换
 42       nline.split(",")
 43         .map(word => (word.trim, 1)) // 数据转换
 44         .filter(_._1.nonEmpty) // word非空,进行数据过滤
 45     })
 46       .reduceByKey(_ + _)
 47       .foreachPartition(iter => {
 48         iter.foreach(record => {
 49           // 累计数据
 50           outputRecords += 1
 51           println(record)
 52         })
 53       })
 54 
 55     println(s"Input Size:${inputRecords.value}")
 56     println(s"Ouput Size:${outputRecords.value}")
 57 



58 // 需求二:假设wordcount的最终结果可以在driver/executor节点的内存中保存下,要求不通过reduceByKey相关API实现wordcount程序 59 /** 60 * 1. 每个分区进行wordcount的统计,将结果保存到累加器中 61 * 2. 当分区全部执行完后,各个分区的累加器数据进行聚合操作 62 */ 63 val mapAccumulable = sc.accumulable(mutable.Map[String, Int]())(MapAccumulableParam)//MapAccumulableParam是强制转换 64 try 65 rdd.foreachPartition(iter => { 66 val index = Random.nextInt(2) // index的取值范围[0,1] 67 iter.foreach(line => { 68 val r = 1 / index 69 print(r) 70 val nline = if (line == null) "" else line 71 // 进行数据分割、过滤、数据转换 72 nline.split(",") 73 .filter(_.trim.nonEmpty) // 过滤空单词 74 .map(word => { 75 mapAccumulable += word // 统计word出现的次数 76 }) 77 }) 78 }) 79 catch { 80 case e: Exception => println(s"异常:${e.getMessage}") 81 } 82 println("result================") 83 mapAccumulable.value.foreach(println) 84 85 Thread.sleep(100000) 86 } 87 } 88 89 90 object MapAccumulableParam extends AccumulableParam[mutable.Map[String, Int], String] { 91 /** 92 * 添加一个string的元素到累加器中 93 * 94 * @param r 95 * @param t 96 * @return 97 */ 98 override def addAccumulator(r: mutable.Map[String, Int], t: String): mutable.Map[String, Int] = { 99 r += t -> (1 + r.getOrElse(t, 0)) 100 } 101 102 /** 103 * 合并两个数据 104 * 105 * @param r1 106 * @param r2 107 * @return 108 */ 109 override def addInPlace(r1: mutable.Map[String, Int], r2: mutable.Map[String, Int]): mutable.Map[String, Int] = { 110 r2.foldLeft(r1)((a, b) => { 111 a += b._1 -> (a.getOrElse(b._1, 0) + b._2) 112 }) 113 } 114 115 /** 116 * 返回初始值 117 * 118 * @param initialValue 119 * @return 120 */ 121 override def zero(initialValue: mutable.Map[String, Int]): mutable.Map[String, Int] = initialValue 122 }
原文地址:https://www.cnblogs.com/juncaoit/p/6542166.html