Hbase小案例之小新

1、HBase的namespace介绍

namespace:名字空间或命名空间,相当于mysql中的database。

2、namespace的作用

  1)配额管理:限制一个namespace可以使用的资源,包括region和table

  2)命名空间安全管理:提供了另一个层面的多租户安全管理

  3)Region服务器组:一个命名或一张表,可以被固定到一组RegionServers上,从而保证了数据隔离性

3、namespace的基本操作

  1)列出所有的namespace

list_namespace

  2)创建namespace

create_namespace 'nametest'  

3)查看namespace

describe_namespace 'nametest'

4)在namespace下创建表

create 'nametest:testtable', 'fm1' 

5)查看namespace下的表

list_namespace_tables 'nametest'  

6)删除namespace(删除空间的时候,命名空间下的表必须是空,否则先删除表)

drop_namespace 'nametest'  

4、Hbase的数据版本的确界以及TTL(数据存活的时间)

hbase中有版本version 的概念

hbase中针对版本有一个上界和下届的概念

1)版本的下届:最小的版本数据,下届的默认值是0,说明最小版本数是1

2)版本的上界:数据最大的版本,上届的默认值是1。

TTL:数据的存活周期

可以设置在某一个列族上或者是设置在一条数据上(rowkey的数据)

5、通过代码实现版本设定以及数据的TTL

1)创建maven工程并添加jar包

2)代码实现

public class HBaseVersionAndTTL {
    public static void main(String[] args) throws IOException, InterruptedException {
// 创建数据库的连接        
Configuration configuration = HBaseConfiguration.create();      configuration.set("hbase.zookeeper.quorum","hadoop01:2181,hadoop02:2181,hadoop03:2181");
Connection connection = ConnectionFactory.createConnection();
Admin admin = connection.getAdmin();

// 创建一张表
        if(!admin.tableExists(TableName.valueOf("version_hbase"))){
            HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf("version_hbase"));
            HColumnDescriptor f1 = new HColumnDescriptor("f1");
// 设置版本
            f1.setMinVersions(3);
            f1.setMaxVersions(5);
            //针对某一个列族下面所有的列设置TTL
            f1.setTimeToLive(30);  // 存活周期
            hTableDescriptor.addFamily(f1);
            admin.createTable(hTableDescriptor);
        }
        Table version_hbase = connection.getTable(TableName.valueOf("version_hbase"));
        Put put = new Put("1".getBytes());
        //针对某一条具体的数据设置TTL
        //put.setTTL(3000);
  // 指定数据的插入时间      put.addColumn("f1".getBytes(),"name".getBytes(),System.currentTimeMillis(),"zhangsan".getBytes());
        version_hbase.put(put);
        Thread.sleep(1000);
        Put put2 = new Put("1".getBytes());
        put2.addColumn("f1".getBytes(),"name".getBytes(),System.currentTimeMillis(),"zhangsan2".getBytes());
        version_hbase.put(put2);
        Get get = new Get("1".getBytes());
        get.setMaxVersions();
        Result result = version_hbase.get(get);
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
        }
        version_hbase.close();
        connection.close();
    }
}

6、微博案列

1)三张表

2)创建maven工程导入包

3)拷贝配置文件daomaven下

/export/servers/hbase-1.2.0-cdh5.14.0/conf 这个路径下的三个配置文件,分别是 core-site.xml、hdfs-site.xml、hbase-site.xml三个配置文件,拷贝到maven工程的resources资源目录下

4)创建命名空间以及表名的定义

 private String WEIBO_CONTENT = "weibo:content";
    private String WEIBO_RELATIONS = "weibo:relations";
    private String WEIBO_CONTENT_EMAILS = "weibo:receive_content_email";

    private Connection connection;
    private Admin admin;

    @Before
    public void init() throws IOException {
        //构建config对象
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "hadoop01:2181,hadoop02:2181");
        //数据库的连接
        connection = ConnectionFactory.createConnection(conf);
        admin = connection.getAdmin();
    }

    // 创建命名空间
    @Test
    public void createNameSpace() throws IOException {
        // 创建命名空间描述符
        NamespaceDescriptor namespaceDescriptor = NamespaceDescriptor.create("weibo").addConfiguration("ceator", "wang").build();
        // 创建命名空间
        admin.createNamespace(namespaceDescriptor);
    }

5)创建微博内容表

    // 创建微博content表
    @Test
    public void createTableContent() throws IOException {
        if(!admin.tableExists(TableName.valueOf(WEIBO_CONTENT))){
            // 创建表的描述符
            HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(WEIBO_CONTENT));
            HColumnDescriptor info = new HColumnDescriptor("info");
            info.setMinVersions(1);
            info.setMaxVersions(1);
            info.setBlockCacheEnabled(true); //是否开启缓存
            info.setBlocksize(2048*1024);
            tableDescriptor.addFamily(info);
            admin.createTable(tableDescriptor);
        }
    }

