MapReduce 简单开发

先给出 maven 依赖配置

    <properties>
        <hadoop.version>2.6.0</hadoop.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.7</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.6</version>
        </dependency>
    </dependencies>

打包的话可以参考之前的 build 配置 把所有的 jar 打包成一个

执行命令

hadoop jar dst.jar cc.stdpain.XXX $INPUT_PATH/ $OUTPUT_PATH/

MapReduce 包括Map 和 Reduce 两个过程

我们可以使用继承的方式开发

public static class StdMapper extends Mapper<Object, Text, Text, Text> {
        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			/**
				编写一些解析方法
			*/
            context.write(new Text(key.toString()), new Text(line));
        }
    }

Reduce 开发

    public static class StdReducer extends Reducer<Text, Text, NullWritable, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws Exception {
            //相同的key会放在一起 进行一些Reduce 操作
            for (Text text : values) {
                //如果不需要Reduce操作也不需要key,就可以使用 NullWritable
                context.write(NullWritable.get(), text);
            }
        }
    }

Main

Configuration conf = new Configuration();
//获取参数 args 为main中的 args
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
    System.out.println("Error param");
    System.exit(-1);
}

设置任务

Job job = new Job(conf, "XXXJOB");
job.setJarByClass(XXXMain.class);//main的class
job.setMapperClass(XXXMain.StdMapper.class);//Mapper的class
job.setReducerClass(XXXMain.StdReducer.class);//Reducer的class

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

设置输入输出路径

String inputPath = otherArgs[(otherArgs.length - 2)];
FileSystem fs = FileSystem.get(conf);
Path path = new Path(inputPath);
FileStatus[] fileStatuses = fs.listStatus(path);
for (FileStatus fileStatus : fileStatuses) {
    if (fileStatus.getPath().getName().equals("_C_SUCCESS")) {
        continue;
    }
FileInputFormat.addInputPath(job, fileStatus.getPath());
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)]));
//压缩 不需要可以注释
//FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setCompressOutput(job, false);
System.exit(job.waitForCompletion(true) ? 0 : 1);
原文地址:https://www.cnblogs.com/stdpain/p/11722184.html