hbase 聚合操作

hbase本身提供了 聚合方法可以服务端聚合操作

hbase中的CoprocessorProtocol机制. 

CoprocessorProtocol的原理比较简单,近似于一个mapreduce框架。由client将scan分解为面向多个region的请求,并行发送请求到多个region,然后client做一个reduce的操作,得到最后的结果。 


先看一个例子,使用hbase的AggregationClient可以做到简单的面向单个column的统计。 

Java代码  收藏代码
  1. @Test  
  2. public void testAggregationClient() throws Throwable {  
  3.   
  4.     LongColumnInterpreter columnInterpreter = new LongColumnInterpreter();  
  5.   
  6.     AggregationClient aggregationClient = new AggregationClient(  
  7.             CommonConfig.getConfiguration());  
  8.     Scan scan = new Scan();  
  9.   
  10.     scan.addColumn(ColumnFamilyName, QName1);  
  11.   
  12.     Long max = aggregationClient.max(TableNameBytes, columnInterpreter,  
  13.             scan);  
  14.     Assert.assertTrue(max.longValue() == 100);  
  15.   
  16.     Long min = aggregationClient.min(TableNameBytes, columnInterpreter,  
  17.             scan);  
  18.     Assert.assertTrue(min.longValue() == 20);  
  19.   
  20.     Long sum = aggregationClient.sum(TableNameBytes, columnInterpreter,  
  21.             scan);  
  22.     Assert.assertTrue(sum.longValue() == 120);  
  23.   
  24.     Long count = aggregationClient.rowCount(TableNameBytes,  
  25.             columnInterpreter, scan);  
  26.     Assert.assertTrue(count.longValue() == 4);  
  27.   
  28. }  



看下hbase的源码。AggregateImplementation 

Java代码  收藏代码
  1. @Override  
  2.   public <T, S> T getMax(ColumnInterpreter<T, S> ci, Scan scan)  
  3.       throws IOException {  
  4.     T temp;  
  5.     T max = null;  
  6.     InternalScanner scanner = ((RegionCoprocessorEnvironment) getEnvironment())  
  7.         .getRegion().getScanner(scan);  
  8.     List<KeyValue> results = new ArrayList<KeyValue>();  
  9.     byte[] colFamily = scan.getFamilies()[0];  
  10.     byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();  
  11.     // qualifier can be null.  
  12.     try {  
  13.       boolean hasMoreRows = false;  
  14.       do {  
  15.         hasMoreRows = scanner.next(results);  
  16.         for (KeyValue kv : results) {  
  17.           temp = ci.getValue(colFamily, qualifier, kv);  
  18.           max = (max == null || (temp != null && ci.compare(temp, max) > 0)) ? temp : max;  
  19.         }  
  20.         results.clear();  
  21.       } while (hasMoreRows);  
  22.     } finally {  
  23.       scanner.close();  
  24.     }  
  25.     log.info("Maximum from this region is "  
  26.         + ((RegionCoprocessorEnvironment) getEnvironment()).getRegion()  
  27.             .getRegionNameAsString() + ": " + max);  
  28.     return max;  
  29.   }  


这里由于 

Java代码  收藏代码
  1. byte[] colFamily = scan.getFamilies()[0];  
  2. byte[] qualifier = scan.getFamilyMap().get(colFamily).pollFirst();  


所以,hbase自带的Aggregate函数,只能面向单列进行统计。 

当我们想对多列进行Aggregate,并同时进行countRow时,有以下选择。 
1 scan出所有的row,程序自己进行Aggregate和count。 
2 使用AggregationClient,调用多次,得到所有的结果。由于多次调用,有一致性问题。 
3 自己扩展CoprocessorProtocol。 

这个是github的hbase集成插件

这个功能集成到simplehbase里面了。
https://github.com/zhang-xzhi/simplehbase

原文地址:https://www.cnblogs.com/yaohaitao/p/6789113.html