hbase使用MapReduce操作3(实现将 fruit 表中的一部分数据,通过 MR 迁入到 fruit_mr 表中)

Runner类

实现将 fruit 表中的一部分数据,通过 MR 迁入到 fruit_mr 表中。

package com.yjsj.hbase_mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

class Fruit2FruitMRRunner extends Configured implements Tool {
    //组装 Job
    public int run(String[] args) throws Exception {
        //得到 Configuration

        Configuration conf = this.getConf();

        //创建 Job 任务
        Job job = Job.getInstance(conf, this.getClass().getSimpleName());
        job.setJarByClass(Fruit2FruitMRRunner.class);

        //配置 Job

        Scan scan = new Scan();
        scan.setCacheBlocks(false);
        scan.setCaching(500);
        //设置 Mapper,注意导入的是 mapreduce 包下的,不是 mapred 包下的,后者是老版本
        TableMapReduceUtil.initTableMapperJob(
                "fruit", //数据源的表名
                scan, //scan 扫描控制器
                ReadFruitMapper.class,//设置 Mapper 类
                ImmutableBytesWritable.class,//设置 Mapper 输出 key 类型
                Put.class,//设置 Mapper 输出 value 值类型
                job);//设置给哪个 JOB //设置 Reducer

        TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRReducer.class, job);
        //设置 Reduce 数量,最少 1 个

        job.setNumReduceTasks(1);
        boolean isSuccess = job.waitForCompletion(true);
        if (!isSuccess) {
            throw new IOException("Job running with error");
        }
        return isSuccess ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf ;
        conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "master,node1,node2");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set("hbase.master", "master:60000");
        int status = ToolRunner.run(conf, (Tool) new Fruit2FruitMRRunner(), args);
        System.exit(status);
    }
}

Mapper类

 1 package com.yjsj.hbase_mr;
 2 
 3 import java.io.IOException;
 4 import org.apache.hadoop.hbase.Cell;
 5 import org.apache.hadoop.hbase.CellUtil;
 6 import org.apache.hadoop.hbase.client.Put;
 7 import org.apache.hadoop.hbase.client.Result;
 8 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 9 import org.apache.hadoop.hbase.mapreduce.TableMapper;
10 import org.apache.hadoop.hbase.util.Bytes;
11 
12 public class ReadFruitMapper extends TableMapper<ImmutableBytesWritable, Put> {
13     @Override
14     protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
15         //将 fruit 的 name 和 color 提取出来,相当于将每一行数据读取出来放入到 Put 对象中。
16         Put put = new Put(key.get());
17         //遍历添加 column 行
18         for (Cell cell:value.rawCells()) {
19         //添加/克隆列族:info
20             if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
21                 //添加/克隆列:name
22                 if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
23                     //将该列 cell 加入到 put 对象中
24                     put.add(cell);
25                     //添加/克隆列:color
26                 }else if ("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
27                     //向该列 cell 加入到 put 对象中
28                     put.add(cell);
29                 }
30             }
31         }
32         //将从 fruit 读取到的每行数据写入到 context 中作为 map 的输出
33         context.write(key,put);
34     }
35 }

Reduce类

package com.yjsj.hbase_mr;

import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

public class WriteFruitMRReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
    //读出来的每一行数据写入到 fruit_mr 表中
        for (Put put : values) {
            context.write(NullWritable.get(), put);
        }
    }
}
原文地址:https://www.cnblogs.com/pursue339/p/10658117.html