spark序列化及MapOutputTracker解析

本文主要打算对spark内部的序列化机制以及在shuffle map中起衔接作用的MapOutputTracker做一下剖析。主要涉及具体实现原理以及宏观设计的一些思路。

1,spark序列化

任何一个分布式框架,序列化都是其必不可少并且很重要一部分,spark也不例外。spark设计序列化的主要类以及之间的依赖包含关系如下图:

上图中,虚线表示依赖关系,而实线表示继承关系。各个类的解释如下:

a,SerializerManager可以认为是入口类,提供相关实现供外部序列化和反序列化时调用。

b,Serializer主要用来创建SerializerInstance,此类存在的作用主要是处理序列化过程中的多线程问题。

c,SerializationStream/DeSerializationStream具体做序列化,反序列化的抽象类。

此外,还包括以上抽象类的具体的两种序列化方式,java默认的序列化方式以及第三方library的kyro方式。spark默认采用java内生的序列化方式,但是kyro序列化方式能够提供更高的性能(有测试说10倍),所以建议修改序列化方式以优化性能。

2,MapOutputTracker解析

至于为什么要分析这个类,主要是因为在调研一个spark shuffledRDD的的prefer location的问题时碰到了,所以就决定深究一下。

在driver和executor端,MapOutputTracker对应的实现分别是MapOutputTrackerMaster和MapOutputTrackerWorker。总的来说,MapOutputTracker在spark shuffle过程中的map和reduce起着衔接作用。具体点就是:在shuffle map过程中,executor端MapOutputTrackerWorker会将task结束后产生的map状态上报给Driver端的MapOutputTrackerMaster,所以在MapOutputTrackerMaster端保存中spark在shuffle map过程中所有block的相关的详细(包括位置,block大小等信息)。在shuffle reduce的时候,通过读取MapOutputTrackerMaster中的这些位置大小信息,从而决定去远程或者本地fetch相关block数据。

下面就以上过程跟踪一下对应的源码。

a,在executor的任务结束以后,driver端的DAGSchuduler会进行相关处理,对应正常成功结束任务,其中要做的一个事情就是调用MapOutputTrackerMaster的registerMapOutputs方法,将当前shuffleId对应所有MapStatus保存到mapStatuses中。

b,在ShuffledRDD中,调用getPreferredLocations时,会调用MapOutputTracker中的getLocationsWithLargestOutputs函数,在函数中根据每个partition所在位置以及大小信息以及相关参数来决定当前shuffled RDD的perfer location。

此外,在shuffle reduce fetch数据时,也需要想MapOutputTrackerMaster发送GetMapOutputMessage消息,获取当前shuffle对应的map信息,这时driver端会将这些信息序列化以后发送给executor端。

3,小结

本文主要分析了一下spark中序列化的相关设计及主要类的作用。此外,还多shuffle过程中进行信息交互的MapOutTracker做了一下简要分析。尤其是序列化的设计,值得思考和借鉴。

原文地址:https://www.cnblogs.com/superhedantou/p/9172274.html