Hbase速览

一、概述

理解为hadoop中的key-value存储,数据按列存储,基于HDFS和Zookeeper

1.应用

2.场景

适用场景:

  • 存储格式:半结构化数据,结构化数据存储,Key-Value存储
  • 数据版本:固定集合(多版本),定时删除(TTL)
  • 更新:列族结构经常调整
  • 写Pattern:高并发写入

不适用场景

  • 事务
  • 复杂查询Operator:Join,Union,Group By
  • 索引支持:不按照rowkey查询数据
  • 读Pattern:高并发随机读

NoSQL线性扩展的功能,数据多了之后,可以不断增加机器

3.基本操作

{NAME=>'cf1'} 列簇

row1 行号

cf1:col1 列簇下的列名

 

 表结构

二、原理、架构与基本组件

HBase是主从结构,主是HMaster,从是RegionServer

HMaster统揽全局

当Client有读写请求的时候,HMaster去Zookeeper中查询表存在哪些节点上,将这些读写路由到Region Server上。client真的读写数据的时候,是和具体数据所在的节点上的Region Server打交道的,Region Server是真正支持读写请求的。HMaster,某节点上数据多了,能否切分到其他节点上,进行这些数据切分与合并的工作。

Zookeeper存储表在哪些节点的元信息,如果有多个HMaster,zookeeper决定哪个节点是真正的老大,哪些节点是备份。

HDFS,最终读写落实到磁盘上,是HDFS文件

1.细化

2.RDBMS行存储和Hbase列存储比较

关系型数据库:每一行对应多列,相邻行是连续存储的

HBase:列存储,各列簇可能不在一起

3.数据模型

table中的所有行都按照row key的字典序排列。
table 在行的方向上分割为多个Hregion。

region按大小分割的,每个表一开始只有一个region,随着数据不断插入表,region不断增大,当增大到一个阀值的时候,Hregion就会等分会两个新的Hregion。当table中的行不断增多,就会有越来越多的Hregion。

HRegion虽然是分布式存储的最小单元,但并不是存储的最小单元。
事实上,HRegion由一个或者多个Store组成,每个store保存一个columns family。
每个Strore又由一个memStore和0至多个StoreFile组成。
如图:StoreFile以HFile格式保存在HDFS上。

HDFS查看HBase相关数据

[root@k8s-node2 bin]# hadoop dfs -ls /user/hbase/
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

Found 12 items
drwxr-xr-x   - root supergroup          0 2020-04-29 08:54 /user/hbase/.hbck
drwxr-xr-x   - root supergroup          0 2020-05-19 05:40 /user/hbase/.tmp
drwxr-xr-x   - root supergroup          0 2020-05-20 03:56 /user/hbase/MasterProcWALs
drwxr-xr-x   - root supergroup          0 2020-05-18 20:56 /user/hbase/WALs
drwxr-xr-x   - root supergroup          0 2020-05-19 15:26 /user/hbase/archive
drwxr-xr-x   - root supergroup          0 2020-04-29 08:54 /user/hbase/corrupt
drwxr-xr-x   - root supergroup          0 2020-04-29 09:05 /user/hbase/data
-rw-r--r--   3 root supergroup         42 2020-04-29 08:54 /user/hbase/hbase.id
-rw-r--r--   3 root supergroup          7 2020-04-29 08:53 /user/hbase/hbase.version
drwxr-xr-x   - root supergroup          0 2020-04-29 08:54 /user/hbase/mobdir
drwxr-xr-x   - root supergroup          0 2020-05-20 03:57 /user/hbase/oldWALs
drwx--x--x   - root supergroup          0 2020-04-29 08:54 /user/hbase/staging

hbase.id文件记录了hbase的id

hbase.version记录了hbase的版本,是一个二进制文件

.tmp是临时目录,是一个空目录

MasterProcWALs目录下含有一个HMaster主节点状态日志文件

WALs日志目录

oldWALs hbase操作相关的旧日志存放目录

data目录是最重要的目录,存储hbase数据,下面含有两个命名空间default和hbase,其中default是默认命名空间,如果创建的表未指定命名空间,将存放在该命名空间下,habse是系统命名空间,他们分别对应default和hbase目录,如果不创建表default目录为空

HBase的数据最终是以HFile的形式存储在HDFS中的,HBase中HFile有着自己的格式。一次memstore的flush会产生一个HFile,一次Compact会导致多个HFile合并成一个。


