hbase

一. Hbase的逻辑模型与物理模型

  1. 逻辑结构
    (1)行健 : 用于唯一标识一行记录 , 数据的内容只能通过行健获得 (行健本身也是字段,相当于PK主键)
    (2)列族 : 一系列列的集合 , 每个列族以单独的文件形式存在 . 不同的列分到不同的列族, 使得不同的列写往不同的文件。 可以提高并发写的效率(关系型数据库是行式存储, 不存在列族的概念)
    (3)列标识符 : 列就是字段
    (4)单元格 : 行健, 列族, 列形成了单元格 , 里面存储数据
    (5)时间戳 : 单元格中不只存储一行中一个字段的数据 , 而是存储了该字段的多个版本数据. eg:搜索引擎要记录下网页的变化 , 所以进行采用多个版本的数据存储

  2. 物理结构
    (1)行健 : hbase中的每条数据都按行健的递增顺序排列 (rdbms中数据是按照插入的顺序存储的)
    (2) Hfile : Hfile是hbase用于存储列族的二进制文件 , 一个列族会有多个Hfile存储(划分region) . 而一个HFile只能存储一个列族
    (3)region : 一系列row key的集合. 一个表可以拆成多个region. 因为单机的存储容量有限. 让每个节点值存储几个region. 而region就是所有列族的部分数据的集合,这样就形成了行的分布式存储;又由于hbase的所有region存在与不同的机器,在查询时形成并发查询

  3. hbase的存储模式 - 数据坐标
    (1)关系型数据库采用2维坐标存储数据, 行坐标 , 列坐标
    (2)hbase采用四维坐标确定一个value, 行健:列族:列:时间戳. hbase也可看做一个<k,v>数据库, 四维坐标为key,值是value。单元格的维度越少,value的值范围越广

二.Hbase的体系结构

源码 : HRegionServer -> HRegion -> HStore (一个Region的一个列族) -> MemStore -> StoreFile -> HFile

  1. HRegionServer :
    HregionServer是用户访问的对象 , 每个HRegion都在HregionServer中注册,HRegionServer是一台节点上的进程,这个进程管理着region这个逻辑范围,region中包含很多的hfile,hfile被上传到hdfs中分块
  2. Hregion
    Hregion是一些行的集合, 包含rowkey的起始结束标识,用来快速找到rowkey在哪个hregion中 HRegion由HStore组成.
  3. HStore :
    一个HStore只能存放一个Region中的一个列族 , 一个完整的列族会存放在多个HStore中。HStore由1个memstore和0个或多个StoreFile组成
  4. StoreFile :
    一个Hstore是一个Region中一个列族的数据 , 当客户端向HStore(列族)写入数据时 , HStore中的数据先放在内存
    当数据增多时, 会刷一部分数据到磁盘中. 这部分刷到磁盘中的数据就是StoreFile
  5. MemStore :
    memsdtore的方法不能被并行调用 , 操作memstore必须同时拥有读锁写锁
  6. HFile :
    StoreFile只是对memstore的一种"虚拟内存",StoreFile还有负责查询等存储管理功能,StoreFile再把数据写到hdfs中, 真正存储数据的是Hfile这种二进制文件,HFile只是文件没有其他职能
    【注】:所有HBase的优化, 都是对HFile的优化 . HFile是键值对的文件 , <行健:列值>
  7. Hlog
    HRegionServer除了把数据分发到每个Region外, 还有一个叫做Hlog的东西写(HLog写入成功后,再向内存写) , 是为了确保HStore宕机时, 数据可以从HLog找回, 但是HLog中的数据是按照插入顺序写的, 不是按照行健排序 , 所以数据杂乱无章, 但是这种按写入顺序写的速度又比写Hfile这种按行健排序后再写hdfs的方式快 , 又因为MemStore 不安全, HFile写数据又慢,所以产生了hlog。 HLog在达到一定量后会自动删除;Hlog的数据结构:LSM(log structured merge) Tree : 适合磁盘大量随机读写的数据结构
    【注】:当Hlog和MemStore中都有数据时, Hbase写入数据成功

三.Hfile的结构


data是真实数据,meta元数据,这两个东西都有索引,索引以. data称作hbase的block , 这个block的结构如上图
每个block以magic魔数开头,用来区分block. block里面存储着很多<k,v>,键值对的结构如下

为了快速的随机存取, hbase把data(block块)的索引放到内存中 , 即hbase对block进行索引 ,而data中存放的是很多键值对,一个键值对就是一条记录如上图, 所以引入block是为了给记录分块(记录是按照行健的字典顺序排列的), 这样索引block,就是对一定范围的数据进行索引, 加快查询速度.    为什么不按照<k,v>记录索引? hbase中存储大量的<k,v>, 每条记录单独索引会增大索引的数量,索引数量太大会导致查询速度变慢, 索引block不能太大(索引找到data块后再次找到行健变慢), block块也不能太小, 太小会导致和索引每条记录没区别, 官网建议block为1m
【注】:为什么行健,列族,列标识符要尽量短 ?
行健,列族,列标识符名字的长度尽量短, 这样键值对中key的长度减少 , 很多条记录就会减少很多的存储空间.  
存储空间增大, 会导致region切分的频次增大. 可以用a,b,c作为列族, a1,a2,a3作为列标识符. 
为什么mysql的列名不用在意长短? 
mysql中的表结构和数据是分开存储的, 所以列名长短无所谓. hbase的列族,列标识符是根据每个记录都存一遍, 所以会占用很大空间

