hadoop自带例子SecondarySort源码分析MapReduce原理

这里分析MapReduce原理并没用WordCount,目前没用过hadoop也没接触过大数据,感觉,只是感觉,在项目中,如果真的用到了MapReduce那待排序的肯定会更加实用。

先贴上源码

package examples;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

/**
 * This is an example Hadoop Map/Reduce application. It reads the text input files that must contain
 * two integers per a line. The output is sorted by the first and second number and grouped on the
 * first number.
 *
 * To run: bin/hadoop jar build/hadoop-examples.jar secondarysort <i>in-dir</i> <i>out-dir</i>
 */
public class SecondarySort {

  /**
   * Define a pair of integers that are writable. They are serialized in a byte comparable format.
   */
  public static class IntPair implements WritableComparable<IntPair> {
    private int first = 0;
    private int second = 0;

    /**
     * Set the left and right values.
     */
    public void set(int left, int right) {
      first = left;
      second = right;
    }

    public int getFirst() {
      return first;
    }

    public int getSecond() {
      return second;
    }

    /**
     * Read the two integers. Encoded as: MIN_VALUE -&gt; 0, 0 -&gt; -MIN_VALUE, MAX_VALUE-&gt; -1
     */
    @Override
    public void readFields(DataInput in) throws IOException {
System.out.println("in read Fields");
      first = in.readInt() + Integer.MIN_VALUE;
      second = in.readInt() + Integer.MIN_VALUE;
    }

    @Override
    public void write(DataOutput out) throws IOException {
System.out.println("in write");
      out.writeInt(first - Integer.MIN_VALUE);
      out.writeInt(second - Integer.MIN_VALUE);
    }

    @Override
    public int hashCode() {
System.out.println("in hashCode");
      return first * 157 + second;
    }

    @Override
    public boolean equals(Object right) {
System.out.println("in equals");
      if (right instanceof IntPair) {
        IntPair r = (IntPair) right;
        return r.first == first && r.second == second;
      } else {
        return false;
      }
    }

    /** A Comparator that compares serialized IntPair. */
    public static class Comparator extends WritableComparator {
      public Comparator() {
        super(IntPair.class);
      }

      public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
System.out.println("in IntPair's Comparator's compare");
        return compareBytes(b1, s1, l1, b2, s2, l2);
      }
    }

    static { // register this comparator
      WritableComparator.define(IntPair.class, new Comparator());
    }

    @Override
    public int compareTo(IntPair o) {
System.out.println("in IntPair's compareTo");
      if (first != o.first) {
        return first < o.first ? -1 : 1;
      } else if (second != o.second) {
        return second < o.second ? -1 : 1;
      } else {
        return 0;
      }
    }
  }

  /**
   * Partition based on the first part of the pair.
   */
  public static class FirstPartitioner extends Partitioner<IntPair, IntWritable> {
    @Override
    public int getPartition(IntPair key, IntWritable value, int numPartitions) {
System.out.println("in FistPartitioner");
      return Math.abs(key.getFirst() * 127) % numPartitions;
    }
  }

  /**
   * Compare only the first part of the pair, so that reduce is called once for each value of the
   * first part.
   */
