MapReduce的KeyValueTextInputFormat使用

  1 package com.mengyao.hadoop.mapreduce;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.conf.Configured;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.NullWritable;
  9 import org.apache.hadoop.io.Text;
 10 import org.apache.hadoop.mapreduce.InputFormat;
 11 import org.apache.hadoop.mapreduce.Job;
 12 import org.apache.hadoop.mapreduce.Mapper;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 17 import org.apache.hadoop.util.Tool;
 18 import org.apache.hadoop.util.ToolRunner;
 19 
 20 /**
 21  * KeyValueTextInputFormat默认将行的第一个制表符分隔,前面的是key,后面的是value。并且Mapper输入的k1、v1必须是 Text, Text。
 22  * mapreduce.input.keyvaluelinerecordreader.key.value.separator属性可以改变分隔符,默认为“\t”。官方API描述如下:
 23  *         An {@link InputFormat} for plain text files. Files are broken into lines.
 24  *         Either line feed or carriage-return are used to signal end of line.
 25  *         Each line is divided into key and value parts by a separator byte. If no
 26  *         such a byte exists, the key will be the entire line and value will be empty.
 27  *         The separator byte can be specified in config file under the attribute name
 28  *         mapreduce.input.keyvaluelinerecordreader.key.value.separator. The default
 29  *         is the tab character ('\t').
 30  *         public class KeyValueTextInputFormat extends FileInputFormat<Text, Text>
 31  *         
 32  * 此处应用场景为获取图书大纲,找出所有的一级索引,读取输入HDFS目录下的文件/mapreduces/bookOutline.txt,内容如下:
 33  *     第1章    PostgresQL服务器简介
 34  *         1.1    为什么在服务器中进行程序设计
 35  *         1.2    关于本书的代码示例
 36  *         1.3    超越简单函数
 37  *         1.4    使用触发器管理相关数据
 38  *         1.5    审核更改
 39  *         1.6    数据清洗
 40  *         1.7    定制排序方法
 41  *         1.8    程序设计最佳实践
 42  *         1.8.1    KISS——尽量简单(keep it simple stupid)
 43  *         1.8.2    DRY——不要写重复的代码(don't repeat yourself)
 44  *         1.8.3    YAGNI——你并不需要它(you ain'tgonnaneedit)
 45  *         1.8.4    SOA——服务导向架构(service-oriented architecture)
 46  *         1.8.5    类型的扩展
 47  *         1.9    关于缓存
 48  *         1.10    总结——为什么在服务器中进行程序设计
 49  *         1.10.1    性能
 50  *         1.10.2    易于维护
 51  *         1.10.3    保证安全的简单方法
 52  *         1.11    小结
 53  *     第2章    服务器程序设计环境
 54  *         2.1    购置成本
 55  *         2.2    开发者的可用性
 56  *         2.3    许可证书
 57  *         2.4    可预测性
 58  *         2.5    社区
 59  *         2.6    过程化语言
 60  *         2.6.1    平台兼容性
 61  *         2.6.2    应用程序设计
 62  *         2.6.3    更多基础
 63  *         2.7    小结
 64  *     第3章    第一个PL/pgsQL函数
 65  *         3.1    为什么是PL/pgSQL
 66  *         3.2    PL/pgSQL函数的结构
 67  *         ...
 68  *
 69  * 输出到HDFS目录下的文件/mapreduces/keyvaluetextinputformat/part-r-00000,内容如下:
 70  *        第1章
 71  *        第2章
 72  *        第3章
 73  *
 74  * @author mengyao
 75  *
 76  */
 77 public class KeyValueTextInputFormatApp extends Configured implements Tool {
 78 
 79     static class NLineInputFormatMapper extends Mapper<Text, Text, Text, NullWritable> {
 80         
 81         private NullWritable outputValue;
 82         
 83         @Override
 84         protected void setup(Context context)
 85                 throws IOException, InterruptedException {
 86             this.outputValue = NullWritable.get();
 87         }
 88         
 89         @Override
 90         protected void map(Text key, Text value, Context context)
 91                 throws IOException, InterruptedException {
 92             //如果key值不为空则认为是一级索引
 93             if (key.toString()!=null && !key.toString().isEmpty()) {
 94                 context.write(key, this.outputValue);
 95             }
 96         }
 97     }
 98     
 99     static class NLineInputFormatReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
100         
101         private NullWritable outputValue;
102         
103         @Override
104         protected void setup(Context context)
105                 throws IOException, InterruptedException {
106             this.outputValue = NullWritable.get();
107         }
108         
109         @Override
110         protected void reduce(Text key, Iterable<NullWritable> value, Context context)
111                 throws IOException, InterruptedException {
112             context.write(key, outputValue);
113         }
114     }
115     
116     @Override
117     public int run(String[] args) throws Exception {
118         Job job = Job.getInstance(getConf(), KeyValueTextInputFormatApp.class.getSimpleName());
119         job.setJarByClass(KeyValueTextInputFormatApp.class);
120         
121         job.setInputFormatClass(KeyValueTextInputFormat.class);
122         FileInputFormat.addInputPath(job, new Path(args[0]));
123         FileOutputFormat.setOutputPath(job, new Path(args[1]));
124         
125         job.setMapperClass(NLineInputFormatMapper.class);
126         job.setMapOutputKeyClass(Text.class);
127         job.setMapOutputValueClass(NullWritable.class);
128         
129         job.setReducerClass(NLineInputFormatReducer.class);
130         job.setOutputKeyClass(Text.class);
131         job.setOutputValueClass(NullWritable.class);
132         
133         return job.waitForCompletion(true)?0:1;
134     }
135 
136     public static int createJob(String[] args) {
137         Configuration conf = new Configuration();
138         conf.set("dfs.datanode.socket.write.timeout", "7200000");
139         conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456");
140         conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912");
141         conf.set("mapreduce.job.jvm.numtasks", "-1");        
142         conf.set("mapreduce.map.speculative", "false");        
143         conf.set("mapreduce.reduce.speculative", "false");    
144         conf.set("mapreduce.map.maxattempts", "4");            
145         conf.set("mapreduce.reduce.maxattempts", "4");        
146         conf.set("mapreduce.map.skip.maxrecords", "0");
147         int status = 0;
148         
149         try {
150             status = ToolRunner.run(conf, new KeyValueTextInputFormatApp(), args);
151         } catch (Exception e) {
152             e.printStackTrace();
153         }
154         
155         return status;
156     }
157     
158     public static void main(String[] args) {
159         args = new String[]{"/mapreduces/bookOutline.txt", "/mapreduces/keyvaluetextinputformat"};
160         if (args.length!=2) {
161             System.out.println("Usage: "+KeyValueTextInputFormatApp.class.getName()+" Input paramters <INPUT_PATH> <OUTPUT_PATH>");
162         } else {
163             int status = createJob(args);
164             System.exit(status);
165         }
166     }
167 
168 }
原文地址:https://www.cnblogs.com/mengyao/p/4865571.html