使用MapReduce对HBase中表数据进行分析并存入MySQL中

项目源码:https://github.com/cw1322311203/hbasedemo/tree/master/hbase-mr-mysql

目标:对HBase中的student表数据的value值进行wordcount,并写入MySQL

前置准备:

  1. 在maven中加入MySQL jdbc驱动包

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.25</version>
    </dependency>
    
  2. 运行MapReduce时候报错:java.io.IOException: com.mysql.jdbc.Driver,一般是由于程序找不到mysql驱动包。解决方法是让每个tasktracker运行MapReduce程序时都可以找到该驱动包。

添加包有两种方式:

  1. 每个节点下的${HADOOP_HOME}/lib下添加该包。重启集群,一般是比较原始的方法。
  > CDH版本的放到/opt/cloudera/parcels/CDH-5.16.2-1.cdh5.16.2.p0.8/lib/hadoop/lib中
  1. 首先把包传到集群上:

    $ hadoop fs -put mysql-connector-java-5.1.0-bin.jar /hdfsPath/
    

    在mr程序提交job前,添加语句:

    DistributedCache.addFileToClassPath(new Path(/hdfsPath/mysql-connector-java- 5.1.0-bin.jar”), conf);
    
  2. MySQL中的数据库和表要实现创建好

  3. 为了方便 MapReduce 直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。

具体代码:

主类

package com.cw.bigdata.mr3;

import com.cw.bigdata.mr3.tool.HBase2MysqlTool;
import org.apache.hadoop.util.ToolRunner;

public class HBase2MysqlApplication {
    public static void main(String[] args) throws Exception {
        ToolRunner.run(new HBase2MysqlTool(), args);
    }
}

Tool类

package com.cw.bigdata.mr3.tool;

import com.cw.bigdata.mr3.bean.CacheData;
import com.cw.bigdata.mr3.format.MysqlOutputFormat;
import com.cw.bigdata.mr3.mapper.ScanHbaseMapper;
import com.cw.bigdata.mr3.reducer.Hbase2MysqlReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.util.Tool;

public class HBase2MysqlTool implements Tool {
    public int run(String[] args) throws Exception {

        Job job = Job.getInstance();
        job.setJarByClass(HBase2MysqlTool.class);

        // mapper
        TableMapReduceUtil.initTableMapperJob(
                "student",
                new Scan(),
                ScanHbaseMapper.class,
                Text.class,
                CacheData.class,
                job
        );


        // reducer
        job.setReducerClass(Hbase2MysqlReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setMapOutputValueClass(CacheData.class);

        job.setOutputFormatClass(MysqlOutputFormat.class);

        return job.waitForCompletion(true) ? JobStatus.State.SUCCEEDED.getValue() : JobStatus.State.FAILED.getValue();
    }

    public void setConf(Configuration configuration) {

    }

    public Configuration getConf() {
        return null;
    }
}

Mapper类

package com.cw.bigdata.mr3.mapper;

import com.cw.bigdata.mr3.bean.CacheData;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;

import java.io.IOException;

public class ScanHbaseMapper extends TableMapper<Text, CacheData> {
    @Override
    protected void map(ImmutableBytesWritable key, Result result, Context context) throws IOException, InterruptedException {
        for (Cell cell : result.rawCells()) {
            String name = Bytes.toString(CellUtil.cloneValue(cell));
            CacheData data = new CacheData();
            data.setName(name);
            data.setCount(1);
            System.out.println(name);
            context.write(new Text(name), data);
        }
    }
}

Reducer类

package com.cw.bigdata.mr3.reducer;

import com.cw.bigdata.mr3.bean.CacheData;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class Hbase2MysqlReducer extends Reducer<Text, CacheData, Text, CacheData> {
    @Override
    protected void reduce(Text key, Iterable<CacheData> datas, Context context) throws IOException, InterruptedException {
        int sum = 0;

        for (CacheData data : datas) {
            sum += data.getCount();
        }

        CacheData sumData = new CacheData();
        sumData.setName(key.toString());
        sumData.setCount(sum);

        System.err.println(sumData.getName() + ":" + sumData.getCount());

        context.write(key, sumData);
    }
}

bean对象

package com.cw.bigdata.mr3.bean;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class CacheData implements WritableComparable<CacheData> {

    private String name;
    private int count;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public int getCount() {
        return count;
    }

    public void setCount(int count) {
        this.count = count;
    }

    public int compareTo(CacheData data) {
        return name.compareTo(data.name);
    }

    public void write(DataOutput out) throws IOException {
        out.writeUTF(name);
        out.writeInt(count);
    }

    public void readFields(DataInput in) throws IOException {
        name = in.readUTF();
        count = in.readInt();
    }
}

format类

package com.cw.bigdata.mr3.format;

import com.cw.bigdata.mr3.bean.CacheData;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class MysqlOutputFormat extends OutputFormat<Text, CacheData> {

    class MysqlRecordWriter extends RecordWriter<Text, CacheData> {

        private static final String MYSQL_DRIVER_CLASS = "com.mysql.jdbc.Driver";
        //private static final String MYSQL_URL = "jdbc:mysql://localhost:3306/company?useUnicode=true&characterEncoding=UTF-8";
        private static final String MYSQL_URL = "jdbc:mysql://192.168.139.101:3306/company?useUnicode=true&characterEncoding=UTF-8";
        private static final String MYSQL_USERNAME = "root";
        private static final String MYSQL_PASSWORD = "123456";

        private Connection connection;

        public MysqlRecordWriter() {
            try {
                Class.forName(MYSQL_DRIVER_CLASS);
                connection = DriverManager.getConnection(MYSQL_URL, MYSQL_USERNAME, MYSQL_PASSWORD);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        public void write(Text key, CacheData data) throws IOException, InterruptedException {
            String sql = "insert into statresult (name,sumcnt) values(?,?)";
            PreparedStatement preparedStatement = null;
            try {
                preparedStatement = connection.prepareStatement(sql);
                preparedStatement.setObject(1, key.toString());
                preparedStatement.setObject(2, data.getCount());
                preparedStatement.executeUpdate();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (preparedStatement != null) {
                    try {
                        preparedStatement.close();
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
            }
        }

        public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            if (connection != null) {
                try {
                    connection.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
    }


    public RecordWriter<Text, CacheData> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new MysqlRecordWriter();
    }

    public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {

    }


    private FileOutputCommitter committer = null;

    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException {
        if (committer == null) {
            Path output = getOutputPath(context);
            committer = new FileOutputCommitter(output, context);
        }
        return committer;
    }

    public static Path getOutputPath(JobContext job) {
        String name = job.getConfiguration().get(FileOutputFormat.OUTDIR);
        return name == null ? null : new Path(name);
    }

}

原文地址:https://www.cnblogs.com/chenxiaoge/p/13335435.html