spark shuffle过程分析

spark shuffle流程分析

回到ShuffleMapTask.runTask函数

如今回到ShuffleMapTask.runTask函数中:

overridedef runTask(context:TaskContext): MapStatus = {

首先得到要reducetask的个数。

valnumOutputSplits= dep.partitioner.numPartitions

metrics= Some(context.taskMetrics)


valblockManager= SparkEnv.get.blockManager

valshuffleBlockManager= blockManager.shuffleBlockManager

varshuffle:ShuffleWriterGroup = null

varsuccess =false


try{

得到对数据时行serializer操作的类。

//Obtain all the block writers for shuffle blocks.

valser =SparkEnv.get.serializerManager.get(dep.serializerClass,SparkEnv.get.conf)

通过shuffleid与要进行reducetask个数,生成ShuffleBlockId

同一时候依据blockid生成ShuffleWriterGroup.shuffle的实现为DiskBlockObjectWriter

通过spark.shuffle.consolidateFiles配置是否合并文件的输入。默认的为false,

合并文件设置为true,下次再有task在本机执行时,会直接打开当前输入的文件进行输入。

shuffle= shuffleBlockManager.forMapTask(dep.shuffleId,partitionId,numOutputSplits,ser)

依据rdditerator取出数据,依据elementkey又一次进行partition,又一次写入到shuffle

//Write the map output to its associated buckets.

for(elem <-rdd.iterator(split,context)) {

valpair =elem.asInstanceOf[Product2[Any,Any]]

valbucketId =dep.partitioner.getPartition(pair._1)

每个partition都相应着一个DiskBlockObjectWriter,通过此实例的write函数,写入shuffle的数据。

也就是说,这个时候此RDD远行的task个数为core的个数,此时打开的文件个数为corenum*numpartition

shuffle.writers(bucketId).write(pair)

}


//Commit the writes. Get the size of each bucket block (total blocksize).

vartotalBytes= 0L

vartotalTime =0L

把这次打开的所有的文件所有commit,同一时候关闭文件的输入。

valcompressedSizes:Array[Byte] = shuffle.writers.map{ writer: BlockObjectWriter =>

writer.commit()

writer.close()

valsize =writer.fileSegment().length

totalBytes+= size

totalTime+= writer.timeWriting()

MapOutputTracker.compressSize(size)

}


//Update shuffle metrics.

valshuffleMetrics= newShuffleWriteMetrics

shuffleMetrics.shuffleBytesWritten= totalBytes

shuffleMetrics.shuffleWriteTime= totalTime

metrics.get.shuffleWriteMetrics= Some(shuffleMetrics)


success= true

newMapStatus(blockManager.blockManagerId,compressedSizes)

}catch{ casee:Exception =>

//If there is an exception from running the task, revert the partialwrites

//and throw the exception upstream to Spark.

if(shuffle !=null&& shuffle.writers!= null){

for(writer <-shuffle.writers){

writer.revertPartialWrites()

writer.close()

}

}

throwe

}finally{

//Release the writers back to the shuffle block manager.

if(shuffle !=null&& shuffle.writers!= null){

shuffle.releaseWriters(success)

}

//Execute the callbackson task completion.

context.executeOnCompleteCallbacks()

}

}


关于SparkEnv

ShuffleMapTask.runTask中開始就通过SparkEnv.get去获取SparkEnv里面的内容。

SparkEnv中主要通过ThreadLocal来存储此实例。

此实例中包括Akkaactor,serializer,BlockManager,shuffle使用的MapoutputTracker等。

SparkEnv实例生成包括两部分,masterworker,

master是在sparkcontext生成时生成,worker是在executor生成时生成

因此如今我们来分析下这个类定义


针对每个Worker中的executor会生成一个SparkEnv实例:

Executor实例生成时,会运行发下代码:

设置当前executor的属性env为创建一个SparkEnv实例,此实例通过当前的executorId与当前的host生成。

privateval env= {

if(!isLocal) {

val_env =SparkEnv.create(conf,executorId, slaveHostname, 0,

isDriver = false,isLocal = false)

SparkEnv.set(_env)

_env.metricsSystem.registerSource(executorSource)

_env

}else{

SparkEnv.get

}

}


