hadoop知识整理(2)之MapReduce

之前写的关于MR的文章的前半部分已丢。

所以下面重点从3个部分来谈MR:

  1)Job任务执行过程,以及主要进程-ResourceManager和NodeManager作用;

  2)shuffle过程;

  3)主要代码;

一、Job任务执行过程

  

  这里是hadoop2.0-ResourceManager的Job的执行过程:

  1)run job阶段,由提交Job客户端JVM完成,主要做job环境信息的收集,各个组件类,如Mapper、Reducer类,输出输入的K-V类型做检验是否合法,并且检验输入hdfs路径的合法性,还有输出hdfs目录是否已经存在,检测不通过,则Job停止。

  2)1阶段通过后,Job会获取一个Application对象,同时给一个应用ID,用于MapReduce的作业ID;

  3)再次检查输入输出目录的合法性,hdfs目录的合法性,计算作业的输入分片,如果分片无法计算,作业将不会提交,错误将返回给MR客户端程序,如果没有问题,将运行作业的所需资源,包括MR程序的JAR文件,配置文件以及输入分片,复制到一个以应用ID命名的hdfs目录下的共享文件系统中,JOB的jar的副本较多,所以在运行job时,集群的所有节点都可访问job的副本;

  4)MR客户端通过调用submitApplication()方法提交Job给RM;

  5)RM即资源管理器(ResourceManager)收到Job作业后,并将请求传递给YARN的调度器(scheduler),调度器会分配一个容器container,然后资源管理器在节点管理器(NodeManager)中启动application master进程;

  6)application master是一个java应用程序,主类为MRAppMaster,他将接收来自Job的进度和完成报告;

  7)application master对Job的初始化,是创建了很多薄记对象,以保持对于job进度的跟踪,然后他将从hdfs共享存储中获得由MR客户端计算的输入分片,然后对每一个split创建一个Map任务,以及确定有几个redece任务;

  8)分配资源,application master程序,会计算构成MR的job的所有任务,判断是在一个节点上进行还是多个节点进行并行计算,简单来说,通过MR的数量来将这个job定性为小任务还是超级(uber)任务;

    小job指的是,少于10个mapper且只有1个reducer,且输入大小小于一个HDFS块的job。通过设置mapreduce.job.ubertask.enable设置为true才可确保启动超级任务作业。

  如果非uber任务,application master会向资源管理器RM请求需要的所有容器资源;当然,请求中先为map任务请求,然后是reduce任务,通常,完成有5%的map任务完成之后,为reduce任务请求资源的信息才会发出; 

  reduce任务可以在集群的任何节点运行,但是map任务尽量本着本地化的策略在进行,尽量减少磁盘的IO操作,通常情况之下,每个map任务和reduce任务都会申请获得1核的cpu以及1GB的内存,参数可配。

  9)一旦RM分配了一个特定节点的容器,那么application master就与该nodeManager进行通信来启动容器;

  10)执行任务的主类为YarnChild,一个JAVA程序,在运行任务之前,首先将需要的资源本地化,从共享的hdfs中取得,包括作业的配置,jar包和其他所有缓存文件等等;

  11)执行MR任务。

  以上是整个Job的生命周期。

  

ResourceManager(RM)
RM是一个全局的资源管理器,负责整个系统的资源管理和分配。它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager,ASM)。
调度器 调度器根据容量、队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等),将系统中的资源分配给各个正在运行的应用程序。需要注意的是,该调度器是一个“纯调度器”,它不再从事任何与具体应用程序相关的工作,比如不负责监控或者跟踪应用的执行状态等,也不负责重新启动因应用执行失败或者硬件故障而产生的失败任务,这些均交由应用程序相关的ApplicationMaster完成。调度器仅根据各个应用程序的资源需求进行资源分配,而资源分配单位用一个抽象概念“资源容器”(Resource Container,简称Container)表示,Container是一个动态资源分配单位,它将内存、CPU资源封装在一起,从而限定每个任务使用的资源量。

应用程序管理器(Applications Manager)负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重新启动它等。

ApplicationMaster(AM)
用户提交的每个应用程序均包含一个AM,主要功能包括:
与RM调度器协商以获取资源(用Container表示);
将得到的任务进一步分配给内部的任务(资源的二次分配);
与NM通信以启动/停止任务;
监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务。

NodeManager(NM)
NM是每个节点上的资源和任务管理器,一方面,它会定时地向RM汇报本节点上的资源使用情况和各个Container的运行状态;另一方面,它接收并处理来自AM的Container启动/停止等各种请求。


Container

