MAPREDUCE框架结构及核心运行机制

1.2.1 结构

一个完整的mapreduce程序在分布式运行时有三类实例进程:

1、MRAppMaster:负责整个程序的过程调度及状态协调

2、mapTask:负责map阶段的整个数据处理流程

3、ReduceTask:负责reduce阶段的整个数据处理流程

1.2.2 MR程序运行流程

1.2.2.1 流程示意图

 

1.2.2.2 流程解析

1、  一个mr程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程

2、  maptask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:

a)       利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对

b)       将输入KV对传递给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对收集到缓存

c)        将缓存中的KV对按照K分区排序后不断溢写到磁盘文件

3、  MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据范围(数据分区)

4、  Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储

 

1.3 MapTask并行度决定机制

maptask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度

那么,mapTask并行实例是否越多越好呢?其并行度又是如何决定呢?

1.3.1 mapTask并行度的决定机制

一个job的map阶段并行度由客户端在提交job时决定

而客户端对map阶段并行度的规划的基本逻辑为:

将待处理数据执行逻辑切片(即按照一个特定切片大小,将待处理数据划分成逻辑上的多个split),然后每一个split分配一个mapTask并行实例处理

这段逻辑及形成的切片规划描述文件,由FileInputFormat实现类的getSplits()方法完成,其过程如下图:

 

1.3.2 FileInputFormat切片机制

1、切片定义在InputFormat类中的getSplit()方法

2、FileInputFormat中默认的切片机制:

a)       简单地按照文件的内容长度进行切片

b)       切片大小,默认等于block大小

c)        切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

比如待处理数据有两个文件:

file1.txt    320M

file2.txt    10M

经过FileInputFormat的切片机制运算后,形成的切片信息如下: 

file1.txt.split1--  0~128

file1.txt.split2--  128~256

file1.txt.split3--  256~320

file2.txt.split1--  0~10M

3、FileInputFormat中切片的大小的参数配置

通过分析源码,在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize, Math.min(maxSize, blockSize));  切片主要由这几个值来运算决定

minsize:默认值:1 

      配置参数: mapreduce.input.fileinputformat.split.minsize   

maxsize:默认值:Long.MAXValue 

    配置参数:mapreduce.input.fileinputformat.split.maxsize

blocksize

因此,默认情况下,切片大小=blocksize

maxsize(切片最大值):

参数如果调得比blocksize小,则会让切片变小,而且就等于配置的这个参数的值

minsize (切片最小值):

参数调的比blockSize大,则可以让切片变得比blocksize还大

选择并发数的影响因素:

1、运算节点的硬件配置

2、运算任务的类型:CPU密集型还是IO密集型

3、运算任务的数据量

1.4 map并行度的经验之谈

如果硬件配置为2*12core + 64G,恰当的map并行度是大约每个节点20-100个map,最好每个map的执行时间至少一分钟。

l  如果job的每个map或者 reduce task的运行时间都只有30-40秒钟,那么就减少该job的map或者reduce数,每一个task(map|reduce)的setup和加入到调度器中进行调度,这个中间的过程可能都要花费几秒钟,所以如果每个task都非常快就跑完了,就会在task的开始和结束的时候浪费太多的时间。

配置task的JVM重用[dht1] 可以改善该问题:

mapred.job.reuse.jvm.num.tasks默认是1,表示一个JVM上最多可以顺序执行的task

数目(属于同一个Job)是1。也就是说一个task启一个JVM

l  如果input的文件非常的大,比如1TB,可以考虑将hdfs上的每个block size设大,比如设成256MB或者512MB

1.5 ReduceTask并行度的决定

reducetask的并行度同样影响整个job的执行并发度和执行效率,但与maptask的并发数由切片数决定不同,Reducetask数量的决定是可以直接手动设置:

//默认值是1,手动设置为4

job.setNumReduceTasks(4);

如果数据分布不均匀,就有可能在reduce阶段产生数据倾斜

注意: reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个reducetask

尽量不要运行太多的reduce task。对大多数job来说,最好rduce的个数最多和集群中的reduce持平,或者比集群的 reduce slots小。这个对于小集群而言,尤其重要。

1.6 MAPREDUCE程序运行演示

Hadoop的发布包中内置了一个hadoop-mapreduce-example-2.4.1.jar,这个jar包中有各种MR示例程序,可以通过以下步骤运行:

启动hdfs,yarn

然后在集群中的任意一台服务器上启动执行程序(比如运行wordcount):

hadoop jar hadoop-mapreduce-example-2.4.1.jar wordcount  /wordcount/data /wordcount/out


JVM重用技术不是指同一Job的两个或两个以上的task可以同时运行于同一JVM上,而是排队按顺序执行。

2 map reduce数量设置:

map和reduce是hadoop的核心功能,hadoop正是通过多个map和reduce的并行运行来实现任务的分布式并行计算,从这个观点来看,如果将map和reduce的数量设置为1,那么用户的任务就没有并行执行,但是map和reduce的数量也不能过多,数量过多虽然可以提高任务并行度,但是太多的map和reduce也会导致整个hadoop框架因为过度的系统资源开销而使任务失败。所以用户在提交map/reduce作业时应该在一个合理的范围内,这样既可以增强系统负载匀衡,也可以降低任务失败的开销。

1 map的数量