四. Hbase导入导出数据

  1. MR导出
    public class HBase2MR{
        private static class MyMapper extends TableMapper<Text,Text>{
            Text k2 = new Text();
            Test v2 = new Text();
            @Override
            protected void map(ImmutableBytesWritable key,Result value,Context context){
                String colummnName = context.getConfiguration.get("colummnName");  // 导出需要的列
                Cell cell1 = value.getColumnLatestCell("f1".getBytes(),colummnName.getBytes());
                Cell cell2 = value.getColumnLatestCell("f1".getBytes(),"c2".getBytes());
    
                k2.set(key.get());
                v2.set(new String(cell1.getValue())+"	"+new String(cell2.getValue())); 
                context.write(k2,v2);
            }
        }    
        public static void main(Sttring[] args){
            Configuration conf = HbaseConfiguration.create();
            conf.set("columnName",args[0]);   // 设置需要导出的列名 , 集群共享的信息放到configuration中
            Job job =Job.getInstance(conf,Hbase2MR.class.getSimplename());
    
            TableMapReduceUtil.initTableMapperJob("user2",new Scan(),MyMapper.class,Text.class,Text.class,job);
            FileOutputformat.setOutputPath(job,"hdfs://10.1.198.60:8020");
    
            job.setMapperClass(MyMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setNumReduceTasks(0);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            job.waitForCompletion(true);
        }
    }
    

【注】mapreduce导出数据步骤:
(1)添加hbase jar包到hadoop的classpath中
(2)将脚本加入到~/bashrc
shell HADOOP_CLASSPATH='.' for jar in `ls $HBASE_HOMElib*.jar` do HADOOP_CLASSPATH= $HADOOP_CLASSPATH:$jar done exprt HADOOP_CLASSPATH
【注】:Hbase的export类导出数据 (看该类的源码对比自己的)
shell bin/hbase org.apache.hadoop.hbase.mapreduce.Export 't3' /t31 (表明,hdfs目录)
2. MR导入
```java
static class BatchImportMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
SimpleDateFormat dateformat1=new SimpleDateFormat("yyyyMMddHHmmss");
Text v2 = new Text();

  protected void map(LongWritable key, Text value, Context context) throws java.io.IOException ,InterruptedException {
  final String[] splited = value.toString().split("	");
  try {
  final Date date = new Date(Long.parseLong(splited[0].trim()));
  final String dateFormat = dateformat1.format(date);
  String rowKey = splited[1]+":"+dateFormat;
  v2.set(rowKey+"	"+value.toString());
  context.write(key, v2);
  } catch (NumberFormatException e) {
  final Counter counter = context.getCounter("BatchImport", "ErrorFormat");
  counter.increment(1L);
  System.out.println("出错了"+splited[0]+" "+e.getMessage());
  }
  };
}
static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{
  protected void reduce(LongWritable key, java.lang.Iterable<Text> values,   Context context) throws java.io.IOException ,InterruptedException {
  for (Text text : values) {
  final String[] splited = text.toString().split("	");
 
  public static void main(String[] args) throws Exception {
  final Configuration configuration = new Configuration();
  //设置zookeeper
  configuration.set("hbase.zookeeper.quorum", "hadoop0");
  //设置hbase表名称
  configuration.set(TableOutputFormat.OUTPUT_TABLE, "wlan_log");
  //将该值改大,防止hbase超时退出
  configuration.set("dfs.socket.timeout", "180000");
 
  final Job job = new Job(configuration, "HBaseBatchImport");
    TableMapReduceUtil.addDependencyJars(job);
  job.setJarByClass(BatchImport.class);
  job.setMapperClass(BatchImportMapper.class);
  job.setReducerClass(BatchImportReducer.class);
  //设置map的输出,不设置reduce的输出类型
  job.setMapOutputKeyClass(LongWritable.class);
  job.setMapOutputValueClass(Text.class);
 
  job.setInputFormatClass(TextInputFormat.class);
  //不再设置输出路径,而是设置输出格式类型
  job.setOutputFormatClass(TableOutputFormat.class);
 
  FileInputFormat.setInputPaths(job, "hdfs://hadoop0:9000/input");
 
  job.waitForCompletion(true);
  }
  final Put put = new Put(Bytes.toBytes(splited[0]));
  put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"), Bytes.toBytes(splited[1]));
  //省略其他字段,调用put.add(....)即可
  context.write(NullWritable.get(), put);
  }
  };
}
static class BatchImportReducer extends TableReducer<LongWritable, Text, NullWritable>{
  protected void reduce(LongWritable key, java.lang.Iterable<Text> values,   Context context) throws java.io.IOException ,InterruptedException {
  for (Text text : values) {
  final String[] splited = text.toString().split("	");
 
  final Put put = new Put(Bytes.toBytes(splited[0]));
  put.add(Bytes.toBytes("cf"), Bytes.toBytes("date"), Bytes.toBytes(splited[1]));
  //省略其他字段,调用put.add(....)即可
  context.write(NullWritable.get(), put);
  }
  };
}
```

【注】:import导入
(1)import导入的文件, 必须是export导出的文件
(2)hbae的数据导入导出为了实现类似于create table ... as select * from ...的语法功能
shell bin/hbase org.apache.hadoop.hbase.mapreduce.Import 't1' /t31
(3)该文件会导出sequencefile,下载到linux,用more查看

原文地址:https://www.cnblogs.com/72808ljup/p/5322981.html