MapReduce工作机制

MapReduce任务执行总流程

以下图5 是MapReduce作业详细的执行流程图。

                                                图 5 MapReduce 作业执行流程图

1.在客户端(Client)编写MapReduce代码,配置作业,启动作业。

这里需要注意的一点是:一个MapReduce作业在提交到Hadoop上之后,会进入完全地自动化执行过程。在这个过程中,用户除了监控程序的执行情况和强制终止之外,不能对作业的执行过程进行任何的干预。所以在作业提交之前,用户需要将所有应该配置的参数按照自己的需求配置完毕。

2.向Jobtracker请求一个Job ID

3.复制作业的资源文件

Jobtracker将运行作业所需要的资源,包括作业JAR文件、配置文件和计算所得的输入划分等复制到作业对应的HDFS上。这些文件都存放在Jobtracker专门为该作业创建的文件夹中。文件夹名为该作业的Job ID。作业jar的副本较多默认会有10个副本(mapred.submit.replication属性控制),因此在运行作业的任务时,集群中有很多副本可供taskTracker访问;输入划分信息告诉了JobTracker应该为这个作业启动多少个map任务等信息。

4.提交作业

调用JobTracker对象的submitJob()方法来真正提交作业,告诉JobTracker作业准备执行。

5.初始化

Jobtracker接收到对其submitJob()方法的调用后,会把此调用放入一个内部队列中,交由作业调度进行调度,并对其初始化。

6.获取输入划分

为了创建任务运行列表,作业调度器首先从共享文件系统中获取jobClient计算好的输入分片信息,然后为每一个分片创建map任务。创建reduce任务数量由jobConf的mapred.reduce.task属性决定,然后调度器创建相应数量的要运行的reduce任务。

7.JobTracker分配任务。

Tasktracker和JobTracker之间的通信和任务的分配都是通过心跳机制完成的。

Tasktracker运行一个简单的循环来定期发送“心跳”给Jobtracker,用来告诉Jobtracker它自己是否还存活和是否准备好运行新的任务(Map任务和Reduce任务的个数是否小于上限)。如果是,Jobtracker会为他分配一个任务,并将分配信息封装在心跳通信的返回值中返回给TaskTracker。当TaskTracker从JobTracker返回的心跳信息中获取新的任务信息时,它会将Map任务或者Reduce任务加入到对应的任务槽中。

需要注意的是:在JobTracker为Tasktracker分配Map任务时,为了减少网络带宽,会考虑将map任务数据本地化。它会根据Tasktracker的网络位置,选取一个距离此TaskTracker map任务最近的输入划分文件分配给此Tasktracker。最好的情况是,划分文件就在Tasktracker本地。

8.TaskTracker获取作业资源

TaskTracker将任务运行所必需的数据、配置信息、程序代码从HDFS复制到TaskTracker本地磁盘。

9.发布任务

10.执行

当TaskTracker获取到作业资源,tasktracker就会为任务新建一个本地工作目录,并把jar文件中的内容解压到这个文件夹下,然后tasktracker新建一个TaskRunner实例来运行该任务,TaskRunner会启动一个新的JVM来运行每个任务。

11.输出结果到HDFS。

以上就是MapReduce完成一个任务的整体过程。但其实上图中没有体现客户端是如何知道这个作业的运行进度和状态。下面我简单说明一下

由MapReduce作业分割成的每个任务中都有一组计数器,它们对任务执行过程中的进度组成事件进行计数。如果任务要报告进度,它便会设置一个标志以表明状态变化将会发送到TaskTracker上。另外一个监听线程检查到这标志后,会告知TaskTracker当前的任务状态。同时,TaskTracker在每隔5秒发送给JobTracker的心跳中封装任务状态,告知自己的任务执行状态。通过这种心跳通信机制,所有TaskTracker的统计信息都会汇总到JobTracker处。JobTracker将这些统计信息合并起来,产生一个全局作业进度统计信息,用来表明正在运行的所有作业以及其中包含任务的状态。最后,JobClient通过查看JobTracker来接收作业进度最新状态。

