MapReduce的NLineInputFormat使用

  1 package com.mengyao.hadoop.mapreduce;
  2 
  3 import java.io.IOException;
  4 
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.conf.Configured;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.LongWritable;
  9 import org.apache.hadoop.io.NullWritable;
 10 import org.apache.hadoop.io.Text;
 11 import org.apache.hadoop.mapreduce.Job;
 12 import org.apache.hadoop.mapreduce.Mapper;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 17 import org.apache.hadoop.util.Tool;
 18 import org.apache.hadoop.util.ToolRunner;
 19 
 20 /**
 21  * NLineInputFormat前面的N表示每个Mapper收到输入的行数,N的默认输入行数是1。mapreduce.input.lineinputformat.linespermap属性实现N值的设定。如果希望使Mapper收到固定行数的输入可以使用该类实现。
 22  * 与TextInputFormat相同,key是文件中行的字节偏移量,value是行本身。
 23  * 通常情况下,对少量的输入行执行map任务是比较低效的(任务初始化的额外开销导致)。
 24  *
 25  * 使用NLineInputFormat设置Mapper任务每次输入处理4行,此处应用场景为获取图书大纲,找出所有的一级索引,读取输入HDFS目录下的文件/mapreduces/bookOutline.txt,内容如下:
 26  *     第1章    PostgresQL服务器简介
 27  *         1.1    为什么在服务器中进行程序设计
 28  *         1.2    关于本书的代码示例
 29  *         1.3    超越简单函数
 30  *         1.4    使用触发器管理相关数据
 31  *         1.5    审核更改
 32  *         1.6    数据清洗
 33  *         1.7    定制排序方法
 34  *         1.8    程序设计最佳实践
 35  *         1.8.1    KISS——尽量简单(keep it simple stupid)
 36  *         1.8.2    DRY——不要写重复的代码(don't repeat yourself)
 37  *         1.8.3    YAGNI——你并不需要它(you ain'tgonnaneedit)
 38  *         1.8.4    SOA——服务导向架构(service-oriented architecture)
 39  *         1.8.5    类型的扩展
 40  *         1.9    关于缓存
 41  *         1.10    总结——为什么在服务器中进行程序设计
 42  *         1.10.1    性能
 43  *         1.10.2    易于维护
 44  *         1.10.3    保证安全的简单方法
 45  *         1.11    小结
 46  *     第2章    服务器程序设计环境
 47  *         2.1    购置成本
 48  *         2.2    开发者的可用性
 49  *         2.3    许可证书
 50  *         2.4    可预测性
 51  *         2.5    社区
 52  *         2.6    过程化语言
 53  *         2.6.1    平台兼容性
 54  *         2.6.2    应用程序设计
 55  *         2.6.3    更多基础
 56  *         2.7    小结
 57  *     第3章    第一个PL/pgsQL函数
 58  *         3.1    为什么是PL/pgSQL
 59  *         3.2    PL/pgSQL函数的结构
 60  *         ...
 61  *
 62  * 输出到HDFS目录下的文件/mapreduces/nlineinputformat/part-r-00000,内容如下:
 63  *        第1章    PostgresQL服务器简介
 64  *        第2章    服务器程序设计环境
 65  *        第3章    第一个PL/pgsQL函数
 66  *
 67  * @author mengyao
 68  *
 69  */
 70 public class NLineInputFormatApp extends Configured implements Tool {
 71 
 72     static class NLineInputFormatMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
 73         
 74         private Text outputValue;
 75         
 76         @Override
 77         protected void setup(Context context)
 78                 throws IOException, InterruptedException {
 79             this.outputValue = new Text();
 80         }
 81         
 82         @Override
 83         protected void map(LongWritable key, Text value, Context context)
 84                 throws IOException, InterruptedException {
 85             final String line = value.toString();
 86             //如果行第一个字是“第”则认为是一级索引
 87             if (line.startsWith("第")) {
 88                 outputValue.set(line);
 89                 context.write(key, this.outputValue);
 90             }
 91         }
 92     }
 93     
 94     static class NLineInputFormatReducer extends Reducer<LongWritable, Text, Text, NullWritable> {
 95         
 96         private Text outputKey;
 97         private NullWritable outputValue;
 98         
 99         @Override
100         protected void setup(Context context)
101                 throws IOException, InterruptedException {
102             this.outputKey = new Text();
103             this.outputValue = NullWritable.get();
104         }
105         
106         @Override
107         protected void reduce(LongWritable key, Iterable<Text> value, Context context)
108                 throws IOException, InterruptedException {
109             outputKey.set(value.iterator().next());
110             context.write(this.outputKey, outputValue);
111         }
112     }
113     
114     @Override
115     public int run(String[] args) throws Exception {
116         Job job = Job.getInstance(getConf(), NLineInputFormatApp.class.getSimpleName());
117         job.setJarByClass(NLineInputFormatApp.class);
118         
119         NLineInputFormat.setNumLinesPerSplit(job, Integer.parseInt(args[0]));
120         job.setInputFormatClass(NLineInputFormat.class);
121         FileInputFormat.addInputPath(job, new Path(args[1]));
122         FileOutputFormat.setOutputPath(job, new Path(args[2]));
123         
124         job.setMapperClass(NLineInputFormatMapper.class);
125         job.setMapOutputKeyClass(LongWritable.class);
126         job.setMapOutputValueClass(Text.class);
127         
128         job.setReducerClass(NLineInputFormatReducer.class);
129         job.setOutputKeyClass(Text.class);
130         job.setOutputValueClass(NullWritable.class);
131         
132         return job.waitForCompletion(true)?0:1;
133     }
134 
135     public static int createJob(String[] args) {
136         Configuration conf = new Configuration();
137         conf.set("dfs.datanode.socket.write.timeout", "7200000");
138         conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456");
139         conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912");
140         int status = 0;
141         
142         try {
143             status = ToolRunner.run(conf, new NLineInputFormatApp(), args);
144         } catch (Exception e) {
145             e.printStackTrace();
146         }
147         
148         return status;
149     }
150     
151     public static void main(String[] args) {
152         args = new String[]{"4", "/mapreduces/bookOutline.txt", "/mapreduces/nlineinputformat"};
153         if (args.length!=3) {
154             System.out.println("Usage: "+NLineInputFormatApp.class.getName()+" Input paramters <LINE_NUMBER> <INPUT_PATH> <OUTPUT_PATH>");
155         } else {
156             int status = createJob(args);
157             System.exit(status);
158         }
159     }
160 
161 }
原文地址:https://www.cnblogs.com/mengyao/p/4865579.html