//  public static class FirstGroupingComparator implements RawComparator<IntPair> {
//    @Override
//    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
//System.out.println("in FirstGroupingComparator's compare 6 params");
//      return WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8, b2, s2, Integer.SIZE / 8);
//    }
//
//    @Override
//    public int compare(IntPair o1, IntPair o2) {
//System.out.println("in FirstGroupingComparator's compare 2 params");
//      int l = o1.getFirst();
//      int r = o2.getFirst();
//      return l == r ? 0 : (l < r ? -1 : 1);
//    }
//  }

  public static class FirstGroupingComparator extends WritableComparator {
    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
System.out.println("in FirstGroupingComparator's compare 6 params");
System.out.println("===========");
for (byte b : b1) {
  System.out.print(b + " - ");
}
System.out.println();
for (byte b : b2) {
  System.out.print(b + " - ");
}
System.out.println();
System.out.println(s1 + " " + l1 + " " +s2 + " " + l2);
      return WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8, b2, s2, Integer.SIZE / 8);
    }

    public int compare(IntPair o1, IntPair o2) {
System.out.println("in FirstGroupingComparator's compare 2 params");
      int l = o1.getFirst();
      int r = o2.getFirst();
      return l == r ? 0 : (l < r ? -1 : 1);
    }
  }
  
  /**
   * Read two integers from each line and generate a key, value pair as ((left, right), right).
   */
  public static class MapClass extends Mapper<LongWritable, Text, IntPair, IntWritable> {

    private final IntPair key = new IntPair();
    private final IntWritable value = new IntWritable();

    @Override
    public void map(LongWritable inKey, Text inValue, Context context)
        throws IOException, InterruptedException {
System.out.println("in map");
      StringTokenizer itr = new StringTokenizer(inValue.toString());
      int left = 0;
      int right = 0;
      if (itr.hasMoreTokens()) {
        left = Integer.parseInt(itr.nextToken());
        if (itr.hasMoreTokens()) {
          right = Integer.parseInt(itr.nextToken());
        }
        key.set(left, right);
        value.set(right);
        context.write(key, value);
      }
    }
  }

  /**
   * A reducer class that just emits the sum of the input values.
   */
  public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable> {
    private static final Text SEPARATOR =
        new Text("------------------------------------------------");
    private final Text first = new Text();

    @Override
    public void reduce(IntPair key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException {
System.out.println("in reduce");
      context.write(SEPARATOR, null);
      first.set(Integer.toString(key.getFirst()));
      for (IntWritable value : values) {
        context.write(first, value);
      }
    }
  }

  public static void main(String[] args) throws Exception {
    SimpleDateFormat formatter = new SimpleDateFormat("dd-MM-yyyy HH:mm:ss:SSS");
    args = new String[] {"sort", "output/" + formatter.format(new Date())};
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: secondarysort <in> <out>");
      System.exit(2);
    }
    Job job = Job.getInstance(conf, "secondary sort");
    job.setJarByClass(SecondarySort.class);
    job.setMapperClass(MapClass.class);
    job.setReducerClass(Reduce.class);

    // group and partition by the first int in the pair
    job.setPartitionerClass(FirstPartitioner.class);
    job.setGroupingComparatorClass(FirstGroupingComparator.class);

    // the map output is IntPair, IntWritable
    job.setMapOutputKeyClass(IntPair.class);
    job.setMapOutputValueClass(IntWritable.class);

    // the reduce output is Text, IntWritable
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(false) ? 0 : 1);
  }

}

为了看程序是怎么跑的,所以在上面加上了各种输出。

从头开始分析,单纯的从main方法开始,集群上不太好跟踪代码,所以分析本地的,原理都类似。

这里是在单机跑的,纯单机,eclipse的hadoop插件都没有,这里在main方法的参数中加上了输入、输出路径。

启动main方法后,new Configuration()先加载默认的配置文件。加载core-default.xml,core-site.xml。

new GenericOptionsParser(conf, args).getRemainingArgs(),先获取命令行上的参数,剩下的就是输入输出路径(这里是在代码里直接启动,没有命令行参数,也可以模拟方法和输入输出路径类似)。

设置job的一些配置。先根据configuration生成JobConf,加载配置。在根据JobConf获取Job实例,设置job名。

自定义的设置job属性,setJarByClass程序的入口。设置map,reduce类。
setPartitionerClass设置分区器,如果是在集群中,会有多个map同时运行,分区器用来确定map的输出值应该分配到哪个reduce。如果没设置这个,默认的分区器是HashPartitioner调用输出键的hashCode方法,然后用hashCode方法的结果对reduce的数量进行一个模数(modulo)运算,最后得到那个目标reduce。默认的分区器使用整个键。这就不适于组合键了。因为它可能把有同样自然键的组合键发送给不同的reduce。因此,就需要自定义分区器,基于自然键进行分区。我们在这里虽然设置了这个参数,但是如果数量小,并不会调用自定义的getPartition方法。
setGroupingComparatorClass。分组,当reduce阶段将在本地磁盘上的map输出的记录进行流化处理(streaming)的时候,需要要进行分组。在分组中,记录将被按一定方式排成一个有逻辑顺序的流,并被传输给reduce。在分组阶段,所有的记录已经经过了次排序。分组比较器需要将有相同fitst值的记录分在同一个组。
设置map和reduce端的输入输出key和value。
FileInputFormat的超类是InputFormat,getSplits方法将输入的数据分片,createRecordReader将分片的数据生成一个个key/value的键值对传给map处理,computeSplitSize方法就是需要多少个map处理数据。hdfs中块的大小是64M,Split大小最好也设置成64M,利于资源本地化。Split只是一个逻辑的概念,它只是记录了数据的起始位置,路径等一些其他的信息。
FileOutputFormat是设置输出的格式。