6)创建用户关系表

* 创建用户关系表
 * 方法名 createTableRelations
 Table Name    weibo:relations
 RowKey    用户ID
 ColumnFamily  attends、fans
 ColumnLabel   关注用户ID,粉丝用户ID
 ColumnValue   用户ID
 Version   1个版本

 */
    // 创建relations表
    @Test
    public void createTableRelations() throws IOException {
        if (!admin.tableExists(TableName.valueOf(WEIBO_RELATIONS))) {
            HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(WEIBO_RELATIONS));
            // 添加列族
            HColumnDescriptor attends = new HColumnDescriptor("attends");
            attends.setMinVersions(1);
            attends.setMaxVersions(1);
            attends.setBlockCacheEnabled(true);
            attends.setBlocksize(2048*1024);
            HColumnDescriptor fans = new HColumnDescriptor("fans");
            fans.setMinVersions(1);
            fans.setMaxVersions(1);
            fans.setBlockCacheEnabled(true);
            fans.setBlocksize(2048*1024);

            tableDescriptor.addFamily(attends);
            tableDescriptor.addFamily(fans);
            admin.createTable(tableDescriptor);
        }
    }

7)创建微博收件箱表

/**
 * 表结构:
 方法名   createTableReceiveContentEmails
 Table Name    weibo:receive_content_email
 RowKey    用户ID
 ColumnFamily  info
 ColumnLabel   用户ID
 ColumnValue   取微博内容的RowKey
 Version   1000
 */

    // 创建email表
    @Test
    public void createTableEmile() throws IOException {
        if(!admin.tableExists(TableName.valueOf(WEIBO_CONTENT_EMAILS))){

            HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(WEIBO_CONTENT_EMAILS));
           // 创建列族
            HColumnDescriptor info = new HColumnDescriptor("info");
            info.setMinVersions(1000);
            info.setMaxVersions(1000);
            info.setBlockCacheEnabled(true);
            info.setBlocksize(2018*1024);
            // 把列族加入表中
            tableDescriptor.addFamily(info);
            // 创建表
            admin.createTable(tableDescriptor);

        }
    }

8)发布微博内容

  a、微博内容表中添加1条数据

  b、微博收件箱表对所有粉丝用户添加数据

    // 用户发布微博
    public void publishWeibo(String uid, String content) throws IOException {
        Connection connection = getConnection();
        // 1、将发布的内容保存起来 weibo:connect
        Table weibo_content = connection.getTable(TableName.valueOf(WEIBO_CONTENT));
        // 设定rowkey
        long time = System.currentTimeMillis();
        String rowkey = uid + "_" + time;
        Put put = new Put(rowkey.getBytes());
        put.addColumn("info".getBytes(), "content".getBytes(), time, content.getBytes());
        weibo_content.put(put);

        // 2、获取当前的所有fans查询
        Table weibo_relations = connection.getTable(TableName.valueOf(WEIBO_RELATIONS));
        Get get = new Get(uid.getBytes());
        // 只要fans的列族数据
        get.addFamily("fans".getBytes());
        Result result = weibo_relations.get(get);
        // 如果没有fans 直接结束
        if (result.isEmpty()) {
            weibo_content.close();
            weibo_relations.close();
            connection.close();
            return;
        }
        ArrayList<String> uids = new ArrayList<>();
        //要是是fans的uid
        List<Cell> cells = result.listCells();
        for (Cell cell : cells) {
            String fans_uid = Bytes.toString(CellUtil.cloneQualifier(cell));
            uids.add(fans_uid);
        }
        // 3、将用户发布的消息rowkey
        Table weibo_receive_content_emails = connection.getTable(TableName.valueOf(WEIBO_CONTENT_EMAILS));
        for (String fanid : uids) {
            Put put1 = new Put(fanid.getBytes());
            put1.addColumn("info".getBytes(), uid.getBytes(), time, rowkey.getBytes());
            weibo_receive_content_emails.put(put1);
        }
        weibo_content.close();
        weibo_relations.close();
        weibo_receive_content_emails.close();
        connection.close();
    }

