akka-persistence-hbase-master源码分析之创建Journal表

 akka-persistence-hbase-master是负责akka持久化的组件,采用async方式来操作hbase,讲akka的日志持久化到hbase中。

 akka-persistence-hbase-master持久化之前首先要创建Journal表,关于表的配置存放在application.conf文件中,

akka.persistence.hbase.journal.HBaseJournalInit对象负责关于Journal表的管理,获取hbase配置的代码如下:
  def getHBaseConfig(config: Config): Configuration = {
    val c = new Configuration()

    val journalConfig = config.getConfig("hbase-journal")//获取hbase-journal标签的配置
    val hbaseConfig = journalConfig.getConfig("hadoop-pass-through")//获取hadoop-pass-through下的配置
  //将获取到的配置按照key:value的形式返回(与hbase-site.xml的配置相同)
    hbaseConfig.entrySet().asScala foreach { e =>
      c.set(e.getKey, e.getValue.unwrapped.toString)
    }
    c
  }

 获取到配置后,还需要表名和列族名才可以创建表,akka-persistence-hbase-master默认创建的表名为"akka_messages",列族名为"message",如需修改,可在application.conf文件的hbase-journal标签下添加如下配置:

table = "tablename"

family = "familyname"

 配置,表名,列族名都拿到之后,就会执行doInitTable方法(创建表依旧采用hbase自带的hbaseadmin来创建),代码如下:

private def doInitTable(admin: HBaseAdmin, tableName: String, familyName: String): Boolean = {
    if (admin.tableExists(tableName)) {
      val tableDesc = admin.getTableDescriptor(toBytes(tableName))
      if (tableDesc.getFamily(toBytes(familyName)) == null) {
        // target family does not exists, will add it.
        admin.addColumn(familyName, new HColumnDescriptor(familyName))
        true
      } else {
        // existing table is OK, no modifications run.
        false
      }
    } else {
      val tableDesc = new HTableDescriptor(toBytes(tableName))
      tableDesc.addFamily(new HColumnDescriptor(familyName))

      admin.createTable(tableDesc)
      true
    }
  }

创建Journal表之后还会创建snapshot表,步骤与Journal类似。

之后还会执行Thread.sleep(2000)

转载请注明出处,期待共同进步...
原文地址:https://www.cnblogs.com/zhangyukun/p/4168790.html