针对master启动时生成的SparkEnv实例:

通过在生成SparkContext实例时,生成SparkEnv属性:

private[spark]val env= SparkEnv.create(

conf,

//注意:此处使用的是driver,表示这是一个driver程序(master),worker时这里传入的是详细的executorid

"<driver>",

conf.get("spark.driver.host"),

conf.get("spark.driver.port").toInt,

isDriver = true,

isLocal = isLocal)

SparkEnv.set(env)


生成的env实例,此实例是一个线程本地实例,每个线程都有自己独立的SparkEnv

private valenv = newThreadLocal[SparkEnv]

声明可变的变量,用来存储最后变化的实例,通过sparkEnv.get时假设env不存在,会拿这个值

@volatileprivatevarlastSetSparkEnv: SparkEnv = _


defset(e: SparkEnv) {

lastSetSparkEnv= e

env.set(e)

}


defget: SparkEnv = {

Option(env.get()).getOrElse(lastSetSparkEnv)

}


以下是sparkenvcreate函数:


private[spark]def create(

conf: SparkConf,

executorId: String,

hostname: String,

port: Int,

isDriver: Boolean,

isLocal: Boolean): SparkEnv = {


val(actorSystem,boundPort)= AkkaUtils.createActorSystem("spark",hostname, port,

conf = conf)


//Bit of a hack: If this is the driver and our port was 0 (meaning bindto any free port),

//figure out which port number Akkaactually bound to and set spark.driver.port to it.

if(isDriver && port == 0){

conf.set("spark.driver.port", boundPort.toString)

}


valclassLoader= Thread.currentThread.getContextClassLoader


//Create an instance of the class named by the given Java systemproperty, or by

//defaultClassName if the property is not set, and return it as a T

definstantiateClass[T](propertyName: String, defaultClassName: String):T = {

valname =conf.get(propertyName, defaultClassName)

Class.forName(name,true,classLoader).newInstance().asInstanceOf[T]

}

生成一个Serializermanager实例

valserializerManager= newSerializerManager

得到配置的Serializer实例。这个地方有部分资料建议配置为org.apache.spark.serializer.KryoSerializer.

请參见http://spark.apache.org/docs/0.9.0/tuning.html的说明。

valserializer= serializerManager.setDefault(

conf.get("spark.serializer","org.apache.spark.serializer.JavaSerializer"),conf)

闭包使用的serializer,假设闭包中函数使用了大量的对象,可改动默认的值

valclosureSerializer= serializerManager.get(

conf.get("spark.closure.serializer","org.apache.spark.serializer.JavaSerializer"),

conf)

此部分检查是否是driver(也就是是否是master)

defregisterOrLookup(name: String, newActor: => Actor): ActorRef = {

假设是master时,生成一个actor的实例。

if(isDriver) {

logInfo("Registering" + name)

actorSystem.actorOf(Props(newActor),name = name)

} else{

否则表示是worker,生成一个actor的引用。

对指定的actor进行连接,生成actorref

valdriverHost:String = conf.get("spark.driver.host","localhost")

valdriverPort:Int = conf.getInt("spark.driver.port",7077)

Utils.checkHost(driverHost,"Expected hostname")

valurl =s"akka.tcp://spark@$driverHost:$driverPort/user/$name"

valtimeout =AkkaUtils.lookupTimeout(conf)

logInfo(s"Connectingto $name:$url")

Await.result(actorSystem.actorSelection(url).resolveOne(timeout),timeout)

}

}

此处生成BlockManagerMaster实例。假设是driver时。

会生成一个名称为BlockManagerMasterBlockManagerMasterActor实例。

否则表示是worker,生成BlockManagerMaster,并创建与master中的BlockManagerMasterActoractorref引用。

BlockManagerMasterActor中通过配置spark.storage.blockManagerTimeoutIntervalMs,默认值为60000ms

定期检查上面注冊的BlockManagerId是否过期。

valblockManagerMaster= newBlockManagerMaster(registerOrLookup(

"BlockManagerMaster",

newBlockManagerMasterActor(isLocal, conf)), conf)

生成BlockManager,BlockManager中会生成ShuffleBlockManager,DiskBlockManager,memory/diskstore.

针对此BlockManager,生成一个BlockManagerId实例。

通过masteractor(BlockManagerMasterActor),master注冊此block,并定期向master发送心跳。

心跳的发送通过spark.storage.blockManagerTimeoutIntervalMs配置的值/4

valblockManager= newBlockManager(executorId,

actorSystem,blockManagerMaster,serializer,conf)


valconnectionManager= blockManager.connectionManager


valbroadcastManager= newBroadcastManager(isDriver, conf)

生成CacheManager,

valcacheManager= newCacheManager(blockManager)

生成MapOutputTracker,假设是master时,生成MapOutputTrackerMaster,否则生成MapOutputTracker

//Have to assign trackerActor after initialization asMapOutputTrackerActor

//requires the MapOutputTracker itself

valmapOutputTracker= if(isDriver) {

newMapOutputTrackerMaster(conf)

}else{

newMapOutputTracker(conf)

}

假设是master时,生成MapOutputTrackerMasterActor实例。否则生成对actor的引用。

mapOutputTracker.trackerActor= registerOrLookup(

"MapOutputTracker",

newMapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]))