当JobTracker收到作业的最后一个任务已完成的通知后,便把作业状态设置为“成功”。然后在JobClient查询状态时知道任务已经完成。于是JobClient打印一条消息告知用户。最后JobTracker清空作业的工作状态,并指示TaskTracker也清空作业的工作状态(eg:删除中间输出)。

这里需要注意一点:Map任务将结果写入本地硬盘,而非HDFS。因为map任务的结果是中间结果,要给Reduce任务进行再次处理,处理完之后map任务的结果就没有价值了,通常是被删掉。HDFS上的同一份数据,通常情况下是要备份的。所以如果存入HDFS,那么就有些小题大做了。

错误处理机制

MapReduce任务执行过程中出现的故障可以分为两大类:硬件故障和任务执行失败引发的故障。

硬件故障

在Hadoop Cluster中,只有一个JobTracker,因此,JobTracker本身是存在单点故障的。如何解决JobTracker的单点问题呢?我们可以采用主备部署方式,启动JobTracker主节点的同时,启动一个或多个JobTracker备用节点。当JobTracker主节点出现问题时,通过某种选举算法,从备用的JobTracker节点中重新选出一个主节点。

机器故障除了JobTracker错误就是TaskTracker错误。TaskTracker故障相对较为常见,MapReduce通常是通过重新执行任务来解决该故障。

在Hadoop集群中,正常情况下,TaskTracker会不断的与JobTracker通过心跳机制进行通信。如果某TaskTracker出现故障或者运行缓慢,它会停止或者很少向JobTracker发送心跳。如果一个TaskTracker在一定时间内(默认是1分钟)没有与JobTracker通信,那么JobTracker会将此TaskTracker从等待任务调度的TaskTracker集合中移除。同时JobTracker会要求此TaskTracker上的任务立刻返回。如果此TaskTracker任务仍然在mapping阶段的Map任务,那么JobTracker会要求其他的TaskTracker重新执行所有原本由故障TaskTracker执行的Map任务。如果任务是在Reduce阶段的Reduce任务,那么JobTracker会要求其他TaskTracker重新执行故障TaskTracker未完成的Reduce任务。比如:一个TaskTracker已经完成被分配的三个Reduce任务中的两个,因为Reduce任务一旦完成就会将数据写到HDFS上,所以只有第三个未完成的Reduce需要重新执行。但是对于Map任务来说,即使TaskTracker完成了部分Map,Reduce仍可能无法获取此节点上所有Map的所有输出。所以无论Map任务完成与否,故障TaskTracker上的Map任务都必须重新执行。

任务失败

在实际任务中,MapReduce作业还会遇到用户代码缺陷或进程崩溃引起的任务失败等情况。用户代码缺陷会导致它在执行过程中抛出异常。此时,任务JVM进程会自动退出,并向TaskTracker父进程发送错误消息,同时错误消息也会写入log文件,最后TaskTracker将此次任务尝试标记失败。对于进程崩溃引起的任务失败,TaskTracker的监听程序会发现进程退出,此时TaskTracker也会将此次任务尝试标记为失败。对于死循环程序或执行时间太长的程序,由于TaskTracker没有接收到进度更新,它也会将此次任务尝试标记为失败,并杀死程序对应的进程。

在以上情况中,TaskTracker将任务尝试标记为失败之后会将TaskTracker自身的任务计数器减1,以便想JobTracker申请新的任务。TaskTracker也会通过心跳机制告诉JobTracker本地的一个任务尝试失败。JobTracker接到任务失败的通知后,通过重置任务状态,将其加入到调度队列来重新分配该任务执行(JobTracker会尝试避免将失败的任务再次分配给运行失败的TaskTracker)。如果此任务尝试了4次(次数可以进行设置)仍没有完成,就不会再被重试,此时整个作业也就失败了。

作业调度机制

在0.19.0版本之前,Hadoop集群上的用户作业采用先进先出(FIFO)调度算法,即按照作业提交的顺序来运行,同时每个作业在运行时都会使用整个集群,因此只有轮到自己运行才享受整个集群的服务。虽然FIFO调度器最后又支持了设置优先级别的功能,但是由于不支持优先级抢占,所以这种单用户的调度算法仍然不符合云计算中采用并行计算来提供服务的宗旨。从0.19.0版本开始,Hadoop提供了支持多用户同事服务和集群资源公平共享的调度器,即公平调度器(Fair Scheduler Guide)和容量调度器(Capacity Schedule Guide)。

    公平调度器