9)添加关注用户

    // 添加关注用户
    public void addAttends(String uid, String... attends) throws IOException {
        Connection connection = getConnection();
        Table weibo_relations = connection.getTable(TableName.valueOf(WEIBO_RELATIONS));
        //1、1的用户保存关注的用户
        Put put = new Put(uid.getBytes());
        for (String attend : attends) {
            put.addColumn("attends".getBytes(), attend.getBytes(), attend.getBytes());
        }
        weibo_relations.put(put);
        //2、被关注的用户同时多了一个fans
        for (String attend : attends) {
            Put put1 = new Put(attend.getBytes());
            put1.addColumn("fans".getBytes(), uid.getBytes(), uid.getBytes());
            weibo_relations.put(put1);
        }
        //3、维护当前用户和被关乎用户的微博查看关系 1关注了23m  此时1就可以看到23m发布的消息
        Table weibo_content = connection.getTable(TableName.valueOf(WEIBO_CONTENT));
        Table weibo_content_emails = connection.getTable(TableName.valueOf(WEIBO_CONTENT_EMAILS));
        ArrayList<String> strings = new ArrayList<>();//用于接收微博rowkey
        for (String attend : attends) {
            Scan scan = new Scan();
            //用于检索当前用户发布微博的rowkey
            PrefixFilter prefixFilter = new PrefixFilter((attend + "_").getBytes());
            scan.setFilter(prefixFilter);
            ResultScanner scanner = weibo_content.getScanner(scan);

            //遍历获取微博的rowkey
            for (Result result : scanner) {
                List<Cell> cells = result.listCells();
                for (Cell cell : cells) {
                    //获取微博rowkey
                    String weiborowkey = Bytes.toString(CellUtil.cloneRow(cell));

                    strings.add(weiborowkey);
                }
            }
        }

        //把微博内容的rowkey保存到收件箱中
        //传入的是当前用户的uid
        Put put2 = new Put(uid.getBytes());
        for (String weiborowkey : strings) {
            //2_13235345436   2_13452346436
            String[] split = weiborowkey.split("_");
            put2.addColumn("info".getBytes(), split[0].getBytes(), System.currentTimeMillis(), weiborowkey.getBytes());
            weibo_content_emails.put(put2);
        }
        weibo_content.close();
        weibo_relations.close();
        weibo_content_emails.close();
        connection.close();
    }

10)取消关注用户

    //7.取消关注用户
    public  void cancelAttends(String uid,String...  cancelattends) throws IOException {

        Connection connection = getConnection();
        Table weibo_relations = connection.getTable(TableName.valueOf(WEIBO_RELATIONS));

        Delete delete = new Delete(uid.getBytes());
        //1.移除当前用户被关注的用户  1.  取消3 M
        for (String cancelattend : cancelattends) {
            delete.addColumns("attends".getBytes(),cancelattend.getBytes());
        }
        weibo_relations.delete(delete);

        //2.被移除的用户同时取消粉丝
        for (String cancelattend : cancelattends) {
            //被取消关注的用户,取消粉丝
            Delete delete1 = new Delete(cancelattend.getBytes());
            delete1.addColumn("fans".getBytes(),uid.getBytes());
            weibo_relations.delete(delete1);
        }

        Table weibo_content_emails = connection.getTable(TableName.valueOf(WEIBO_CONTENT_EMAILS));
        //3.删除收件箱中相关的微博rowkey
        for (String cancelattend : cancelattends) {
            Delete delete2 = new Delete(uid.getBytes());
            //删除全部版本
            delete2.addColumns("info".getBytes(),cancelattend.getBytes());
            weibo_content_emails.delete(delete2);
        }

        weibo_content_emails.close();
        weibo_relations.close();
        connection.close();

    }

11)拉取关注人的微博消息

 //8.拉取关注人的微博消息
    public void getAttendsContent(String uid) throws IOException {
        Connection connection = getConnection();
        Table weibo_content_emails = connection.getTable(TableName.valueOf(WEIBO_CONTENT_EMAILS));

        //从email表检索关注的用户发布的微博rowkey
        Get get = new Get(uid.getBytes());
        get.setMaxVersions(5);
        Result result = weibo_content_emails.get(get);


        //创建一个get集合用于接收get对象
        List<Get> gets = new ArrayList<Get>();

        List<Cell> cells = result.listCells();
        for (Cell cell : cells) {
            //获取微博的rowkey
            byte[] weiborowkey = CellUtil.cloneValue(cell);
            //这个get就是用于在content进行检索数据deget对象
            Get get1 = new Get(weiborowkey);
            gets.add(get1);
        }

        //上微博表检索发布的内容
        Table weibo_content = connection.getTable(TableName.valueOf(WEIBO_CONTENT));
        //获取检索数据的每一个结果集
        Result[] results = weibo_content.get(gets);
        for (Result result1 : results) {
            List<Cell> cells1 = result1.listCells();
            for (Cell cell : cells1) {
                //打印发布的内容
                System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));

            }
        }

        weibo_content.close();
        weibo_content_emails.close();
        connection.close();
    }
原文地址:https://www.cnblogs.com/haojia/p/12386214.html