MapReduce自定义InputFormat,RecordReader

MapReduce默认的InputFormat是TextInputFormat,且key是偏移量,value是文本,自定义InputFormat需要实现FileInputFormat,并重写createRecorder方法,如果需要还可以重写isSplitable()来设置是否切片,重写了createRecordReader还需要自定义RecordReader,InputFormat规定了key,value是什么,而RecordReader则是具体的读取逻辑,下面的例子是合并小文件,最终输出的k是文件路径,v是文件二进制字节

1.InputFormat

 1 /**
 2  * 自定义InputFormat规定读取文件的k,v
 3  * @author tele
 4  *
 5  */
 6 public class MyInputFormat extends FileInputFormat<NullWritable,BytesWritable>{
 7     /**
 8      * 设置不切片,把小文件作为一个整体
 9      */
10     @Override
11     protected boolean isSplitable(JobContext context, Path filename) {
12         return false;
13     }
14     
15     @Override
16     public RecordReader<NullWritable,BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
17             throws IOException, InterruptedException {
18         MyRecordReader recordReader = new MyRecordReader();
19         recordReader.initialize(split, context);
20         return recordReader;
21     }
22 }

2.RecordReader

 1 /**
 2  * recordreader用于读取文件内容,输出文件内容即可,文件路径信息保存在split中
 3  * @author tele
 4  *
 5  */
 6 public class MyRecordReader extends RecordReader<NullWritable,BytesWritable> {
 7     FileSplit split;
 8     BytesWritable value = new BytesWritable();
 9     boolean flag = false;
10     Configuration conf;
11     int count = 0;
12     
13     /**
14      * 初始化
15      */
16     @Override
17     public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
18         this.split = (FileSplit) split;
19         conf = context.getConfiguration();    conf = context.getConfiguration();
20     }
21 
22     /**
23      * 业务逻辑处理,这个方法用来判断是否还有文件内容需要读取,会进入两次,第一次读取内容存入value中,返回true,第二次调用返回false
24      * 只要返回true,就会调用getCurrentKey().getCurrentValue()把内容返回给map
25      * 
26      */
27     @Override
28     public boolean nextKeyValue() throws IOException, InterruptedException {
29         count++;
30         if(!flag) {
31             //获取fs
32             FileSystem fs = FileSystem.get(conf);
33             //开启流
34             Path path = this.split.getPath();
35             FSDataInputStream fsDataInputStream = fs.open(path);
36             long length = this.split.getLength();
37             byte[] buf = new byte[(int) length];
38             
39             //读取
40             IOUtils.readFully(fsDataInputStream, buf, 0,buf.length);
41             value.set(buf, 0, buf.length);
42             
43             //关闭流
44             IOUtils.closeStream(fsDataInputStream);
45             flag = true;
46         }else {
47             flag = false;
48         }
49         return flag;
50     }
51 
52     @Override
53     public NullWritable getCurrentKey() throws IOException, InterruptedException {
54         return NullWritable.get();
55     }
56 
57     @Override
58     public BytesWritable getCurrentValue() throws IOException, InterruptedException {
59         return value;
60     }
61 
62     @Override
63     public float getProgress() throws IOException, InterruptedException {
64         return flag?1:0;
65     }
66 
67     @Override
68     public void close() throws IOException {
69         
70     }
71 }

3.Mapper

 1 /**
 2  * 把结果输出到SequenceFileOutPutFormat中,输出的key是文件路径,value为文件内容
 3  * @author tele
 4  *
 5  */
 6 public class InputformatMapper extends Mapper<NullWritable, BytesWritable, Text,BytesWritable/*Text*/> {
 7     Text k = new Text();      
 8 
 9     @Override
10     protected void map(NullWritable key, BytesWritable value,
11             Mapper<NullWritable, BytesWritable, Text, BytesWritable/*Text*/>.Context context)
12             throws IOException, InterruptedException {
13         FileSplit split = (FileSplit) context.getInputSplit();
14         Path path = split.getPath();
15         
16         k.set(path.toString());
17         
18     /*    String result = new String(value.getBytes(),0,value.getLength());
19         context.write(k,new Text(result));*/
20         
21         context.write(k, value);
22     }
23 }

4.Driver(由于输出的是字节,需要指定OutputFormat为SequenceFileOutputFormat)

 1 /**
 2  * 驱动
 3  * @author tele
 4  *
 5  */
 6 public class InputformatDriver {
 7     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
 8         //1.获得job实例
 9         Configuration conf = new Configuration();
10         Job job = Job.getInstance(conf);
11         
12         //2.关联class
13         job.setJarByClass(InputformatDriver.class);
14         job.setMapperClass(InputformatMapper.class);
15         
16         
17         //4.设置format
18         job.setInputFormatClass(MyInputFormat.class);
19         //使用SequenceFileOutputFormat作为输出格式
20         job.setOutputFormatClass(SequenceFileOutputFormat.class);
21         
22         //5.数据类型
23         job.setOutputKeyClass(Text.class);
24         job.setOutputValueClass(BytesWritable.class);
25         
26     //    job.setOutputValueClass(Text.class);
27 
28         //6.设置输入与输出路径
29         FileInputFormat.setInputPaths(job,new Path(args[0]));
30         FileOutputFormat.setOutputPath(job,new Path(args[1]));
31         
32         //7.提交
33         boolean result = job.waitForCompletion(true);
34         System.exit(result?0:1);
35     }
36 }

 

原文地址:https://www.cnblogs.com/tele-share/p/9688174.html