如上图所示,一个HFile内容是由一个个block组成的,按照block类型可分为:

  1. datablock 存放的key-value数据,一般一个datablock大小默认为64KB,
  2. data index block,其中存放的是datablock的index,索引可以是多级索引,中间索引,叶子索引一般会分布在HFile文件当中。root 索引位置见图中 root data-block-index
  3. bloom filter block, 保存了bloom过滤器的值
  4. meta data block,meta data block有多个,且连续分布,见图中meta data block
  5. Data Index和Meta Index块记录了每个Data块和Meta块的起始点
  6. file-info block, 其中记录了关于文件的一些信息,比如:HFile中最大的key、平均Key长度、HFile创建时间戳、data block使用的编码方式等等
  7. trailer block,每个HFile文件都会有的,对于不同版本的HFile(有V1,V2,V3三个版本,V2和V3相差不大)来说trailer长度可能不一样,但是同一个版本的所有HFile trailer的长度都是一样长的,并且trailer的最后4B一定是版本信息

从上图可以看出在meta data block之前,datablock、bloom filter block,叶子/中间层data block索引是相间分布的,meta block之后就不会再有data block了


每个Data Block除了开头的Magic以外就是一个个KeyValue对拼接而成, Magic内容就是一些随机数字,目的是防止数据损坏
HFile里面的每个KeyValue对就是一个简单的byte数组。这个byte数组里面包含了很多项,并且有固定的结构。

KeyLength和ValueLength:两个固定的长度,分别代表Key和Value的长度
Key部分:
Row Length是固定长度的数值,表示RowKey的长度

Row就是RowKey

Column Family Length是固定长度的数值,表示Family的长度

接着就是Column Family,再接着是Qualifier

然后是两个固定长度的数值,表示Time Stamp和Key Type(Put/Delete)

Value部分:

没有这么复杂的结构,就是纯粹的二进制数据

查看Hfile文件

该文件存储在HDFS中,找到存储路径

[root@k8s-node2 bin]# hadoop dfs -ls /user/hbase/data/default/peoples/1c200c07ab5a9d6001a8ab09930a05dd
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

Found 6 items
-rw-r--r--   3 root supergroup         42 2020-05-17 02:49 /user/hbase/data/default/peoples/1c200c07ab5a9d6001a8ab09930a05dd/.regioninfo
drwxr-xr-x   - root supergroup          0 2020-05-18 22:00 /user/hbase/data/default/peoples/1c200c07ab5a9d6001a8ab09930a05dd/.tmp
drwxr-xr-x   - root supergroup          0 2020-05-19 15:20 /user/hbase/data/default/peoples/1c200c07ab5a9d6001a8ab09930a05dd/contactinfo
drwxr-xr-x   - root supergroup          0 2020-05-19 15:20 /user/hbase/data/default/peoples/1c200c07ab5a9d6001a8ab09930a05dd/name
drwxr-xr-x   - root supergroup          0 2020-05-19 15:20 /user/hbase/data/default/peoples/1c200c07ab5a9d6001a8ab09930a05dd/personalinfo
drwxr-xr-x   - root supergroup          0 2020-05-18 20:56 /user/hbase/data/default/peoples/1c200c07ab5a9d6001a8ab09930a05dd/recovered.edits
[root@k8s-node2 bin]# hadoop dfs -ls /user/hbase/data/default/peoples/1c200c07ab5a9d6001a8ab09930a05dd/name

上面name等标黄的都是列族名,里面是HFile文件

hadoop dfs -ls /user/hbase/data/default/peoples/1c200c07ab5a9d6001a8ab09930a05dd/name
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

Found 1 items
-rw-r--r--   3 root supergroup       7221 2020-05-19 15:20 /user/hbase/data/default/peoples/1c200c07ab5a9d6001a8ab09930a05dd/name/779e4a6e0170419093d300a2911a95b5

该文件是二进制的无法直接查看,hbase为我们提供了工具

[root@k8s-node2 bin]# ./hbase org.apache.hadoop.hbase.io.hfile.HFile -e -p -f /user/hbase/data/default/peoples/1c200c07ab5a9d6001a8ab09930a05dd/name/779e4a6e0170419093d300a2911a95b5
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/bigdata/hadoop-2.9.2/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/bigdata/hbase-2.2.4/lib/client-facing-thirdparty/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

K: 0/name:first/1589857087753/Put/vlen=6/seqid=24 V: Marcel
K: 0/name:last/1589857087753/Put/vlen=6/seqid=24 V: Haddad
K: 1/name:first/1589857087803/Put/vlen=8/seqid=25 V: Franklin
K: 1/name:last/1589857087803/Put/vlen=5/seqid=25 V: Holtz
K: 1/name:name1/1589855012535/Put/vlen=2/seqid=23 V: ww
...

K代表key,V代表value

1/name:name1/1589855012535/Put/vlen=2/seqid=23

keyrow/family:column/timestamp/datatype/valuelength/seqid

 Java API中提供了Cell接口,每个cell实质都是keyvalue对,可以看成keyrow+family+column+timestamp,KeyValue类实现了Cell接口

 

4.KV存储引擎 LSM Tree

5.存储引擎组件及作用

6.写入与读取

