HBase-MR

一、需求1:对一张表的rowkey进行计数

官方HBase-Mapreduce
需求1:对一张表的rowkey进行计数
    1)导入环境变量
    export HBASE_HOME=/root/hd/hbase-1.3.0
    export HADOOP_HOME=/root/hd/hadoop-2.8.4
    export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
    可以添加到:hbase-env.sh
    
    2)启动HBase-mr任务
    cd /root/hd/hbase-1.3.0
    /root/hd/hadoop-2.8.4/bin/yarn jar lib/hbase-server-1.3.0.jar rowcounter emp

二、需求2:本地数据导入到HBase中

需求2:本地数据导入到HBase中
    思路?HBase底层存储是hdfs,把数据先导入到hdfs
    HBase对应创建一张表
    利用mr导入数据到表中
    
    1)在hdfs中创建文件夹 导入本地数据    
    hdfs dfs -mkdir /lovein
    hdfs dfs -put /root/love.tsv /lovein
    
    2)创建表
    create 'love','info'
    
    3)导入操作
    cd /root/hd/hbase-1.3.0
    /root/hd/hadoop-2.8.4/bin/yarn jar lib/hbase-server-1.3.0.jar importtsv
   -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:description love hdfs://hd09-1:9000/lovein/

附:love.tsv

001    zhangsan    henshuai
002    Dilireba    beautiful
003    Yangmi    good
004    isme    perfect

三、需求3:将HBase中love表进行指定列的筛选然后倒入到lovemr表

自定义HBase-mr    
需求3:将HBase中love表进行指定列的筛选然后倒入到lovemr表
    1)构建Mapper类,读取love表中数据
    2)构建Reducer类,将love表中数据写入到lovemr表中
    3)构建driver驱动类
    4) 打包 放入集群中运行这个任务

    5)创建表
    create 'lovemr','info'
    
    6)导入操作    
    进入到HbaseTest-1.0-SNAPSHOT.jar包所在目录
    /root/hd/hadoop-2.8.4/bin/yarn jar HbaseTest-1.0-SNAPSHOT.jar com.hbase.mr.LoveDriver

1、ReadLoveMapper类

package com.hbase.mr;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

public class ReadLoveMapper extends TableMapper<ImmutableBytesWritable, Put> {
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        //1.读取数据 拿到一个rowkey的数据
        Put put = new Put(key.get());

        //2.遍历column
        for (Cell c : value.rawCells()) {
            //3.加入列族数据 当前列族是info要 不是info列族的不要 是info数据才导入lovemr表中
            if ("info".equals(Bytes.toString(CellUtil.cloneFamily(c)))){
                //4.拿到指定列的数据
                if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(c)))){
                    put.add(c);
                }
            }
        }
        context.write(key,put);
    }
}

2、WriteLoveReducer类

package com.hbase.mr;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

import java.io.IOException;

public class WriteLoveReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for (Put p : values) {
            //遍历数据
            context.write(NullWritable.get(),p);
        }
    }
}

3、LoveDriver类

package com.hbase.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class LoveDriver implements Tool {

    private Configuration conf;

    //业务逻辑
    public int run(String[] strings) throws Exception {
        //1.创建任务
        Job job = Job.getInstance(conf);
        //2.指定运行的主类
        job.setJarByClass(LoveDriver.class);
        //3.配置job 采用scan方式扫描表
        Scan scan = new Scan();

        //4.设置mapper类
        TableMapReduceUtil.initTableMapperJob("love",
                scan,
                ReadLoveMapper.class,
                ImmutableBytesWritable.class,
                Put.class,
                job);

        //5.设置reducer类
        TableMapReduceUtil.initTableReducerJob("lovemr",
                WriteLoveReducer.class,
                job);

        //设置reducerTask个数
        job.setNumReduceTasks(1);

        boolean rs = job.waitForCompletion(true);
        return rs ? 0 : 1;
    }

    //设置配置
    public void setConf(Configuration configuration) {
        this.conf = HBaseConfiguration.create(configuration);
    }

    //拿到配置
    public Configuration getConf() {
        return this.conf;
    }

    public static void main(String[] args) {
        try {
            int status = ToolRunner.run(new LoveDriver(), args);
            System.exit(status);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

四、需求4:HDFS中的数据写入到HBase中

需求4:HDFS中的数据写入到HBase中
    思路:
    1)构建Mapper 来读取hdfs中的数据
    2)构建Reducer
    3)驱动类
    4)打包运行
    5)测试    
    
    6)在hdfs中创建文件夹 导入本地数据    
    hdfs dfs -mkdir /lovehbase
    hdfs dfs -put /root/love.tsv /lovehbase
    
    7)创建表
    create 'lovehdfs','info'

    8)写入操作    
    进入到HbaseTest-1.0-SNAPSHOT.jar包所在目录
    /root/hd/hadoop-2.8.4/bin/yarn jar HbaseTest-1.0-SNAPSHOT.jar com.hbase.mr2.LoveDriver

1、ReadLoveFromHDFSMapper类

package com.hbase.mr2;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ReadLoveFromHDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1.读取数据
        String line = value.toString();

        //2.切分数据
        String[] fields = line.split("	");

        //3.封装数据
        byte[] rowkey = Bytes.toBytes(fields[0]);
        byte[] name = Bytes.toBytes(fields[1]);
        byte[] desc = Bytes.toBytes(fields[2]);
        //封装put对象
        Put put = new Put(rowkey);
        put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"),name);
        put.addColumn(Bytes.toBytes("info"),Bytes.toBytes("desc"),desc);

        //4.输出到reducer端
        context.write(new ImmutableBytesWritable(rowkey),put);
    }
}

2、WriteLoveReducer类

package com.hbase.mr2;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

import java.io.IOException;

public class WriteLoveReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for (Put p : values) {
            context.write(NullWritable.get(),p);
        }
    }
}

3、LoveDriver类

package com.hbase.mr2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class LoveDriver implements Tool {
    private Configuration conf = null;

    public void setConf(Configuration configuration) {
        this.conf = HBaseConfiguration.create();
    }

    public Configuration getConf() {
        return this.conf;
    }

    public int run(String[] strings) throws Exception {
        //1.创建job
        Job job = Job.getInstance(conf);
        job.setJarByClass(LoveDriver.class);

        //2.配置mapper
        job.setMapperClass(ReadLoveFromHDFSMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);

        //3.配置reducer
        TableMapReduceUtil.initTableReducerJob("lovehdfs",WriteLoveReducer.class,job);

        //4.配置输入inputformat
        FileInputFormat.addInputPath(job,new Path("/lovehbase/"));

        //5.输出
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) {
        try {
            int status = ToolRunner.run(new LoveDriver(), args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
原文地址:https://www.cnblogs.com/areyouready/p/10093144.html