Hbase访问方式之Mapreduce

Hbase对Mapreduce API进行了扩展,方便Mapreduce任务读写HTable数据。

一个简单示例:

说明:从日志表中,统计每个IP访问网站目录的总数

[java] view plaincopy在CODE上查看代码片派生到我的代码片
 
  1. package man.ludq.hbase;  
  2.   
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.conf.Configuration;  
  6. import org.apache.hadoop.hbase.HBaseConfiguration;  
  7. import org.apache.hadoop.hbase.client.Put;  
  8. import org.apache.hadoop.hbase.client.Result;  
  9. import org.apache.hadoop.hbase.client.Scan;  
  10. import org.apache.hadoop.hbase.io.ImmutableBytesWritable;  
  11. import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;  
  12. import org.apache.hadoop.hbase.mapreduce.TableMapper;  
  13. import org.apache.hadoop.hbase.mapreduce.TableReducer;  
  14. import org.apache.hadoop.hbase.util.Bytes;  
  15. import org.apache.hadoop.io.IntWritable;  
  16. import org.apache.hadoop.io.Text;  
  17. import org.apache.hadoop.mapreduce.Job;  
  18.   
  19. public class ExampleTotalMapReduce{  
  20.     public static void main(String[] args) {  
  21.         try{  
  22.             Configuration config = HBaseConfiguration.create();  
  23.             Job job = new Job(config,"ExampleSummary");  
  24.             job.setJarByClass(ExampleTotalMapReduce.class);     // class that contains mapper and reducer  
  25.   
  26.             Scan scan = new Scan();  
  27.             scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs  
  28.             scan.setCacheBlocks(false);  // don't set to true for MR jobs  
  29.             // set other scan attrs  
  30.             //scan.addColumn(family, qualifier);  
  31.             TableMapReduceUtil.initTableMapperJob(  
  32.                     "access-log",        // input table  
  33.                     scan,               // Scan instance to control CF and attribute selection  
  34.                     MyMapper.class,     // mapper class  
  35.                     Text.class,         // mapper output key  
  36.                     IntWritable.class,  // mapper output value  
  37.                     job);  
  38.             TableMapReduceUtil.initTableReducerJob(  
  39.                     "total-access",        // output table  
  40.                     MyTableReducer.class,    // reducer class  
  41.                     job);  
  42.             job.setNumReduceTasks(1);   // at least one, adjust as required  
  43.   
  44.             boolean b = job.waitForCompletion(true);  
  45.             if (!b) {  
  46.                 throw new IOException("error with job!");  
  47.             }   
  48.         } catch(Exception e){  
  49.             e.printStackTrace();  
  50.         }  
  51.     }  
  52.   
  53.     public static class MyMapper extends TableMapper<Text, IntWritable>  {  
  54.   
  55.         private final IntWritable ONE = new IntWritable(1);  
  56.         private Text text = new Text();  
  57.   
  58.         public void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException {  
  59.             String ip = Bytes.toString(row.get()).split("-")[0];  
  60.             String url = new String(value.getValue(Bytes.toBytes("info"), Bytes.toBytes("url")));  
  61.             text.set(ip+"&"+url);  
  62.             context.write(text, ONE);  
  63.         }  
  64.     }  
  65.   
  66.     public static class MyTableReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable>  {  
  67.         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {  
  68.             int sum = 0;  
  69.             for (IntWritable val : values) {  
  70.                 sum += val.get();  
  71.             }  
  72.   
  73.             Put put = new Put(key.getBytes());  
  74.             put.add(Bytes.toBytes("info"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));  
  75.   
  76.             context.write(null, put);  
  77.         }  
  78.     }  
  79. }  

参考文档:

1、Mapreduce读取和写入Hbase(从A表读取数据,统计结果放入B表,非常详细,附有代码说明以及流程)
 
2、Mapreduce操作Hbase(官方文档,包括 读/读写/多表输出/输出到文件/输出到RDBMS/Job中访问其他的HBase Tables)
原文地址:https://www.cnblogs.com/bluecoder/p/3824265.html