客户端写keyvalue的数据,首先请求到regionserver,先落盘写Write Ahead log,成功则写入Memstore,写入完成后,会定期将Memstore信息通过compaction合并到HFile,写可能落到region1也可能落到region2

client根据master路由到不同的region上,先在Memstore里读,若有,直接返回,若没有,找对应的HFile,先从最新的HFile读,一层层读到最老的

HLog(WAL log)
WAL 意为Write ahead log(http://en.wikipedia.org/wiki/Write-ahead_logging),类似mysql中的binlog,用来做灾难恢复只用,Hlog记录数据的所有变更。在分布式系统环境中,无法避免系统出错或者宕机,一旦HRegionServer以外退出,MemStore中的内存数据就会丢失,引入HLog就是防止这种情况
每个Region Server维护一个Hlog,而不是每个Region一个。这样不同region(来自不同table)的日志会混在一起,这样做的目的是不断追加单个文件相对于同时写多个文件而言,可以减少磁盘寻址次数,因此可以提高对table的写性能。带来的麻烦是,如果一台region server下线,为了恢复其上的region,需要将region server上的log进行拆分,然后分发到其它region server上进行恢复。
每次用户操作写入Memstore的同时,也会写一份数据到HLog文件,HLog文件定期会滚动出新,并删除旧的文件(已持久化到StoreFile中的数据)。当HRegionServer意外终止后,HMaster会通过Zookeeper感知,HMaster首先处理遗留的HLog文件,将不同region的log数据拆分,分别放到相应region目录下,然后再将失效的region重新分配,领取到这些region的HRegionServer在Load Region的过程中,会发现有历史HLog需要处理,因此会Replay HLog中的数据到MemStore中,然后flush到StoreFiles,完成数据恢复。

HBase WAL解析

7.写入顺序

  1. WAL:数据被发出之后第一时间被写入WAL。由于WAL是基于HDFS来实现的,所以也可以说现在cell就已经被持久化了,但是WAL只是一个暂存的日志,它是不区分Store的。这些数据是不能被直接读取和使用
  2. Memstore:数据随后会立即被放入Memstore中进行整理。Memstore会负责按照LSM树的结构来存放数据。这个过程就像我们在打牌的时候,抓牌之后在手上对牌进行整理的过程。
  3. HFile:最后当Memstore到达一定的大小阈值,或者达到了刷写时间间隔阀值的时候,HBaes会把这个Memstore的内容刷写到HDFS上一个新的HFile文件。至此数据真正地被持久化到硬盘上,就算宕机、断电数据也不会丢失了。

HBase minor and major compaction

写的越来越多的时候,会出现写放大,不断写,不断compaction,磁盘上产生了大量的HFile,MemStore产生了太多的小的文件,于是,读的时候,大量的HFile文件都要读一遍,造成大量小文件读的问题。

通过合并小文件解决

Major Compaction最激进的是把所有HFile都合并成一个。这种合并过程中要大量归并排序,进行大量IO,造成在线服务一段时间不能运行,放在夜里等时间做

Minor Copaction,将部分HFile合并成部分大的HFile

8.Region Size过大Region Split

9.容错性

进程组件Master/Region Server

zookeeper replication持久化元数据信息副本

HA:zookeeper leader election of Master

数据

WAL 持久化操作,保证memstore数据可以通过replay恢复

HDFS replication持久化数据多副本,保证丢失File可以被恢复

10.Replication

11.Crash Recovery

12.Leader Election

13.RowKey设计

HBase设计适合读写Pattern的key,数据负载均衡与高效顺序(Scan)读取时常矛盾

  • Sequential Keys
  • Saulted Keys
  • Promoted field Keys
  • Hash Keys

三、分布式集群部属

HBase和Hadoop版本兼容性

主要目录结构

 bin目录主要脚本

*.rb

工具脚本

运行方式:hbase-jruby脚本 参数

hbase-cleanup.sh 删除zk或hdfs内容
hbase 最终被调用脚本
hbase-config.sh 启动环境配置脚本,一般不直接调用
hbase-daemon.sh 组件启动脚本

conf目录主要配置文件

hbase-env.sh 环境变量配置
hbase-site.xml 运行参数配置
log4j.properties log配置

Shell

 HBase shell example

 命令

alter 修改列族(column family)模式
count 统计表中行的数量
create 创建表
describe 显示表相关的详细信息
delete 删除指定对象的值(可以为表,行,列对应的值,另外也可以指定时间戳的值)
deleteall 删除指定行的所有元素值
disable 使表无效
drop 删除表
enable 使表有效
exists 测试表是否存在
exit 退出hbase shell
get 获取行或单元(cell)的值
incr 增加指定表,行或列的值
list 列出hbase中存在的所有表
put 向指向的表单元添加值
tools 列出hbase所支持的工具
scan 通过对表的扫描来获取对用的值
status 返回hbase集群的状态信息
shutdown 关闭hbase集群(与exit不同)
truncate 重新创建指定表
version 返回hbase版本信息

表的管理
1)通过list可以列出所有已创建的表(除-ROOT表和.META表(被过滤掉了))

