DBWritable的使用

首先导入mysql连接驱动jar包

或者maven模式下在pom.xml文件中追加:

<dependency>

    <groupId>mysql</groupId>

    <artifactId>mysql-connector-java</artifactId>

    <version>5.1.38</version>

</dependency>

代码:

package com.neworigin.db;

import java.io.IOException;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;

import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;

import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;

import org.apache.hadoop.mapreduce.lib.db.DBWritable;

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

public class DBJob {

public static class TableUers implements DBWritable{//表的映射

int id;

String name;

int age;

 

   public int getId() {

   return id;

}

 

public void setId(int id) {

   this.id = id;

}

 

public String getName() {

   return name;

}

 

public void setName(String name) {

   this.name = name;

}

 

public int getAge() {

   return age;

}

 

public void setAge(int age) {

   this.age = age;

}

 

   public TableUers(int id, String name, int age) {

   this.id = id;

   this.name = name;

   this.age = age;

}

 

   public TableUers() {

   }

//将表的映射对象中的属性设置进(写入数据库)

   public void write(PreparedStatement statement) throws SQLException {

      statement.setInt(1, id);

      statement.setString(2, name);

      statement.setInt(3, age);

   }

//从结果集通过与字段匹配的赋值给表的映射对象(读取数据库)

   public void readFields(ResultSet resultSet) throws SQLException {//获取表中的属性

      this.id=resultSet.getInt("id");

      this.name=resultSet.getString("name");

      this.age=resultSet.getInt("age");

     

   }

 

   @Override

   public String toString() {

      return  id + "  " + name + " " + age;

   }

  

}

public static class MyMapper extends Mapper<Object ,TableUers,TableUers,NullWritable>{

 

   protected void map(Object key, TableUers value, Mapper<Object, TableUers, TableUers, NullWritable>.Context context)

          throws IOException, InterruptedException {

      String s = value.toString();

      System.out.println(s);

     

     

      context.write(value,NullWritable.get());

     

   }

  

}

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

   Path out=new Path("file:///F:/安装/java工程/MR/data/db/out");

   Job job = Job.getInstance();

   Configuration conf = job.getConfiguration();

/* FileSystem fs =FileSystem.get(conf);

   if(fs.exists(out))

   {

      fs.delete(out);

   }*/

  

   job.setInputFormatClass(DBInputFormat.class);

   DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://127.0.0.1/mydb","root","123456");

   DBInputFormat.setInput(job, TableUers.class, "select id,name,age from user", "select count(*) from user");

  

   job.setOutputFormatClass(DBOutputFormat.class);

   DBOutputFormat.setOutput(job, "aaa", "id","name","age");

  

  

   job.setMapperClass(MyMapper.class);

   job.setMapOutputKeyClass(TableUers.class);

   job.setMapOutputValueClass(NullWritable.class);

  

  

   job.setOutputKeyClass(TableUers.class);

   job.setOutputValueClass(NullWritable.class);

   job.setNumReduceTasks(0);

  

   //FileOutputFormat.setOutputPath(job, out);

  

   job.waitForCompletion(true);

}

}

原文地址:https://www.cnblogs.com/chengdonghui/p/7825003.html