HBase与MapReduce集成

感觉效率不是很高,是否能用sqoop来解决HBase与其他文件系统的数据导入导出。

通过HBase的相关JavaApi,我们可以实现伴随HBase操作的MapReduce过程,比如使用MapReduce将数据从本地文件导入HBase的表中,或我们从HBase的表中读取一些原始数据用于MapReduce做数据分析。


案例

将fruit文件的一部分数据,通过MapReduce导入fruit_mr表中

//fruit.tsv文件
1001 Apple Red
1002 Pear  Yellow
1003 Pineapple Yellow
构建ReadFruitMapper类,用于读取fruit表中的数据
package com.atguigu;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
public class ReadFruitMapper extends
TableMapper<ImmutableBytesWritable, Put> {
@Override
    protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
        //将 fruit 的 name 和 color 提取出来,相当于将每一行数据读取出来放入到 Put对象中。
        Put put = new Put(key.get());
        //遍历添加 column 行
        for(Cell cell: value.rawCells()){
        //添加/克隆列族:info
        if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
        //添加/克隆列: name
            if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                //将该列 cell 加入到 put 对象中
                put.add(cell);
                //添加/克隆列:color
            }else if("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                //向该列 cell 加入到 put 对象中
                put.add(cell);
            }
        }
    }
    //将从 fruit 读取到的每行数据写入到 context 中作为 map 的输出
    context.write(key, put);
    }
}
构建WriteFruitMRReducer类,用于将读取到的数据写入fruit_mr表中
package com.atguigu.Hbase_mr;
import java.io.IOException;
import org.apache.hadoop.Hbase.client.Put;
import org.apache.hadoop.Hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.Hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
public class WriteFruitMRReducer extends
TableReducer<ImmutableBytesWritable, Put, NullWritable> {
@Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put>
    values, Context context)
    throws IOException, InterruptedException {

        //读出来的每一行数据写入到 fruit_mr 表中
        for(Put put: values){
        context.write(NullWritable.get(),     put);
        }
    }
}
构建Fruit2FruitMRRunner extends Congfigured implements Tool用于组装运行job
//组装 Job
public int run(String[] args) throws Exception {
//得到 Configuration
Configuration conf = this.getConf();
//创建 Job 任务
Job job = Job.getInstance(conf,
this.getClass().getSimpleName());
job.setJarByClass(Fruit2FruitMRRunner.class);
//配置 Job
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setCaching(500);
//设置 Mapper,注意导入的是 mapreduce 包下的,不是 mapred 包下的,后者
是老版本
TableMapReduceUtil.initTableMapperJob(
"fruit", //数据源的表名
scan, //scan 扫描控制器
ReadFruitMapper.class,//设置 Mapper 类
ImmutableBytesWritable.class,//设置 Mapper 输出 key 类型
Put.class,//设置 Mapper 输出 value 值类型
job//设置给哪个 JOB
);
//设置 Reducer
TableMapReduceUtil.initTableReducerJob("fruit_mr",
WriteFruitMRReducer.class, job);
//设置 Reduce 数量,最少 1 个
job.setNumReduceTasks(1);
boolean isSuccess = job.waitForCompletion(true);
if(!isSuccess){
throw new IOException("Job running with error");
}
return isSuccess ? 0 : 1;
}
主函数中调用运行该job任务
public static void main( String[] args ) throws Exception{
Configuration conf = HbaseConfiguration.create();
int status = ToolRunner.run(conf, new Fruit2FruitMRRunner(), args);
System.exit(status);
}
打包运行任务
$ /opt/module/hadoop-2.7.2/bin/yarn jar ~/softwares/jars/Hbase-
0.0.1-SNAPSHOT.jar
com.z.Hbase.mr1.Fruit2FruitMRRunner
原文地址:https://www.cnblogs.com/chenshaowei/p/12491817.html