HBase集成MapReduce

1. 读取myuser这张表当中的数据写入到HBase的另外一张表当中去

package com.itheima;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class HBase2HBaseMain extends Configured implements Tool {

    public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum","hadoop01:2181,hadoop02:2181,hadoop03:2181");
        int run = ToolRunner.run(configuration, new HBase2HBaseMain(), args);
        System.exit(run);

    }

    @Override
    public int run(String[] strings) throws Exception {
        Job job= Job.getInstance(super.getConf(),"Hbase2HBase");
        job.setJarByClass(HBase2HBaseMain.class);
        //1.
        /**
         * TableName table,
         Scan scan,
         Class<? extends TableMapper> mapper,
         Class<?> outputKeyClass,
         Class<?> outputValueClass,
         Job job
         */
        //设置mapper
        TableMapReduceUtil.initTableMapperJob(TableName.valueOf("myuser"),new Scan(),HBase2HBaseMapper.class,
                Text.class, Put.class,job);
        //设置reducer
        /**
         * String table,
         Class<? extends TableReducer> reducer, Job job
         */
        TableMapReduceUtil.initTableReducerJob("myuser2",HBase2HBaseReducer.class,job);

        boolean b = job.waitForCompletion(true);

        return b?0:1;

    }
}
View Code
package com.itheima;


import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;


import java.io.IOException;
import java.util.List;

/**
 * Text;rowkey的输出
 * Put:value对象的输出
 */
public class HBase2HBaseMapper extends TableMapper<Text, Put> {
    /**
     *
     * @param key:rowkey
     * @param value:查询表的结果集
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        //构造put对象
        Put put=new Put(key.get());
        List<Cell> cells = value.listCells();
        for (Cell cell : cells) {
            //myuser  f1 name
            if((Bytes.toString(CellUtil.cloneFamily(cell)).equals("f1"))||(Bytes.toString(CellUtil.cloneQualifier(cell))).equals("name")){
                put.add(cell);
            }
        }
        context.write(new Text(Bytes.toString(key.get())),put);
    }
}
View Code
package com.itheima;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * Text:key2
 * Put;value2
 * ImmutableBytesWritable:key3
 * value3:TableReducer继承至Reducer已经定义好
 */
public class HBase2HBaseReducer extends TableReducer<Text, Put, ImmutableBytesWritable> {
    @Override
    protected void reduce(Text key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for (Put put : values) {
            context.write(new ImmutableBytesWritable(key.toString().getBytes()),put);
        }
    }
}
View Code

2. 读取HDFS文件,写入到HBase表当中去

package com.itheima2;


import com.itheima.HBase2HBaseMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import java.io.IOException;

public class HDFS2HBaseMain {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration=HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum","hadoop01:2181,hadoop02:2181,hadoop03:2181");
      //  configuration.set();
        Job job= Job.getInstance(configuration,"test");


        //1
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("hdfs://hadoop01/hbase/input"));

        //2
        job.setMapperClass(HDFSHbaseMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);


        //7
        TableMapReduceUtil.initTableReducerJob("myuser2",HDFSHbaseReducer.class,job);

        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}
View Code
package com.itheima2;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class HDFSHbaseMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(value,NullWritable.get());
    }
}
View Code
package com.itheima2;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableMap;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;

import java.io.IOException;

public class HDFSHbaseReducer extends TableReducer<Text, NullWritable, ImmutableBytesWritable> {
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        String[] split = key.toString().split("	");
        Put put = new Put(split[0].getBytes());
        put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());
        put.addColumn("f1".getBytes(),"age".getBytes(),split[2].getBytes());
        context.write(new ImmutableBytesWritable(Bytes.toBytes(split[0])),put);
    }
}
View Code

3.  通过bulkload的方式批量加载数据到HBase当中去

package com.itheima3;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import java.io.IOException;

public class BulkLoadMain {
    public static void main(String[] args) throws Exception {
        Configuration configuration= HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum","hadoop01:2181");
        Job job = Job.getInstance(configuration, "BlukLoadMain");

        //input
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("hdfs://hadoop01:8020/hbase/input"));
        //2 map
        job.setMapperClass(BulkLoadMapper.class);
        job.setMapOutputValueClass(Put.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        //设置reduce的输出
        job.setOutputFormatClass(HFileOutputFormat2.class);

        //加载数据
        /**
         * Job job, Table table, RegionLocator regionLocator
         */
        Connection connection = ConnectionFactory.createConnection(configuration);
        Table myuser2 = connection.getTable(TableName.valueOf("myuser2"));
        RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf("myuser2"));
        HFileOutputFormat2.configureIncrementalLoad(job,myuser2,regionLocator);
        HFileOutputFormat2.setOutputPath(job,new Path("hdfs://hadoop01:8020/hbase/output_hfile1"));

        boolean b = job.waitForCompletion(true);
        System.exit(b?0:1);
    }
}
View Code
package com.itheima3;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class BulkLoadMapper extends Mapper<LongWritable, Text,ImmutableBytesWritable, Put> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] split = value.toString().split("	");
        Put put = new Put(split[0].getBytes());
        put.addColumn("f1".getBytes(),"name".getBytes(),split[1].getBytes());
        put.addColumn("f1".getBytes(),"age".getBytes(),split[2].getBytes());
        context.write(new ImmutableBytesWritable(split[0].getBytes()),put);
    }
}
View Code
package com.itheima3;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;

import java.io.IOException;

public class LoadData {
    public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.property.clientPort", "2181");
        configuration.set("hbase.zookeeper.quorum", "hadoop01,hadoop02,hadoop03");

        Connection connection =  ConnectionFactory.createConnection(configuration);
        Admin admin = connection.getAdmin();
        Table table = connection.getTable(TableName.valueOf("myuser2"));
        LoadIncrementalHFiles load = new LoadIncrementalHFiles(configuration);
        load.doBulkLoad(new Path("hdfs://hadoop01:8020/hbase/output_hfile1"), admin,table,connection.getRegionLocator(TableName.valueOf("myuser2")));
    }
}
View Code
原文地址:https://www.cnblogs.com/qidi/p/11679007.html