公平调度是为作业分配资源的方式,其目的是随着时间的推移,让提交的作业获取等量的集群共享资源,让用户公平的共享集群。具体做法是:当集群上只有一个作业在运行时,它将使用整个集群;当有其他作业提交时,系统会将TaskTracker节点空闲时间片分配给这些新的作业,并保证每个作业都得到大概等量的CPU时间。

公平调度按作业池来组织作业,它会按照提交作业的用户数目将资源公平的分到这些作业池里。默认情况下,每一个用户拥有一个独立的作业池,以使每个用户都能获得一份等同的集群资源而不会管他们提交了多少作业。在每一个资源池内,会用公平共享的方法在运行作业之间共享容量,除了提供共享方法外,公平调取器还允许为作业池设置最小的共享资源,以确保特定用户、群组或生产应用程序总能获取到足够的资源。对于设置了最小共享资源的作业来说,如果包含了作业,它至少能获取最小的共享资源。但是如果最小共享资源超过作业需要的资源时,额外的资源会在其他作业池间进行切分。

 在常规操作中,当提交一个新作业时,公平调度器会等待已运行作业中的任务完成,以释放时间片给新的作业。但公平调度器也支持作业抢占。如果新的作业在一定时间(即超时时间,可以配置)内还未获取公平的资源配置,公平调度器就会允许这个作业抢占已运行作业中的任务,以获取运行所需的资源。另外,如果作业在超时时间内获取的资源不到公平资源的一半时,也允许对任务进行抢占。而在选择时,公平调度器会在所有运行任务中选择最近运行起来的任务,这样浪费的计算相对较少。由于Hadoop作业能容忍丢失任务,抢占不会导致被抢占的作业失败,只会让被抢占作业的运行时间更长。

最后,公平调度器还可以限制每个用户和每个作业池并发运行的作业数量。这个限制可以在用户一次性提交数百个作业或当大量作业并发执行时用来确保中间数据不会塞满真个集群上的磁盘空间。超出限制的作业会被列入调度器的队列中等待,直到早期作业运行完毕。公平调度器再根据作业优先权和提交时间的排列情况从等待作业中调度即将运行的作业。

容量调度器

容量调度器以队列为单位划分资源,每个队列都有资源使用的下限和上限。每个用户也可以设定资源使用上限。一个队列的剩余资源可以共享给另一个队列,其他队列使用后还可以归还。管理员可以约束单个队列、用户或作业的资源使用。支持资源密集型作业,可以给某些作业分配多个slot(这是比较特殊的一点)。支持作业优先级,但不支持资源抢占。

这里明确一下用户、队列和作业之间的关系。Hadoop以队列为单位管理资源,每个队列分配到一定的资源,用户只能向一个或几个队列提交作业。队列管理体现为两方面:1. 用户权限管理:Hadoop用户管理模块建立在操作系统用户和用户组之间的映射之上,允许一个操作系统用户或者用户组对应一个或者多个队列。同时可以配置每个队列的管理员用户。队列信息配置在mapred-site.xml文件中,包括队列的名称,是否启用权限管理功能等信息,且不支持动态加载。队列权限选项配置在mapred-queue-acls.xml文件中,可以配置某个用户或用户组在某个队列中的某种权限。权限包括作业提交权限和作业管理权限。2. 系统资源管理:管理员可以配置每个队列和每个用户的可用资源量信息,为调度器提供调度依据。这些信息配置在调度器自己的配置文件(如Capacity- Scheduler.xml)中。

具体以上两种调度器该如何配置,这里不做细讲,在本文末尾一些参考链接中有一些相关配置的方法,有兴趣的可以去查阅。

Shuffle和排序

在MapReduce流程中,为了让Reduce可以并行处理Map结果,必须对Map的输出进行一定的排序和分割,然后再交付给对应的Reduce,而这个将Map输出进行进一步整理并交给Reduce的过程就成为了shuffle,即以下MapReduce流程图红色框中的部分。从shuffle的过程来看,它是MapReduce的核心所在,shuffle过程的性能与整个MapReduce的性能直接相关。

