经典的MapReduce1解析

MapReduce1任务图解

最顶层包含4个独立的实体
客户端,提交MapReduce作业
jobtracker,协调作业的运行。Jobtracker是一个Java应用程序,它的主类是JobTracker
tasktracker,运行作业划分后的任务。tasktracker是Java应用程序,它的主类是TaskTracker
分布式文件系统(一般为HDFS),用来在其它实体间共享作业。

一、作业的提交

1. Job的submit方法创建一个内部的JobSummiter实例,并且调用其submitJobInternal方法。提交作业后,waitForCompletion方法每秒轮询作业的进度,如果发现自上次报告后有改变,便把进度报告到控制台。作业完成后,如果成功就显示作业计数器;如果失败则导致作业失败的错误被记录到控制台。
JobSummiter所实现的作业提交过程如下
2. 向Jobtracker请求一个新的作业ID(通过调用JobTracker的getNewJobId方法获取)
检查作业的输出说明。
计算作业的输入分片
3. 将作业所需要的资源(包括作业JAR文件、配置文件和计算所得到的输入分片)复制到一个以作业ID命名的目录下jobtracker的文件系统中。作业JAR的副本越多(由mapred.submit.replication属性控制,默认值为10),因此在运行作业的任务时,集群中有很多歌副本可供tasktracker访问。
4. 告知jobtracker作业准备执行(通过调用JobTracker的submitJob方法实现)

二、作业的初始化

5. 当JobTracker接收到对其submitJob()方法的调用后,会把此调用放入一个内部队列中,交由作业调度器(job scheduler)进行调度,并对其进行初始化。初始化包括创建一个表示正在运行作业的对象,用于封装任务和记录信息,以便跟踪任务的状态和进程。
6. 为了创建任务运行列表,作业调度器首先从共享文件系统中获取客户端已经计算好的输入分片。然后为每一个分片创建一个map任务。创建的reduce任务的数量由job的mapred.reduce.tasks属性决定,它是用setNumReduceTasks方法来设置的,然后调度器创建相应数量的要运行的reduce任务。任务在此时被指定ID
除了Map任务和Reduce任务,还会创建两个任务:作业创建和作业清理。这两个任务在TaskTracker中执行,在map任务运行之前运行代码来创建作业,并且在所有reduce任务完成之后完成清理工作。配置项OutputCommiter属性能设置运行的代码,默认值是FileOutputCommiter。作业创建为作业创建输出路径和临时工作空间。作业清理清除作业运行过程中的临时目录.

三、任务的分配

7. tasktracker运行一个简单的循环来定期发送心跳(heartbeat)给jobtracker。心跳向jobtracker表明tasktracker是否还存活,同时也充当两者之间的消息通道。作为心跳的一部分,tasktracker会指明它是否已经准备好运行新的任务,如果是,jobtracker会为它分配一个任务,并使用心跳的返回值与tasktracker进行通信。
对于Map任务和reduce任务,tasktracker有固定数量的任务槽,两者是独立设置的。例如,一个tasktracker可能可以同时运行两个map任务和两个reduce任务。准确数量由tasktracker核的数量和内存大小来决定。默认调度器在处理reduce槽之前,会填满空闲的map任务槽,因此,如果tasktracker至少有一个闲置的map任务槽,jobtracker会为它选择一个map任务,否则选择一个reduce任务。
为了选择reduce任务,jobtracker从待运行的reduce任务列表中选取下一个来执行,用不着考虑数据的本地化。然而,对于map任务,jobtracker会考虑tasktracker的网络位置,并选取一个距离其输入分片文件最近的tasktracker。在最理想的情况下,任务是数据本地化的(data-local),也就是任务运行在输入分片所在的节点上。同样,任务也可能是机架本地化的(rack-local):任务和输入分片是在同一个机架,但不在同一个节点上。一些任务既不是数据本地化的,也不是机架本地化的,而是从与它们自身运行的不同机架上检索数据。

四、任务的执行

8. 第一步,通过从共享文件系统把作业的JAR文件复制到tasktracker所在的文件系统,从而实现作业的JAR文件本地化。同时,tasktracker将应用程序所需要的全部文件从分布式缓存复制到本地磁盘。第二步,tasktracker为任务新建一个本地工作目录,并把JAR文件中的内容解压到这个文件夹下。第三步,tasktracker新建一个TaskRunner实例来运行该任务。
9. TaskRunner启动一个新的JVM,以便用户定义的map和reduce函数的任何软件问题都不会影响到tasktracker(例如导致崩溃或挂起等),在不同的任务之间重用JVM还是可能的
10. 在新的JVM中运行每个任务。
子进程通过umbilical接口与父进程进行通信。任务的子进程每隔几秒便告知父进程它的进度,直到任务完成。
每个任务都能够执行搭建(setup)和清理(cleanup)动作,它们和任务本身在同一个JVM中运行,并由作业的OutputCommiter确定。清理动作用于提交任务,这在基于文件的作业中意味着它的输出写到该任务的最终文职。提交协议确保当推理执行(speculative execution)可用时,只有一个任务副本被提交,其他的都将取消。
至于Streaming和Pipes,它们都运行特殊的map任务和reduce任务,目的是运行用户提供的可执行程序。如图

五、进度和状态的更新

MapReduce作业是长时间运行的批量作业,运行时间范围从数分钟到数小时。一个作业和它的每个任务都有一个状态(status),包括:作业或任务的状态(比如,运行状态,成功完成,失败状态)、map和reduce的进度、作业计数器的值、状态消息或描述。
作业在运行时,对其进度(progress, 即任务完成百分比)保持追踪。对map任务,任务进度是已处理输入所占的比例。对reduce任务,情况稍微复杂,但系统仍然会估计已处理reduce输入的比例。
任务也有一组计数器,负责对任务运行过程中各个事件进行计数,这些计数器要么内置于框架中,要么由用户自己定义。
如果任务报告了进度,就设置一个标志以表明状态变化将被发送到tasktracker。有一个独立的线程每隔3秒就检查一次次标志,如果已设置,则告知tasktracker当前任务状态。同时,tasktracker每隔5秒钟就发送心跳到jobtracker(5秒钟这个间隔是最小值,因为心跳间隔实际上由集群的大小来决定的:对于一个更大的集群,间隔会更长一些),并且由tasktracker运行的所有任务的状态都会在调用中被发送至jobtracker。计数器的发送间隔通常大于5秒,因为计数器占用的带宽相对较高。
Jobtracker将这些更新合并起来,产生一个表明所有运行作业及其所包含任务状态的全局视图。最后,JobClient通过每秒查询jobtracker来接收最新状态。客户端也可以使用Job的getStatus方法来得到一个JobStatus的实例,后者包含作业的所有状态信息。
状态更新在MapReduce1中的传递流程

六、作业的完成

当jobtracker收到作业最后一个任务已完成的通知后(这是一个特定的任务清理任务),便把作业的状态设置为成功。然后,在job查询状态时,便知道任务已经成功完成,于是job打印一条消息告知用户,然后从waitForCompletion方法返回。Job的统计信息和计数值也在这时输出到控制台。
如果jobtracker有相应的设置,也会发送一个HTTP作业通知。希望收到回调指令的客户端可以通过job.end.notification.url属性来进行这项设置。
最后,jobtracker清空作业的工作状态,只是tasktracker也清空作业的工作状态(如删除中间输出)。

原文地址:https://www.cnblogs.com/EnzoDin/p/9420571.html