进入job.waitForCompletion(false)开始程序。false不显示最后的统计信息,可以设置成true显示信息。

waitForCompletion方法中,开始进入submit()提交作业。这里hadoop的版本是3.0.0-alpha,会有设置使用新的api。进入connect方法,这里的方法主要的作用是在Job类中实例化一个cluster(return new Cluster(getConfiguration()))建立连接,进入initialize方法初始化,根据jobTrackAddr为空生成ClientProtocol clientProtocol,最终生成的是LocalJobRunner(在集群中,可以配置mapreduce使用yarn,这时这里生成的就是YARNRunner,YARNRunner的作用管理Job和Task)。这里有MapTaskRunnable和ReduceTaskRunnable,这两个里面有run方法,分别是map和reduce启动时调用的方法。上面ClientProtocol,还有其他的protocol是RPC通讯时的协议。将信息封装进Cluster cluster中,根据cluster得到JobSubmitter submitter,去提交作业submitter.submitJobInternal。
checkSpecs()检查输出路径是否存在,如果存在就报错。
JobSubmissionFiles.getStagingDir()初始化用于存放job相关资源的存放路径,会创建目录file:/tmp/hadoop/mapred/staging/{电脑登陆的用户名}{随机数}/.staging
获取并设置hostname和ip
ClientProtocol类型的submitClient(这里是LocalJobRunner)通过RPC申请一个JobID,如果集群用了Yarn,就是向ResourceManager申请JobID。设置在job上。
通过上面生成的job相关资源的存放目录和JobID一起生成一个新的提交作业的目录。
从HDFS的NameNode获取验证用的Token,并将其放入缓存。
copyAndConfigureFiles()。上传资源到上面生成的提交作业的目录。设置工作目录。
生成一个提交作业的xml配置文件(file:/tmp/hadoop/mapred/staging/fzk1167408138/.staging/job_local1167408138_0001/job.xml)就是上面提交作业的目录下的job.xml文件。
向上面生成的job.xml文件写入数据。进入到writeNewSplits()方法,方法中获取InputFormat(这里是TextInputFormat)的实例input,调用input.getSplits()获取List<InputSplit> splits。分片的大小由如下几个参数决定:mapreduce.input.fileinputformat.split.maxsize、mapreduce.input.fileinputformat.split.minsize、文件的块大小。具体计算方式为:Math.max(minSize, Math.min(maxSize, blockSize))分片的大小有可能比默认块大小64M要大,当然也有可能小于它,默认情况下分片大小为当前HDFS的块大小,64M。调用SplitComparator的compare把splits排序,SplitComparator是JobSubmitter的内部类目的就是排序。排序之后大的在前边。创建file:/tmp/hadoop/mapred/staging/ito1621858443/.staging/job_local1621858443_0001/job.split。设置split数据备份数(mapreduce.client.submit.file.replication),默认是10。splits的个数就是map任务的个数。
将配置文件( core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, yarn-default.xml, yarn-site.xml)写入到submitJobFile(file:/tmp/hadoop/mapred/staging/fzk1621858443/.staging/job_local1621858443_0001/job.xml)文件中。
submitClient.submitJob()。提交任务。集群中就是提交到Yarn中。方法中,创建localJobFile(file:/tmp/hadoop-fzk/mapred/local/localRunner/ito/job_local1621858443_0001/job_local1621858443_0001.xml),将配置文件写入。开始map和reduce线程(this.start)。在LocalJobRunner中的run方法中调用runtasks()方法,map任务结束后执行reduce任务。先执行MapTaskRunnable的run方法。所有map任务结束后执行ReduceTaskRunnable的run方法。上面一系列的配置文件的复制就是在为map和reduce准备。