总体来说,shuffle过程包含在Map和Reduce两端中。在Map端的shuffle过程是对Map结果进行划分(partition)、排序(sort)和分割(spill),然后将属于同一个划分的输出合并在一起(Merge)并写在磁盘上,同时按照不同的划分将结果发送给对应的Reduce。Reduce端又会将各个Map送来的属于同一个划分的输出进行合并(Merge),然后对Merge的结果进行排序,最后交给Reduce处理。

以下链接中通过一个实例将shuffle过程解释的很详细,请大家阅读下,这里就不再对shuffle的过程做过多的解释

http://www.slideshare.net/snakebbf/hadoop-mapreduce-12716482

shuffle过程的优化

在这里简单介绍从Hadoop参数配置触发来优化shuffle过程。在一个任务中,完成单位任务使用时间最多的一般都是I/O操作。在Map端,主要就是shuffle阶段中缓冲区内容超过阀值后的写出操作。可以通过合理的设置ip.sort.*属性来减少这种情况下的写出次数,具体来说就是增加io.sort.mb的值。在Reduce端,在复制Map输出的时候直接将复制的结果放在内存中同样能够提升性能,这样可以让部分数据少做两次I/O操作(前提是留下的内存足够Reduce任务执行)。所以在Reduce函数的内存需求很小的情况下,将mapred.inmen.merge.threshold设置为0,将mapred.job.reduce.input.buffer.percent(默认是0)设置为1.0(或者更低的值)能让I/O操作更少,提升shuffle性能。

任务执行时策略

这里再介绍一些Hadoop在任务执行时用的一些策略,让大家进一步了解MapReduce任务的执行细节。

推测式执行

所谓推测式执行是指当作业的所有任务都开始运行时,JobTracker会统计所有任务的平均进度,如果某个任务所在的TaskTraker节点由于配置比较低或CPU负载过高,导致任务执行的速度比总体任务的平均速度要慢,此时JobTracker就会启动一个新的备份任务,原有任务和新任务哪个先执行完就把另外一个kill掉,这就是经常在JobTracker页面看到任务执行成功,但是总有些任务被kill掉的原因,

MapReduce将待执行作业分割成一些小任务,然后并行处理这些任务,提高作业运行的效率,使作业的整体执行时间小于顺序执行时间。但很明显,运行缓慢的任务将成为MapReduce的瓶颈。因为只要有一个运行缓慢的任务,整个作业的完成时间将被大大延长。这个时候就需要采用推测式来避免出现这种情况。

推测式执行在默认情况下是启动的。这种执行方式有一个很明显的缺陷:对于由于代码缺陷导致的任务执行速度过慢,它所启用的备份任务并不会解决问题。除此之外,因为推测式执行会启动新的任务,所以这种执行方法不可避免地会增加集群的负担。所以在利用Hadoop集群运行作业的时候可以根据具体情况选择开启和关闭该执行策略(通过设置mapred.map.tasks.speculative.execution和mapred.reduce.tasks.speculative.execution属性的值来为Map和Reduce任务开启或关闭)。

任务JVM重用

不论是Map任务还是Reduce任务,都是在TaskTracker节点上的java虚拟机(JVM)中运行。当TaskTracker被分配一个任务时,就会在本地启动一个新的JVM来运行这个任务。对于有大量零碎输入文件的Map任务而言,为每一个Map任务启动一个JVM这种做法显然还有很大的改善空间。如果在一个非常短的任务结束之后让后续的任务重用此JVM,这样就可以省下新任务启动新的JVM的时间,这就是所谓的任务JVM重用。需要注意的是,虽然一个TaskTracker上可能会有多个任务在同时运行,但这些正在执行的任务都是在相互独立的JVM上。

控制JVM重用的属性是mapred.job.reuse.jvm.num.tasks。这个属性定义了单个jvm上运行任务的最大数目,默认情况小艾是1,意味着每个JVM上运行一个任务。可以将这个属性设置为大于1的值来启动JVM重用,也可以将此属性设为-1,表明共享此JVM的任务数目不受限制。