map的数量通常是由hadoop集群的DFS块大小确定的,也就是输入文件的总块数,正常的map数量的并行规模大致是每一个Node是10~100个,对于CPU消耗较小的作业可以设置Map数量为300个左右,但是由于hadoop的没一个任务在初始化时需要一定的时间,因此比较合理的情况是每个map执行的时间至少超过1分钟。具体的数据分片是这样的,InputFormat在默认情况下会根据hadoop集群的DFS块大小进行分片,每一个分片会由一个map任务来进行处理,当然用户还是可以通过参数mapred.min.split.size参数在作业提交客户端进行自定义设置。还有一个重要参数就是mapred.map.tasks,这个参数设置的map数量仅仅是一个提示,只有当InputFormat 决定了map任务的个数比mapred.map.tasks值小时才起作用。同样,Map任务的个数也能通过使用JobConf 的conf.setNumMapTasks(int num)方法来手动地设置。这个方法能够用来增加map任务的个数,但是不能设定任务的个数小于Hadoop系统通过分割输入数据得到的值。当然为了提高集群的并发效率,可以设置一个默认的map数量,当用户的map数量较小或者比本身自动分割的值还小时可以使用一个相对交大的默认值,从而提高整体hadoop集群的效率。

2 reduece的数量

reduce在运行时往往需要从相关map端复制数据到reduce节点来处理,因此相比于map任务。reduce节点资源是相对比较缺少的,同时相对运行较慢,正确的reduce任务的个数应该是0.95或者1.75 *(节点数 ×mapred.tasktracker.tasks.maximum参数值)。如果任务数是节点个数的0.95倍,那么所有的reduce任务能够在 map任务的输出传输结束后同时开始运行。如果任务数是节点个数的1.75倍,那么高速的节点会在完成他们第一批reduce任务计算之后开始计算第二批 reduce任务,这样的情况更有利于负载均衡。同时需要注意增加reduce的数量虽然会增加系统的资源开销,但是可以改善负载匀衡,降低任务失败带来的负面影响。同样,Reduce任务也能够与 map任务一样,通过设定JobConf 的conf.setNumReduceTasks(int num)方法来增加任务个数。

3 reduce数量为0

有些作业不需要进行归约进行处理,那么就可以设置reduce的数量为0来进行处理,这种情况下用户的作业运行速度相对较高,map的输出会直接写入到 SetOutputPath(path)设置的输出目录,而不是作为中间结果写到本地。同时Hadoop框架在写入文件系统前并不对之进行排序。

map red.tasktracker.map.tasks.maximum 这个是一个task tracker中可同时执行的map的最大个数,默认值为2,看《pro hadoop》:it is common to set this value to the effective number of CPUs on the node 
把ob分割成map和reduce,合理地选择Job中 Tasks数的大小能显著的改善Hadoop执行的性能。增加task的个数会增加系统框架的开销,但同时也会增强负载均衡并降低任务失败的开销。一个极端是1个map、1个reduce的情况,这样没有任务并行。另一个极端是1,000,000个map、1,000,000个reduce的情况,会由于框架的开销过大而使得系统资源耗尽。 
Map任务的数量 
Map的数量经常是由输入数据中的DFS块的数量来决定的。这还经常会导致用户通过调整DFS块大小来调整map的数量。正确的map任务的并行度似乎应该是10-100 maps/节点,尽管我们对于处理cpu运算量小的任务曾经把这个数字调正到300maps每节点。Task的初始化会花费一些时间,因此最好控制每个 map任务的执行超过一分钟。 
实际上控制map任务的个数是很 精妙的。mapred.map.tasks参数对于InputFormat设定map执行的个数来说仅仅是一个提示。InputFormat的行为应该把输入数据总的字节值分割成合适数量的片段。但是默认的情况是DFS的块大小会成为对输入数据分割片段大小的上界。一个分割大小的下界可以通过一个mapred.min.split.size参数来设置。因此,如果你有一个大小是10TB的输入数据,并设置DFS块大小为 128M,你必须设置至少82K个map任务,除非你设置的mapred.map.tasks参数比这个数还要大。最终InputFormat 决定了map任务的个数。 
Map任务的个数也能通过使用JobConf 的 conf.setNumMapTasks(int num)方法来手动地设置。这个方法能够用来增加map任务的个数,但是不能设定任务的个数小于Hadoop系统通过分割输入数据得到的值。 
Reduce任务的个数 
正确的reduce任务的 个数应该是0.95或者1.75 ×(节点数 ×mapred.tasktracker.tasks.maximum参数值)。如果任务数是节点个数的0.95倍,那么所有的reduce任务能够在 map任务的输出传输结束后同时开始运行。如果任务数是节点个数的1.75倍,那么高速的节点会在完成他们第一批reduce任务计算之后开始计算第二批 reduce任务,这样的情况更有利于负载均衡。 
目前reduce任务的数量 由于输出文件缓冲区大小(io.buffer.size × 2 ×reduce任务个数 << 堆大小),被限制在大约1000个左右。直到能够指定一个固定的上限后,这个问题最终会被解决。 
Reduce任务的数量同时也控制着输出目录下输出文件的数量,但是通常情况下这并不重要,因为下一阶段的 map/reduce任务会把他们分割成更加小的片段。 
Reduce任务也能够与 map任务一样,通过设定JobConf 的conf.setNumReduceTasks(int num)方法来增加任务个数。

原文地址:https://www.cnblogs.com/jiang-it/p/7698021.html