HBase API代码示例

1.先配置hosts的机器名和IP映射

需要先在运行Java程序的机器的hosts文件加入机器名和IP的映射。否则程序运行时会卡死或连接失败。

192.168.100.105 c1
192.168.100.110 c2
192.168.100.115 c3
192.168.100.120 c4

2.修改Maven配置文件(pom.xml)

在<dependencies></dependencies>节点内加入:

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>2.4.7</version>
</dependency>

* 其中三个version节点的2.4.7是对应HBase的版本。例如,安装的HBase是v2.4.0,那三个version节点就填2.4.0

3.代码实现

public class HBaseHandler
{
    public static void main(String[] args)
    {
        String quorum = "c1,c2,c3,c4"; //ZooKeeper的主机名或主机ip,默认端口是2181

        String tableName = "Student";
        String cf_info = "Info";
        String cf_score = "Score";
        String c_info_name = "Name", c_info_age = "Age";
        String c_score_Math = "Math", c_score_English = "English";

        // HBase API中要close的类实现了java.lang.AutoCloseable,可以直接在try里面创建实例
        try (Connection conn = connect(quorum))
        {
            System.out.println("------------------- create table -------------------");
            createTable(conn, tableName, new String[]{cf_info, cf_score});

            System.out.println("------------------- put -------------------");
            put(conn, tableName, "row1", cf_info, c_info_name, "sam");
            put(conn, tableName, "row1", cf_score, c_score_Math, "70");
            put(conn, tableName, "row1", cf_score, c_score_English, "75");
            put(conn, tableName, "row2", cf_score, c_score_English, "80");

            System.out.println("------------------- get -------------------");
            String value1 = getCell(conn, tableName, "row1", cf_score, c_score_Math);
            String value2 = getCell(conn, tableName, "row1", cf_score);

            System.out.println("------------------- scan -------------------");
            List<Map<String, String>> values1 = scan(conn, tableName, "row1", "row3"); //[row1,row3)
            System.out.println("------------------- scan all table -------------------");
            List<Map<String, String>> values2 = scan(conn, tableName); //[row1,row3)
            System.out.println("------------------- scan with filter -------------------");
            Filter filter = new ColumnPrefixFilter("Eng".getBytes());//过滤列族以Eng开头的
            List<Map<String, String>> values3 = scan(conn, tableName, filter); //[row1,row3)

            System.out.println("------------------- increment -------------------");
            long value3 = increase(conn, tableName, "row2", cf_score, c_score_Math, 80L);
            long value4 = increase(conn, tableName, "row2", cf_score, c_score_Math, -5);

            System.out.println("------------------- delete table -------------------");
            deleteTable(conn, tableName);
        }
        catch (Exception e)
        {
            System.out.println(e);
        }
    }

    /**
     * 连接HBase
     *
     * @param quorum
     * @return
     * @throws Exception
     */
    public static Connection connect(String quorum) throws Exception
    {
        Configuration config = HBaseConfiguration.create();
        config.set("hbase.zookeeper.quorum", quorum);
        Connection connection = ConnectionFactory.createConnection(config);
        return connection;
    }