-->MapTaskRunnable.run()
获取MapTask map,进行设置。
-->map.run(MapTask.run())
-->initialize(job, getJobID(), reporter, useNewApi)  这里会初始化临时工作目录和输出目录
--> runNewMapper()  获取Mapper,InputFormat,Split,RecordReader<INKEY,INVALUE> input,RecordWriter output等
-->mapper.run(mapperContext)    setup()为空方法。
-->map()方法,用户自定义的map()方法中
-->context.write(key, value)  -->WrappedMapper.Context.write() -->TaskInputOutputContextImpl.write() --> MapTask.NewOutputCollector.write() 方法中只有collector.collect(key, value,partitioner.getPartition(key, value, partitions)); 一行代码partitioner.getPartition(key, value, partitions)中的partitioner是MapTask.NewOutputCollector.(org.apache.hadoop.mapreduce.Partitioner<K,V>),还记得一开始设置的setPartitionerClass()。这里的代码

      if (partitions > 1) {
        partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
          ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
      } else {
        partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
          @Override
          public int getPartition(K key, V value, int numPartitions) {
            return partitions - 1;
          }
        };
      }

partitions也是MapTask.NewOutputCollector中的成员,分离的数量也就是reduceTask的数量,这里的数据量比较少只有一个,所以走else。在else中Partitioner中实现了compare方法,所以走的是这个compare方法。如果这里的数据量大的花,有多余一个reduceTask的话就会走自己的FirstPartitioner中的getPartition方法。可以试试。
collector.collect-->MapTask.MapOutputBuffer.collect()  中间有行keySerializer.serialize(key)方法,进入到自己写的IntPair的write()方法。
反复执行map(),将所有数据解析完成后执行cleanup()方法,默认空方法,同setup()一样,程序可重写。

mapper.run()执行完成后接着向下执行,进入output.close(mapperContext);
-->MapTask.close()
-->collector.flush()
-->MapTask.sortAndSpill()  创建临时文件/tmp/hadoop-fzk/mapred/local/localRunner/fzk/jobcache/job_local1621858443_0001/attempt_local1621858443_0001_m_000000_0/output/spill0.out
进入sorter.sort(MapOutputBuffer.this, mstart, mend, reporter) --> Quick.sort() --> Quick.sortInternal()
其中不断调用s.compare()方法,这个s从最外层传的是MapOutputBuffer.this。s.compare() --> Task.MapOutputBuffer.compare() 进入了我们自定义的Comparator的compare()方法。  -->WritebleComparator.compareBytes() -->FastByteComparisons.compareTo() -->LexicographicalComparerHolder.BEST_COMPARER.compareTo()最终的这个compareTo方法是二进制的比较方法,这种比较方法不需要进行序列化,效率更高。这里为什么会进入我们的自定义的Comparator的compare方法?MapReduce程序中,想使用自定义的类最为Key那么就必须继承Writable和Comparable接口,继承Writable是因为文件会进行序列化和反序列化,也正因为这个我们也需要提供一个无参构造器。继承Comparable是因为在进行作业时,项目会多次进行排序。还需要重写hashCode方法和equals方法。由于需要二次排序所以还需要一个比较器(这里是自定义的Comparator,IntPair的内部类),一般这个类写成Key的内部类,并提供一个无参构造器,将要比较的类(IntPari)传入。并重写compare方法。然后注册。

    public static class Comparator extends WritableComparator {
      public Comparator() {
        super(IntPair.class);
      }

      public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {return compareBytes(b1, s1, l1, b2, s2, l2);
      }
    }

    static { // register this comparator
      WritableComparator.define(IntPair.class, new Comparator());
    }

为什么需要重写compare方法。从super(IntPair.class)深入看看父类中的代码

  private final Class<? extends WritableComparable> keyClass;
  private final WritableComparable key1;
  private final WritableComparable key2;
  private final DataInputBuffer buffer;

  protected WritableComparator() {
    this(null);
  }

  /** Construct for a {@link WritableComparable} implementation. */
  protected WritableComparator(Class<? extends WritableComparable> keyClass) {
    this(keyClass, null, false);
  }

  protected WritableComparator(Class<? extends WritableComparable> keyClass,
      boolean createInstances) {
    this(keyClass, null, createInstances);
  }

  protected WritableComparator(Class<? extends WritableComparable> keyClass,
                               Configuration conf,
                               boolean createInstances) {
    this.keyClass = keyClass;
    this.conf = (conf != null) ? conf : new Configuration();
    if (createInstances) {
      key1 = newKey();
      key2 = newKey();
      buffer = new DataInputBuffer();
    } else {
      key1 = key2 = null;
      buffer = null;
    }
  }

super(IntPair.class)实例化的WritableComparator中的key1,key2,buffer均为空。如果调用默认的compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2):

  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    try {
      buffer.reset(b1, s1, l1);                   // parse key1
      key1.readFields(buffer);
      
      buffer.reset(b2, s2, l2);                   // parse key2
      key2.readFields(buffer);
      
      buffer.reset(null, 0, 0);                   // clean up reference
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    
    return compare(key1, key2);                   // compare them
  }

