剖析MapReduce 作业运行机制

包含四个独立的实体:

     ·  Client Node 客户端:编写 MapReduce代码,配置作业,提交MapReduce作业。

     ·  JobTracker :初始化作业,分配作业,与 TaskTracker通信,协调整个作业的运行。 jobtracker是一个Java 应用程序,它的主类是 JobTracker。

     ·  TaskTracker :保持与 JobTracker通信,在分配的数据片段上执行 Map或Reduce 任务。tasktracker是 Java应用程序,它的主类是TaskTracker。

     ·  分布式文件系统 (一般为HDFS) :保存作业的数据,配置信息等,保存作业结果,在其他实体间共享作业文件。
 

(一)作业的提交:

一个MapReduce 作业在提交到 Hadoop上之后,会进入完全地自动化执行过程。在这个过程中,用户除了监控程序的执行情况和强制中止作业之外,不能对作业的执行过程进行任何干扰。所以在作业提交之前,用户需要将所有应该配置的参数按照自己的意愿配置完毕。

需要配置的主要内容有:

public static void main(String[] args) throws Exception{
       JobConf conf = new JobConf(WordCount.class);
       conf.setJobName(“wordcount”);
      
       conf.setOutputKeyClass(Text.class);
       conf.setOutputValueClass(IntWritable.class);
 
       conf.setMapperClass(Map.class);
       conf.setReduceClass(Reduce.class);

       conf.setInputFormat(TextInputFormat.class);
       conf.setOutputFormat(TextOutputFormat.class);

       FileInputFormat.setInputPaths(conf,new Path(args[0]));
       FileOutputFormat.setOutputPath(conf,new Path(args[1]));

       JobClient.runJob(conf);
}

*  程序代码:这里主要是指 map函数和reduce 函数的具体代码,这是一个 MapReduce作业对应的程序必不可少的部分,并且这部分代码的逻辑正确与否与运行结果直接相关。

*  Map接口和 Reduce接口的配置:在MapReduce中, Map接口需要派生自Mapper<k1,v1,k2,v2>接口,Reduce 接口则要派生自 Reduce<k2,v2,k3,v3>。它们都对应唯一一个方法,分别是 map函数和reduce 函数,也就是在上一点中所写的代码。在调用这两个方法时需要配置它们的四个参数,分别是输入 key的数据类型、输入value的数据类型、输出 key-value对的数据类型和Reporter实例,其中输入输出的数据类型要与继承时设置的数据类型相同,还有一个要求是 Map接口的输出key-value 类型要与Reduce的输入key-value对应,因为 map输出组合value 之后,它们会成为 reduce的输入内容。

*  输入输出路径:作业提交之前还需要在主函数中配置 MapReduce作业在Hadoop 集群上的输入路径和输出路径 (必须保证输出路径不存在,如果存在程序会报错,这也是初学者经常犯的错误 )。

*  其他类型设置,比如调用 runJob方法:先要在主函数中配置如 Output的key 和value类型、作业名称、InputFormat和OutputFormat 等,最后再调用 JobClient的runJob 方法。