生成ShuffleFetcher的实例,通过spark.shuffle.fetcher配置,默觉得BlockStoreShuffleFetcher

valshuffleFetcher= instantiateClass[ShuffleFetcher](

"spark.shuffle.fetcher","org.apache.spark.BlockStoreShuffleFetcher")


valhttpFileServer= newHttpFileServer()

httpFileServer.initialize()

conf.set("spark.fileserver.uri", httpFileServer.serverUri)


valmetricsSystem= if(isDriver) {

MetricsSystem.createMetricsSystem("driver",conf)

}else{

MetricsSystem.createMetricsSystem("executor",conf)

}

metricsSystem.start()


//Set the sparkFiles directory, used when downloading dependencies. Inlocal mode,

//this is a temporary directory; in distributed mode, this is theexecutor's current working

//directory.

valsparkFilesDir:String = if(isDriver) {

Utils.createTempDir().getAbsolutePath

}else{

"."

}


//Warn about deprecated spark.cache.class property

if(conf.contains("spark.cache.class")){

logWarning("Thespark.cache.class property is no longer being used! Specify storage "+

"levelsusing the RDD.persist() method instead.")

}


newSparkEnv(

executorId,

actorSystem,

serializerManager,

serializer,

closureSerializer,

cacheManager,

mapOutputTracker,

shuffleFetcher,

broadcastManager,

blockManager,

connectionManager,

httpFileServer,

sparkFilesDir,

metricsSystem,

conf)

}


ShuffleBlockManager.forMapTask函数

shuffleBlockManager.forMapTask函数是shufflemaptask执行shuffle的核心函数,

此函数中会生成ShuffleWriterGroup实例,

并依据执行的task个数。一般是cpucore个数*reducepartitionshuffle个文件,每一次的执行都会生成这么多个文件。

因此这部分会同一时候打开core*reduceparitionnumfile,每个的maptask执行都会生成这么多个文件。

此部分完毕后就会产生大量的mapoutput文件个数,总文件个数为maptasknum*reducetasknum个文件。

同一时候spark中为了控制文件的生成个数,可通过spark.shuffle.consolidateFiles配置是否重用write文件。

默觉得false,

假设此值设置为true,每个worker通常仅仅生成core*reducetasknum个文件。

每个文件打开通过spark.shuffle.file.buffer.kb配置的缓存大小。默觉得100kb。也就是一次执行中

每个worker中会有core*reducetasknum*100kb的内存buffer的使用。由这部分我个人觉得,

这玩意还是不合适maptask的任务太多的分析任务。Mapreduceshuffle从性能上会比这要慢一些。

可是从对大数据量的支持上还是要好一些。

函数定义

defforMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer:Serializer) = {

生成一个ShuffleWriterGroup实例

newShuffleWriterGroup {

shuffleStates.putIfAbsent(shuffleId,newShuffleState(numBuckets))

privatevalshuffleState= shuffleStates(shuffleId)

privatevarfileGroup:ShuffleFileGroup = null

假设spark.shuffle.consolidateFiles配置的值为true,检查是否有上次生成的writer文件,又一次打开这个文件。

也就是在文件里进行append操作。

valwriters:Array[BlockObjectWriter] = if(consolidateShuffleFiles){

fileGroup= getUnusedFileGroup()

Array.tabulate[BlockObjectWriter](numBuckets){ bucketId =>

valblockId =ShuffleBlockId(shuffleId, mapId, bucketId)

blockManager.getDiskWriter(blockId,fileGroup(bucketId),serializer, bufferSize)

}

} else{

否则每个task都会生成新的writer文件。

Array.tabulate[BlockObjectWriter](numBuckets){ bucketId =>

valblockId =ShuffleBlockId(shuffleId, mapId, bucketId)

此处主要是通过sparkenv中的diskBlockMangaer来在指定的路径下生成文件。

路径通过spark.local.dir配置。默觉得java.io.tmpdir

valblockFile =blockManager.diskBlockManager.getFile(blockId)

//Because of previous failures, the shuffle file may already exist onthis machine.

//If so, remove it.

if(blockFile.exists){

if(blockFile.delete()){

logInfo(s"Removedexisting shuffle file $blockFile")

} else{

logWarning(s"Failedto remove existing shuffle file $blockFile")

}

}

blockManager.getDiskWriter(blockId,blockFile,serializer, bufferSize)

}

}

这个函数在shuffleMapTask运行完毕的时候调用。假设上面提到的配置为true时,

会把writerblockfile放到一个容器中,下一次task执行时。会直接打开这个blockfile文件。

overridedefreleaseWriters(success: Boolean) {

if(consolidateShuffleFiles){

if(success) {

valoffsets =writers.map(_.fileSegment().offset)

fileGroup.recordMapOutput(mapId,offsets)

}

recycleFileGroup(fileGroup)

} else{

shuffleState.completedMapTasks.add(mapId)

}

}


privatedefgetUnusedFileGroup(): ShuffleFileGroup = {

valfileGroup =shuffleState.unusedFileGroups.poll()

if(fileGroup!= null)fileGroupelsenewFileGroup()

}


privatedefnewFileGroup(): ShuffleFileGroup = {

valfileId =shuffleState.nextFileId.getAndIncrement()

valfiles =Array.tabulate[File](numBuckets) { bucketId =>

valfilename =physicalFileName(shuffleId, bucketId, fileId)

blockManager.diskBlockManager.getFile(filename)

}

valfileGroup =newShuffleFileGroup(fileId,shuffleId, files)

shuffleState.allFileGroups.add(fileGroup)

fileGroup

}


privatedefrecycleFileGroup(group: ShuffleFileGroup) {

shuffleState.unusedFileGroups.add(group)

}

}

}


DAGShuduler中注冊shuffleidmapStatus

DAGSheduler的调度中。启动一个stage时。假设是shufflestage,会运行例如以下代码:

DAGsheduler.runjob-->submitJob-->JobSubmittedactor-->

newStage传入參数getParentStages-->getShuffleMapStage-->newOrUsedStage


privatedef newOrUsedStage(

rdd: RDD[_],

numTasks: Int,

shuffleDep:ShuffleDependency[_,_],

jobId: Int,

callSite: Option[String] = None)

:Stage =

{

valstage =newStage(rdd, numTasks, Some(shuffleDep), jobId, callSite)

if(mapOutputTracker.has(shuffleDep.shuffleId)){

valserLocs =mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)

vallocs =MapOutputTracker.deserializeMapStatuses(serLocs)

for(i <- 0until locs.size)stage.outputLocs(i)= List(locs(i))

stage.numAvailableOutputs= locs.size

}else{

master中注冊此shuffleid

//Kind of ugly: need to register RDDs with the cache and map outputtracker here

//since we can't do it in the RDD constructor because # of partitionsis unknown

logInfo("RegisteringRDD " + rdd.id+ " ("+ rdd.origin+ ")")

mapOutputTracker.registerShuffle(shuffleDep.shuffleId,rdd.partitions.size)

}