buffer为空,这里会空指针。所以重写compare方法,让其直接调用compareBytes方法。而其中不在涉及到序列化后的IntPair,compare(Object a, Object b)将不会调用,我们也不需要重写。

  public static int compareBytes(byte[] b1, int s1, int l1,
                                 byte[] b2, int s2, int l2) {
    return FastByteComparisons.compareTo(b1, s1, l1, b2, s2, l2);
  }

排序之后开始记录数据,如果totalIndexCacheMemory >= indexCacheMemoryLimit,就会向文件中溢写。
sortAndSpill()执行完成后mergeParts()合并。这又是一个大过程。在想想这里。最终的结果是将相同的key合并成一组,并复制文件到目标区。
collector.flush()执行完成进入collector.close()方法,这是一个空方法。
之后是一系列的关闭资源,关闭通讯线程等一些操作,map端执行完成。

上面就是map端的执行过程,在重新看一下,首先根据InputFormat将文件分块,在将文件转成key/value的RecordReader传给map处理,经过map处理过转成key/values的键值对,然后会通过分区器(Partitioner)确定应该输出在哪个reduce,然后进行文件的sort,通过IntPair.Comparator.compare()方法进行排序。这里起名为secondarysort,其实还是只进行了依次排序,只不过这依次排序会根据fisrt和second进行排序(类似于我们在java中实现了Comparator接口的类),这个compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法是进行一个字节一个字节的比较,知道找到不一样的字节,在判断哪个字节大。在map端已经是拍好顺序的文件了。

reduce端。

ReduceTask.run()  初始化,获取各种需要的变量后shuffle。这里属于shuffle的后半段,前半段是在map端,就是在sortAndSpill()方法中,不断的进行merge,reduce端不断的获取数据。在reduce端也是一样的道理,先不断的copy map的输出文件,启动的Fetcher线程。不断的Merge,数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。merge有三种形式:1)内存到内存  2)内存到磁盘  3)磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也会启用,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的那个文件。最终的文件可能在磁盘,可能在内存中。shuffle结束后才向下进行。
进入runNewReducer()
--> reducer.run() 在调用context.nextKey()时 调用的是Reducer.Context.next() --> ReduceContextImpl.nextKey --> ReduceContextImpl.nextKeyValue() 中的key = keyDeserializer.deserialize(key);就是调用自己写的IntPait的readFields方法。继续向下在方法最后执行了comparator.compare()方法就是自定义的FirstGroupingComparator.compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法。comparator是ReduceContextImpl的成员变量,赋值的时候是在开始设置变量的时候。这里原理和上面map端分析的过程一样,也解释了为什么compare(IntPair o1, IntPair o2)不执行的原因。
setGroupingComparatorClass()这个部分设置的分组类的作用并没有做排序功能,reduce端的作用就是获取map端的输出文件,一条一条读取。设置这个的作用是在reduce读取一条记录后,还会判断下面的一条数据和本条数据是不是在一个组里,而在不在一个组里调用的方法就是FirstGroupingComparator.compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)方法。迭代器的next方法--> ReduceContextImpl.next() 这个方法里会执行一下nextKeyValue()方法,在nextKeyValue()方法中如果hasMore重新计算nextKeyIsSame。

    //ReduceContextImpl.next() --> ReduceContextImpl.nextKeyValue() 中代码片段
hasMore = input.next(); if (hasMore) { nextKey = input.getKey(); nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, currentRawKey.getLength(), nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition() ) == 0; } else { nextKeyIsSame = false; }

 comparator.compare会调用自定义的FirstGroupingComparator.compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) --> WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8, b2, s2, Integer.SIZE / 8),这里并没有将长度l1和l2传进去,传的是Integer.SIZE / 8,Integer.SIZE是32。l1 l2代表的是length。就只取前四个。
看一下怎么实现的只读取first值进行比较的。上面代码中currentRawKey和nextKey的赋值方式。

currentRawKey.set(nextKey.getData(), nextKey.getPosition(), nextKey.getLength() - nextKey.getPosition());

nextKey = input.getKey();

自定义读取二进制和转换二进制的方式:

