HBase协处理器实战

一、协处理器介绍

定义:HBase可以让用户的部分逻辑在数据存放端及hbase服务端进行计算的机制(框架)。协处理器允许用户在hbase服务端上运行自己的代码。

分类:系统协处理器、表协处理器 

observer,相当于关系型数据库里面的触发器 

endpoint,类似于存储过程 

二、OBserver

观察者的设计意图是允许用户通过插入代码来重载协处理器框架的upcall方法,而具体的事件触发的callback方法由HBase的核心代码来执行。可以分为RegionObserver、RegionServerObserver、MasterObserver、WalObserver 

RegionObserver              提供客户端的数据操纵事件钩子: Get、 Put、 Delete、Scan等

RegionServerObserver     专门处理RegionServer上的一些事件

MasterObserver              提供DDL-类型的操作钩子。如创建、删除、修改数据表等

WalObserver                  提供WAL相关操作

 这些接口可以同时使用在同一个地方,按照不同优先级顺序执行.用户可以任意基于协处理器实现复杂的HBase功能层。HBase有很多种事件可以触发观察者方法,这些事件与方法从HBase0.92版本起,都会集成在HBase API中。不过这些API可能会由于各种原因有所

改动,不同版本的接口改动比较大。RegionObserver工作原理如下图所示。

三、Endpoint

终端是动态RPC插件的接口,它的实现代码被安装在服务器端,从而能够通过HBase RPC唤醒。客户端类库提供了非常方便的方法来调用这些动态接口,它们可以在任意时候调用一个终端,它们的实现代码会被目标region远程执行,结果会返回到终端。用户可以结合使

用这些强大的插件接口,为HBase添加全新的特性。

endpoint服务端编写

1.创建endpoint.proto文件,生成java文件

  1. option java_package = "edu.endpoint";  
  2. option java_outer_classname = "Sum";  
  3. option java_generic_services = true;  
  4. option java_generate_equals_and_hash = true;  
  5. option optimize_for = SPEED;  
  6. message SumRequest {  
  7.     required string family = 1;  
  8.     required string column = 2;  
  9. }  
  10. message SumResponse {  
  11.     required int64 sum = 1 [default = 0];  
  12. }  
  13.   
  14. service SumService {  
  15.     rpc getSum(SumRequest)  
  16.         returns (SumResponse);  
  17. }  

2.将.proto文件生成Java代码。 

protoc endpoint.proto –java_out=./ 

将生成的.java文件拷贝到eclipse对应的java项目源代码包文件下。添加hbase lib下所有依赖库文件。 

3.编写服务端SumEndpoint.java类

  1. package edu.endpoint;  
  2.   
  3. import java.io.IOException;  
  4. import java.util.ArrayList;  
  5. import java.util.List;  
  6.   
  7. import org.apache.hadoop.hbase.Coprocessor;  
  8. import org.apache.hadoop.hbase.CoprocessorEnvironment;  
  9. import org.apache.hadoop.hbase.client.Scan;  
  10. import org.apache.hadoop.hbase.coprocessor.CoprocessorException;  
  11. import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;  
  12. import org.apache.hadoop.hbase.regionserver.InternalScanner;  
  13. import org.apache.hadoop.hbase.util.Bytes;  
  14.   
  15. import com.google.protobuf.RpcCallback;  
  16. import com.google.protobuf.RpcController;  
  17. import com.google.protobuf.Service;  
  18.   
  19. import edu.endpoint.Sum.SumRequest;  
  20. import edu.endpoint.Sum.SumResponse;  
  21. import edu.endpoint.Sum.SumService;  
  22.   
  23. public class SumEndPoint extends SumService implements Coprocessor,CoprocessorService{  
  24.     private RegionCoprocessorEnvironment env;  
  25.   
  26.     public void getSum(RpcController controller,SumRequest request,RpcCallback<SumResponse> done) throws IOException{  
  27.         Scan scan = new Scan();  
  28.         scan.addFamily(Bytes.toBytes(request.getFamily()));  
  29.         scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));  
  30.         SumResponse response = null;  
  31.         InternalScanner scanner = null;  
  32.         try{  
  33.             scanner = env.getRegion().getScanner(scan);  
  34.             List<Cell> results = new ArrayList<Cell>();  
  35.             boolean hasMore = false;  
  36.             Long sum = 0;  
  37.             do {  
  38.                 hasMore = scanner.next(results);  
  39.                 for(Cell cell:results){  
  40.                     sum += Long.parseLong(new String(CellUtil.cloneValue(cell)));  
  41.                 }  
  42.                 results.clear();  
  43.             } while (hasMore);  
  44.             response = SumResponse.newBuilder().setSum(sum).build();  
  45.         }catch (IOException e) {  
  46.             ResponseConverter.setControllerException(controller,e);  
  47.         }finally {  
  48.                 if (scanner!=null) {  
  49.                     try {  
  50.                         scanner.close();  
  51.                     } catch (IOException e) {  
  52.                         // TODO Auto-generated catch block  
  53.                         e.printStackTrace();  
  54.                     }     
  55.                 }  
  56.                   
  57.         }  
  58.         done.run(response);  
  59.     }  
  60.     public Service getService(){  
  61.         return this;  
  62.     }  
  63.     @Override  
  64.     public void start(CoprocessorEnvironment env) throws IOException {  
  65.         if (env instanceof RegionCoprocessorEnvironment) {  
  66.             this.env =(RegionCoprocessorEnvironment) env;  
  67.         }else {  
  68.             throw new CoprocessorException("no load region");  
  69.         }  
  70.     }  
  71.     @Override  
  72.     public void stop(CoprocessorEnvironment arg0) throws IOException {  
  73.     }  
  74. }  


原文地址:https://www.cnblogs.com/fangdai/p/6060569.html