Spark编程模型

=============

RDD

MapReduce的不足:
计算之间数据共享只有一个办法,写入到文件系统如hdfs,引入了磁盘IO,序列化等开销,从而占据了大部分的执行时间。

RDD:

  • 更强的容错性,如通过数据集的血统,如通过两个父集join,map,filter出子集,可以快速恢复慢节点或某个分区的数据
  • 在并行阶段高效的进行数据共享,减少IO开销

RDD类型

  • 创建操作,从内存或外部系统创建,或转换操作生成新的rdd
  • 转换操作,惰性操作,只是定义了一个新的RDD,并没有立即执行
  • 控制操作,进行RDD持久化,保存在内存或磁盘中,避免二次计算
  • 行动操作,触发Spark运行的操作,如collect,count,或将rdd保存到外部文件系统或数据库中

RDD实现

1)作业调度

  • 当对RDD进行转换操作时,调度器会根据RDD的血统来构建若干调度阶段(Stage)组成有向无环图(DAG),每个调度阶段包含尽可能多的连续窄依赖转换。

  • 延时调度机制,分配任务时根据数据存储位置来把任务分配给较佳位置的节点,如需要处理的某个分区刚好存储在某个节点的内存中,则该任务会分配给该节点。

  • 以宽依赖划分Stage

    dataset1 = a group by a.biz_code 【group宽依赖】
    dataset2 = b.map(_.*) union c 【b map 操作为窄依赖, union 也是窄依赖】
    finaldataset = dataset1 join dataset2 on id 【join操作为宽依赖】

判断是宽依赖还是窄依赖:子集数据的分区仅从父集数据的某一个分区而来就是窄依赖,否则为宽依赖,如map操作,将一行数据 进行转换操作,生成一行新的数据,这就是窄依赖

上述代码可以划分为3个阶段

stage1: group by
stage2: map + union,虽然这两个操作都是窄依赖,但是结果将与stage1进行join,因此单独一个stage
stage3: join 

只要父数据集还在,就可以快速、并行的计算出丢失的分区

内存管理

3种存储策略

  • 未序列化Java对象在内存中
  • 序列化的数据存于内存中
  • 存储在磁盘中

参数解释

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-properties.html
http://ifeve.com/spark-config/

--executor-memory 5G executor内存,默认会申请Executor的10%作为堆外内存
spark.executor.memoryOverhead 直接指定executor堆外内存的大小 ,实际申请给executor的内存大小= executor-memory + memoryOverhead
spark.sql.columnVector.offheap.enabled
spark.network.timeout 所有网络交互的默认超时 120s
spark.memory.offHeap.enabled 如果true,Spark会尝试使用堆外内存
spark.memory.offHeap.size 堆外内存分配的大小(绝对值)
spark.shuffle.io.preferDirectBufs 默认 true仅netty)堆外缓存可以有效减少垃圾回收和缓存复制。对于堆外内存紧张的用户来说,可以考虑禁用这个选项,以迫使netty所有内存都分配在堆上。
spark.shuffle.file.buffer 默认32k 每个混洗输出流的内存buffer大小。这个buffer能减少混洗文件的创建和磁盘寻址。
spark.shuffle.unsafe.file.output.buffer
spark.shuffle.sort.initialBufferSize 内存块的大小,用来存储数据记录指针,最后用来排序
spark.yarn.maxAppAttempts 提交应用最大尝试次数
spark.reducer.maxBlocksInFlightPerAddress 此配置限制从给定主机端口每个reduce任务获取的远程块的数量。当在单次提取中同时从给定地址请求大量块时,这可能会使服务执行程序或节点管理器崩溃。这对于在启用外部随机播放时减少节点管理器上的负载特别有用。您可以通过将其设置为较低的值来缓解此问题。
spark.memory.fraction 0.75 堆内存中用于执行、混洗和存储(缓存)的比例。这个值越低,则执行中溢出到磁盘越频繁,同时缓存被逐出内存也更频繁。这个配置的目的,是为了留出用户自定义数据结构、内部元数据使用的内存。推荐使用默认值
spark.memory.storageFraction 0.5 不会被逐出内存的总量,表示一个相对于 spark.memory.fraction的比例。这个越高,那么执行混洗等操作用的内存就越少,从而溢出磁盘就越频繁。推荐使用默认值
spark.maxRemoteBlockSizeFetchToMem 当块的大小高于此阈值(以字节为单位)时,远程块将被提取到磁盘
spark.shuffle.spill.numElementsForceSpillThreshold
spark.shuffle.sort.bypassMergeThreshold 200 (高级)在基于排序(sort)的混洗管理器中,如果没有map端聚合的话,就会最多存在这么多个reduce分区

executor的jvm配置

--conf "spark.executor.extraJavaOptions=-XX:MaxMetaspaceSize=512m -XX:MetaspaceSize=512m -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MaxDirectMemorySize=4096m"

Memory调整

http://spark.apache.org/docs/latest/tuning.html#data-serialization
3个需要考虑的情况

  • 你自己的objects,如你可能想把整个dataset搞到内存
  • 到达这些objects的花费
  • gc使用的内存

Java objects是快速可获取的,由于以下原因object占用的内存可能是2-5x于raw data

  • object header,约16bytes
  • Java Strings have about 40 bytes of overhead over the raw string data (since they store it in an array of Chars and keep extra data such as the length), and store each character as two bytes due to String’s internal usage of UTF-16 encoding. Thus a 10-character string can easily consume 60 bytes.
  • Common collection classes, such as HashMap and LinkedList, use linked data structures, where there is a “wrapper” object for each entry (e.g. Map.Entry). This object not only has a header, but also pointers (typically 8 bytes each) to the next object in the list.
  • Collections of primitive types often store them as “boxed” objects such as java.lang.Integer.

如果

  • 一次任务中执行了多次full gc,则应该,说明内存不足
  • 很多minor gc,较少major gc,则应该调整Young区->大
  • old快要满了,可减少 spark.memory.fraction的值,让user objects可以用更多的内存,缓存占用更细空间,或降低xmn
  • 使用G1 -XX:+UseG1GC,如果执行的内存很大,可以增加G1的 -XX:G1HeapRegionSize
  • 如果从hdfs读数据,Eden建议大小= taskNumdecompressed times(2到3) hdfs block size
  • reduce task发生OutOfMemoryError ,可以考虑提供并行度,让单个task input变小,如果executor上task个数不变,则同一时刻需要的内存也就变小了

Tungsten

off-heap 内存管理,降低对象开销,GC频率

原文地址:https://www.cnblogs.com/windliu/p/11021773.html