HBASE 过滤器

1. 过滤器

要完成一个过滤的操作,至少需要两个参数。一个是抽象的操作符,Hbase 提供了枚举类型的变量来表示这些抽象的操作符:LESS/LESS_OR_EQUAL/EQUAL/NOT_EUQAL等;另外一个就是具体的比较器(Comparator),代表具体的比较逻辑,如果可以提高字节级的比较、字符串级的比较等。有了这两个参数,我们就可以清晰的定义筛选的条件,过滤数据。

抽象操作符(比较运算符)

LESS <****
LESS_OR_EQUAL <=
EQUAL =
NOT_EQUAL <>
GREATER_OR_EQUAL >=
GREATER >
NO_OP 排除所有

比较器(指定比较机制)

BinaryComparator 按字节索引顺序比较指定字节数组,采用 Bytes.compareTo(byte[])

BinaryPrefixComparator 跟前面相同,只是比较左端的数据是否相同

NullComparator 判断给定的是否为空

BitComparator 按位比较

RegexStringComparator 提供一个正则的比较器,仅支持 EQUAL 和非 EQUAL

SubstringComparator 判断提供的子串是否出现在 value 中

2. 比较过滤器

2.1 行键过滤器

过滤出 rowkey 大于 10004 的数据:

// 过滤器
public static void scanFilterData(String tableName) throws IOException {
    Table table = connection.getTable(TableName.valueOf(tableName));

    // GREATER 大于、 BinaryComparator 按字节索引顺序比较指定字节数组
    Filter rowFilter = new RowFilter(CompareFilter.CompareOp.GREATER, new BinaryComparator(Bytes.toBytes("10004")));
    Scan scan = new Scan();
    scan.setFilter(rowFilter);

    ResultScanner resultScanner = table.getScanner(scan);
    for (Result result: resultScanner) {
        Cell[] cells = result.rawCells();
        for (Cell cell: cells) {
            System.out.println("行键: " + Bytes.toString(result.getRow()));
            System.out.println("列族: " + Bytes.toString(CellUtil.cloneFamily(cell)));
            System.out.println("列: " + Bytes.toString(CellUtil.cloneQualifier(cell)));
            System.out.println("值: " + Bytes.toString(CellUtil.cloneValue(cell)));

        }
    }
    table.close();
}

测试:

// t2 表中所有数据
hbase(main):008:0> scan 't2'
ROW                                COLUMN+CELL                                                                                       
10004                             column=info:alias2, timestamp=1628383262854, value=jun2                                           
10011                             column=info:alias4, timestamp=1628383262854, value=jun4                                           
10016                             column=info:alias5, timestamp=1628383262854, value=jun5                                           
3 row(s) in 0.1140 seconds

// 过滤器
scanFilterData("t2");

行键: 10011
列族: info
列: alias4
值: jun4

行键: 10016
列族: info
列: alias5
值: jun5

2.2 列族过滤器

过滤 info 列族:

 // 列族过滤器
public static void scanFilterCf(String tableName, String cf) throws IOException {
    Table table = connection.getTable(TableName.valueOf(tableName));

    // 获取列族为 info 的记录
    Filter cfFilter = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(cf)));
    Scan scan = new Scan();
    scan.setFilter(cfFilter);

    ResultScanner resultScanner = table.getScanner(scan);
    for (Result result : resultScanner) {
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            System.out.println("行键: " + Bytes.toString(result.getRow()) +
                    "	列族: " + Bytes.toString(CellUtil.cloneFamily(cell)) +
                    "	列: " + Bytes.toString(CellUtil.cloneQualifier(cell)) +
                    "	值: " + Bytes.toString(CellUtil.cloneValue(cell))
            );
        }
    }
    table.close();
}

测试:

scanFilterCf("t2", "info");
行键: 10004	列族: info	列: alias2	值: jun2
行键: 10011	列族: info	列: alias4	值: jun4
行键: 10016	列族: info	列: alias5	值: jun5

// 不存在的列族
scanFilterCf("t2", "info2");

2.3 列过滤器

// 获取列为 alias2 的记录
Filter qualifierFilter = new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(cn)));
Scan scan = new Scan();
scan.setFilter(qualifierFilter);

测试:

scanFilterCn("t2", "alias2");

行键: 10004	列族: info	列: alias2	值: jun2

2.4 值过滤器

// 获取值为 jun5 的记录
Filter valueFilter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(value)));
Scan scan = new Scan();
scan.setFilter(valueFilter);

测试:

scanFilterValue("t2", "jun5");

