【HBase】协处理器是什么?又能干什么?怎么用?


简单了解

官方帮助文档

http://hbase.apache.org/book.html#cp

协处理器出现的原因

HBase作为列族数据库经常被人诟病的就是无法轻易建立“二级索引”难执行求和、计数、排序等操作。在0.92版本以前的HBase虽然在数据存储层集成了MapReduce,能够有效用于数据表的分布式计算,然而在很多情况下,做一些简单的相加或者聚合计算的时候,如果直接将计算过程放置在server端,能够减少通讯开销,从而获得很好的性能提升。所以HBase在0.92之后引入了协处理器(coprocessors),添加了一些新的特性:能够轻易建立二次索引、复杂过了长期以及访问控制等

协处理器的分类

Observer

对数据进行前置或者后置的拦截操作

Endpoint

主要实现数据的一些统计功能,例如 COUNT,SUM,GROUP BY 等等

Phoenix

其实就是使用了大量的协处理器来实现的


协处理器的使用

加载方式

协处理器的加载方式有两种,静态加载方式(Static Load)动态加载方式(Dynamic Load)。 静态加载的协处理器称之为 System Coprocessor,动态加载的协处理器称之为 Table Coprocessor

静态加载

通过修改hbase-site.xml,然后重启hbase实现,这是对所有表都有效

cd /export/servers/hbase-1.2.0-cdh5.14.0/conf
vim hbase-site.xml
<property>
	<name>hbase.coprocessor.user.region.classes</name>
	<value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
</property>
动态加载

动态加载可以对指定表生效
首先在禁用指定表

disable 'mytable'

然后添加aggregation

alter 'mytable', METHOD => 'table_att','coprocessor'=>
'|org.apache.Hadoop.hbase.coprocessor.AggregateImplementation||'

重启指定表

enable 'mytable'
协处理器的卸载

禁用表

disable 'test'

卸载协处理器

alter 'test',METHOD => 'table_att_unset',NAME => 'coprocessor$1'

启用表

enable 'test'

协处理器Observer应用实战

需求

通过协处理器Observer实现hbase当中一张表插入数据,然后通过协处理器,将数据复制一份保存到另外一张表当中去,但是只取当第一张表当中的部分列数据保存到第二张表当中去

步骤
一、HBase当中创建第一张表proc1和第二张表proc2
create 'proc1','info'
create 'proc2','info'
二、开发HBase的协处理器
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;

import java.io.IOException;

public class ObserverProcessor  extends BaseRegionObserver {

    @Override
    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
        //连接到HBase
        Configuration configuration = HBaseConfiguration.create();
        //设置连接配置
        configuration.set("hbase.zookeeper.quorum", "node01,node02,node03");
        //
        Connection connection = ConnectionFactory.createConnection(configuration);
        Cell nameCell = put.get("info".getBytes(), "name".getBytes()).get(0);
        Put put1 = new Put(put.getRow());
        put1.add(nameCell);
        Table reverseuser = connection.getTable(TableName.valueOf("proc2"));
        reverseuser.put(put1);
        reverseuser.close();
    }
}
三、将java打成Jar包,上传到HDFS

先将jar包上传到linux的/export/servers路径下,然后执行以下命令

mv original-day12_HBaseANDMapReduce-1.0-SNAPSHOT.jar  processor.jar
hdfs dfs -mkdir -p /processor
hdfs dfs -put processor.jar /processor
四、将jar包挂载到proc1表

hbase shell执行以下命令

describe 'proc1'

alter 'proc1',METHOD => 'table_att','Coprocessor'=>'hdfs://node01:8020/processor/processor.jar|cn.itcast.mr.demo7.ObserverProcessor|1001|'

describe 'proc1'

在这里插入图片描述

五、用JavaAPI想proc1表中添加数据
/**
 *
 */
@Test
public void testPut() throws Exception{
    //获取连接
    Configuration configuration = HBaseConfiguration.create();
    configuration.set("hbase.zookeeper.quorum", "node01,node02");
    Connection connection = ConnectionFactory.createConnection(configuration);
    Table user5 = connection.getTable(TableName.valueOf("proc1"));
    Put put1 = new Put(Bytes.toBytes("hello_world"));
    put1.addColumn(Bytes.toBytes("info"),"name".getBytes(),"helloworld".getBytes());
    put1.addColumn(Bytes.toBytes("info"),"gender".getBytes(),"abc".getBytes());
    put1.addColumn(Bytes.toBytes("info"),"nationality".getBytes(),"test".getBytes());
    user5.put(put1);
    byte[] row = put1.getRow();
    System.out.println(Bytes.toString(row));
    user5.close();
}
六、查看proc1和proc2表的数据

在这里插入图片描述

七、如果要卸载协处理器
disable 'proc1'
alter 'proc1',METHOD=>'table_att_unset',NAME=>'coprocessor$1'
enable 'proc1'
原文地址:https://www.cnblogs.com/zzzsw0412/p/12772422.html