MapReduce的DBOutputFormat使用

package com.mengyao.hadoop.mapreduce;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 使用DBOutputFormat类将HDFS上/mapreduces/dboutput目录下的结构化数据写入到数据库中,结构化数据文件如下(第一列为key),
 *             0    1    New Balance 999复古跑鞋    ML999GY        ML999GY        999.0
 *             1    2    New Balance 999复古跑鞋    ML999BB        ML999BB        999.0
 *             2    3    New Balance 996复古跑鞋    MRL996DG    MRL996DG    819.0
 *             3    4    New Balance 996复古跑鞋    MRL996EM    MRL996EM    819.0
 *             4    5    New Balance 996复古跑鞋    MRL996ES    MRL996ES    819.0
 * 
 * 一定要记住在使用MapReduce操作数据库时一定要添加JDBC驱动jar包到Hadoop的classpath中,否则会报无法加载JDBC Driver类异常,如下:
 *       1、我这里添加到/usr/local/installs/hadoop/share/hadoop/mapreduce/lib/mysql-connector-java-5.1.26-bin.jar这里了,务必要重启集群使classpath生效。
 *       2、将JDBC驱动jar包打包到这个MapReduce作业jar包中。
 * 如果写入到数据库的数据中出现乱码,请在JDBC连接url中设置编码,例如:jdbc:mysql://host:3306/dbName?useUnicode=true&characterEncoding=utf8
 * 
 * @author mengyao
 *
 */
public class DBOutputFormatApp extends Configured implements Tool {

    /**
     * 这个JavaBean需要实现Hadoop的序列化接口Writable和与数据库交互时的序列化接口DBWritable
     * 官方源码定义如下:
     *         public class DBOutputFormat<K  extends DBWritable, V> extends OutputFormat<K,V> 
     *         也就是说DBOutputFormat要求输出的key必须是继承自DBWritable的类型
     * @author mengyao
     *
     */
    static class ProductWritable implements Writable, DBWritable {
        
        private long id;            // bigint(20) NOT NULL AUTO_INCREMENT,
        private String name;        // varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '商品名称',
        private String model;        // varchar(30) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '型号',
        private String color;        // varchar(10) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '颜色',
        private double price;        // decimal(10,0) DEFAULT NULL COMMENT '售价'
        
        public void set(long id, String name, String model,
                String color, double price) {
            this.id = id;
            this.name = name;
            this.model = model;
            this.color = color;
            this.price = price;
        }

        @Override
        public void write(PreparedStatement ps) throws SQLException {
            ps.setLong(1, id);
            ps.setString(2, name);
            ps.setString(3, model);
            ps.setString(4, color);
            ps.setDouble(5, price);
        }

        @Override
        public void readFields(ResultSet rs) throws SQLException {
            this.id = rs.getLong(1);
            this.name = rs.getString(2);
            this.model = rs.getString(3);
            this.color = rs.getString(4);
            this.price = rs.getDouble(5);
        }

        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(id);
            out.writeUTF(name);
            out.writeUTF(model);
            out.writeUTF(color);
            out.writeDouble(price);
        }