行键: 10016	列族: info	列: alias5	值: jun5

2.5 时间戳过滤器

public static void scanFilterTimestamp(String tableName, long Timestamp) throws IOException {
    Table table = connection.getTable(TableName.valueOf(tableName));

    List<Long> list = new ArrayList<>();
    list.add(Timestamp);

    // 获取时间戳为 1628383262854 的记录
    TimestampsFilter timestampsFilter  = new TimestampsFilter(list);
    Scan scan = new Scan();
    scan.setFilter(timestampsFilter);

    ResultScanner resultScanner = table.getScanner(scan);
    for (Result result : resultScanner) {
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            System.out.println("行键: " + Bytes.toString(result.getRow()) +
                    "	列族: " + Bytes.toString(CellUtil.cloneFamily(cell)) +
                    "	列: " + Bytes.toString(CellUtil.cloneQualifier(cell)) +
                    "	值: " + Bytes.toString(CellUtil.cloneValue(cell)) +
                    "	时间戳: " + cell.getTimestamp()
            );
        }
    }
    table.close();
}

测试:

scanFilterTimestamp("t2", 1628383262854L);

行键: 10004	列族: info	列: alias2	值: jun2	时间戳: 1628383262854
行键: 10011	列族: info	列: alias4	值: jun4	时间戳: 1628383262854
行键: 10016	列族: info	列: alias5	值: jun5	时间戳: 1628383262854

3. 专用过滤器

3.1 单列值过滤器

public static void scanFilterSingleValue(String tableName, String cf, String cn, String value) throws IOException {
    Table table = connection.getTable(TableName.valueOf(tableName));

    SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter(
            Bytes.toBytes(cf),
            Bytes.toBytes(cn),,
            CompareFilter.CompareOp.EQUAL,
            new SubstringComparator(value)
    );
    //如果不设置为 true,则那些不包含指定 column 的行也会返回
    singleColumnValueFilter.setFilterIfMissing(true);

    Scan scan = new Scan();
    scan.setFilter(singleColumnValueFilter);

    ResultScanner resultScanner = table.getScanner(scan);
    for (Result result : resultScanner) {
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            System.out.println("行键: " + Bytes.toString(result.getRow()) +
                    "	列族: " + Bytes.toString(CellUtil.cloneFamily(cell)) +
                    "	列: " + Bytes.toString(CellUtil.cloneQualifier(cell)) +
                    "	值: " + Bytes.toString(CellUtil.cloneValue(cell)) +
                    "	时间戳: " + cell.getTimestamp()
            );
        }
    }
    table.close();
}

测试:

scanFilterSingleValue("t2", "info", "alias2", "jun2");

行键: 10004	列族: info	列: alias2	值: jun2	时间戳: 1628383262854

// 注释  singleColumnValueFilter.setFilterIfMissing(true);
行键: 10004	列族: info	列: alias2	值: jun2	时间戳: 1628383262854
行键: 10011	列族: info	列: alias4	值: jun4	时间戳: 1628383262854
行键: 10016	列族: info	列: alias5	值: jun5	时间戳: 1628383262854

3.2 单列值排除器

public static void scanFilterSingleExcludeValue(String tableName, String cf, String cn, String value) throws IOException {
    Table table = connection.getTable(TableName.valueOf(tableName));

    SingleColumnValueExcludeFilter singleColumnValueExcludeFilter = new SingleColumnValueExcludeFilter(
            Bytes.toBytes(cf),
            Bytes.toBytes(cn),
            CompareFilter.CompareOp.EQUAL,
            Bytes.toBytes(value)

//                new SubstringComparator(value)
    );
    //如果不设置为 true,则那些不包含指定 column 的行也会返回
    singleColumnValueExcludeFilter.setFilterIfMissing(true);

    Scan scan = new Scan();
    scan.setFilter(singleColumnValueExcludeFilter);

    ResultScanner resultScanner = table.getScanner(scan);
    for (Result result : resultScanner) {
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            System.out.println("行键: " + Bytes.toString(result.getRow()) +
                    "	列族: " + Bytes.toString(CellUtil.cloneFamily(cell)) +
                    "	列: " + Bytes.toString(CellUtil.cloneQualifier(cell)) +
                    "	值: " + Bytes.toString(CellUtil.cloneValue(cell)) +
                    "	时间戳: " + cell.getTimestamp()
            );
        }
    }
    table.close();
}

测试:

scanFilterSingleExcludeValue("t2", "info", "alias2", "jun2");

3.3 前缀过滤器(针对行键)

