逻辑图:
表达的是什么:
逻辑图就是数据处理和存储的过程表达
什么是RDD之间的依赖关系:
什么是关系(依赖关系) ?
-
从算子视角上来看,
splitRDD
通过map
算子得到了tupleRDD
, 所以splitRDD
和tupleRDD
之间的关系是map
但是仅仅这样说, 会不够全面, 从细节上来看,
RDD
只是数据和关于数据的计算, 而具体执行这种计算得出结果的是一个神秘的其它组件, 所以, 这两个RDD
的关系可以表示为splitRDD
的数据通过map
操作, 被传入tupleRDD
, 这是它们之间更细化的关系但是
RDD
这个概念本身并不是数据容器, 数据真正应该存放的地方是RDD
的分区, 所以如果把视角放在数据这一层面上的话, 直接讲这两个 RDD 之间有关系是不科学的, 应该从这两个 RDD 的分区之间的关系来讨论它们之间的关系那这些分区之间是什么关系?如果仅仅说splitRDD
和tupleRDD
之间的话, 那它们的分区之间就是一对一的关系 -
但是
tupleRDD
到reduceRDD
呢?tupleRDD
通过算子reduceByKey
生成reduceRDD
, 而这个算子是一个Shuffle
操作,Shuffle
操作的两个RDD
的分区之间并不是一对一,reduceByKey
的一个分区对应tupleRDD
的多个分区RDD 之间的依赖关系详解
窄依赖
对于
cartesian
来说, 依赖关系如下上述图形中清晰展示如下现象
-
rddC
中的分区数量是两个父RDD
的分区数量之乘积 -
rddA
中每个分区对应rddC
中的两个分区 (因为rddB
中有两个分区),rddB
中的每个分区对应rddC
中的三个分区 (因为rddA
有三个分区)
它们之间是窄依赖, 事实上在
cartesian
中也是NarrowDependency
这个所有窄依赖的父类的唯一一次直接使用, 为什么呢?因为所有的分区之间是拷贝关系, 并不是 Shuffle 关系
-
rddC
中的每个分区并不是依赖多个父RDD
中的多个分区 -
rddC
中每个分区的数量来自一个父RDD
分区中的所有数据, 是一个FullDependence
, 所以数据可以直接从父RDD
流动到子RDD
-
不存在一个父
RDD
中一部分数据分发过去, 另一部分分发给其它的RDD
宽依赖
在
ShuffleDependency
的类声明上如下写到Represents a dependency on the output of a shuffle stage.
上面非常清楚的说道, 宽依赖就是
Shuffle
中的依赖关系, 换句话说, 只有Shuffle
产生的地方才是宽依赖那么宽窄依赖的判断依据就非常简单明确了, 是否有 Shuffle ?
举个
reduceByKey
的例子,rddB = rddA.reduceByKey( (curr, agg) ⇒ curr + agg )
会产生如下的依赖关系-
rddB
的每个分区都几乎依赖rddA
的所有分区 -
对于
rddA
中的一个分区来说, 其将一部分分发给rddB
的p1
, 另外一部分分发给rddB
的p2
, 这不是数据流动, 而是分发 -
什么是SHUFFLE:
-
如何分辨宽窄依赖 ?
其实分辨宽窄依赖的本身就是在分辨父子
RDD
之间是否有Shuffle
, 大致有以下的方法-
如果是
Shuffle
, 两个RDD
的分区之间不是单纯的数据流动, 而是分发和复制 -
一般
Shuffle
的子RDD
的每个分区会依赖父RDD
的多个分区
但是这样判断其实不准确, 如果想分辨某个算子是否是窄依赖, 或者是否是宽依赖, 则还是要取决于具体的算子, 例如想看
cartesian
生成的是宽依赖还是窄依赖, 可以通过如下步骤-
查看
map
算子生成的RDD
-
进去
RDD
查看getDependence
方法常见的窄依赖类型:
一对一窄依赖
其实
RDD
中默认的是OneToOneDependency
, 后被不同的RDD
子类指定为其它的依赖类型, 常见的一对一依赖是map
算子所产生的依赖, 例如rddB = rddA.map(…)
-
每个分区之间一一对应, 所以叫做一对一窄依赖
Range 窄依赖
Range
窄依赖其实也是一对一窄依赖, 但是保留了中间的分隔信息, 可以通过某个分区获取其父分区, 目前只有一个算子生成这种窄依赖, 就是union
算子, 例如rddC = rddA.union(rddB)
-
rddC
其实就是rddA
拼接rddB
生成的, 所以rddC
的p5
和p6
就是rddB
的p1
和p2
-
所以需要有方式获取到
rddC
的p5
其父分区是谁, 于是就需要记录一下边界, 其它部分和一对一窄依赖一样
多对一窄依赖
多对一窄依赖其图形和
Shuffle
依赖非常相似, 所以在遇到的时候, 要注意其RDD
之间是否有Shuffle
过程, 比较容易让人困惑, 常见的多对一依赖就是重分区算子coalesce
, 例如rddB = rddA.coalesce(2, shuffle = false)
, 但同时也要注意, 如果shuffle = true
那就是完全不同的情况了-
因为没有
Shuffle
, 所以这是一个窄依赖
再谈宽窄依赖的区别
宽窄依赖的区别非常重要, 因为涉及了一件非常重要的事情: 如何计算
RDD
?宽窄以来的核心区别是: 窄依赖的
RDD
可以放在一个Task
中运行物理图:
物理图的作用:
问题一: 物理图的意义是什么?
物理图解决的其实就是
RDD
流程生成以后, 如何计算和运行的问题, 也就是如何把 RDD 放在集群中执行的问题问题二: 如果要确定如何运行的问题, 则需要先确定集群中有什么组件
-
首先集群中物理元件就是一台一台的机器
-
其次这些机器上跑的守护进程有两种:
Master
,Worker
-
每个守护进程其实就代表了一台机器, 代表这台机器的角色, 代表这台机器和外界通信
-
例如我们常说一台机器是
Master
, 其含义是这台机器中运行了一个Master
守护进程, 如果一台机器运行了Master
的同时又运行了Worker
, 则说这台机器是Master
也可以, 说它是Worker
也行
-
-
真正能运行
RDD
的组件是:Executor
, 也就是说其实RDD
最终是运行在Executor
中的, 也就是说, 无论是Master
还是Worker
其实都是用于管理Executor
和调度程序的
结论是
RDD
一定在Executor
中计算, 而Master
和Worker
负责调度和管理Executor
如何划分阶段 ?
为了减少执行任务, 减少数据暂存和交换的机会, 所以需要创建管道, 让数据沿着管道流动, 其实也就是原先每个
RDD
都有一组Task
, 现在改为所有的RDD
共用一组Task
, 但是也有问题, 问题如下就是说, 在
Shuffle
处, 必须断开管道, 进行数据交换, 交换过后, 继续流动, 所以整个流程可以变为如下样子把
Task
断开成两个部分,Task4
可以从Task 1, 2, 3
中获取数据, 后Task4
又作为管道, 继续让数据在其中流动但是还有一个问题, 说断开就直接断开吗? 不用打个招呼的呀? 这个断开即没有道理, 也没有规则, 所以可以为这个断开增加一个概念叫做阶段, 按照阶段断开, 阶段的英文叫做
Stage
, 如下所以划分阶段的本身就是设置断开点的规则, 那么该如何划分阶段呢?
-
第一步, 从最后一个
RDD
, 也就是逻辑图中最右边的RDD
开始, 向前滑动Stage
的范围, 为Stage0
-
第二步, 遇到
ShuffleDependency
断开Stage
, 从下一个RDD
开始创建新的Stage
, 为Stage1
-
第三步, 新的
Stage
按照同样的规则继续滑动, 直到包裹所有的RDD
总结来看, 就是针对于宽窄依赖来判断, 一个
Stage
中只有窄依赖, 因为只有窄依赖才能形成数据的Pipeline
.如果要进行
Shuffle
的话, 数据是流不过去的, 必须要拷贝和拉取. 所以遇到RDD
宽依赖的两个RDD
时, 要切断这两个RDD
的Stage
.这样一个 RDD 依赖的链条, 我们称之为 RDD 的血统, 其中有宽依赖也有窄依赖
数据怎么流动 ?
val sc = ... val textRDD = sc.parallelize(Seq("Hadoop Spark", "Hadoop Flume", "Spark Sqoop")) val splitRDD = textRDD.flatMap(_.split(" ")) val tupleRDD = splitRDD.map((_, 1)) val reduceRDD = tupleRDD.reduceByKey(_ + _) val strRDD = reduceRDD.map(item => s"${item._1}, ${item._2}") strRDD.collect.foreach(item => println(item))
上述代码是这个章节我们一直使用的代码流程, 如下是其完整的逻辑执行图
如果放在集群中运行, 通过
WebUI
可以查看到如下DAG
结构Step 1: 从
ResultStage
开始执行-
最接近
Result
部分的Stage id
为 0, 这个Stage
被称之为ResultStage
由代码可以知道, 最终调用
Action
促使整个流程执行的是最后一个RDD
,strRDD.collect
, 所以当执行RDD
的计算时候, 先计算的也是这个RDD
Step 2:
RDD
之间是有关联的-
前面已经知道, 最后一个
RDD
先得到执行机会, 先从这个RDD
开始执行, 但是这个RDD
中有数据吗 ? 如果没有数据, 它的计算是什么? 它的计算是从父RDD
中获取数据, 并执行传入的算子的函数简单来说, 从产生
Result
的地方开始计算, 但是其RDD
中是没数据的, 所以会找到父RDD
来要数据, 父RDD
也没有数据, 继续向上要, 所以, 计算从Result
处调用, 但是从整个逻辑图中的最左边RDD
开始, 类似一个递归的过程
运行过程:
逻辑图
-
是什么 怎么生成 具体怎么生成
val textRDD = sc.parallelize(Seq("Hadoop Spark", "Hadoop Flume", "Spark Sqoop")) val splitRDD = textRDD.flatMap(_.split(" ")) val tupleRDD = splitRDD.map((_, 1)) val reduceRDD = tupleRDD.reduceByKey(_ + _) val strRDD = reduceRDD.map(item => s"${item._1}, ${item._2}")
逻辑图如何生成
-
上述代码在
Spark Application
的main
方法中执行, 而Spark Application
在Driver
中执行, 所以上述代码在Driver
中被执行, 那么这段代码执行的结果是什么呢?一段
Scala
代码的执行结果就是最后一行的执行结果, 所以上述的代码, 从逻辑上执行结果就是最后一个RDD
, 最后一个RDD
也可以认为就是逻辑执行图, 为什么呢?例如
rdd2 = rdd1.map(…)
中, 其实本质上rdd2
是一个类型为MapPartitionsRDD
的对象, 而创建这个对象的时候, 会通过构造函数传入当前RDD
对象, 也就是父RDD
, 也就是调用map
算子的rdd1
,rdd1
是rdd2
的父RDD
一个
RDD
依赖另外一个RDD
, 这个RDD
又依赖另外的RDD
, 一个RDD
可以通过getDependency
获得其父RDD
, 这种环环相扣的关系, 最终从最后一个RDD
就可以推演出前面所有的RDD
- 逻辑图是什么, 干啥用
-
逻辑图其实本质上描述的就是数据的计算过程, 数据从哪来, 经过什么样的计算, 得到什么样的结果, 再执行什么计算, 得到什么结果
可是数据的计算是描述好了, 这种计算该如何执行呢?
-
物理图
-
数据的计算表示好了, 该正式执行了, 但是如何执行? 如何执行更快更好更酷? 就需要为其执行做一个规划, 所以需要生成物理执行图
strRDD.collect.foreach(item => println(item))
上述代码其实就是最后的一个
RDD
调用了Action
方法, 调用Action
方法的时候, 会请求一个叫做DAGScheduler
的组件,DAGScheduler
会创建用于执行RDD
的Stage
和Task
DAGScheduler
是一个由SparkContext
创建, 运行在Driver
上的组件, 其作用就是将由RDD
构建出来的逻辑计划, 构建成为由真正在集群中运行的Task
组成的物理执行计划,DAGScheduler
主要做如下三件事-
帮助每个
Job
计算DAG
并发给TaskSheduler
调度 -
确定每个
Task
的最佳位置 -
跟踪
RDD
的缓存状态, 避免重新计算
从字面意思上来看,
DAGScheduler
是调度DAG
去运行的,DAG
被称作为有向无环图, 其实可以将DAG
理解为就是RDD
的逻辑图, 其呈现两个特点:RDD
的计算是有方向的,RDD
的计算是无环的, 所以DAGScheduler
也可以称之为RDD Scheduler
, 但是真正运行在集群中的并不是RDD
, 而是Task
和Stage
,DAGScheduler
负责这种转换 -
Job
是什么 ?-
Job
什么时候生成 ?-
当一个
RDD
调用了Action
算子的时候, 在Action
算子内部, 会使用sc.runJob()
调用SparkContext
中的runJob
方法, 这个方法又会调用DAGScheduler
中的runJob
, 后在DAGScheduler
中使用消息驱动的形式创建Job
简而言之,
Job
在RDD
调用Action
算子的时候生成, 而且调用一次Action
算子, 就会生成一个Job
, 如果一个SparkApplication
中调用了多次Action
算子, 会生成多个Job
串行执行, 每个Job
独立运作, 被独立调度, 所以RDD
的计算也会被执行多次 Job
是什么 ?-
如果要将
Spark
的程序调度到集群中运行,Job
是粒度最大的单位, 调度以Job
为最大单位, 将Job
拆分为Stage
和Task
去调度分发和运行, 一个Job
就是一个Spark
程序从读取 → 计算 → 运行
的过程一个
Spark Application
可以包含多个Job
, 这些Job
之间是串行的, 也就是第二个Job
需要等待第一个Job
的执行结束后才会开始执行
-
Job
和Stage
的关系-
Job
是一个最大的调度单位, 也就是说DAGScheduler
会首先创建一个Job
的相关信息, 后去调度Job
, 但是没办法直接调度Job
, 比如说现在要做一盘手撕包菜, 不可能直接去炒一整颗包菜, 要切好撕碎, 再去炒- 为什么
Job
需要切分 ? -
-
因为
Job
的含义是对整个RDD
血统求值, 但是RDD
之间可能会有一些宽依赖 -
如果遇到宽依赖的话, 两个
RDD
之间需要进行数据拉取和复制如果要进行拉取和复制的话, 那么一个
RDD
就必须等待它所依赖的RDD
所有分区先计算完成, 然后再进行拉取 -
由上得知, 一个
Job
是无法计算完整个RDD
血统的
-
如何切分 ?
-
创建一个
Stage
, 从后向前回溯RDD
, 遇到Shuffle
依赖就结束Stage
, 后创建新的Stage
继续回溯. 这个过程上面已经详细的讲解过, 但是问题是切分以后如何执行呢, 从后向前还是从前向后, 是串行执行多个Stage
, 还是并行执行多个Stage
问题一: 执行顺序
-
在图中,
Stage 0
的计算需要依赖Stage 1
的数据, 因为reduceRDD
中一个分区可能需要多个tupleRDD
分区的数据, 所以tupleRDD
必须先计算完, 所以, 应该在逻辑图中自左向右执行Stage
问题二: 串行还是并行
-
还是同样的原因,
Stage 0
如果想计算,Stage 1
必须先计算完, 因为Stage 0
中每个分区都依赖Stage 1
中的所有分区, 所以Stage 1
不仅需要先执行, 而且Stage 1
执行完之前Stage 0
无法执行, 它们只能串行执行
总结
-
-
一个
Stage
就是物理执行计划中的一个步骤, 一个Spark Job
就是划分到不同Stage
的计算过程 -
Stage
之间的边界由Shuffle
操作来确定-
Stage
内的RDD
之间都是窄依赖, 可以放在一个管道中执行 -
而
Shuffle
后的Stage
需要等待前面Stage
的执行
-
Stage
有两种-
ShuffMapStage
, 其中存放窄依赖的RDD
-
ResultStage
, 每个Job
只有一个, 负责计算结果, 一个ResultStage
执行完成标志着整个Job
执行完毕
-
- 为什么
Stage
和Task
的关系-
前面我们说到
Job
无法直接执行, 需要先划分为多个Stage
, 去执行Stage
, 那么Stage
可以直接执行吗?-
第一点:
Stage
中的RDD
之间是窄依赖因为
Stage
中的所有RDD
之间都是窄依赖, 窄依赖RDD
理论上是可以放在同一个Pipeline(管道, 流水线)
中执行的, 似乎可以直接调度Stage
了? 其实不行, 看第二点 -
第二点: 别忘了
RDD
还有分区一个
RDD
只是一个概念, 而真正存放和处理数据时, 都是以分区作为单位的Stage
对应的是多个整体上的RDD
, 而真正的运行是需要针对RDD
的分区来进行的 -
第三点: 一个
Task
对应一个RDD
的分区一个比
Stage
粒度更细的单元叫做Task
,Stage
是由Task
组成的, 之所以有Task
这个概念, 是因为Stage
针对整个RDD
, 而计算的时候, 要针对RDD
的分区假设一个
Stage
中有 10 个RDD
, 这些RDD
中的分区各不相同, 但是分区最多的RDD
有 30 个分区, 而且很显然, 它们之间是窄依赖关系那么, 这个
Stage
中应该有多少Task
呢? 应该有 30 个Task
, 因为一个Task
计算一个RDD
的分区. 这个Stage
至多有 30 个分区需要计算 -
总结
-
一个
Stage
就是一组并行的Task
集合 -
Task 是 Spark 中最小的独立执行单元, 其作用是处理一个 RDD 分区
-
一个 Task 只可能存在于一个 Stage 中, 并且只能计算一个 RDD 的分区
-
-
TaskSet
-
梳理一下这几个概念,
Job > Stage > Task
,Job 中包含 Stage 中包含 Task
而
Stage
中经常会有一组Task
需要同时执行, 所以针对于每一个Task
来进行调度太过繁琐, 而且没有意义, 所以每个Stage
中的Task
们会被收集起来, 放入一个TaskSet
集合中-
一个
Stage
有一个TaskSet
-
TaskSet
中Task
的个数由Stage
中的最大分区数决定
-
整体执行流程
高级特性:
闭包:
class Closure { @Test def test(): Unit = { val areaFunction = closure() val area = areaFunction(2) println(area) } def closure(): Int => Double = { val factor = 3.14 val areaFunction = (r: Int) => math.pow(r, 2) * factor areaFunction } }
什么是闭包?
val areaFunction = closure() areaFunction()
通过
closure
返回的函数areaFunction
就是一个闭包, 其函数内部的作用域并不是test
函数的作用域, 这种连带作用域一起打包的方式, 我们称之为闭包, 在 Scala 中Scala 中的闭包本质上就是一个对象, 是 FunctionX 的实例
全局累加器:
一个小问题
var count = 0 val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]") val sc = new SparkContext(config) sc.parallelize(Seq(1, 2, 3, 4, 5)) .foreach(count += _) println(count)
上面这段代码是一个非常错误的使用, 请不要仿照, 这段代码只是为了证明一些事情
先明确两件事,
var count = 0
是在 Driver 中定义的,foreach(count += _)
这个算子以及传递进去的闭包运行在 Executor 中这段代码整体想做的事情是累加一个变量, 但是这段代码的写法却做不到这件事, 原因也很简单, 因为具体的算子是闭包, 被分发给不同的节点运行, 所以这个闭包中累加的并不是 Driver 中的这个变量
全局累加器
Accumulators(累加器) 是一个只支持
added
(添加) 的分布式变量, 可以在分布式环境下保持一致性, 并且能够做到高效的并发.原生 Spark 支持数值型的累加器, 可以用于实现计数或者求和, 开发者也可以使用自定义累加器以实现更高级的需求
val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]") val sc = new SparkContext(config) val counter = sc.longAccumulator("counter") sc.parallelize(Seq(1, 2, 3, 4, 5)) .foreach(counter.add(_)) // 运行结果: 15 println(counter.value)
注意点:
-
Accumulator 是支持并发并行的, 在任何地方都可以通过
add
来修改数值, 无论是 Driver 还是 Executor -
只能在 Driver 中才能调用
value
来获取数值
在 WebUI 中关于 Job 部分也可以看到 Accumulator 的信息, 以及其运行的情况
package cn.itcast.spark.rdd import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.util.AccumulatorV2 import org.junit.Test import scala.collection.mutable class Accumulator { /** * RDD -> (1, 2, 3, 4, 5) -> Set(1,2,3,4,5) */ @Test def acc(): Unit = { val config = new SparkConf().setAppName("acc").setMaster("local[6]") val sc = new SparkContext(config) val numAcc = new NumAccumulator() // 注册给 Spark sc.register(numAcc, "num") sc.parallelize(Seq("1", "2", "3")) .foreach(item => numAcc.add(item)) println(numAcc.value) sc.stop() } } class NumAccumulator extends AccumulatorV2[String, Set[String]] {//累加前后的数据类型 private val nums: mutable.Set[String] = mutable.Set()//mutable保证是可变的 /** * 告诉 Spark 框架, 这个累加器对象是否是空的 */ override def isZero: Boolean = { nums.isEmpty } /** * 提供给 Spark 框架一个拷贝的累加器 * @return */ override def copy(): AccumulatorV2[String, Set[String]] = { val newAccumulator = new NumAccumulator() nums.synchronized { newAccumulator.nums ++= this.nums//++=是将两个集合加 } newAccumulator } /** * 帮助 Spark 框架, 清理累加器的内容 */ override def reset(): Unit = { nums.clear() } /** * 外部传入要累加的内容, 在这个方法中进行累加 */ override def add(v: String): Unit = { nums += v } /** * 累加器在进行累加的时候, 可能每个分布式节点都有一个实例 * 在最后 Driver 进行一次合并, 把所有的实例的内容合并起来, 会调用这个 merge 方法进行合并 */ override def merge(other: AccumulatorV2[String, Set[String]]): Unit = { nums ++= other.value } /** * 提供给外部累加结果 * 为什么一定要给不可变的, 因为外部有可能再进行修改, 如果是可变的集合, 其外部的修改会影响内部的值 */ override def value: Set[String] = { nums.toSet } }
广播变量:
广播变量的作用
广播变量允许开发者将一个
Read-Only
的变量缓存到集群中每个节点中, 而不是传递给每一个 Task 一个副本.-
集群中每个节点, 指的是一个机器
-
每一个 Task, 一个 Task 是一个 Stage 中的最小处理单元, 一个 Executor 中可以有多个 Stage, 每个 Stage 有多个 Task
所以在需要跨多个 Stage 的多个 Task 中使用相同数据的情况下, 广播特别的有用
广播变量的API
方法名 描述 id
唯一标识
value
广播变量的值
unpersist
在 Executor 中异步的删除缓存副本
destroy
销毁所有此广播变量所关联的数据和元数据
toString
字符串表示
使用广播变量的一般套路
-
可以通过如下方式创建广播变量
val b = sc.broadcast(1)
如果 Log 级别为 DEBUG 的时候, 会打印如下信息
DEBUG BlockManager: Put block broadcast_0 locally took 430 ms DEBUG BlockManager: Putting block broadcast_0 without replication took 431 ms DEBUG BlockManager: Told master about block broadcast_0_piece0 DEBUG BlockManager: Put block broadcast_0_piece0 locally took 4 ms DEBUG BlockManager: Putting block broadcast_0_piece0 without replication took 4 ms
创建后可以使用
value
获取数据b.value
获取数据的时候会打印如下信息
DEBUG BlockManager: Getting local block broadcast_0 DEBUG BlockManager: Level for block broadcast_0 is StorageLevel(disk, memory, deserialized, 1 replicas)
广播变量使用完了以后, 可以使用
unpersist
删除数据b.unpersist
删除数据以后, 可以使用
destroy
销毁变量, 释放内存空间b.destroy
销毁以后, 会打印如下信息
DEBUG BlockManager: Removing broadcast 0 DEBUG BlockManager: Removing block broadcast_0_piece0 DEBUG BlockManager: Told master about block broadcast_0_piece0 DEBUG BlockManager: Removing block broadcast_0
使用
value
方法的注意点-
方法签名
value: T
在
value
方法内部会确保使用获取数据的时候, 变量必须是可用状态, 所以必须在变量被destroy
之前使用value
方法, 如果使用value
时变量已经失效, 则会爆出以下错误org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed (destroy at <console>:27) at org.apache.spark.broadcast.Broadcast.assertValid(Broadcast.scala:144) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:69) ... 48 elided
使用
destroy
方法的注意点-
方法签名
destroy(): Unit
destroy
方法会移除广播变量, 彻底销毁掉, 但是如果你试图多次destroy
广播变量, 则会爆出以下错误org.apache.spark.SparkException: Attempted to use Broadcast(0) after it was destroyed (destroy at <console>:27) at org.apache.spark.broadcast.Broadcast.assertValid(Broadcast.scala:144) at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:107) at org.apache.spark.broadcast.Broadcast.destroy(Broadcast.scala:98) ... 48 elided
package cn.itcast.spark.rdd import org.apache.spark.{SparkConf, SparkContext} import org.junit.Test class Broadcast { /** * 资源占用比较大, 有十个对应的 value */ @Test def bc1(): Unit = { // 数据, 假装这个数据很大, 大概一百兆 val v = Map("Spark" -> "http://spark.apache.cn", "Scala" -> "http://www.scala-lang.org") val config = new SparkConf().setMaster("local[6]").setAppName("bc") val sc = new SparkContext(config) // 将其中的 Spark 和 Scala 转为对应的网址 val r = sc.parallelize(Seq("Spark", "Scala")) val result = r.map(item => v(item)).collect() println(result) } /** * 使用广播, 大幅度减少 value 的复制 */ @Test def bc2(): Unit = { // 数据, 假装这个数据很大, 大概一百兆 val v = Map("Spark" -> "http://spark.apache.cn", "Scala" -> "http://www.scala-lang.org") val config = new SparkConf().setMaster("local[6]").setAppName("bc") val sc = new SparkContext(config) // 创建广播 val bc = sc.broadcast(v) // 将其中的 Spark 和 Scala 转为对应的网址 val r = sc.parallelize(Seq("Spark", "Scala")) // 在算子中使用广播变量代替直接引用集合, 只会复制和executor一样的数量 // 在使用广播之前, 复制 map 了 task 数量份 // 在使用广播以后, 复制次数和 executor 数量一致 val result = r.map(item => bc.value(item)).collect() result.foreach(println(_)) } }
-
-
-