Container是YARN中的资源抽象,它封装了某个节点上的内存、CPU资源,当AM向RM申请资源时,RM为AM返回的资源便是用Container表示。YARN会为每个任务分配一个Container,且该任务只能使用该Container中描述的资源。

二、shuffle

  MapReduce确保每个reducer的输入都是按照Key来进行排序的。系统执行排序,且将Map输出作为输入给Reduce的过程称之为shuffer。

   1)map端在输出时,会首先输出到一个内存缓冲区,英文名字为spill,他的默认大小为100M,可以理解为在内存中一部分首尾相连的内存区域,这个内存缓冲区的阈值为80%,可通过mapreduce.task.io.spill.percent参数改变,当map的输出达到阈值时,会把溢出的旧输出内容写入磁盘,新的输出继续往缓冲区去写,至于为什么是80%,是因为内存区的IO远快于物理磁盘的IO速度,所以在达到阈值时,开始溢写,如果spill写满时,仍未写到物理磁盘上,那么map会处于wait状态;

  2)而在将map输出结果写磁盘之前,会根据最后返回给reduce的数据划分成对应的分区,且在每个分区中,后台线程会按照key进行排序,这个时候如果存在一个conbiner,那么conbiner的函数redece是在排序后进行的。运行combiner会使map的输出结果更加紧凑,因此会减少写磁盘IO的压力。

  3)这时的疑问在于,当spill内存缓冲区不足以支撑map的输出时,那么会将输出溢写到本地磁盘中,那么map的输出会有多个磁盘文件,所以,在map任务完成之前,会对他们进行合并排序。

   至于存在combine的情况时, 任务会判断溢出文件的数量,假如溢出文件的数量大于3,那么有必要再对此进行一次combine操作,这个操作的时间是,map任务的最终输出准备向磁盘上写时。所以由此判断,combine可以在map任务中多次执行,也不会影响最终的结果,至于是否再次进行combine操作,那么由map来进行判断,通过溢出文件的数量来进行判断,其主要目的时判断进行combine带来的开销是否足够抵消IO磁盘操作。

  4)在这里,map任务的输出,对这个输出文件进行压缩,然后放到磁盘会更好,这是一个典型的节省磁盘IO的有效操作。这样同样可以减少通过网络IO传输给reduce的文件大小。压缩的配置参数为:mapreduce.map.output.compress设置为true,hadoop便会启动map结果的压缩功能。

  5)至于reduce任务,前面在分析job的执行过程的时候,知道有一个参数会影响application master向RM为reduce申请资源的时间,那便是map任务完成的比率,比率默认是5%,即有5%的map任务完成时,那么reduce任务将开始进行工作。在上图中,reduce是通过fetch(抓)过来map的输出结果,其实是通过网络通信将map的输出结果复制过来。reduce任务有少量的复制线程,默认值为5个,这5个线程可以从多个执行完毕的map任务中复制过来其输出结果。而这个线程的数量,可以通过mapreduce.reduce.shuffle.parallelcopies属性。

  6)这里的问题关键点在于,reduce任务如何得知map任务已经结束,且从哪里获得其输出结果?其实还在于强大的application master,全程负责所有任务的调度工作,当map任务完成后,会通过心跳机制,告知application master,而reduce任务一旦开启,也会有一个线程,不停轮询application master的map任务完成情况,这里推测,完成的map任务的网络主机情况,输出结果的磁盘存储情况,会保存在application master的一个对象中,大概率是一个(HashMap),而当reduce取得map的输出结果之后,并不会马上删除此map的结果释放资源,他会等待application master的通知,这是在整体job完成后执行的。

  7)下一步当reduce取得map任务的输出结果只会,需要进行的就是不停的merge工作。

  如果,map的输出结果非常小,那么直接在reduce任务的jvm内存中进行合并了,但往往这种情况并不会经常发生。

  当有很多个map的输出,且输出文件都比较大,redece会将map的输出结果复制到磁盘,如果磁盘上的副本太多了,那么reduce会将这些个文件合并成更大的文件,而之前在于被压缩的map输出,都会在内存中被解压缩。

  直到将所有的map复制完毕,那么下一步会进行真正的reduce合并操作。

  reduce合并这一块很有意思,hadoop为了减少磁盘的IO,做了很多构想,很巧妙。

  首先有一个指定参数,名字为合并因子,通过:mapreduce.task.io.sort.factor属性设置,默认为10。

  这个因子决定你的map输出数量合并多少次,假如有40个map的输出结果,那么将会合并4次。

  如上图所看,一共有40个map输出,那么hadoop不会每10个文件合并一次,将合并完成的4个文件交给reduce task。

  他会第一次合并4个文件形成s1,第二次、三次、四次分别合并10个文件形成s2和s3以及s4,然后它会将s1、s2、s3、s4以及剩下未合并的6个文件直接交给reduce函数。

  因为map输出结果本身为排序状态,这样操作可以减少了6个map结果文件的多一次通过磁盘IO进行合并的操作,而hadoop这样做,也只是为了减少磁盘IO,多用内存。

  而做完这个操作之后,reduce调用reduce函数,将输出结果复用到HDFS之前配置地输出目录当中。至此,shuffle结束。

