MapReduce的类型与格式

MapReduce的类型

  • 默认的MR作业

  1. 默认的mapper是Mapper类,它将输入的键和值原封不动地写到输出中
  2. 默认的partitioner是HashPartitioner,它对每条记录的键进行哈希操作以决定该记录应该属于哪个分区(每个分区对应于一个reduce任务)
  3. 默认的reducer是Reducer类,它将所有的输入写到输出中
  4. map任务的数量等于输入文件被划分成的块数
  5. reduce任务的个数的选择: 一个经验法则是目标reducer保持在每个运行5分钟左右且产生至少一个HDFS块的输出比较合适
  6. 默认的输入格式是TexInputFormat,输出是TextOutpFormat
  • 默认的streaming作业

输入格式

     输入分片与记录 

  1. 一个输入分片就是由单个map操作来处理的数据块,并且每一个map只处理一个分片、
  2. 每个输入分片分为若干个记录,每条记录就是 一个键值对,map将一个接一个地处理记录
  3. 输入分片和记录都是逻辑概念,不一定对应着文件,也可能对应其他数据形式,如对于数据库,输入分片就是对应于一个表上的若干行,一条记录对应着其中的一行
  4. 输入分片只是指向数据的引用,不包含数据本身
    1. InputSpilt接口(Java中的实现,开发人员不必直接处理InputSplit,因为它是由InputFormat创建的),包含 
      • 以字节为单位的长度,表示分片的大小,用以排序分片,以便优先处理最大的分片,从而最小化作业运行时间
      • 一组存储位置,供MR系统使用一边将map任务尽可能放在分片数据附近
      • 该接口由InputFormat创建
    2. InputFormat 
      • 运行作业的客户端使用getSplits方法计算分片,并将结果告知application master,后者使用其存储信息来调度map任务从而在集群集群上处理这些分片数据
      • map任务将输入分片传给createRecordReader方法来获取这个分片的RecordReader(就像是记录上的迭代器),map任务用这个RecordReader来生成记录的键值对,然后再将键值对传递给map函数(参见run方法)

    

InputFormat是MapReduce中一个很常用的概念,它在程序的运行中到底起到了什么作用呢?

 

InputFormat其实是一个接口,包含了两个方法:    

public interface InputFormat<K, V> {
  InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
  RecordReader<K, V> getRecordReader(InputSplit split,
                                     JobConf job,
                                     Reporter reporter) throws IOException;
}
这两个方法有分别完成着以下工作:
     方法getSplits将输入数据切分成splits,splits的个数即为map tasks的个数,splits的大小默认为块大小,即128M
     方法getRecordReader将每个split解析成records, 再依次将record解析成<K,V>对
也就是说InputFormat完成以下工作:
             InputFile --> splits --> <K,V>  : map任务把输入分片传给InputFormat的createRecordReader()方法来获得这个分片的RecordReader, RecordReader就像是记录上的迭代器,map任务用一个RecordReader来生成记录的键-值对,然后再传递给map函数进行处理。 
 

系统常用的 InputFormat 又有哪些呢?

在领会自定义 InputFormat 之前,需要弄懂一下几个抽象类、接口及其之间的关系:

InputFormat(interface), FileInputFormat(abstract class), TextInputFormat(class)RecordReader(interface), LineRecordReader(class)的关系

     FileInputFormat implements InputFormat
     TextInputFormat extends FileInputFormat
     TextInputFormat.getRecordReader calls LineRecordReader
     LineRecordReader implements RecordReader
 
对于InputFormat接口,上面已经有详细的描述
再看看FileInputFormat,它实现了InputFormat接口中的getSplits方法,而将getRecordReader与isSplitable留给具体类(如TextInputFormat)实现,isSplitable方法通常不用修改,所以只需要在自定义的InputFormat中实现
getRecordReader方法即可,而该方法的核心是调用LineRecordReader(即由LineRecorderReader类来实现 "将每个split解析成records, 再依次将record解析成<K,V>对"),该方法实现了接口RecordReader
public interface RecordReader<K, V> {
  boolean next(K key, V value) throws IOException;
  K createKey();
  V createValue();
  long getPos() throws IOException;
  public void close() throws IOException;
  float getProgress() throws IOException;
}
因此自定义InputFormat的核心是自定义一个实现接口RecordReader类似于LineRecordReader的类,该类的核心也正是重写接口RecordReader中的几大方法,
     定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader
     
    示例,数据每一行为 “物体,x坐标,y坐标,z坐标”
          ball 3.5,12.7,9.0
          car 15,23.76,42.23
          device 0.0,12.4,-67.1
    每一行将要被解析为<Text, Point3D>(Point3D是自定义的数据类型)

 

 则自定义的ObjectPositionInputFormat 类的编写如下

public class ObjectPositionInputFormat extends FileInputFormat<Text, Point3D> {
//如果是要指定的输入文件不被分片,则重写isSplitable()方法
@override
protected boolean isSplitable(JobContext context, Path file){
return false; /默认是true false表示不分片
}
@override
public RecordReader<Text, Point3D> getRecordReader (InputSplit input, JobConf job, Reporter reporter)throws IOException { reporter.setStatus(input.toString()); return new ObjPosRecordReader(job, (FileSplit)input); } }

     

原文地址:https://www.cnblogs.com/tongxupeng/p/10408878.html