HBase协处理器(1)

HBase协处理器的简单使用

一、hbas协处理器介绍


hbase在0.92版本之前是没有协处理器的,之所以引入协处理器是为了能够让用户能够可以扩展服务端的类库,并直接在服务端完成特定任务二不需要跟客户端之间有IO操作.

    hbase可以实现的功能

  • 代码可以运行在每个表服务器的每个表上
  • 提供高层调用接口给客户端使用
  • 提供一个非常灵活的模型来构建分布式服务
  • 为每一个应用提供自动化的扩展,负载均衡,请求路由的功能

    hbase协处理器的分类

  • EndPoint:

    用来实现类似关系型数据库中的存储过程的功能

  • Observers:

    用来实现类似关系型数据库中的触发器的功能

HBase提供了一些基本的抽象类来简单的实现接口中的方法,便于用户编写协处理器的时候不需要手动实现每一个方法

观察者(Observer)

  • RegionObserver:

    针对Region的观察者,可以监听关于Region的操作

  • RegionServerObserver:

    针对RegionServer的观察者,可以监听整个RegionServer的操作

  • MasterObserver:

    针对Master观察者,可以监听Master进行的DDL操作

  • WALObserver:

    针对WAL的观察者,可以监听WAL的所有读写操作

  • BulkLoadObserver:

    BulkLoad是采用MapReduce将大量数据快读导入HBase的一种方式。BulkLoadObserver可以监听BulkLoad行为

  • EndPointObserver:

    可以监听EndPoint的执行过程

常用接口:

  • BaseRegionObserver:

    实现了RegionObserver接口的所有需要实现的方法,并给出了最简单的实现

  • BaseMasterObserver:

    实现了MasterObserver接口的所有需要实现的方法,并给出了最简单的实现

终端程序(EndPoint)

  只实现了一个接口CoprocessorService,并且没有提供最基本的实现类。该接口只有一个方法需要实现:getServer,该方法主要返回ProtocolBuffers的实例,所以实现EndPoint之前,需要了解一下Protocol Buffers的相关知识作为基础

二、Observer协处理器的实现


上面讲了一堆概念,下面我们实现一个写出里器并加载到集群中进行测试一下

需求1:

  有一张表winshop_attention,现在需要实现在每向里面put一条数据的时候,就会更新modifyTime为当前最新时间。之前的做法是,通过两条put语句来实现,现在通过写出里器交给协处理器来进行修改时间的更新:

STEP 1

  新建一个maven工程,打包方式选择jar,然后添加项目的依赖。因为协处理器相关的接口都在hbase-server包里面,所有,我们只需要添加hbase-server的依赖即可,注意:最好不好同时添加hbase-server和hbase-client的依赖

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>1.1.2</version>
</dependency>

STEP 2

  新建 AttentionObserver,直接继承 BaseRegionObserver类,这样相对于实现Coprocessor接口来说,可以避免实现很多不必要的方法

       

STEP 3

  在该类中我实现我们的逻辑,需要重写BaseRegionObserver当中的方法,方法很多,足以满足我们的业务逻辑需求,在再次我实现:数据put之后进行更新modifyTime

package com.winner.count;

import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;
import java.time.LocalDateTime;

/**
 * @AUTHOR Guozy
 * @DATE 2020/9/14-0:18
 **/
public class AttentionObserver extends BaseRegionObserver {

    @Override
    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        String currentTime = String.valueOf(LocalDateTime.now()).replace("T", " ");
        put.addColumn(Bytes.toBytes("i"), Bytes.toBytes("mt"), Bytes.toBytes(currentTime));
    }
}

  当然,这里面可以编写我们的业务逻辑,且 ObserverContext<RegionCoprocessorEnvironment> e 更是包含了更多的有用的信息。

STEP 4

  打包该项目为jar包,并将jar包上传至hdfs目录下,具体打包方式,可以直接使用maven自带的package方式:

  

    上传jar包到hdfs目录,我这里指定目录为:/winner/hadoop/hbase_config 下

STEP 5

  在HBase中针对winshop_attention这张表启用这个观察者。登录hbase shell依次执行以下命令:

hbase(main):215:0> disable 'winshop_attention'
hbase(main):216:0> alter 'winshop_attention',METHOD=>'table_att','coprocessor'=>'/winner/hadoop/hbase_config/hbaseComprocess.jar|com.winner.count.AttentionObserver||'
hbase(main):217:0> enable 'winshop_attention'

  协处理器加载格式命令行说明:

  alter 'winshop_attention',METHOD=>'table_att','coprocessor'=>'①|②|③|④'
  • table_att:固定词组,意思是调用setValue()方法给表设置属性
  • coprocessor: 代表协处理器的意思
  • ①:协处理器的jar包路径,要保证所有regionServer都可以读取到,可以在本地(每台机器的都要有),不过建议在hdfs上
  • ②:协处理器的完成类名
  • ③:协处理器的优先级,用整数表示,整数越小,优先级越高。可以为空
  • ④:协处理器运行需要的参数,可以为空,多个参数用逗号分隔,例如:arg1=1,arg2=2

注:①|②|③|④ 之间不要有空格

STEP 6

  验证协处理器是否加载成功:

  1.  将HBase表进行上线
  2.  执行put命令,查看modifytime是否会更新为最新时间

   可以看到,我们的协处理器是生效了。

 这里有一个坑:对于同一张表来说,如果想要卸载原有的协处理器,并加载新的协处理器的话,就必须更改jar名称,否则会包如下错误:

三、EndPoint协处理器的实现


四、总结


原文地址:https://www.cnblogs.com/Gxiaobai/p/13690707.html