hbase(main)> list

2)创建表,其中t1是表名,f1、f2是t1的列族。hbase中的表至少有一个列族.它们之中,列族直接影响hbase数据存储的物理特性。

# 语法:create <table>, {NAME => <family>, VERSIONS => <VERSIONS>}
# 例如:创建表t1,有两个family name:f1,f2,且版本数均为2

hbase(main)> create 't1',{NAME => 'f1', VERSIONS => 2},{NAME => 'f2', VERSIONS => 2}

3)删除表
分两步:首先disable,然后drop
例如:删除表t1
hbase(main)> disable 't1'

hbase(main)> drop 't1'

4)查看表的结构 
# 语法:describe(desc) <table> (可以看到这个表的所有默认参数)
# 例如:查看表t1的结构

hbase(main)> describe 't1' / desc  't1'

5)修改表结构
修改表结构必须先disable

# 语法:alter 't1', {NAME => 'f1'}, {NAME => 'f2', METHOD => 'delete'}
# 例如:修改表test1的cf的TTL为180天
hbase(main)> disable 'test1'
hbase(main)> alter 'test1',{NAME=>'body',TTL=>'15552000'},{NAME=>'meta', TTL=>'15552000'}

hbase(main)> enable 'test1'

权限管理

1)分配权限

# 语法 : grant <user> <permissions> <table> <column family> <column qualifier> 参数后面用逗号分隔

# 权限用五个字母表示: "RWXCA".

# READ('R'), WRITE('W'), EXEC('X'), CREATE('C'), ADMIN('A')
# 例如,给用户‘test'分配对表t1有读写的权限,
hbase(main)> grant 'test','RW','t1'
2)查看权限


# 语法:user_permission <table>
# 例如,查看表t1的权限列表
hbase(main)> user_permission 't1'
3)收回权限


# 与分配权限类似,语法:revoke <user> <table> <column family> <column qualifier>
# 例如,收回test用户在表t1上的权限
hbase(main)> revoke 'test','t1'
表数据的增删改查
1)添加数据
# 语法:put <table>,<rowkey>,<family:column>,<value>,<timestamp>
# 例如:给表t1的添加一行记录:rowkey是rowkey001,family name:f1,column name:col1,value:value01,timestamp:系统默认

hbase(main)> put 't1','rowkey001','f1:col1','value01'

用法比较单一。
2)查询数据
a)查询某行记录

# 语法:get <table>,<rowkey>,[<family:column>,....]
# 例如:查询表t1,rowkey001中的f1下的col1的值
hbase(main)> get 't1','rowkey001', 'f1:col1'
# 或者:
hbase(main)> get 't1','rowkey001', {COLUMN=>'f1:col1'}
# 查询表t1,rowke002中的f1下的所有列值

hbase(main)> get 't1','rowkey001'

b)扫描表


# 语法:scan <table>, {COLUMNS => [ <family:column>,.... ], LIMIT => num}
# 另外,还可以添加STARTROW、TIMERANGE和FITLER等高级功能
# 例如:扫描表t1的前5条数据
hbase(main)> scan 't1',{LIMIT=>5}

c)查询表中的数据行数
# 语法:count <table>, {INTERVAL => intervalNum, CACHE => cacheNum}
# INTERVAL设置多少行显示一次及对应的rowkey,默认1000;CACHE每次去取的缓存区大小,默认是10,调整该参数可提高查询速度
# 例如,查询表t1中的行数,每100条显示一次,缓存区为500
hbase(main)> count 't1', {INTERVAL => 100, CACHE => 500}
3)删除数据
a )删除行中的某个列值

# 语法:delete <table>, <rowkey>,  <family:column> , <timestamp>,必须指定列名
# 例如:删除表t1,rowkey001中的f1:col1的数据
hbase(main)> delete 't1','rowkey001','f1:col1'
注:将删除改行f1:col1列所有版本的数据
b )删除行


# 语法:deleteall <table>, <rowkey>,  <family:column> , <timestamp>,可以不指定列名,删除整行数据
# 例如:删除表t1,rowk001的数据
hbase(main)> deleteall 't1','rowkey001'
c)删除表中的所有数据

# 语法: truncate <table>
# 其具体过程是:disable table -> drop table -> create table
# 例如:删除表t1的所有数据
hbase(main)> truncate 't1'
Region管理
1)移动region
# 语法:move 'encodeRegionName', 'ServerName'
# encodeRegionName指的regioName后面的编码,ServerName指的是master-status的Region Servers列表
# 示例
hbase(main)>move '4343995a58be8e5bbc739af1e91cd72d', 'db-41.xxx.xxx.org,60020,1390274516739'
2)开启/关闭region