配置完作业的所有内容并确认无误之后就可以运行作业了。( 步骤1run job

用户程序调用JobClient 的runJob方法,在提交 JobConf对象之后,runJob 方法会先调用 JobSubmissionProtocol接口所定义的submitJob 方法,并将作业提交给 JobTracker。紧接着,runJob 不断循环,并在循环中调用JobSubmissionProtocol的getTaskCompletionEvents 方法,获取 TaskCompletionEvent类的对象实例,了解作业的实时执行情况。如果发现作业运行状态有更新,就将状态报告给 JobTracker。作业完成后,如果成功就显示作业计数器,否则,将导致作业失败的错误记录到控制台。

上面介绍了作业提交的过程,可以看出,最关键的是 JobClient对象中submitJob() 方法的调用执行,那么 submitJob()方法具体是怎么做的呢?下面从 submitJob()方法的代码出发介绍作业提交的详细过程。

public RunningJob submitJob (JobConf job) throws FileNotFoundException, InvalidJobConfException, IOException{
  
        // 从JobTracker 得到当前任务的 ID
        JobID jobId  =  jobSubmitClient.getNewJobId();

        // 获取HDFS 路径:
        Path submitJobDir  =  new Path (getSystemDir(),jobId.toString());

        // 获取提交作业的 JAR文件目录
        Path submitJarFile  =  new Path( submitJobDir , “job.jar”);

        // 获取提交输入文件分割信息的路径
        Path submitSplitFile  =  new Path (submitJobDir , “job.split”);

        // 此处将-libjars 命令行指定的 jar 上传至HDFS
        configureCommandLineOprions (job , submitJobDir , submitJarFile);

        // 获取作业配置文档的路径
        Path submitJobFile  =  new Path(submitJobDir , “job.xml”);
        …….
        // 通过input format 的格式获得相应的 input split ,默认类型为
        // FileSplit InputSplit[] splits  =  job.getInputFormat().getSplits(job, job.getNumMapTasks());
        // 从这里开始对文件进行分割             
        // 生成一个写入流,将 input split 的信息写入 job.split文件
        FSDataOutputStream out  =  FileSystem.create( fs , submitSplitFile , new FsPermission(JOB_FILE_PERMISSION));
        try{
             writeSplitsFile(splits,out);
}finally{
             out.close();
       }
        job.set(“mapred.job.split.file”,submitSplitFile.toString());
       
        // 根据split 的个数设定 map task 的个数
        job.setNumMapTask(splits.length);

        // 将job 的配置信息写入 job.xml 文件
        out  =  FileSystem.create(fs , submitJobFile , new FsPermission(JOB_FILE_PERMISSION));
        try{
             job.writeXml(out);
        }finally{
             out.close();
        }

        // 真正地调用JobTracker 来提交任务
        JobStatus status  =  jobSubmitClient.submitJob(jobId);
        ……….
}

从上面的代码可以看出,整个提交过程包含以下步骤:

1)  通过调用 JobTracker对象的getNewJobId() 方法从JobTracker处获取当前作业的 ID号( 步骤2: get new job ID) 。

2)  检查作业相关路径。在代码中获取各个路径信息的时候会对作业的对应路径进行检查。比如,如果没有指定输出目录或它已经存在,作业就不会被提交,并且会给 MapReduce程序返回错误信息,再比如输入目录不存在也会返回错误等。

3)  计算作业的输入划分,并将划分信息写入 job.split文件,如果写入失败就会返回错误。 split文件的信息主要包括:split文件头、 split文件版本号、split 的个数。这些信息中每一条都会包括以下内容: split类型名( 默认FileSplit)、 split的大小、split 的内容(对于 FileSplit来说是写入的文件名,此 split在文件中的起始位置上)、 split的location 信息(即在哪个 DataNode上) 。

4)  将运行作业所需要的资源——包括作业 JAR文件、配置文件和计算所得的输入划分等——复制到一个以作业 ID命名的目录的HDFS 上。作业 JAR的副本较多( 由mapred.submit.replication属性控制,默认值为10),因此在运行作业的任务时,集群中有很多个副本可供 tasktracker访问。(步骤3:copy job resources)。(这里需要注意的是要处理的数据文件是事先先上传到HDFS 处,这里的copy job resources只是获取已经上传的数据的路径和配置文件的路径,还有分割文件的信息,在将分割的信息写入到配置信息中)。

5)  调用 JobTracker对象的submitJob() 方法来真正提交作业,告诉 JobTracker作业准备执行(步骤4 submit job)。

初始化作业:

在客户端用户作业调用JobTracker对象的 submitJob()方法后,JobTracker 会把此调用放入一个内部队列中,交由作业调度器(TaskScheduler)变量进行调度,默认的调度方法是 JobQueneTaskScheduler,也就是FIFO 调度方法。当客户作业被调度执行时,JobTracker会创建一个代表这个作业的 JobInProgress对象,并将任务和记录信息封装到这个对象中,以便跟踪任务的状态和进程。接下来 JobInProgress对象的initTasks 函数会对任务进行初始化操作( 步骤5:initialize job)。

下面仍然从initTasks 函数的代码出发详细讲解初始化过程。