跳过坏记录

MapReduce作业处理的数据集非常庞大,用户在基于MapReduce编写处理程序时可能并不会考虑到数据集中的每一种数据格式和字段(特别是某些坏的记录)。所以,用户代码在处理数据集中的某个特定记录时可能会崩溃。这个时候即使MapReduce有错误处理机制。但是由于存在这种代码缺陷,即使重新执行4次(默认的最大重新执行次数),这个任务仍然会失败,最终导致整个作业失败。所以针对这种由于坏数据导致任务抛出的异常,重新运行任务是无济于事的。但是,如果想要在庞大的数据集中找出这个坏记录,然后在程序中添加相应的处理代码或者直接除去这条坏记录,显然也是很困难的一件事情,况且并不能保证没有其他坏记录。所以最好的办法就是在当前代码对应的任务执行期间,遇到坏记录时就直接跳过(由于数据集巨大,忽略这种极少数的坏记录是可以接受的),然后继续执行,这是Hadoop中的忽略模式(Skipping模式)。

当忽略模式启动时,如果任务连续失败两次,它会将自己正在处理的记录告诉TaskTracker,然后TaskTracker会重新运行该任务并在运行到先前任务报告的记录时直接跳过。从忽略模式的工作方式看,忽略模式只能检测并忽略一个错误记录,因此这种机制仅适用于检测个别错误记录。如果增加任务尝试次数最大值(mapred.map.max.attemps,mapred.reduce.max.attemps),可以增加忽略模式能够检测并忽略的错误记录数目。默认情况下忽略模式是关闭的,可以使用SkipBadRecord类单独为Map和Reduce任务启动他。

任务执行环境

Map和Reduce任务在执行时,怎么知道执行环境及运行所需参数方面的信息呢?其实是这样的,JobTracker在分配任务给TaskTracker后,TaskTracker会去HDFS上将所需的资源拷贝到本地,而TaskTracker是在一个单独JVM上以子进程的形式执行 Mapper/Reducer任务(Task)的,所以启动Map或Reduce Task时,会直接从父TaskTracker处继承任务的执行环境。以下图列出了每个Task执行时使用的本地参数。

                                                                                      图 7 Task 的本地参考表

当Job启动时,TaskTracker会根据配置文件创建Job和本地缓存。TaskTracker的本地目录是${mapred.local.dir}/taskTracker/。在这个目录下有两个子目录:一个是作业的分布式缓存目录,路径是在本地目录后面加上archive/;一个是本地Job目录,路径是本地目录后面加上jobcache/$jobid,在这个目录下保存了Job执行的共享目录(各个任务可以使用这个空间作为暂存空间,用于任务之间的文件共享,此目录通过job.local.dir参数暴露给用户)、存放JAR包的目录(保存作业的JAR文件和展开的JAR文件)、一个XML文件(此XML文件是本地通用的作业配置文件)和根据任务ID分配的任务目录(每个任务都有一个这样的目录,目录中包含本地化的任务作业配置文件,存放中间结果的输出文件目录、任务当前工作目录和任务临时目录)。

关于任务的输出文件需要注意的是,应该确保同一个任务的多个实例不会尝试向同一个文件进行写操作。因为这可能会存在两个问题,第一,如果任务失败并被重试,那么会先删除第一个任务的旧文件;第二,在推测式执行的情况下同一个任务的两个实例会向同一个文件进行写操作。Hadoop通过将输出写到任务的临时文件夹来解决上面的问题。这个临时目录是{mapred.out.put.dir}/temporary/${mapred.task.id}。如果任务执行成功,目录的内容(任务输出)就会被复制到此作业的输出目录({ mapred.out.put.dir})。因此,如果一个任务失败并重试,第一个任务尝试的部分输出会被删除。同时推测式执行时的备份任务和原始任务位于不同的工作目录,它们的临时输出文件夹并不相同,只有先完成的任务才会把其工作目录中的输出内容传到输出目录中,而另外一个任务的工作目录就会被丢弃。

参考资料:Hadoop实战 第2版 陆嘉恒著

原文地址:https://www.cnblogs.com/shihuai355/p/3835104.html