# 语法:balance_switch true|false
hbase(main)> balance_switch
3)手动split


# 语法:split 'regionName', 'splitKey'
4)手动触发major compaction


#语法:
#Compact all regions in a table:
#hbase> major_compact 't1'
#Compact an entire region:
#hbase> major_compact 'r1'
#Compact a single column family within a region:
#hbase> major_compact 'r1', 'c1'
#Compact a single column family within a table:
#hbase> major_compact 't1', 'c1'
配置管理及节点重启
1)修改hdfs配置
hdfs配置位置:/etc/hadoop/conf
# 同步hdfs配置
cat /home/hadoop/slaves|xargs -i -t scp /etc/hadoop/conf/hdfs-site.xml hadoop@{}:/etc/hadoop/conf/hdfs-site.xml
#关闭:
cat /home/hadoop/slaves|xargs -i -t ssh hadoop@{} "sudo /home/hadoop/cdh4/hadoop-2.0.0-cdh4.2.1/sbin/hadoop-daemon.sh --config /etc/hadoop/conf stop datanode"
#启动:
cat /home/hadoop/slaves|xargs -i -t ssh hadoop@{} "sudo /home/hadoop/cdh4/hadoop-2.0.0-cdh4.2.1/sbin/hadoop-daemon.sh --config /etc/hadoop/conf start datanode"
2)修改hbase配置
hbase配置位置:


# 同步hbase配置
cat /home/hadoop/hbase/conf/regionservers|xargs -i -t scp /home/hadoop/hbase/conf/hbase-site.xml hadoop@{}:/home/hadoop/hbase/conf/hbase-site.xml
 
# graceful重启
cd ~/hbase
bin/graceful_stop.sh --restart --reload --debug inspurXXX.xxx.xxx.org

分布式HBase部属

三个节点:node01、node02、node03,都要进行如下步骤:

将HBase下载到/bigdata文件夹下

解压 tar -zxvf hbase-2.1.5-bin.tar.gz

进入HBase的conf文件夹

cd hbase-2.1.5/conf

配置hbase-env.sh

设置jdk路径:export JAVA_HOME=/usr/local/jdk

启用外部zookeeper:export HBASE_MANAGES_ZK=false

配置hbase-site.xml

<configuration>
    <property>
        <name>hbase.zookeeper.property.dataDir</name>
        <value>/usr/local/zookeeper/data</value>
    </property>
    <property>
        <name>hbase.cluster.distributed</name>
        <value>true</value>
    </property>
    <property>
        <name>hbase.rootdir</name>
        <value>hdfs://node02:9000/user/hbase</value>
    </property>
    <property>
        <name>hbase.zookeeper.quorum</name>
        <value>node01:2181,node02:2181,node03:2181</value>
    </property>
</configuration>

配置regionservers:node01

                                node03

创建backup-masters: node01

进入lib下,拷贝client-facing-thirdparty下的jar包到lib目录:

cp client-facing-thirdparty/htrace-core-3.1.0-incubating.jar

node02为master,在node02中启动hbase

查看日志出现了下面错误

 搜索说是hdfs的node02是standby的,不可以。将node02设置为active即可。

四、Java API

DDL操作

配置

public class HBaseConfigUtil {
    public static Configuration getHBaseConfiguration() {
        Configuration configuration = HBaseConfiguration.create();
        configuration.addResource(new Path("/bigdata/hbase-2.1.5/conf/hbase-site.xml"));
        return configuration;
    }
}

创建表格