    /**
     * 创建表
     *
     * @param connection
     * @param tableName   表名
     * @param familyNames 列族
     * @throws Exception
     */
    public static void createTable(Connection connection, String tableName, String[] familyNames) throws Exception
    {
        try (Admin admin = connection.getAdmin())
        {
            TableName tb = TableName.valueOf(tableName);
            if (!admin.tableExists(tb))
            {
                TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tb);
                for (String name : familyNames)
                {
                    ColumnFamilyDescriptor cfd = ColumnFamilyDescriptorBuilder.of(name);
                    builder.setColumnFamily(cfd);
                }
                TableDescriptor desc = builder.build();
                admin.createTable(desc);
            }
        }
    }

    /**
     * 插入数据
     *
     * @param connection
     * @param tableName
     * @param rowKey
     * @param columnFamily
     * @param column
     * @param data
     * @throws Exception
     */
    public static void put(Connection connection, String tableName, String rowKey, String columnFamily, String column, long data) throws Exception
    {
        try (Table table = connection.getTable(TableName.valueOf(tableName)))
        {
            Put put = new Put(Bytes.toBytes(rowKey));
            put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(data));
            table.put(put);
        }
    }

    /**
     * 插入数据
     *
     * @param connection
     * @param tableName
     * @param rowKey
     * @param columnFamily
     * @param column
     * @param data
     * @throws Exception
     */
    public static void put(Connection connection, String tableName, String rowKey, String columnFamily, String column, String data) throws Exception
    {
        try (Table table = connection.getTable(TableName.valueOf(tableName)))
        {
            Put put = new Put(Bytes.toBytes(rowKey));
            put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(data));
            table.put(put);
        }
    }

    /**
     * 根据rowkey获取一行数据
     *
     * @param connection
     * @param tableName
     * @param rowKey
     * @return
     * @throws IOException
     */
    public static Map<String, String> getRow(Connection connection, String tableName, String rowKey) throws IOException
    {
        try (Table table = connection.getTable(TableName.valueOf(tableName)))
        {
            Get get = new Get(Bytes.toBytes(rowKey));

            Result result = table.get(get);
            List<Cell> cells = result.listCells();

            if (CollectionUtils.isEmpty(cells))
            {
                return Collections.emptyMap();
            }

            Map<String, String> objectMap = new HashMap<>();
            for (Cell cell : cells)
            {
                String qualifier = new String(CellUtil.cloneQualifier(cell));
                String value = new String(CellUtil.cloneValue(cell), "UTF-8");
                objectMap.put(qualifier, value);
            }
            return objectMap;
        }
    }

    /**
     * 获取指定列族的数据
     *
     * @param connection
     * @param tableName
     * @param rowKey
     * @param columnFamily
     * @return
     * @throws IOException
     */
    public static String getCell(Connection connection, String tableName, String rowKey, String columnFamily) throws IOException
    {
        return getCell(connection, tableName, rowKey, columnFamily, null);
    }

    /**
     * 获取指定行列的数据
     *
     * @param connection
     * @param tableName
     * @param rowKey
     * @param columnFamily
     * @param column
     * @return
     * @throws IOException
     */
    public static String getCell(Connection connection, String tableName, String rowKey, String columnFamily, String column) throws IOException
    {
        try (Table table = connection.getTable(TableName.valueOf(tableName)))
        {
            String value = null;
            Get get = new Get(Bytes.toBytes(rowKey));
            get.addFamily(columnFamily.getBytes());
            if (column != null)
            {
                get.addColumn(columnFamily.getBytes(), column.getBytes());
            }
//            get.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column));

            Result result = table.get(get);
            List<Cell> cells = result.listCells();
            if (!CollectionUtils.isEmpty(cells))
            {
                value = new String(CellUtil.cloneValue(cells.get(0)), "UTF-8");
            }
            return value;
        }
    }

    /**
     * 获取游标内容(start row和end row)
     *
     * @param connection
     * @param tableName
     * @param rowkeyStart
     * @param rowkeyEnd
     * @return
     * @throws IOException
     */
    public static List<Map<String, String>> scan(Connection connection, String tableName, String rowkeyStart, String rowkeyEnd) throws IOException
    {
        return scan(connection, tableName, rowkeyStart, rowkeyEnd, null);
    }

    /**
     * 获取游标内容(filter)
     *
     * @param connection
     * @param tableName
     * @return
     * @throws IOException
     */
    public static List<Map<String, String>> scan(Connection connection, String tableName) throws IOException
    {
        return scan(connection, tableName, null, null, null);
    }

    /**
     * 获取游标内容(filter)
     *
     * @param connection
     * @param tableName
     * @param filter
     * @return
     * @throws IOException
     */
    public static List<Map<String, String>> scan(Connection connection, String tableName, Filter filter) throws IOException
    {
        return scan(connection, tableName, null, null, filter);
    }

    /**
     * 获取游标内容
     *
     * @param connection
     * @param tableName
     * @param rowkeyStart
     * @param rowkeyEnd
     * @throws IOException
     */
    public static List<Map<String, String>> scan(Connection connection, String tableName, String rowkeyStart, String rowkeyEnd, Filter filter) throws IOException
    {
        try (Table table = connection.getTable(TableName.valueOf(tableName)))
        {
            Scan scan = new Scan();
            if (!StringUtils.isEmpty(rowkeyStart))
            {
                scan.withStartRow(Bytes.toBytes(rowkeyStart));
            }
            if (!StringUtils.isEmpty(rowkeyEnd))
            {
                scan.withStopRow(Bytes.toBytes(rowkeyEnd));
            }
            if (filter != null)
            {
                scan.setFilter(filter);
            }

            try (ResultScanner rs = table.getScanner(scan);)
            {
                List<Map<String, String>> dataList = new ArrayList<>();
                for (Result r : rs)
                {
                    Map<String, String> objectMap = new HashMap<>();
                    for (Cell cell : r.listCells())
                    {
                        String qualifier = new String(CellUtil.cloneQualifier(cell));
                        String value = new String(CellUtil.cloneValue(cell), "UTF-8");
                        objectMap.put(qualifier, value);
                    }
                    dataList.add(objectMap);
                }
                return dataList;
            }
        }
    }

    /**
     * 对指定行列的数据加减值
     *
     * @param connection
     * @param tableName
     * @param rowKey
     * @param columnFamily
     * @param column
     * @param incValue
     * @throws IOException
     */
    public static long increase(Connection connection, String tableName, String rowKey, String columnFamily, String column, long incValue) throws IOException
    {
        Long value = null;
        try (Table table = connection.getTable(TableName.valueOf(tableName)))
        {
            Increment increment = new Increment(rowKey.getBytes());
            increment.addColumn(columnFamily.getBytes(), column.getBytes(), incValue);
            Result result = table.increment(increment);
            for (Cell cell : result.rawCells())
            {
                value = Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength());
//                System.out.println("cell: " + cell + " -  incValue: " + Bytes.toLong(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
            }
        }
        return value;
    }

    /**
     * 删除表
     *
     * @param connection
     * @param tableName
     * @throws IOException
     */
    public static void deleteTable(Connection connection, String tableName) throws IOException
    {
        TableName tn = TableName.valueOf(tableName);
        try (Admin admin = connection.getAdmin())
        {
            if (admin.tableExists(tn))
            {
                admin.disableTable(tn);
                admin.deleteTable(tn);
            }
        }
    }
}

参考资料:https://my.oschina.net/Endless2010/blog/1492121

原文地址:https://www.cnblogs.com/live41/p/15541838.html