hadoop的自定义数据类型和与关系型数据库交互

 最近有一个需求就是在建模的时候,有少部分数据是postgres的,只能读取postgres里面的数据到hadoop里面进行建模测试,而不能导出数据到hdfs上去。

 读取postgres里面的数据库有两种方法,一种就是用hadoop的DBInputFormat(DBInputFormat在hadoop2.4.1的jar里面有两个包,import  

 org.apache.hadoop.mapreduce.lib.db包和org.apache.hadoop.mapred包,前者是较新的),另外一种就是postgres的CopyManager类。

 先说一说用DBInputFormat这个方法吧。

 首先在数据库里面创建一个表,插入几条数据测试用

 由于表里面的数据要用来做为map的输入Value,所以要自定义数据类型。

 hadoop要自定义数据类型要实现Writable接口,如果是Key要自定义数据类型那么就要实现WritableComparable接口,还要实现里面的比较方法。实现WritableComparable接 口在比较时要反序列话,比较麻烦,那么可以用继承WritableComparator类来实现字节流的比较。

 在配置DBInputFormat的输入参数时,必须要有一个数据类型实现DBWritable,所有在这里为Value自定义数据类型要实现DBWritable和Writable两个接口。

package com.qldhlbs.hadoop.demo0420;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;

public class PgDbWritable implements DBWritable, Writable{

    private Integer call_type_id;
    private String call_type;
    private String remark;
    
    public PgDbWritable() {
        
    }
    
    public PgDbWritable(Integer call_type_id, String call_type, String remark){
        
        set(call_type_id, call_type, remark);
    }
    
    public void set(Integer call_type_id, String call_type, String remark) {
        
        this.call_type_id = call_type_id;
        this.call_type = call_type;
        this.remark = remark;
    }
    
  //结果集读取 @Override
public void readFields(ResultSet set) throws SQLException { this.call_type_id = set.getInt(1); this.call_type = set.getString(2); this.remark = set.getString(3); }   
  
  //设置参数 @Override
public void write(PreparedStatement ps) throws SQLException { ps.setInt(1, this.call_type_id); ps.setString(2, this.call_type); ps.setString(3, this.remark); }
  //反序列化 @Override
public void readFields(DataInput in) throws IOException { this.call_type_id = in.readInt(); this.call_type = in.readUTF(); this.remark = in.readUTF(); }
  //序列化 @Override
public void write(DataOutput out) throws IOException { out.writeInt(this.call_type_id); out.writeUTF(this.call_type); out.writeUTF(this.remark); } public Integer getCall_type_id() { return call_type_id; } public String getCall_type() { return call_type; } public String getRemark() { return remark; } @Override public String toString() { return call_type_id + " " + call_type + " " + remark; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((call_type == null) ? 0 : call_type.hashCode()); result = prime * result + ((call_type_id == null) ? 0 : call_type_id.hashCode()); result = prime * result + ((remark == null) ? 0 : remark.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; PgDbWritable other = (PgDbWritable) obj; if (call_type == null) { if (other.call_type != null) return false; } else if (!call_type.equals(other.call_type)) return false; if (call_type_id == null) { if (other.call_type_id != null) return false; } else if (!call_type_id.equals(other.call_type_id)) return false; if (remark == null) { if (other.remark != null) return false; } else if (!remark.equals(other.remark)) return false; return true; } }

首先在PgDbWritable 里面维护对应数据库表的3个字段,并覆写关键的四个方法。每个方法的作用在代码里面有介绍。重写toString,hashCode和equals方法。

自定义数据类型后就是读取数据库的数据了。

package com.qldhlbs.hadoop.demo0420;

import java.io.IOException;
import java.sql.SQLException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



public class MapreducePackageDbApp {

    
    static class DbReadMapper extends Mapper<LongWritable, PgDbWritable, LongWritable, PgDbWritable>{
        
        @Override
        protected void map(LongWritable key, PgDbWritable value,
                Mapper<LongWritable, PgDbWritable, LongWritable, PgDbWritable>.Context context)
                        throws IOException, InterruptedException {
            context.write(key, value);
        }
    }
    
    static class DbReadReduce extends Reducer<LongWritable, PgDbWritable, LongWritable, PgDbWritable>{
        