而MR任务的魔幻点就在于shuffle过程,他神奇地将乱序地文件,通过一系列map和reduce操作,通过强大地设计application master地控制中心,完美的完成了整理数据工作。

所以,知晓了MR任务地执行过程和shuffle内容,那么MR任务地优化点也来了,不论是通过参数还是在编码中刻意进行改变,都会很好地优化MR。

   推测执行:application master会跟踪每个task的执行情况,当某个task执行过慢时,会创建出这个task的副本,从而进一步判定task是否存在执行失误的情况,假如副本task先行执行完成,那么会废掉原task。

      从中体现application master强大的task线程调度能力。而这个参数的配置方法为:mapreduce.map.speculative/reduce.speculative->true。

    这个推测执行功能有点过于屌了,但并不推荐使用,因为它是以整个集群的资源为代价的,应该根据具体情况开启此功能。

参数优化点

  

调优的总体纲领为:

  1)减少数据传输--》增加conbine操作和map输出压缩操作;

  2)尽量使用内存-》增加spill内存缓冲区的大小-增加map和reduce的jvm内存参数->mapred.child.java.opts,这个是task任务执行时的jvm内存大小;

  3)减少磁盘IO-》压缩map输出,减少reduce合并次数,即增大合并因子参数;

  4)增大任务并行数-》增加reduce的fetch数量,尽量更改此参数数量与map数量一致,达到并行抽取;

  5)剩下就是推测执行了,根据集群网络情况和机器性能进行调优操作。

 jvm调优,jvm重用机制:

  1)默认不允许JVM重用;

  2)一旦开启JVM的重用,所有的task都将在一个jvm中执行,简单表达,即是,所有的包括application master、map task、reduce task都会在一个jvm-container中执行;

  3)此种情况适用于小的MR任务,默认为10个及其以下的map任务,1个的reduce任务,且reduce输入大小为小于一个hdfs文件块的任务。

  4)此种情况依旧适用于海量小文件的情况,减少jvm的频繁启停;

对于海量的小文件,应该将多个小文件处理成为一个文件,以减少map的任务数量。

三、部分源码,主要代码

  MapTask部分,即map小任务部分:

  启动map任务,调用的是其中的run方法

@Override
  public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
    throws IOException, ClassNotFoundException, InterruptedException {
    this.umbilical = umbilical;

    if (isMapTask()) {
      // If there are no reducers then there won't be any sort. Hence the map 
      // phase will govern the entire attempt's progress.
      if (conf.getNumReduceTasks() == 0) {
        mapPhase = getProgress().addPhase("map", 1.0f);
      } else {
        // If there are reducers then the entire attempt's progress will be 
        // split between the map phase (67%) and the sort phase (33%).
        mapPhase = getProgress().addPhase("map", 0.667f);
        sortPhase  = getProgress().addPhase("sort", 0.333f);
      }
    }
    TaskReporter reporter = startReporter(umbilical);
 
    boolean useNewApi = job.getUseNewMapper();
    initialize(job, getJobID(), reporter, useNewApi);

    // check if it is a cleanupJobTask
    if (jobCleanup) {
      runJobCleanupTask(umbilical, reporter);
      return;
    }
    if (jobSetup) {
      runJobSetupTask(umbilical, reporter);
      return;
    }
    if (taskCleanup) {
      runTaskCleanupTask(umbilical, reporter);
      return;
    }

    if (useNewApi) {
      runNewMapper(job, splitMetaInfo, umbilical, reporter);
    } else {
      runOldMapper(job, splitMetaInfo, umbilical, reporter);
    }
    done(umbilical, reporter);
  }

  这里启动了taskReporter,向application master报告执行情况,并且初始化了整个Job任务,且在2.0中,调用了runNewMapper方法;

  

  @SuppressWarnings("unchecked")
  private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  void runNewMapper(final JobConf job,
                    final TaskSplitIndex splitIndex,
                    final TaskUmbilicalProtocol umbilical,
                    TaskReporter reporter
                    ) throws IOException, ClassNotFoundException,
                             InterruptedException {
    // make a task context so we can get the classes
    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
      new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job, 
                                                                  getTaskID(),
                                                                  reporter);
    // make a mapper
    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
    // make the input format
    org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
      (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
        ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
    // rebuild the input split
    org.apache.hadoop.mapreduce.InputSplit split = null;
    split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
        splitIndex.getStartOffset());
    LOG.info("Processing split: " + split);

    org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
      new NewTrackingRecordReader<INKEY,INVALUE>
        (split, inputFormat, reporter, taskContext);
    
    job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
    org.apache.hadoop.mapreduce.RecordWriter output = null;
    
    // get an output object
    if (job.getNumReduceTasks() == 0) {
      output = 
        new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
    } else {
      output = new NewOutputCollector(taskContext, job, umbilical, reporter);
    }

    org.apache.hadoop.mapreduce.MapContext<INKEY, INVALUE, OUTKEY, OUTVALUE> 
    mapContext = 
      new MapContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(job, getTaskID(), 
          input, output, 
          committer, 
          reporter, split);

    org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
        mapperContext = 
          new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext(
              mapContext);

    try {
      input.initialize(split, mapperContext);
      mapper.run(mapperContext);
      mapPhase.complete();
      setPhase(TaskStatus.Phase.SORT);
      statusUpdate(umbilical);
      input.close();
      input = null;
      output.close(mapperContext);
      output = null;
    } finally {
      closeQuietly(input);
      closeQuietly(output, mapperContext);
    }
  }
