hadoop之MapReduce输入(split)输出

hadoop之MapReduce输入(split)输出

Split分割
在执行mapreduce之前,原始数据被分割成若干split,每个split作为一个map任务的输入,
在map执行过程中split会被分解成一个个记录(key-value对),map会依次处理每一个记录。
(Key:偏移量,不是行数)
FileInputFormat:
        FileInputFormat是所有以文件作为数据源的InputFormat实现的基类,FileInputFormat
        保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。至于获得记录的
        方法是有不同的子类进行实现的;
        1) FileInputFormat只划分比HDFS block大的文件,所以FileInputFormat划分的结果是
           这个文件或者是这个文件中的一部分.                
        2) 如果一个文件的大小比block小,将不会被划分,这也是Hadoop处理大文件的效率要比
           处理很多小文件的效率高的原因。
        3) 当Hadoop处理很多小文件(文件大小小于hdfs block大小)的时候,由于FileInputFormat
          不会对小文件进行划分,所以每一个小文件都会被当做一个split并分配一个map任务,导致
          效率底下。
        例如:
             一个1G的文件,会被划分成16个64MB的split,并分配16个map任务处理,而10000个
             100kb的文件会被10000个map任务处理。
 
获得了输入文件后,FileInputFormat是怎样将他们划分成splits的呢?
input file -->split -->map task
计算SplitSize的函数很简单:
       splitSize = max(minsize,min(maxSize,blockSize)) = 64M;
       maxSize = mapred.max.split.size 默认最大值整数值
       minSize = mapred.min.split.size 默认0
 
那么我们如何处理同一行垮Split问题呢?
首先map任务getSplit读入一个split-->recordReader一行一行读取数据,如果有一行数据在
两个split中,map读入第一个split后,会去读取留在另一个split中的半行;然而另一个map
读入第二个split时,会自动跳过第一个换行符;
 
  //此方法每次只读取一行数据,key为偏移量,value为本行数据
public void map(Object key, Text value, Context context)
   throws IOException, InterruptedException {
        StringTokenizer itr = new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
   }
}            
分割成split(不支持的除外)比如:1G的文件切割成64M,放到一个map里面,如果不支持直接把1G
放到map里面
解释map方法中的(key偏移量-value对):
abcdefghigklmnopqrstuvwxyz        key = 0     value=abcdefghigklmnopqrstuvwxyz  
abcdefghigklmnopqrstuvwxyz        key = 26   value=abcdefghigklmnopqrstuvwxyz  
abcdefghigklmnopqrstuvwxyz        key = 52   value=abcdefghigklmnopqrstuvwxyz    
 
 
 
 
 
 
 
分类: Hadoop
原文地址:https://www.cnblogs.com/Leo_wl/p/3042650.html