stage

}


回到dagsheduler的调度中。当shuffle的全部的task处理完毕后,会调用例如以下代码:

....

execBackend.statusUpdate(taskId,TaskState.FINISHED, serializedResult)

.....

casesmt: ShuffleMapTask =>

valstatus =event.result.asInstanceOf[MapStatus]

valexecId =status.location.executorId

logDebug("ShuffleMapTaskfinished on " + execId)

if(failedEpoch.contains(execId)&& smt.epoch<= failedEpoch(execId)){

logInfo("Ignoringpossibly bogus ShuffleMapTask completion from "+ execId)

} else{

第一个task完毕后。都会把map返回的MapStatus(记录有location信息)记录到stageoutputloc中。

stage.addOutputLoc(smt.partitionId,status)

}

if(running.contains(stage)&& pendingTasks(stage).isEmpty){

markStageAsFinished(stage)

logInfo("lookingfor newly runnable stages")

logInfo("running:" + running)

logInfo("waiting:" + waiting)

logInfo("failed:" + failed)

if(stage.shuffleDep!= None) {

.........................................

假设全部的shuffletask都运行完毕,把此stage相应的shuffled与全部的location注冊到mapOutputTracker

此处是通过DAGSheculer来完毕的,因此。mapoutputtracker是一个MapOutputTrackerMaster的实现。

mapOutputTracker.registerMapOutputs(

stage.shuffleDep.get.shuffleId,

stage.outputLocs.map(list=> if(list.isEmpty) nullelselist.head).toArray,

changeEpoch = true)

}


Shuffle的读取计算


此时shuffleMAPRDD运行完毕后。会通过PairRDDFunctions来做处理

回到PairRDDFunctions中的reduceByKey

reduceByKey-->combineByKey

再次来看这个函数的定义

defcombineByKey[C](createCombiner: V => C,

mergeValue: (C, V) => C,

mergeCombiners: (C, C) => C,

partitioner: Partitioner,

mapSideCombine: Boolean = true,

serializerClass: String = null):RDD[(K, C)] = {

if(getKeyClass().isArray) {

if(mapSideCombine) {

thrownewSparkException("Cannot use map-sidecombining with array keys.")

}

if(partitioner.isInstanceOf[HashPartitioner]) {

thrownewSparkException("Default partitionercannot partition array keys.")

}

}

valaggregator= newAggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)

假设当前的RDDpartitioner与传入的partitioner相等,表示是一个map,不须要进行shuffle,直接在map端合并。

if(self.partitioner== Some(partitioner)) {

self.mapPartitionsWithContext((context,iter) => {

newInterruptibleIterator(context, aggregator.combineValuesByKey(iter,context))

}, preservesPartitioning = true)

}elseif(mapSideCombine) {

假设设置有在map端先进行一次合并。类似于mapreduce中的combine,先在map端运行一次合并。

并生成MapPartitionsRDD

valcombined =self.mapPartitionsWithContext((context, iter) => {

aggregator.combineValuesByKey(iter,context)

}, preservesPartitioning = true)

生成一个ShuffledRDD实例。在reduce端运行合并操作。合并的核心函数是aggregator实例中定义的相关函数。

valpartitioned= newShuffledRDD[K, C, (K, C)](combined,partitioner)

.setSerializer(serializerClass)

partitioned.mapPartitionsWithContext((context,iter) => {

newInterruptibleIterator(context, aggregator.combineCombinersByKey(iter,context))

}, preservesPartitioning = true)

}else{

不运行combiner操作,直接在reduce端进行shuffle操作。

//Don't apply map-side combiner.

valvalues =newShuffledRDD[K, V, (K, V)](self,partitioner).setSerializer(serializerClass)

values.mapPartitionsWithContext((context,iter) => {

newInterruptibleIterator(context, aggregator.combineValuesByKey(iter,context))

}, preservesPartitioning = true)

}

}