View Code

在代码中可见, 在这个方法中解析了job中的计算出的,InputSplit信息,这里面封装了所有的map文件的切片信息,而InputSplit对象的初始化,由 private <T> T getSplitDetails(Path file, long offset)方法获得。

这个方法里去获取 T split = deserializer.deserialize(null);切片信息,而切片信息,又通过AvroSerialization获得,代码如下,现在就可以串起来,Job客户端从RM获取了一个输入流,而这个输入流中存储了map所需输入文件的切片信息,类似上文讲的,从hdfs文件系统中下载文件的过程之一,先从NN节点获取文件的切片信息:

@Override
    public T deserialize(T t) throws IOException {
      return reader.read(t, decoder);
    }
View Code

 InputSplit中包含了切片信息,拿到本map任务需要的切片后,通过RecordReader,获取文件内容,然后反射调用程序员缩写的Mapper类。此项代码在runNewMapper方法的第722行:

org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
      (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
        ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
View Code

然后在最后有如下代码:

try {
      input.initialize(split, mapperContext);
      mapper.run(mapperContext);
      mapPhase.complete();
      setPhase(TaskStatus.Phase.SORT);
      statusUpdate(umbilical);
      input.close();
      input = null;
      output.close(mapperContext);
      output = null;
    } finally {
      closeQuietly(input);
      closeQuietly(output, mapperContext);
    }
View Code

这里,初始化输入文件切片,然后run程序员写的mapper再之后输出结果,关闭资源。

需要注意的是,NewOutputCollector这个在上文代码中的作用:

他会每次收集调用map新的kv对,然后将他们spill到内存或者文件中,还可以做进一步的partition和sort和combine操作,当存在reduce的时候,此类代码如下:

  private class NewOutputCollector<K,V>
    extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
    private final MapOutputCollector<K,V> collector;
    private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;
    private final int partitions;

    @SuppressWarnings("unchecked")
    NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                       JobConf job,
                       TaskUmbilicalProtocol umbilical,
                       TaskReporter reporter
                       ) throws IOException, ClassNotFoundException {
      collector = createSortingCollector(job, reporter);
      partitions = jobContext.getNumReduceTasks();
      if (partitions > 1) {
        partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
          ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
      } else {
        partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
          @Override
          public int getPartition(K key, V value, int numPartitions) {
            return partitions - 1;
          }
        };
      }
    }

    @Override
    public void write(K key, V value) throws IOException, InterruptedException {
      collector.collect(key, value,
                        partitioner.getPartition(key, value, partitions));
    }

    @Override
    public void close(TaskAttemptContext context
                      ) throws IOException,InterruptedException {
      try {
        collector.flush();
      } catch (ClassNotFoundException cnf) {
        throw new IOException("can't find class ", cnf);
      }
      collector.close();
    }
  }
View Code

还有一个MapOutputBuffer需要注意,他是在实例化NewOutputCollector时被创建的:

构造方法:

 private final MapOutputCollector<K,V> collector;

collector = createSortingCollector(job, reporter);

然后在

 

创建了这个buffer对象,在这个对象中

 

ReduceTask部分源码:

  

原文地址:https://www.cnblogs.com/qfxydtk/p/11167437.html