一、RDD的分区和Shuffle
-
RDD 的分区操作
-
Shuffle 的原理
- 分区的作用
-
RDD 使用分区来分布式并行处理数据, 并且要做到尽量少的在不同的 Executor 之间使用网络交换数据, 所以当使用 RDD 读取数据的时候, 会尽量的在物理上靠近数据源, 比如说在读取 Cassandra 或者 HDFS 中数据的时候, 会尽量的保持 RDD 的分区和数据源的分区数, 分区模式等一一对应
- 分区和 Shuffle 的关系
-
分区的主要作用是用来实现并行计算, 本质上和 Shuffle 没什么关系, 但是往往在进行数据处理的时候, 例如`reduceByKey`, `groupByKey`等聚合操作, 需要把 Key 相同的 Value 拉取到一起进行计算, 这个时候因为这些 Key 相同的 Value 可能会坐落于不同的分区, 于是理解分区才能理解 Shuffle 的根本原理
- Spark 中的 Shuffle 操作的特点
-
-
只有
Key-Value
型的 RDD 才会有 Shuffle 操作, 例如RDD[(K, V)]
, 但是有一个特例, 就是repartition
算子可以对任何数据类型 Shuffle -
早期版本 Spark 的 Shuffle 算法是
Hash base shuffle
, 后来改为Sort base shuffle
, 更适合大吞吐量的场景
-
1、查看RDD分区
2、指定RDD分区
-----在本地集合创建的时候指定分区数
-----通过读取文件创建的时候指定分区数
-----如何进行重分区coalese和repatitions
-----通过其他算子指定分区数(很多算子都可以指定分区数例如partitioner分区函数)
很多算子都可以指定分区数:
1、一般情况下设计shuffle操作的算子都运行指定分区数
2、一般这些算子,可以在最后一个参数的位置传入新的分区数
3、如果没有重新指定分区数,默认从父RDD中继承分区数
partitioner分区函数“:
-----shuffle过程
reduceByKey
这个算子本质上就是先按照 Key 分组, 后对每一组数据进行 reduce
, 所面临的挑战就是 Key 相同的所有数据可能分布在不同的 Partition 分区中, 甚至可能在不同的节点中, 但是它们必须被共同计算.
为了让来自相同 Key 的所有数据都在 reduceByKey
的同一个 reduce
中处理, 需要执行一个 all-to-all
的操作, 需要在不同的节点(不同的分区)之间拷贝数据, 必须跨分区聚集相同 Key 的所有数据, 这个过程叫做 Shuffle
.
--------------------shuffle的原理---------
Spark 的 Shuffle 发展大致有两个阶段: Hash base shuffle
和 Sort base shuffle
- Hash base shuffle
- Sort base shuffle
二、RDD的缓存
@Test def prepare(): Unit = { // 1. 创建 SC val conf = new SparkConf().setAppName("cache_prepare").setMaster("local[6]") val sc = new SparkContext(conf) // 2. 读取文件 val source = sc.textFile("dataset/access_log_sample.txt") // 3. 取出IP, 赋予初始频率 val countRDD = source.map( item => (item.split(" ")(0), 1) ) // 4. 数据清洗 val cleanRDD = countRDD.filter( item => StringUtils.isNotEmpty(item._1) ) // 5. 统计IP出现的次数(聚合) val aggRDD = cleanRDD.reduceByKey( (curr, agg) => curr + agg ) // 6. 统计出现次数最少的IP(得出结论) val lessIp = aggRDD.sortBy(item => item._2, ascending = true).first() // 7. 统计出现次数最多的IP(得出结论) val moreIp = aggRDD.sortBy(item => item._2, ascending = false).first() println((lessIp, moreIp)) }
-
缓存的意义
-
缓存相关的 API
-
缓存级别以及最佳实践
4.1. 缓存的意义
- 使用缓存的原因 - 多次使用 RDD
- 使用缓存的原因 - 容错
上述两个问题的解决方案其实都是 缓存
, 除此之外, 使用缓存的理由还有很多, 但是总结一句, 就是缓存能够帮助开发者在进行一些昂贵操作后, 将其结果保存下来, 以便下次使用无需再次执行, 缓存能够显著的 提升性能.
所以, 缓存适合在一个 RDD 需要重复多次利用, 并且还不是特别大的情况下使用, 例如迭代计算等场景.
@Test def cache(): Unit = { // 1. 创建 SC val conf = new SparkConf().setAppName("cache_prepare").setMaster("local[6]") val sc = new SparkContext(conf) // 2. 读取文件 val source = sc.textFile("dataset/access_log_sample.txt") // 3. 取出IP, 赋予初始频率 val countRDD = source.map( item => (item.split(" ")(0), 1) ) // 4. 数据清洗 val cleanRDD = countRDD.filter( item => StringUtils.isNotEmpty(item._1) ) // 5. 统计IP出现的次数(聚合) var aggRDD = cleanRDD.reduceByKey( (curr, agg) => curr + agg ) aggRDD=aggRDD.cache() //action之前进行缓存 // 6. 统计出现次数最少的IP(得出结论) val lessIp = aggRDD.sortBy(item => item._2, ascending = true).first() // 7. 统计出现次数最多的IP(得出结论) val moreIp = aggRDD.sortBy(item => item._2, ascending = false).first() println((lessIp, moreIp)) }
@Test def persist(): Unit = { // 1. 创建 SC val conf = new SparkConf().setAppName("cache_prepare").setMaster("local[6]") val sc = new SparkContext(conf) // 2. 读取文件 val source = sc.textFile("dataset/access_log_sample.txt") // 3. 取出IP, 赋予初始频率 val countRDD = source.map( item => (item.split(" ")(0), 1) ) // 4. 数据清洗 val cleanRDD = countRDD.filter( item => StringUtils.isNotEmpty(item._1) ) // 5. 统计IP出现的次数(聚合) var aggRDD = cleanRDD.reduceByKey( (curr, agg) => curr + agg ) aggRDD=aggRDD.persist() //action之前进行缓存 // 6. 统计出现次数最少的IP(得出结论) val lessIp = aggRDD.sortBy(item => item._2, ascending = true).first() // 7. 统计出现次数最多的IP(得出结论) val moreIp = aggRDD.sortBy(item => item._2, ascending = false).first() println((lessIp, moreIp)) }
@Test def persist(): Unit = { // 1. 创建 SC val conf = new SparkConf().setAppName("cache_prepare").setMaster("local[6]") val sc = new SparkContext(conf) // 2. 读取文件 val source = sc.textFile("dataset/access_log_sample.txt") // 3. 取出IP, 赋予初始频率 val countRDD = source.map( item => (item.split(" ")(0), 1) ) // 4. 数据清洗 val cleanRDD = countRDD.filter( item => StringUtils.isNotEmpty(item._1) ) // 5. 统计IP出现的次数(聚合) var aggRDD = cleanRDD.reduceByKey( (curr, agg) => curr + agg ) aggRDD=aggRDD.persist(StorageLevel.MEMORY_ONLY) //action之前进行缓存 // 6. 统计出现次数最少的IP(得出结论) val lessIp = aggRDD.sortBy(item => item._2, ascending = true).first() // 7. 统计出现次数最多的IP(得出结论) val moreIp = aggRDD.sortBy(item => item._2, ascending = false).first() println((lessIp, moreIp)) }
缓存级别
其实如何缓存是一个技术活, 有很多细节需要思考, 如下
-
是否使用磁盘缓存?
-
是否使用内存缓存?
-
是否使用堆外内存?
-
缓存前是否先序列化?
-
是否需要有副本?
如果要回答这些信息的话, 可以先查看一下 RDD 的缓存级别对象
val conf = new SparkConf().setMaster("local[6]").setAppName("debug_string")
val sc = new SparkContext(conf)
val interimRDD = sc.textFile("dataset/access_log_sample.txt")
.map(item => (item.split(" ")(0), 1))
.filter(item => StringUtils.isNotBlank(item._1))
.reduceByKey((curr, agg) => curr + agg)
.persist()
println(interimRDD.getStorageLevel)
sc.stop()
打印出来的对象是 StorageLevel
, 其中有如下几个构造参数
根据这几个参数的不同, StorageLevel
有如下几个枚举对象
缓存级别 | userDisk 是否使用磁盘 | useMemory 是否使用内存 | useOffHeap 是否使用堆外内存 | deserialized 是否以反序列化形式存储 | replication 副本数 |
---|---|---|---|---|---|
|
false |
false |
false |
false |
1 |
|
true |
false |
false |
false |
1 |
|
true |
false |
false |
false |
2 |
|
false |
true |
false |
true |
1 |
|
false |
true |
false |
true |
2 |
|
false |
true |
false |
false |
1 |
|
false |
true |
false |
false |
2 |
|
true |
true |
false |
true |
1 |
|
true |
true |
false |
true |
2 |
|
true |
true |
false |
false |
1 |
|
true |
true |
false |
false |
2 |
|
true |
true |
true |
false |
1 |