        @Override
        public void readFields(DataInput in) throws IOException {
            this.id = in.readLong();
            this.name = in.readUTF();
            this.model = in.readUTF();
            this.color = in.readUTF();
            this.price = in.readDouble();
            
        }
        
    }
    
    static class DBOutputFormatMapper extends Mapper<LongWritable, Text, NullWritable, ProductWritable> {
        
        private NullWritable outputKey;
        private ProductWritable outputValue;
        
        @Override
        protected void setup(
                Mapper<LongWritable, Text, NullWritable, ProductWritable>.Context context)
                throws IOException, InterruptedException {
            this.outputKey = NullWritable.get();
            this.outputValue = new ProductWritable();
        }
        
        @Override
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, NullWritable, ProductWritable>.Context context)
                throws IOException, InterruptedException {
            //插入数据库成功的计数器
            final Counter successCounter = context.getCounter("exec", "successfully");
            //插入数据库失败的计数器
            final Counter faildCounter = context.getCounter("exec", "faild");
            //获取DFS上已有的机构化数据
            final String[] fields = value.toString().split("\t");
            //DBOutputFormatApp这个MapReduce应用导出的数据包含long类型的key,所以忽略key从1开始
            if (fields.length > 5) {
                long id = Long.parseLong(fields[1]);
                String name = fields[2];
                String model = fields[3];
                String color = fields[4];
                double price = Double.parseDouble(fields[5]);
                this.outputValue.set(id, name, model, color, price);
                context.write(outputKey, outputValue);
                //如果插入数据库成功则递增1,表示成功计数
                successCounter.increment(1L);
            } else {
                //如果插入数据库失败则递增1,表示失败计数
                faildCounter.increment(1L);
            }
        }
    }
    
    /**
     * DBOutputFormatReducer
     *         输入的key即DBOutputFormatMapper输出的key
     *         输出的value即DBOutputFormatMapper输出的value
     *         输出的key必须是继承自DBWritable的类型(DBOutputFormat要求输出的key必须是继承自DBWritable的类型)
     *         输出的value无所谓,这里我使用的是NullWritable作为占位输出
     * @author mengyao
     *
     */
    static class DBOutputFormatReducer extends Reducer<NullWritable, ProductWritable, ProductWritable, NullWritable> {
        @Override
        protected void reduce(NullWritable key, Iterable<ProductWritable> value,
                Reducer<NullWritable, ProductWritable, ProductWritable, NullWritable>.Context context)
                throws IOException, InterruptedException {
            for (ProductWritable productWritable : value) {
                context.write(productWritable, key);                            
            }
        }
    }
    
    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        //在创建Configuration对象时紧跟着配置当前作业需要使用的JDBC配置
        DBConfiguration.configureDB(
                conf, 
                "com.mysql.jdbc.Driver", 
                "jdbc:mysql://192.168.1.104:3306/test?useUnicode=true&characterEncoding=utf8", 
                "root", 
                "123456");
        Job job = Job.getInstance(conf, DBOutputFormatApp.class.getSimpleName());
        job.setJarByClass(DBOutputFormatApp.class);
        
        job.setInputFormatClass(TextInputFormat.class);
        FileInputFormat.setInputPaths(job, args[0]);
        
        job.setMapperClass(DBOutputFormatMapper.class);
        job.setMapOutputKeyClass(NullWritable.class);
        job.setMapOutputValueClass(ProductWritable.class);
        
        job.setPartitionerClass(HashPartitioner.class);
        job.setNumReduceTasks(1);
        
        job.setReducerClass(DBOutputFormatReducer.class);
        job.setOutputKeyClass(ProductWritable.class);
        job.setOutputValueClass(NullWritable.class);
        
        job.setOutputFormatClass(DBOutputFormat.class);
        //配置当前作业输出到数据库的表、字段信息
        DBOutputFormat.setOutput(job, "product", new String[]{"id", "name", "model", "color", "price"});
        
        return job.waitForCompletion(true)?0:1;
    }
    
    public static int createJob(String[] args) {
        Configuration conf = new Configuration();
        conf.set("dfs.datanode.socket.write.timeout", "7200000");
        conf.set("mapreduce.input.fileinputformat.split.minsize", "268435456");
        conf.set("mapreduce.input.fileinputformat.split.maxsize", "536870912");
        int status = 0;
        
        try {
            status = ToolRunner.run(conf, new DBOutputFormatApp(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        return status;
    }
    
    public static void main(String[] args) {
        args = new String[]{"/mapreduces/dboutput"};
        if (args.length!=1) {
            System.out.println("Usage: "+DBOutputFormatApp.class.getName()+" Input paramters <INPUT_PATH>");
        } else {
            int status = createJob(args);
            System.exit(status);
        }
    }

}
原文地址:https://www.cnblogs.com/mengyao/p/4865595.html