public synchronized void initTasks() throws IOException{
        ………
        // 从HDFS 中作业对应的路径读取 job.split 文件,生成 input
        //splits 为下面 map的划分做好准备
        String jobFile = profile.getJobFile();
        Path sysDir = new Path(this.jobtracker.getSystemDir());
        FileSystem fs = sysDir.getFileSystem(conf);
        DataInputStream splitFile =
        fs.open(new Path(conf.get(“mapred.job.split.file”)))
        JobClient.RawSplit [] splits;
        try{
             splits = JobClient.readSplitFile(splitFile);
        }finally{
             splitFile.close();
        }
      
        // 根据input split 设置 map task个数
        numMapTasks = splits.length;
       
         //为每个 map tasks 生成一个 TaskInProgress来处理一个 input split
        maps = new TaskInProgress[numMapTasks];
        for( int i = 0 ; I < numMapTasks ; ++i  ){
               inputLength += splits[i] .getDataLength();
               maps[i] = new TaskInProgress(jobId , jobFile , splits[i] , jobtracker , conf , this , i);
        }
        if (numMapTasks > 0){
               //map task 放入 nonRunningMapCache,其将在 JobTracker 向
               //TaskTracker 分配 map task 的时候使用
               nonRunningMapCache = createCache(splits,maxLevel);
         }
        // 创建reduce task
        this.reduces = new TaskInProgress[numReduceTasks];
        for( int i = 0 ; i < numReduceTasks ; i++ ){
               reduces[i] = new TaskInProgress( jobId , jobFile , numMapTasks , i , jobtracker , conf , this );
        //reduce task 放入nonRunningReduces ,其将在 JobTracker 向
        //TaskTracker 分配reduce task 的时候使用
        nonRunningReduces.add(reduces[i]);
        }
  
        // 清理map 和 reduce
         cleanup = new TaskInProgress[2];
         cleanup[0] = new TaskInProgress(jobId , jobFile , splits[0] , jobtracker , conf , this , numMapTasks);
         cleanup[0].setJobCleanupTask();
         cleanup[1] = new TaskInProgress(jobId , jobFile ,numMapTasks , numReduceTasks , jobtracker , conf , this);
        cleanup[1].setJobCleanupTask();

        // 创建两个初始化 task,一个初始化 map ,一个初始化 reduce
        setup = new TaskInProgress[2];
        setup[0] = new TaskInProgress(jobId , jobFile , splits[0] , jobtracker , conf , this , numMapTasks + 1  );
        setup[0].setJobSetupTask();
        setup[1] = new TaskInProgress(jobId , jobFile , numMapTasks , numReduceTasks + 1  , jobtracker , conf , this  );
        setup[1].setJobSetupTask();
        taskInited.set(true);   // 初始化完毕
}

从上面的代码可以看出在初始化过程中主要有以下步骤:

1)从 HDFS中读取作业对应的job.split( 步骤6:retrieve input splits)。JobTracker 从HDFS中作业对应的路径获取JobClient在步骤3 中写入的 job.split文件,得到输入数据的划分信息。为后面初始化过程中 map任务的分配做好准备。

2)创建并初始化 map任务和reduce 任务。initTasks先根据输入数据划分信息中的个数设定 map task的个数,然后为每个map tasks 生成一个 TaskInProgress来处理input split,并将 map task放入nonRunningMapCache 中,以便在 JobTasker向TaskTracker 分配map task的时候使用。接下来根据 JobConf中的mapred.reduce.tasks 属性利用 setNumReduceTasks()方法来设置reduce task 的个数,然后采用类似 map task的方式将reduce task 放入nonRunningReduces中,以便在向 TaskTracker分配reduce task 的时候使用。

3)最后就是创建两个初始化 task,根据个数和输入划分已经配置的信息,并分别初始化 map和reduce 。

分配任务:

在前面的介绍中已经知道, TaskTracker和JobTracker 之间的通信与任务的分配是通过心跳机制完成的。 TaskTracker作为一个单独的JVM执行一个简单的循环,主要实现每隔一段时间向 JobTracker发送心跳(heartbeat) :告诉JobTracker,此TaskTracker是否存活,是否准备执行新的任务。在 JobTracker接收到心跳信息后,如果有待分配的任务,它就会为 TaskTracker分配一个任务,并将分配信息封装在心跳通信的返回值中返回给 TaskTracker,TaskTracker 从心跳方法的 Response中得知此TaskTracker 需要做的事情,如果是一个新的 task则将task 加入本机的任务队列中( 步骤7:heartbeat returns task)。

下面从TaskTracker 中的transmitHeartBeat()方法和 JobTracker中的heartbeat() 方法的主要代码出发,介绍人任务分配的详细过程,以及在此过程中 TaskTracker和JobTracker 的通信。

TaskTracker中 transmitHeartBeat()方法的主要代码如下所示:

// 向 JobTracker报告 TaskTracker 的当前状态
if( status == null ){
        synchronized( this ){
             status = new TaskTrackerStatus( taskTrackerName , localHostname , httpPort , cloneAndResetRunningTaskStatuses(sendCounters) , failures , maxCurrentMapTasks , maxCurrentReduceTasks );
        }
}
……..
//根据条件是否满足来确定此 TaskTracker 是否请求JobTracker
//为其分配新的 Task
Boolean askForNewTask ;
long localMinSpaceStart ;
synchronized ( this ) {
        askForNewTask = (status.countMapTasks() < maxCurrentMapTasks || status.countReduceTasks() < maxCurrentReduceTasks)  &&  acceptNewTasks;
        localMinSpaceStart = minSpaceStart;
}
……..
//向 JobTracker 发送heartbeat
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,justStarted,askForNewTask,heartbeatResponseId);
 ……….

JobTracker中 heartbeat()方法的主要代码如下所示:

……..
String trackerName = status.getTrackerName();
……..
//如果 TaskTracker 向JobTracker 请求一个 task 运行
if (acceptNewTasks){
        TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
         if (taskTrackerStatus == null ){
                  LOG.warn(“Unknow task tracker polling; ignoring : ” + trackerName );
         } else {
                  List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
                  if (tasks == null ){
                         // 任务调度器分配任务
                         tasks = taskScheduler.assignTasks(taskTrackerStatus);       
                  }
                  if (task != null ){
                       for (Task task : tasks ){
                             // 将任务返回给 TaskTracker
                             expireLaunchingTasks.addNewTask(task.getTaskID());
                             actions.add(new LaunchTaskAction(task));
}} }}………

上面两段代码展示了TaskTracker和 JobTracker之间通过心跳通信汇报状态与分配任务的详细过程。 TaskTracker首先发送自己的状态(主要是 map任务和reduce 任务的个数是否小于上限 ),并根据自身条件选择是否向 JobTracker请求新的Task ,最后发送心跳。 JobTracker接收到TaskTracker 的心跳之后首先分析心跳信息,如果发现 TaskTracker在请求一个task ,那么任务调度器就会将任务和任务信息封装起来返回给 TaskTracker。

在JobTracker 为TaskTracker选择任务之前, JobTracker必须先选定任务所在的作业,默认的调度算法是简单维护一个作业优先级列表。一旦选择好作业, JobTracker就可以为该作业选定一个作业。

针对map 任务和reduce任务, TaskTracker从JobTracker 有固定数量的任务槽 (map任务和reduce 任务的个数都有上限 )。当TaskTracker 从JobTracker返回的心跳信息中获取新的任务信息时,它会将 map任务或者reduce 任务加入到对应的任务槽中。需要注意的是,在 JobTracker为TaskTracker 分配map任务的时候,为了减小网络带宽会考虑将 map任务数据本地化。它会根据TaskTracker的网络位置,选取一个距离此 TaskTracker map任务最近的输入划分文件分配给此 TaskTracker。最好的情况是,划分文件就在 TaskTracker本地(TaskTracker 往往是HDFS的 DataNode中,所以这种情况是存在的 )。(体现移动计算比移动数据经济的好处)。

执行任务:

在TaskTracker 申请到新的任务之后,就要在本地运行任务了。运行任务的第一步是将任务本地化 (将任务运行所必需的数据、配置信息、程序代码从 HDFS复制到TaskTracker 本地(步骤8:retrieve job resources)。这主要是通过调用 localizeJob()方法来完成的。这个方法主要通过下面几个步骤来完成任务的本地化:

1)将 job.split拷贝到本地;

2)将 job.jar拷贝到本地;

3)将 job的配置信息写入job.xml;

4)创建本地任务目录,解压 job.jar;

5)调用 launchTaskForJob()方法发布任务(步骤9:launch)。

任务本地化之后,就可通过调用 launchTaskForJob()真正启动起来。接下来launchTaskForJob()又会调用 launchTask()方法启动任务。launchTask()方法的主要代码如下:

……..
//创建 task 本地运行目录
localizeTask(task);
if ( this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ){
         this.taskStatus.setRunState( TaskStatus.State.RUNNING );
}
//创建并启动 TaskRunner
this.runner = task.createRunner(TaskTracker.this , this);
this.runner.start()
this.taskStatus.setStartTime(System.currentTimeMillis());
………