public void readFields(DataInput in) throws IOException {
      first = in.readInt() + Integer.MIN_VALUE;
      second = in.readInt() + Integer.MIN_VALUE;
    }
    public void write(DataOutput out) throws IOException {out.writeInt(first - Integer.MIN_VALUE);
      out.writeInt(second - Integer.MIN_VALUE);
    }

自定义的FirstGroupingComparator.compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)的参数全部输出出来(只看一组)

    @Override
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {for (byte b : b1) {
  System.out.print(b + " - ");
}
System.out.println();
for (byte b : b2) {
  System.out.print(b + " - ");
}
System.out.println();
System.out.println(s1 + " " + l1 + " " +s2 + " " + l2);
      return WritableComparator.compareBytes(b1, s1, Integer.SIZE / 8, b2, s2, Integer.SIZE / 8);
    }
输出:

-128 - 0 - 0 - 123 - -128 - 0 - 0 - 23 - 0 - 0 - 0 - 0 -
-128 - 0 - 0 - 123 - -128 - 0 - 0 - 120 - 0 - 0 - 0 - 0 - 0 - 0 - 0 - 0 -
0 8 0 8

以下的观点只是我的推测,这个地方没弄太明白:
Integer.size / 8 就是Integer所占的字节数 4。在输出中,第一个数是-128 注意是一个 “-”号,而第四个字符就是我们sort文件中的左边的值123,在向后四个是右边的数字23。下边一样,但是后边又多出了四个字符。多的原因应该是currentRawKey.getBytes()和nextKey.getData()的数据结构不一样。但是前面有效占位都是一样的,是通过map端调用write()方法写进的。Integer占四个字节,头四个字节组成了一个Integer的值,为什么是负数,out.writeInt(first - Integer.MIN_VALUE)是这么写进的。紧接着又写入右边的值out.writeInt(second - Integer.MIN_VALUE);为什么又多出来四个0,不知道和OutPutStraming.writeInt()方法有没有关系。总之,前面已经读取到想要的数据了。参数中的s1、s2是offset,l1、l2是length向对比的长度。b1、b2就是想要比较的数据。取前四个也就是取第一个读进out中的数据也就是IntPair中的first。
在判断是不是同一组的时候,Reduce端读取的是IntPair类型的Key,读取的结果是 {(123 23) 23} {(123 128) 128}这种数据,小括号表示IntPair类型,大括号表示一组数据,自行忽略。读过来后,把Key进行比较,上面说的compare方法,在compare方法中比较(123 23)和(123 128)中的第一个提取出来比较,最后发现一样,比较结果返回0。那么这两个数据就在一个reduce中处理。不断重复。

迭代器的hasNext()的结果根据nextKeyIsSame获取

   //ReduceContextImpl.class 
public boolean hasNext() { try { if (inReset && backupStore.hasNext()) { return true; } } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("hasNext failed", e); } return firstValue || nextKeyIsSame; }

在外层调用reduce()方法的循环中

while (context.nextKey()) {
        reduce(context.getCurrentKey(), context.getValues(), context);
        // If a back up store is used, reset it
        Iterator<VALUEIN> iter = context.getValues().iterator();
        if(iter instanceof ReduceContext.ValueIterator) {
          ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
        }
      }

上面加黑部分应该能理解,nextKey() --> WrapperReduce.Context.nextKey() --> ReduceContextImpl.nextKey()

  //ReduceContextImpl.class
  public boolean nextKey() throws IOException,InterruptedException {
    while (hasMore && nextKeyIsSame) {
      nextKeyValue();
    }
    if (hasMore) {
      if (inputKeyCounter != null) {
        inputKeyCounter.increment(1);
      }
      return nextKeyValue();
    } else {
      return false;
    }
  }

又是根据hasMore和nextKeyIsSame判断,又会走进nextKeyValue()方法中。
最终的意思就是如果下一个的first的值相同,那么就是同一组,就会在同一个reduce里处理。这里并没有排序,只是顺序读取数据和判断一下。

reduce结束,关闭该关闭的资源、线程,删除临时文件。最终程序结束。

刚接触hadoop不久,好多地方还不明白,如果有错误的地方希望指出,这只是作为自己理解MapReduce的鉴证。
参考:http://www.cnblogs.com/datacloud/p/3584640.html
http://www.aboutyun.com/forum.php?mod=viewthread&tid=9366&highlight=hadoop2%CC%E1%BD%BB%B5%BDYarn

原文地址:https://www.cnblogs.com/badboyf/p/6274685.html