MapReduce运行原理

Hadoop2.X后,MapReduce一般都是跑在Yarn上,所以说MapReduce的运行原理,更多是讲MapReduce On Yarn的运行原理,MapReduce On Yarn的运行原理图如下:
1:job的提交
1.1、向RM申请一个新的applicationId
1.2、判断job的输出路径是否已经存在,如果存在则报错退出
1.3、根据输入文件计算input splits,如果输入文件不存在则报错
1.4、将job需要依赖的资源上传到HDFS,资源包括jar包、第三步计算了的input splits等
1.5、向RM提交MR Job
2:job的初始化
2.1、RM根据提交过来的资源请求,在一个NodeManager上启动一个Container来运行ApplicationMaster(MRAppMaster)
2.2、RMAppMaster组件的初始化,这些组件都是用来管理运行的Task(mapTask和reduceTask)的
2.3、从HDFS中读取计算好的input splits信息,然后为每一个input split创建一个MapTask,
且根据mapreduce.job.reduces这个配置决定创建多少个reduceTask
2.4、说白了,MRAppMaster就是计算Master,负责管理Task的运行的
3:Task分配
MRAppMaster为每一个map和reduce task向RM申请资源(资源默认是1024M内存以及1个vcore)
4:Task的执行
4.1、申请到资源后,在数据所在的节点启动Container
4.2、MapTask和ReduceTask都是运行在YarnChild上,在运行Task之前需要从HDFS中下载依赖的jar包

inputSplit

1. Block

块是以 block size 进行划分数据。 因此,如果群集中的 block size 为 128 MB,则数据集的每个块将为128 MB,除非最后一个块小于block size(文件大小不能被 block size 完全整除)。例如下图中文件大小为513MB,513%128=1,最后一个块(e)小于block size,大小为1MB。 因此,块是以 block size的硬切割,并且块甚至可以在逻辑记录结束之前结束(blocks can end even before a logical recordends)。假设我们的集群中block size 是128 MB,每个逻辑记录大约100 MB(假设为巨大的记录)。所以第一个记录将完全在一个块中,因为记录大小为100 MB小于块大小128 MB。但是,第二个记录不能完全在一个块中,因此第二条记录将出现在两个块中,从块1开始,在块2中结束。

2. InputSplit

如果分配一个Mapper给块1,在这种情况下,Mapper不能处理第二条记录,因为块1中没有完整第二条记录。因为HDFS不知道文件块中的内容,它不知道记录会什么时候可能溢出到另一个块(becauseHDFS has no conception of what’s inside the fifile blocks, it can’t gauge when a record mightspill over into another block)。InputSplit这是解决这种跨越块边界的那些记录问题,Hadoop使用逻辑表示存储在文件块中的数据,称为输入拆分(InputSplit)。当MapReduce作业客户端计算InputSplit时,它会计算出块中第一个完整记录的开始位置和最后一个记录的结束位置。在最后一个记录不完整的情况下,InputSplit 包括下一个块的位置信息和完成该记录所需的数据的字节偏移(In cases where the last record in a block is incomplete, the input splitincludes location information for the next block and the byte offffset of the data needed tocomplete the record)。下图显示了数据块和InputSplit之间的关系:块是磁盘中的数据存储的物理块,其中InputSplit不是物理数据块。 它是一个Java类,指向块中的开始和结束位置。 因此,当Mapper尝试读取数据时,它清楚地知道从何处开始读取以及在哪里停止读取。InputSplit的开始位置可以在块中开始,在另一个块中结束。InputSplit代表了逻辑记录边界,在MapReduce执行期间,Hadoop扫描块并创建InputSplits,并且每个InputSplit将被分配给一个Mapper进行处理。

 

什么是combiner

在Hadoop中,有一种处理过程叫Combiner,与Mapper和Reducer在处于同等地位,但其执行的时间介于Mapper和Reducer之间,其实就是Mapper和Reducer的中间处理过程,Mapper的输出是Combiner的输入,Combiner的输出是Reducer的输入。例如获取历年的最高温度例子,以书中所说的1950年为例,在两个不同分区上的Mapper计算获得的结

果分别如下:

第一个Mapper结果:(1950, [0, 10, 20])

第二个Mapper结果:(1950, [25, 15])

如果不考虑Combiner,按照正常思路,这两个Mapper的结果将直接输入到Reducer中处理,如下所示:MaxTemperature:(1950, [0, 10, 20, 25, 15])最终获取的结果是25。

如果考虑Combiner,按照正常思路,这两个Mapper的结果将分别输入到两个不同的Combiner中处理,获得的结果分别如下所示:第一个Combiner结果:(1950, [20])第二个Combiner结果:(1950, [25])然后这两个Combiner的结果会输出到Reducer中处理,如下所示MaxTemperature:(1950, [20, 25])最终获取的结果是25。

由上可知:这两种方法的结果是一致的,使用Combiner最大的好处是节省网络传输的数据,这对于提高整体的效率是非常有帮助的。

但是,并非任何时候都可以使用Combiner处理机制,例如不是求历年的最高温度,而是求平均温度,则会有另一种结果。同样,过程如下,

如果不考虑Combiner,按照正常思路,这两个Mapper的结果将直接输入到Reducer中处理,如下所示:

AvgTemperature:(1950, [0, 10, 20, 25, 15])

最终获取的结果是14。

如果考虑Combiner,按照正常思路,这两个Mapper的结果将分别输入到两个不同的Combiner中处理,获得的结果分别如下所示:

第一个Combiner结果:(1950, [10])

第二个Combiner结果:(1950, [20])

然后这两个Combiner的结果会输出到Reducer中处理,如下所示

AvgTemperature:(1950, [10, 20])

最终获取的结果是15。

由上可知:这两种方法的结果是不一致的,所以在使用Combiner时,一定三思而后行,仔细思量其是否适合,否则可能造成不必要的麻烦。

原文地址:https://www.cnblogs.com/tesla-turing/p/11958492.html