074 hbase与mapreduce集成

一:运行给定的案例

1.获取jar包里的方法

  

2.运行hbase自带的mapreduce程序

  lib/hbase-server-0.98.6-hadoop2.jar 

 

3.具体运行

  注意命令:mapredcp。

  HADOOP_CLASSPATH是当前运行时需要的环境。

  

4.运行一个小方法

  $HADOOP_HOME/bin/yarn jar lib/hbase-server-0.98.6-hadoop2.jar rowcounter nstest1:tb1

  

 二:自定义hbase的数据拷贝

1.需求

  将nstest1:tb1的数据info:name列拷贝到nstest1:tb2

2.新建tb2表

  

3.书写mapreduce程序

  输入:rowkey,result。

 1 package com.beifeng.bigdat;
 2 
 3 import java.io.IOException;
 4 
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.conf.Configured;
 7 import org.apache.hadoop.hbase.Cell;
 8 import org.apache.hadoop.hbase.CellUtil;
 9 import org.apache.hadoop.hbase.HBaseConfiguration;
10 import org.apache.hadoop.hbase.client.Put;
11 import org.apache.hadoop.hbase.client.Result;
12 import org.apache.hadoop.hbase.client.Scan;
13 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
14 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
15 import org.apache.hadoop.hbase.mapreduce.TableMapper;
16 import org.apache.hadoop.hbase.mapreduce.TableReducer;
17 import org.apache.hadoop.hbase.util.Bytes;
18 import org.apache.hadoop.mapreduce.Job;
19 import org.apache.hadoop.mapreduce.Mapper.Context;
20 import org.apache.hadoop.util.Tool;
21 import org.apache.hadoop.util.ToolRunner;
22 
23 public class HBaseMRTest extends Configured implements Tool{
24     /**
25      * map
26      * @author27      *
28      */
29     public static class tbMap extends TableMapper<ImmutableBytesWritable, Put>{
30 
31         @Override
32         protected void map(ImmutableBytesWritable key, Result value,Context context) throws IOException, InterruptedException {
33             Put put=new Put(key.get());
34             for(Cell cell:value.rawCells()){
35                 if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
36                     if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
37                         put.add(cell);
38                         context.write(key, put);
39                     }
40                 }
41             }
42         }
43         
44     }
45     /**
46      * reduce
47      * @author48      *
49      */
50     public static class tbReduce extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable>{
51 
52         @Override
53         protected void reduce(ImmutableBytesWritable key, Iterable<Put> values,Context context)throws IOException, InterruptedException {
54             for(Put put:values){
55                 context.write(key, put);
56             }
57         }
58         
59     }
60     
61     public int run(String[] args) throws Exception {
62         Configuration conf=super.getConf();
63         Job job =Job.getInstance(conf, "hbasemr");
64         job.setJarByClass(HBaseMRTest.class);
65         Scan scan=new Scan();
66         TableMapReduceUtil.initTableMapperJob(
67                 "nstest1:tb1",
68                 scan,
69                 tbMap.class,
70                 ImmutableBytesWritable.class,
71                 Put.class,
72                 job);
73         TableMapReduceUtil.initTableReducerJob(
74                 "nstest1:tb2",
75                 tbReduce.class,
76                 job);
77         boolean issucess=job.waitForCompletion(true);
78         return issucess?0:1;
79     }
80     public static void main(String[] args) throws Exception{
81         Configuration conf=HBaseConfiguration.create();
82         int status=ToolRunner.run(conf, new HBaseMRTest(), args);
83         System.exit(status);
84     }
85     
86 }

4.打成jar包

5.运行语句

  加上需要的export前提。

   $HADOOP_HOME/bin/yarn jar /etc/opt/datas/HBaseMR.jar com.beifeng.bigdat.HBaseMRTest

6.效果

  

  

  

原文地址:https://www.cnblogs.com/juncaoit/p/6149790.html