Java搭建MapReduce输出到DB具体步骤

1、构建新的作业

Configuration conf=getConf();
String driverClassName="com.mysql.cj.jdbc.Driver";
String dburl="jdbc:mysql://x:3306/mr_test";
DBConfiguration.configureDB(conf, driverClassName, dburl, "root", "root");
Job job=Job.getInstance(conf);
job.setJarByClass(MaxTempSaveDB.class);

2、设置输入输出

FileInputFormat.addInputPath(job, new Path(conf.get("inpath")));
DBOutputFormat.setOutput(job, "maxTmp", "year", "tmp");

3、设置自定义Mapper Reducer

job.setMapperClass(MaxTempSaveDBMapper.class);
job.setReducerClass(MaxTempSaveDBReducer.class);

4、设置Mapper Reducer输出类型

job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(YearTmp.class);
job.setOutputValueClass(NullWritable.class);

5、等待程序运行结束退出JVM

return job.waitForCompletion(true)?0:1;

注意事项:

1、Reducer阶段context.write只有key值能写入DB中,所以需要创建类传给key接收传递数据

2、传递给key的类中要重写write和readFields方法

package com.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/*统计每年的最高气温*/
public class MaxTempSaveDB extends Configured implements Tool {
    /*自定义Mapper类*/
    public static class MaxTempSaveDBMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] info = value.toString().split(" ");
            context.write(new IntWritable(Integer.parseInt(info[0])),new IntWritable(Integer.parseInt(info[3])));
        }
    }
    /*自定义Reduce类
     * */
    public static class MaxTempSaveDBReducer extends Reducer<IntWritable, IntWritable, YearTmp, NullWritable> {
        @Override
        protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int max=0;
            for(IntWritable v:values){
                if(v.get()>max){
                    max=v.get();
                }
            }
            YearTmp maxkey=new YearTmp(key.get(), max);
            context.write(maxkey, NullWritable.get());
        }
    }
    /*
     * 构建main方法*/
    public static void main(String[] args) throws Exception {
        int result=ToolRunner.run(new MaxTempSaveDB(), args);
        System.out.println(result);
    }
    @Override
    public int run(String[] strings) throws Exception {
        //1、构建新的作业
        Configuration conf=getConf();
        String driverClassName="com.mysql.cj.jdbc.Driver";
        String dburl="jdbc:mysql://x:3306/mr_test";
        DBConfiguration.configureDB(conf, driverClassName, dburl, "root", "root");
        Job job=Job.getInstance(conf);
        job.setJarByClass(MaxTempSaveDB.class);
        //2、设置输入输出路径
        FileInputFormat.addInputPath(job, new Path(conf.get("inpath")));
        DBOutputFormat.setOutput(job, "maxTmp", "year", "tmp");
        //3、设置Mapper和Reducer
        job.setMapperClass(MaxTempSaveDBMapper.class);
        job.setReducerClass(MaxTempSaveDBReducer.class);
        //4、设置Mapper Reducer输出类型
        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(YearTmp.class);
        job.setOutputValueClass(NullWritable.class);
        //5、等待程序运行推出JVM
        return job.waitForCompletion(true)?0:1;
    }
}
原文地址:https://www.cnblogs.com/qiangang/p/13697906.html