Reduce端。生成为ShuffledRDD。数据计算函数通过compute函数完毕。

ShuffledRDD中计算函数的实现

overridedef compute(split: Partition,context: TaskContext): Iterator[P] = {

valshuffledId= dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId

通过指定的shuffledid,拿到shuffle完毕的数据。

SparkEnv.get.shuffleFetcher.fetch[P](shuffledId,split.index, context,

SparkEnv.get.serializerManager.get(serializerClass,SparkEnv.get.conf))

}


SparkEnv中拿到shuffleFetcher的实例。从SparkEnv生成来看,

通过spark.shuffle.fetcher配置,默觉得BlockStoreShuffleFetcher

Sparkenv中的定义

valshuffleFetcher= instantiateClass[ShuffleFetcher](

"spark.shuffle.fetcher","org.apache.spark.BlockStoreShuffleFetcher")



BlockStoreShuffleFetcher.fetch的函数:

overridedef fetch[T](

shuffleId: Int,

reduceId: Int,

context: TaskContext,

serializer: Serializer)

:Iterator[T] =

{


logDebug("Fetchingoutputs for shuffle %d, reduce %d".format(shuffleId,reduceId))

valblockManager= SparkEnv.get.blockManager


valstartTime =System.currentTimeMillis

executor中的mapoutputtracker会通过GetMapOutputStatuses事件

mapoutputtrackermaster中的MapOutputTrackerMasterActor发起得到全部的mapStatus事件。

valstatuses =SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId,reduceId)

...........................

valsplitsByAddress= newHashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]

BlockManagerid同样的map结果进行合并,index的值就是mappartition

for(((address,size),index) <-statuses.zipWithIndex){

splitsByAddress.getOrElseUpdate(address,ArrayBuffer()) += ((index,size))

}

得到每个map的输出文件的结果集地址,地址由shuffleid,mappartitionnum,reduceparttion组成。

valblocksByAddress:Seq[(BlockManagerId, Seq[(BlockId, Long)])] =splitsByAddress.toSeq.map{

case(address,splits) =>

(address,splits.map(s=> (ShuffleBlockId(shuffleId, s._1,reduceId), s._2)))

}


defunpackBlock(blockPair: (BlockId, Option[Iterator[Any]])) :Iterator[T] = {

valblockId =blockPair._1

valblockOption= blockPair._2

blockOptionmatch{

caseSome(block)=> {

block.asInstanceOf[Iterator[T]]

}

caseNone => {

blockIdmatch{

caseShuffleBlockId(shufId,mapId, _)=>

valaddress =statuses(mapId.toInt)._1

thrownewFetchFailedException(address,shufId.toInt,mapId.toInt,reduceId, null)

case_ =>

thrownewSparkException(

"Failedto get block " + blockId+ ", which is not a shuffle block")

}

}

}

}

通过blockManagerblockid中获取Iterator,用来得到数据

这里的blockManagerreduce进行shuffle的详细有两个实现。默觉得BasicBlockFetcherIterator

假设spark.shuffle.use.netty配置为true时。实现类为NettyBlockFetcherIterator

BasicBlockFetcherIterator中通过nio的方式使用sparkenv中的ConnectionManager来接收数据。

NettyBlockFetcherIterator通过netty的通信框架进行操作,使用netty时,

通过reducespark.shuffle.copier.threads配置的线程数来获取数据,默认的线程个数为6.

valblockFetcherItr= blockManager.getMultiple(blocksByAddress,serializer)

取出每个blockid中的values部分的iterator.

valitr =blockFetcherItr.flatMap(unpackBlock)