public class CreateTable {
    public static void main(String[] args) {
        Configuration conf = HBaseConfigUtil.getHBaseConfiguration();
        Connection connection = null;
        Admin admin = null;

        try {
            connection = ConnectionFactory.createConnection(conf);
            admin = connection.getAdmin();

            String tableName = "peoples";

            if(!admin.isTableAvailable(TableName.valueOf(tableName))) {
                HTableDescriptor hbaseTable = new HTableDescriptor(TableName.valueOf(tableName));
                hbaseTable.addFamily(new HColumnDescriptor("name"));
                hbaseTable.addFamily(new HColumnDescriptor("contactinfo"));
                hbaseTable.addFamily(new HColumnDescriptor("personalinfo"));
                admin.createTable(hbaseTable);
            }
        } catch (Exception e) {
          e.printStackTrace();
        } finally {
            try{
                if(admin != null) {
                    admin.close();
                }
                if(connection != null && !connection.isClosed()) {
                    connection.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }
}
 java -cp HBaseJava-jar-with-dependencies.jar com.hbasejava.example.CreateTable

进入shell

cd /bigdata/hbase-2.1.5/bin
./hbase shell

列出所有表,并扫描

hbase(main):001:0> list
TABLE
peoples
1 row(s)
Took 2.5868 seconds
=> ["peoples"]

hbase(main):002:0> scan 'peoples'
ROW                                        COLUMN+CELL
0 row(s)
Took 0.7385 seconds

向表中插入数据

public class InsertIntoTable {
    public static void main(String[] args) {
        InsertIntoTable object = new InsertIntoTable();
        object.insertRecords();
    }

    public void insertRecords() {
        Configuration config = HBaseConfigUtil.getHBaseConfiguration();
        Connection connection = null;
        Table table = null;

        try {
            connection = ConnectionFactory.createConnection(config);
            table = connection.getTable(TableName.valueOf("peoples"));

            //            creating sample data that can be used to save into hbase table
            String[][] people = {
                    { "1", "Marcel", "Haddad", "marcel@xyz.com", "M", "26", "www.google.com" },
                    { "2", "Franklin", "Holtz", "franklin@xyz.com", "M", "24", "www.bing.com" },
                    { "3", "Dwayne", "McKee", "dwayne@xyz.com", "M", "27", "www.bing.com" },
                    { "4", "Rae", "Schroeder", "rae@xyz.com", "F", "31", "www.baidu.com" },
                    { "5", "Rosalie", "burton", "rosalie@xyz.com", "F", "25", "www.baidu.com" },
                    { "6", "Gabriela", "Ingram", "gabriela@xyz.com", "F", "24", "www.baidu.com" },
                    { "7", "Marcel", "Haddad", "marcel@xyz.com", "M", "26", "www.facebook.com" },
                    { "8", "Franklin", "Holtz", "franklin@xyz.com", "M", "24", "www.facebook.com" },
                    { "9", "Dwayne", "McKee", "dwayne@xyz.com", "M", "27", "www.google.com" },
                    { "10", "Rae", "Schroeder", "rae@xyz.com", "F", "31", "www.google.com" },
                    { "11", "Rosalie", "burton", "rosalie@xyz.com", "F", "25", "www.google.com" },
                    { "12", "Gabriela", "Ingram", "gabriela@xyz.com", "F", "24", "www.google.com" } };

            for (int i = 0; i < people.length; i ++) {
                Put person = new Put(Bytes.toBytes(i));
                person.addColumn(Bytes.toBytes("name"), Bytes.toBytes("first"), Bytes.toBytes(people[i][1]));
                person.addColumn(Bytes.toBytes("name"), Bytes.toBytes("last"), Bytes.toBytes(people[i][2]));
                person.addColumn(Bytes.toBytes("contactinfo"), Bytes.toBytes("email"), Bytes.toBytes(people[i][3]));
                person.addColumn(Bytes.toBytes("personalinfo"), Bytes.toBytes("gender"), Bytes.toBytes(people[i][4]));
                person.addColumn(Bytes.toBytes("personalinfo"), Bytes.toBytes("age"), Bytes.toBytes(people[i][5]));
                person.addColumn(Bytes.toBytes("personalinfo"), Bytes.toBytes("web"), Bytes.toBytes(people[i][6]));
                table.put(person);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try{
                if(connection != null && !connection.isClosed()) {
                    connection.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }
}

列出所有表

public class ListTables {
    public static void main(String[] args) {
        ListTables object = new ListTables();
        object.listTables();
    }

    public void listTables() {
        Configuration config = HBaseConfigUtil.getHBaseConfiguration();
        Connection connection = null;
        Admin admin = null;

        try {
            connection = ConnectionFactory.createConnection(config);
            admin = connection.getAdmin();

            HTableDescriptor tableDescriptor[] = admin.listTables();

            for(int i = 0; i < tableDescriptor.length; i ++) {
                System.out.println(tableDescriptor[i].getNameAsString());
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try{
                if(admin != null) {
                    admin.close();
                }
                if(connection != null && !connection.isClosed()) {
                    connection.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }
}

读取指定行

public class ReadTable {
    public static void main(String[] args) {
        ReadTable object = new ReadTable();
        object.readTableData(args[0]);
    }

    public void readTableData(String rowKey) {
        Configuration config = HBaseConfigUtil.getHBaseConfiguration();
        Connection connection = null;
        Table table = null;

        try {
            connection = ConnectionFactory.createConnection(config);
            table = connection.getTable(TableName.valueOf("peoples"));

            Get get = new Get(Bytes.toBytes(rowKey));

            Result result = table.get(get);

            byte[] firstNameValue = result.getValue(Bytes.toBytes("name"), Bytes.toBytes("first"));
            byte[] lastNameValue = result.getValue(Bytes.toBytes("name"), Bytes.toBytes("last"));
            byte[] emailValue = result.getValue(Bytes.toBytes("contactinfo"), Bytes.toBytes("email"));
            byte[] genderValue = result.getValue(Bytes.toBytes("personalinfo"), Bytes.toBytes("gender"));
            byte[] ageValue = result.getValue(Bytes.toBytes("personalinfo"), Bytes.toBytes("age"));

            String firstName = Bytes.toString(firstNameValue);
            String lastName = Bytes.toString(lastNameValue);
            String email = Bytes.toString(emailValue);
            String gender = Bytes.toString(genderValue);
            String age = Bytes.toString(ageValue);
            System.out.println("First Name:" + firstName);
            System.out.println("last Name:" + lastName);
            System.out.println("email:" + email);
            System.out.println("gender:" + gender);
            System.out.println("age:" + age);
            System.out.println("finished Get");

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try{
                if(connection != null && !connection.isClosed()) {
                    connection.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }
}

扫描全表

public class ScanTable {
    public static void main(String[] args) {
        ScanTable object = new ScanTable();
        object.scanTableData();
    }

    public void scanTableData() {
        Configuration config = HBaseConfigUtil.getHBaseConfiguration();
        Connection connection = null;
        Table table = null;
        ResultScanner resultScanner = null;

        try {
            //建立连接
            connection = ConnectionFactory.createConnection(config);
            //取到table
            table = connection.getTable(TableName.valueOf("peoples"));

            Scan scan = new Scan();
            //增加想读出的列的名字
            scan.addColumn(Bytes.toBytes("name"), Bytes.toBytes("first"));
            scan.addColumn(Bytes.toBytes("name"), Bytes.toBytes("last"));
            scan.addColumn(Bytes.toBytes("contactinfo"), Bytes.toBytes("email"));
            scan.addColumn(Bytes.toBytes("personalinfo"), Bytes.toBytes("gender"));
            scan.addColumn(Bytes.toBytes("personalinfo"), Bytes.toBytes("age"));

            resultScanner = table.getScanner(scan);
            //通过next遍历    
            for(Result result = resultScanner.next(); result != null; result = resultScanner.next()) {
                //二进制数组
                byte[] firstNameValue = result.getValue(Bytes.toBytes("name"), Bytes.toBytes("first"));
                byte[] lastNameValue = result.getValue(Bytes.toBytes("name"), Bytes.toBytes("last"));
                byte[] emailValue = result.getValue(Bytes.toBytes("contactinfo"), Bytes.toBytes("email"));
                byte[] genderValue = result.getValue(Bytes.toBytes("personalinfo"), Bytes.toBytes("gender"));
                byte[] ageValue = result.getValue(Bytes.toBytes("personalinfo"), Bytes.toBytes("age"));
                //转为字符串
                String firstName = Bytes.toString(firstNameValue);
                String lastName = Bytes.toString(lastNameValue);
                String email = Bytes.toString(emailValue);
                String gender = Bytes.toString(genderValue);
                String age = Bytes.toString(ageValue);

                System.out.println("First Name:" + firstName);
                System.out.println("last Name:" + lastName);
                System.out.println("email:" + email);
                System.out.println("gender:" + gender);
                System.out.println("age:" + age);
                System.out.println("finished Get");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try{
                if(connection != null && !connection.isClosed()) {
                    connection.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }
}
public class FilterTable {
    public static void main(String[] args) {
        FilterTable object = new FilterTable();
        object.scanFilterTableData();
    }

    public void scanFilterTableData() {
        Configuration config = HBaseConfigUtil.getHBaseConfiguration();
        Connection connection = null;
        Table table = null;
        ResultScanner resultScanner = null;

        try {
            connection = ConnectionFactory.createConnection(config);
            table = connection.getTable(TableName.valueOf("peoples"));

            SingleColumnValueFilter filter1 = new SingleColumnValueFilter(Bytes.toBytes("personalinfo"), Bytes.toBytes("gender"), CompareOp.EQUAL, Bytes.toBytes("F"));
            SingleColumnValueFilter filter2 = new SingleColumnValueFilter(Bytes.toBytes("personalinfo"), Bytes.toBytes("age"), CompareOp.EQUAL, Bytes.toBytes("25"));

            FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);

            filterList.addFilter(filter1);
            filterList.addFilter(filter2);

            Scan scan = new Scan();
            scan.setFilter(filterList);
            scan.addColumn(Bytes.toBytes("name"), Bytes.toBytes("first"));
            scan.addColumn(Bytes.toBytes("name"), Bytes.toBytes("last"));
            scan.addColumn(Bytes.toBytes("contactinfo"), Bytes.toBytes("email"));
            scan.addColumn(Bytes.toBytes("personalinfo"), Bytes.toBytes("gender"));
            scan.addColumn(Bytes.toBytes("personalinfo"), Bytes.toBytes("age"));

            resultScanner = table.getScanner(scan);

            for(Result result = resultScanner.next(); result != null; result = resultScanner.next()) {
                byte[] firstNameValue = result.getValue(Bytes.toBytes("name"), Bytes.toBytes("first"));
                byte[] lastNameValue = result.getValue(Bytes.toBytes("name"), Bytes.toBytes("last"));
                byte[] emailValue = result.getValue(Bytes.toBytes("contactinfo"), Bytes.toBytes("email"));
                byte[] genderValue = result.getValue(Bytes.toBytes("personalinfo"), Bytes.toBytes("gender"));
                byte[] ageValue = result.getValue(Bytes.toBytes("personalinfo"), Bytes.toBytes("age"));

                String firstName = Bytes.toString(firstNameValue);
                String lastName = Bytes.toString(lastNameValue);
                String email = Bytes.toString(emailValue);
                String gender = Bytes.toString(genderValue);
                String age = Bytes.toString(ageValue);

                System.out.println("First Name:" + firstName);
                System.out.println("last Name:" + lastName);
                System.out.println("email:" + email);
                System.out.println("gender:" + gender);
                System.out.println("age:" + age);
                System.out.println("finished Get");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try{
                if(connection != null && !connection.isClosed()) {
                    connection.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }
}
public class DeleteTable {
    public static void main(String[] args) {
        Configuration conf = HBaseConfigUtil.getHBaseConfiguration();
        Connection connection = null;
        Admin admin = null;

        try {
            connection = ConnectionFactory.createConnection(conf);
            admin = connection.getAdmin();

            TableName tableName = TableName.valueOf("peoples");

            if(admin.isTableAvailable(tableName)) {
                admin.disableTable(tableName);
                admin.deleteTable(tableName);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try{
                if(admin != null) {
                    admin.close();
                }
                if(connection != null && !connection.isClosed()) {
                    connection.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }
}

五、使用MapReduce

1.环境配置

pom.xml配置Hadoop依赖和HBase依赖

 2.任务

学生数据分析

 参考资料

新建Maven项目

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example.hbasemr</groupId>
    <artifactId>HBaseMapReduce</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
    </properties>
    <repositories>
        <repository>
            <id>nexus-aliyun</id>
            <name>Nexus aliyun</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-mapreduce</artifactId>
            <version>2.1.0</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>${project.artifactId}</finalName>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.3</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.example.hbasemr.Driver</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

测试数据导入

hadoop jar HBaseMR-jar-with-dependencies.jar ImportFromHDFS 

MapReduce HBase数据分析

新建main函数,先空着

定义mapper和reducer

mapper

是TableMapper,只需keyout和valueout

TEXT 年龄 InWritable 1

重写map方法

判断是否有该列,如果有就读取

public class HBaseToHDFS {
    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("fs.defaultFS", "hdfs://node002:9000/");
        conf.set("hbase.zookeeper.quorum", "node001:2181,node003:2181,node003:2181");

        FileSystem fs = FileSystem.get(conf);

        Job job = Job.getInstance(conf);
        job.setJarByClass(HBaseToHDFS.class);

        Scan scan = new Scan();
        scan.addColumn("info".getBytes(), "age".getBytes());

        TableMapReduceUtil.initTableMapperJob(
                "student".getBytes(),
                scan,
                HBaseToHDFSMapper.class,
                Text.class,
                IntWritable.class,
                job,
                false
        );

        job.setReducerClass(HBaseToHDFSMapper.HBaseToHDFSReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(DoubleWritable.class);

        Path outputPath = new Path("/student/avgresult");

        if(fs.exists(outputPath)) {
            fs.delete(outputPath);
        }

        FileOutputFormat.setOutputPath(job, outputPath);
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class HBaseToHDFSMapper extends TableMapper<Text, IntWritable> {
        Text outKey = new Text("age");
        IntWritable outValue = new IntWritable();

        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            boolean isContainsColumn = value.containsColumn("info".getBytes(), "age".getBytes());
            if(isContainsColumn) {
                List<Cell> listCells = value.getColumnCells("info".getBytes(), "age".getBytes());
                Cell cell = listCells.get(0);
                byte[] cloneValue = CellUtil.cloneValue(cell); //克隆数据
                String ageValue = Bytes.toString(cloneValue); //转为字符串
                outValue.set(Integer.parseInt(ageValue)); //将outValue设为当期的年龄
                context.write(outKey, outValue); //写到map的输出中
            }
        }

        public static class HBaseToHDFSReducer extends Reducer<Text, IntWritable, Text, DoubleWritable> {

            DoubleWritable outValue = new DoubleWritable();

            @Override
            //输入是map的输出
            protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int count = 0;
                int sum = 0;

                for(IntWritable value: values) { //遍历当前的所有年龄
                    count ++;
                    sum += value.get();
                }

                double avgAge = sum * 1.0 / count;
                outValue.set(avgAge);
                context.write(key, outValue); //写入到HDFS
            }
        }
    }
}
原文地址:https://www.cnblogs.com/aidata/p/11570791.html