从代码中可以看出launchTask()方法先会为任务创建本地目录,然后启动 TaskRunner。在启动TaskRunner 后,对于 map任务,会启动MapTaskRunner ;对于reduce任务则启动 ReduceTaskRunner。

这之后,TaskRunner 又会启动新的 Java虚拟机来运行每个任务( 步骤10:run)。以 map任务为例,任务执行的简单流程是:

1)配置任务的执行参数 (获取Java 程序的执行环境和配置参数等 );

2)在 Child临时文件表中添加map任务信息(运行 map和reduce 任务的主进程是 Child类);

3)配置 log文件夹,然后配置map任务的通信和输出参数;

4)读取 input split,生成RecordReader 读取数据;

5)为 map任务生成MapRunnable ,依次从 RecordReader中接收数据,并调用Mapper的 map函数进行处理;

6)最后将 map函数的输出调用collect收集到 MapOutputBuffer中。

更新任务执行进度和状态:

一个MapReduce 作业在提交到 Hadoop上之后,会进入完全自动化执行过程,用户只能监控程序的执行状态和强制中止作业。但是 MapReduce作业是一个长时间运行的批量作业,有时候可能需要运行数小时。所以对于用户而言,能够得知作业的运行状态是非常重要的。在 Linux终端运行MapReduce 作业时,可以看到在作业执行过程中有一些简单的作业执行状态报告,这能让用户大致了解作业的运行情况,并通过与预期运行情况进行对比来确定作业是否按照预定方式运行。

在MapReduce 作业中,作业的进度主要由一些可衡量可计数的小操作组成。比如在 map任务中,其任务进度就是已处理输入的百分比,比如完成 100条记录中的50 条,那么 map任务的进度就是50%(这里只针对一个 map任务举例,并不是指在Linux终端中执行 MapReduce任务时出现的map50% ,在终端中出现的 50% 是总体map 任务的进度,这是将所有 map任务的进度组合起来的结果)。总体来讲, MapReduce作业的进度由下面几项组成: mapper(或reducer) 读入或写出一条记录,在报告中设置状态描述,增加计数器,调用 Reporter对象的progress() 方法。

由MapReduce 作业分割的每个任务中都有一组计数器,它们对任务执行过程中的进度组成事件进行计数。如果任务要报告进度,它便会设置一个标志以表明状态变化将会发送到 TaskTracker上。另一个监听线程检查到这标志后,会告知 TaskTracker当前的任务状态。具体代码如下 (这是MapTask 中run函数的部分代码 ):

// 同 TaskTracker通信,汇报任务执行进度
final Reporter reporter = getReporter(umbilical);
startCommunicationThread(umbilical);
initialize(job , reporter);

同时,TaskTracker 每隔5秒在发送给 JobTracker的心跳中封装任务状态,报告自己任务执行状态。具体代码如下 (这是TaskTracker 中transmitHeartBeat()方法的部分代码 ):

// 每隔一段时间,向 JobTracker 返回一些统计信息
Boolean sendCounters;
if(now > (previousUpdate + COUNTER_UPDATE_INTERVAL )){
         sendCounters = true;
         previousUpdate = now;
}
else{
         sendCounters = false;
}

  通过心跳通信机制,所有 TaskTracker的统计信息都会汇总到JobTracker处。 JobTracker将这些统计信息合并起来,产生一个全局作业进度统计信息,用来表现正在运行的所有作业,以及其中所含任务的状态。最后, JobClient通过每秒查看JobTracker 来接收作业进度的最新状态。具体代码如下(这是 JobClient中用来提交作业的runJob()方法的部分代码):

// 首先生成一个 JobClient 对象
JobClient jc = new JobClient(job) ;
……..
//调用 submitJob 来提交一个任务
running = jc.submitJob(job);
JobID jobId = running.getID();
……..
//while循环中不断查看 JobTracker 来获取作业进度
while(true){
   ……..
}

完成作业:

  所有TaskTracker 任务的执行进度信息都会汇总到 JobTracker处,当JobTracker 接收到最后一个任务的已完成通知后,便把作业的状态设置为“成功”。然后, JobClient也将及时得知任务已成功完成,它便会显示一条信息告知用户作业已完成,最后从runJob()方法处返回( 在返回后 JobTracker会清空作业的工作状态,并指示 TaskTracker也清空作业的工作状态,比如删除中间输出等 )。

原文地址:https://www.cnblogs.com/yangyquin/p/5021223.html