valcompletionIter= CompletionIterator[T, Iterator[T]](itr,{

valshuffleMetrics= newShuffleReadMetrics

shuffleMetrics.shuffleFinishTime= System.currentTimeMillis

shuffleMetrics.remoteFetchTime= blockFetcherItr.remoteFetchTime

shuffleMetrics.fetchWaitTime= blockFetcherItr.fetchWaitTime

shuffleMetrics.remoteBytesRead= blockFetcherItr.remoteBytesRead

shuffleMetrics.totalBlocksFetched= blockFetcherItr.totalBlocks

shuffleMetrics.localBlocksFetched= blockFetcherItr.numLocalBlocks

shuffleMetrics.remoteBlocksFetched= blockFetcherItr.numRemoteBlocks

context.taskMetrics.shuffleReadMetrics= Some(shuffleMetrics)

})


newInterruptibleIterator[T](context, completionIter)

}


通过MapOutputTracker得到shufflestagemap完毕的mapstatus

上面得到MapStatus的容器的函数定义

defgetServerStatuses(shuffleId: Int, reduceId: Int):Array[(BlockManagerId, Long)] = {

检查executor本地是否有此shuffleidmapstatuses信息。

valstatuses =mapStatuses.get(shuffleId).orNull

假设本地还没有shuffle的状态数据(全部的shuffle完毕的状态都须要从master中同步过来)

if(statuses== null){

logInfo("Don'thave map outputs for shuffle " + shuffleId + ",fetching them")

varfetchedStatuses:Array[MapStatus] = null

出于线程安全考虑,

fetching.synchronized{

假设shuffleid已经在fetching中存在,等待shufflemaster获取MapStatus完毕。

这里主要是为了多个task同一时候来获取数据。第一个task已经向master发起申请,

第二个就不须要在发起仅仅须要等待第一个完毕申请并得到数据存储到fetchedStatuses中。

if(fetching.contains(shuffleId)){

//Someone else is fetching it; wait for them to be done

while(fetching.contains(shuffleId)){

try{

fetching.wait()

} catch{

casee:InterruptedException =>

}

}

}


if(fetchedStatuses== null){

//We wonthe race to fetch the output locs;do so

logInfo("Doingthe fetch; tracker actor = " +trackerActor)

//This try-finally prevents hangs due to timeouts:

try{

通过askTracker函数。通过actorrefMapoutputTrackerMasterActor发起GetMapOutputStatuses事件。

得到此stage完毕的全部的taskMapStatus信息

valfetchedBytes=

askTracker(GetMapOutputStatuses(shuffleId)).asInstanceOf[Array[Byte]]

解析成fetchedStatuses数据。

fetchedStatuses= MapOutputTracker.deserializeMapStatuses(fetchedBytes)

logInfo("Gotthe output locations")

加入到executor中的MapStatuses容器中。缓存起来,共下一个task实例。

mapStatuses.put(shuffleId,fetchedStatuses)

} finally{

master中获取数据完毕,把fetching中的shuffleid移出。

fetching.synchronized{

fetching-= shuffleId

fetching.notifyAll()

}

}

}

if(fetchedStatuses!= null){

fetchedStatuses.synchronized{

通过指定的shuffleidreduceid的值。得到此reduceblockid中要获取数据的大小。

returnMapOutputTracker.convertMapStatuses(shuffleId, reduceId,fetchedStatuses)

}

}

else{

thrownewFetchFailedException(null,shuffleId, -1,reduceId,

newException("Missing all outputlocations for shuffle " +shuffleId))

}

}else{

通过指定的shuffleidreduceid的值,得到此reduceblockid中要获取数据的大小。localcache模式

statuses.synchronized{

returnMapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)

}

}

}


MapOutputTracker.convertMapStatuses函数

privatedef convertMapStatuses(

shuffleId: Int,

reduceId: Int,

statuses: Array[MapStatus]):Array[(BlockManagerId,Long)] = {

assert (statuses != null)

statuses.map {

status =>

if(status == null){

thrownewFetchFailedException(null,shuffleId, -1,reduceId,

newException("Missing an outputlocation for shuffle " +shuffleId))

} else{

取出MapStatus中。针对此reducepartition中的shuffle的内容大小。

(status.location,decompressSize(status.compressedSizes(reduceId)))

}

}

}


........


原文地址:https://www.cnblogs.com/gccbuaa/p/6707110.html