        @Override
        protected void reduce(LongWritable key, Iterable<PgDbWritable> values,
                Reducer<LongWritable, PgDbWritable, LongWritable, PgDbWritable>.Context context) throws IOException, InterruptedException {
            for (PgDbWritable value : values) {
                context.write(key, value);
            }
        }
    }

    
    @SuppressWarnings("deprecation")
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, SQLException {
        
        Configuration conf = new Configuration();

        DBConfiguration.configureDB(conf, "org.postgresql.Driver", "jdbc:postgresql://192.168.0.203/test", "hb", "xxx");
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(MapreducePackageDbApp.class);
        job.setJobName(MapreducePackageDbApp.class.getSimpleName());
        
        DistributedCache.addFileToClassPath(new Path("hdfs://192.168.0.201:49000/user/qldhlbs/lib/postgresql-9.3-1101.jdbc3.jar"), conf);
    
        String[] fields = {"call_type_id", "call_type", "remark"};

        
        DBInputFormat<PgDbWritable> in = new DBInputFormat<PgDbWritable>();
        in.setConf(conf);

        //配置DBInputFormat的信息,job, 输入DBWritable, 表名, 查询条件, order by条件, 表的字段数组
        DBInputFormat.setInput(job, PgDbWritable.class, "dim_160_168_call_type", null, null, fields);
        
        job.setMapperClass(DbReadMapper.class);
        //可以不设置reducer,hadoop会自动配置最简的reducer,看源码可以知道是输出map的输出
        job.setReducerClass(DbReadReduce.class);
        
        job.setOutputKeyClass(LongWritable.class);
        //job.setOutputValueClass(Text.class);
        job.setOutputValueClass(PgDbWritable.class);
        
        job.setInputFormatClass(DBInputFormat.class);
    
        FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.0.201:49000/user/qldhlbs/db5"));
            
        boolean isSuccess = job.waitForCompletion(true);
        System.exit(isSuccess ? 0 : 1);
    }

}

这里只是一个demo,所以map函数就直接输出读取到的内容就行了,由于reduce函数不写,就是直接写出读取到的map函数数据,所有这里reduce函数也可以不写。

在这里有几点是要注意的,首先这里面的包都是导入的mapreduce的而不是mapred包,混淆会报错;第二点是在hadoop的hdfs上上传一份postgres的驱动包,

先在hdfs上创建一个目录:hadoop fs -mkdir /user/qldhlbs/lib,然后把文件上传上去:hadoop fs -copyFromLocal postgresql-9.3-1101.jdbc3.jar /user/qldhlbs/lib。

在代码里面就是用DistributedCache.addFileToClassPath(new Path("hdfs://192.168.0.201:49000/user/qldhlbs/lib/postgresql-9.3-1101.jdbc3.jar"), conf)这个方法

把jar加载到类路径上去;第三点就是配置DBConfiguration信息,参数依次是Configuration ,数据库驱动,数据库url,用户名,密码。在配置完DBConfiguration信息后,

DBInputFormat<PgDbWritable> in = new DBInputFormat<PgDbWritable>();

in.setConf(conf);

setConf()这个方法不能忘记,一开始就是没调用这个方法把conf给DBInputFormat,一直报空指针异常,后来经过调试查看得知是connection没得到,但是DBConfiguration得到了connection。再进一步调试是DBInputFormat没得到DBConfiguration对象,所以根本就获取不到connection。查看hadoop-mapreduce-client-core-2.4.1源码才解决问题。

public void setConf(Configuration conf)
  {
    this.dbConf = new DBConfiguration(conf);
    try
    {
      getConnection();

      DatabaseMetaData dbMeta = this.connection.getMetaData();
      this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase();
    }
    catch (Exception ex) {
      throw new RuntimeException(ex);
    }

    this.tableName = this.dbConf.getInputTableName();
    this.fieldNames = this.dbConf.getInputFieldNames();
    this.conditions = this.dbConf.getInputConditions();
  }


public Connection getConnection() {
    try {
      if (null == this.connection)
      {
        this.connection = this.dbConf.getConnection();
        this.connection.setAutoCommit(false);
        this.connection.setTransactionIsolation(8);
      }
    }
    catch (Exception e) {
      throw new RuntimeException(e);
    }
    return this.connection;
  }

这是反编译的部分源码,可以看到connection是可以从DBConfiguration对象拿的;第四点就是配置DBInputFormat的信息,参数是job, 输入DBWritable, 表名, 查询条件, order by条件, 表的字段字符串数组。

所有的做完了接下来就可以跑hadoop了。

这是在hdfs里面生成的文件,可以看到数据读取到hdfs上了。

如果不用mapreduce包,用mapred包也是可以的,代码就不上了,差不多,只是不要掉用setConf()方法把conf绑定上去也行。

 

这是第一种方法,第二种方法就是直接用org.postgresql.copy.CopyManager这个类

public ByteArrayOutputStream copyToStream(String tableOrQuery,String delimiter){
        try {
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            CopyManager copyManager = new CopyManager(
                    (BaseConnection) getConnection());
            String copySql = "COPY " + tableOrQuery + " TO STDOUT";
            if (delimiter != null){
                copySql = copySql + " WITH DELIMITER AS '"+delimiter+"'";
            }
            copyManager.copyOut(copySql,
                    out);
            return out;
        }catch(Exception e){
            e.printStackTrace();
        }
        return null;
    }


ByteArrayOutputStream out = copyToStream(sql.toString(), ",");
ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());

public void uploadFile(String hdfsPath,InputStream in){
        try {
            FileSystem hdfs = FileSystem.get(conf);
            FSDataOutputStream out = hdfs.create(new Path(hdfsPath));
            org.apache.hadoop.io.IOUtils.copyBytes(in, out,4096,false);
            out.sync();
            out.close();
        } catch (Exception e) {
            // TODO: handle exception
        }
    }

把流读取出来,用hadoop自带的IOUtils.copyBytes()方法写到hdfs上就可以了就可以了。

 

原文地址:https://www.cnblogs.com/qldhlbs/p/5417199.html