public static void scanFilterPrefix(String tableName, String rowKeyPrefix) throws IOException {
    Table table = connection.getTable(TableName.valueOf(tableName));

    PrefixFilter prefixFilter = new PrefixFilter(Bytes.toBytes(rowKeyPrefix));
    Scan scan = new Scan();
    scan.setFilter(prefixFilter);

    ResultScanner resultScanner = table.getScanner(scan);
    for (Result result : resultScanner) {
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            System.out.println("行键: " + Bytes.toString(result.getRow()) +
                    "	列族: " + Bytes.toString(CellUtil.cloneFamily(cell)) +
                    "	列: " + Bytes.toString(CellUtil.cloneQualifier(cell)) +
                    "	值: " + Bytes.toString(CellUtil.cloneValue(cell)) +
                    "	时间戳: " + cell.getTimestamp()
            );
        }
    }
    table.close();
}

测试:

scanFilterPrefix("t2", "1001");

行键: 10011	列族: info	列: alias4	值: jun4	时间戳: 1628383262854
行键: 10016	列族: info	列: alias5	值: jun5	时间戳: 1628383262854

3.4 列前缀过滤器

public static void scanFilterColumnPrefix(String tableName, String CnPrefix) throws IOException {
    Table table = connection.getTable(TableName.valueOf(tableName));

    ColumnPrefixFilter columnPrefixFilter = new ColumnPrefixFilter(Bytes.toBytes(CnPrefix));
    Scan scan = new Scan();
    scan.setFilter(columnPrefixFilter);

    ResultScanner resultScanner = table.getScanner(scan);
    for (Result result : resultScanner) {
        Cell[] cells = result.rawCells();
        for (Cell cell : cells) {
            System.out.println("行键: " + Bytes.toString(result.getRow()) +
                    "	列族: " + Bytes.toString(CellUtil.cloneFamily(cell)) +
                    "	列: " + Bytes.toString(CellUtil.cloneQualifier(cell)) +
                    "	值: " + Bytes.toString(CellUtil.cloneValue(cell)) +
                    "	时间戳: " + cell.getTimestamp()
            );
        }
    }
    table.close();
}

测试:

scanFilterColumnPrefix("t2", "ali");

行键: 10004	列族: info	列: alias2	值: jun2	时间戳: 1628383262854
行键: 10011	列族: info	列: alias4	值: jun4	时间戳: 1628383262854
行键: 10016	列族: info	列: alias5	值: jun5	时间戳: 1628383262854

3.5 分页过滤器

public static void scanFilterPage(String tableName, int pageNum) throws IOException {
    Table table = connection.getTable(TableName.valueOf(tableName));

    PageFilter pageFilter = new PageFilter(pageNum);
    Scan scan = new Scan();
    scan.setFilter(pageFilter);

    ResultScanner resultScanner = table.getScanner(scan);

//        // 获取最后一行的 rowkey,为lastRowkey加上了一个0字节(byte数组初始化
//        //后默认填入的就是0字节),不希望第二次的Scan结果集把第一次的最后一条记录包含进去
//        byte[] lastRowKey = getLastRowKey(resultScanner);
//        System.out.println("lastRowKey: " + Bytes.toString(lastRowKey));
//
//        // 获取第 2 页
//        byte[] startRowKey = Bytes.add(lastRowKey, new byte[1]);
//        scan.setStartRow(startRowKey);
//        ResultScanner rs2 = table.getScanner(scan);
//
//        getLastRowKey(rs2);

    // 循环获取所有数据
    while (true) {
        byte[] lastRowKey = getLastRowKey(resultScanner);
        if (lastRowKey == null) {
            break;
        }

        // 获取下一页
        byte[] startRowKey = Bytes.add(lastRowKey, new byte[1]);
        scan.setStartRow(startRowKey);
        resultScanner = table.getScanner(scan);
    }
    table.close();
}

// 获取最后记录的 rowKey
private static byte[] getLastRowKey(ResultScanner rs) {
    byte[] lastRowKey = null;
    for (Result r : rs) {
        byte[] rowkey = r.getRow();
        lastRowKey = rowkey;
        System.out.println("rowkey: " + Bytes.toString(rowkey));
    }
    return lastRowKey;
}

测试:

// 设置分页数量为 2,总共有 3 行
scanFilterPage("t2", 2);

rowkey: 10004
rowkey: 10011
rowkey: 10016

注意:PageFilte 不能实现翻页,如果想翻页就得记录上一次翻页的最后一个 rowkey

原文地址:https://www.cnblogs.com/midworld/p/15143558.html