使用MapReduce处理Hbase数据

  今天终于把MR处理Hbase的数据的程序搞定了,自己走了好多的弯路,程序写完之后,在本机的伪分布式的hadoop上跑是没问题的,可是把程序上传的集群上就出错了,最后发现是zookeeper没配对,在编译的时候没有把conf添加的CLASSPATH,这才导致出错的。

  下面是MR测试的程序:

 1 import java.io.IOException;
 2 
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.hbase.HBaseConfiguration;
 5 import org.apache.hadoop.hbase.HColumnDescriptor;
 6 import org.apache.hadoop.hbase.HTableDescriptor;
 7 import org.apache.hadoop.hbase.client.HBaseAdmin;
 8 import org.apache.hadoop.hbase.client.Put;
 9 import org.apache.hadoop.hbase.client.Result;
10 import org.apache.hadoop.hbase.client.Scan;
11 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
13 import org.apache.hadoop.hbase.mapreduce.TableMapper;
14 import org.apache.hadoop.hbase.mapreduce.TableReducer;
15 import org.apache.hadoop.hbase.util.Bytes;
16 import org.apache.hadoop.io.IntWritable;
17 import org.apache.hadoop.io.Text;
18 import org.apache.hadoop.mapreduce.Job;
19 
20 public class Test {
21     private static final String sourceTable = "sourceTable";
22     private static final String targetTable = "targetTable";
23     static Configuration config = HBaseConfiguration.create();
24     
25     public static void createTable(String tablename, String[] cfs) throws IOException {
26         HBaseAdmin admin = new HBaseAdmin(config);
27         if (admin.tableExists(tablename)) {
28             System.out.println("table already exists");
29         }
30         else {
31             HTableDescriptor tableDesc = new HTableDescriptor(tablename);
32             for (int i = 0; i < cfs.length; i++) {
33                 tableDesc.addFamily(new HColumnDescriptor(cfs[i]));
34             }
35             admin.createTable(tableDesc);
36             System.out.println("create table successly");
37         }
38     }
39     /**
40      * @param args
41      * @throws IOException
42      * @throws ClassNotFoundException 
43      * @throws InterruptedException 
44      */
45     public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
46         // TODO Auto-generated method stub
47         String[] cfs = {"a"};
48         createTable(targetTable, cfs);
49         Job job = new Job(config, "test");
50         job.setJarByClass(Test.class);
51         Scan scan = new Scan();
52         scan.setCaching(1024);
53         scan.setCacheBlocks(false);
54         TableMapReduceUtil.initTableMapperJob(
55                 sourceTable,        
56                 scan,               
57                 Mapper1.class,    
58                 Text.class,         
59                 IntWritable.class,  
60                 job);    
61         TableMapReduceUtil.initTableReducerJob(
62                 targetTable,        
63                 Reducer1.class,    
64                 job);
65         boolean b = job.waitForCompletion(true);
66         if(!b){
67             throw new IOException("error");
68         }
69     }
70 
71     public static class Mapper1 extends
72             TableMapper<Text, IntWritable> {
73             private final IntWritable ONE = new IntWritable(1);
74             private Text text = new Text();
75             public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException{
76                 String id = new String(value.getValue(Bytes.toBytes("cf"), Bytes.toBytes("a")));
77                 text.set(id);
78                     context.write(id, ONE);
79             }
80     }
81     public static class Reducer1 extends TableReducer<Text, IntWritable, ImmutableBytesWritable>{
82         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException{
83             int i = 0;
84             for (IntWritable val : values){
85                 i += val.get();
86             }
87             Put put = new Put(Bytes.toBytes(key.toString()));
88             put.add(Bytes.toBytes("a"), Bytes.toBytes("c"), Bytes.toBytes(i));
89             context.write(null, put);
90         }
91     }
92 }

编写完成后需要打包,打包可以在本地打,也可以在服务器上的包,一定要设置CLASSPATH

export CLASSPATH = /data/hadoop/hadoop-1.0.4/hadoop-core-1.0.4.jar:/data/hadoop/hbase-0.94.2/hbase-0.94.2.jar:/data/hadoop/hbase-0.94.2/conf/

在终端运行这个命令或者直接将此命令下载家目录下的.bashrc中也可以,

然后创建  test_classes文件夹,

运行命令:

javac -d test_classes/ Test.java

运行完成后会在test_classes文件夹下生成3个.class文件

然后运行

jar -cvf test.jar -C test_classes .

即可生成test.jar 文件

最后运行:

bin/hadoop jar test.jar Test

运行MR程序即可

原文地址:https://www.cnblogs.com/hitandrew/p/2855631.html