Flink sql 写 Hbase 忽略空列

数仓场景下,经常有两个表 Join 后往一个宽表写数据。比如:埋点数据中只包含 user_id,但是下游计算的需要使用用户的其他属性,就需要将埋点数据和用户数据做关联。

实时场景,需要做流表 Join,用埋点数据的 user_id 去全量用户表中取用户属性。

如果两部分数据产生的顺序不确定,可能先生成A,也可能先生成B,并且先后的时间范围也不一定,可能是一起生成,也可能隔三五天。

两部分数据生成的时间间隔不确定,在 Flink 中无法使用 Interval join,如果用流表 Join 也可能实现 Join 的功能,但是比较麻烦。

在这种场景下,可以基于 Hbase 的主键覆盖策略将两部分数据分别写入 Hbase 表中,由于两部分数据都包含 user_id,直接起两个任务将两部分的数据以 user_id 做主键,直接写入 Hbase 表,两部分数据直接写自己对应的字段就可以达到 Join 的效果了。
(这是之前我们遇到的问题,双流Join的场景,A Left Join B 流(Interval Join)需要 Join 的任务中,如果没有关联上 B 流的数据,不覆盖了表中的对应列(一个单独的任务将B流的数据直接写入到对应Hbase 的 对应字段))

----- 原谅我表达能力不行,就是想改写 Hbase sink 源码,写入 Hbase 的时候忽略空列 ----

先看下测试环境:
Flink 1.13.2
Hadoop 3.1.1
Hbase 2.2.7

## 测试
看下测试 SQL

-- kafka source
CREATE TABLE user_log (
  user_id STRING
  ,item_id STRING
  ,category_id STRING
  ,behavior STRING
  ,ts TIMESTAMP(3)
  ,process_time as proctime()
  , WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector' = 'kafka'
  ,'topic' = 'user_behavior1'
  ,'properties.bootstrap.servers' = 'localhost:9092'
  ,'properties.group.id' = 'user_log'
  ,'scan.startup.mode' = 'latest-offset'
  ,'format' = 'json'
);

drop table if exists hbase_user_log_sink ;
CREATE TABLE hbase_user_log_sink (
   user_id STRING
  ,cf ROW(item_id STRING
  ,category_id STRING
  ,behavior STRING
  ,ts TIMESTAMP(3))
) WITH (
   'connector' = 'hbase-2.2'
   ,'zookeeper.quorum' = 'localhost:12181'
   ,'zookeeper.znode.parent' = '/hbase'
   ,'table-name' = 'user_log'
   ,'null-string-literal' = '--'
   -- ,'lookup.cache.max-rows' = '10000'
   -- ,'lookup.cache.ttl' = '10 minute' -- ttl time 超过这么长时间无数据才行
   -- ,'lookup.async' = 'true'
);

insert into hbase_user_log_sink
select user_id, row(item_id, category_id, behavior, ts)
from user_log;

测试数据:

{"category_id":"category_id_107","user_id":"user_id_108","item_id":"item_id_107","behavior":"107","ts":"2021-09-07 15:02:55.110"}
{"category_id":"category_id_107","user_id":"user_id_108","behavior":"107","ts":"2021-09-07 15:02:55.110"}

任务如下: 

直接测试结果如下:

# 第一次把数据写入 hbase
hbase(main):010:0> get 'user_log','user_id_108'
COLUMN                                                CELL                                                                                                                                                         
 cf:behavior                                          timestamp=1630997629859, value=107                                                                                                                           
 cf:category_id                                       timestamp=1630997629859, value=category_id_107                                                                                                               
 cf:item_id                                           timestamp=1630997629859, value=item_id_107_11                                                                                                                
 cf:ts                                                timestamp=1630997629859, value=x00x00x01{xC0xBE<xA2                                                                                                    
1 row(s)
Took 0.0123 seconds   
# 第二次把数据写入 hbase,输入 json 中没有 item_id 字段
hbase(main):011:0> get 'user_log','user_id_108'
COLUMN                                                CELL                                                                                                                                                         
 cf:behavior                                          timestamp=1630997647868, value=107                                                                                                                           
 cf:category_id                                       timestamp=1630997647868, value=category_id_107                                                                                                               
 cf:item_id                                           timestamp=1630997647868, value=null                                                                                                                          
 cf:ts                                                timestamp=1630997647868, value=x00x00x01{xC0xBE<xA2                                                                                                    
1 row(s)
Took 0.0211 seconds

第一次发送包含全部字段的json 到 kafka,hbase 每个列都写入值
第一次发送不包含 item 字段的json 到 kafka,hbase item_id 列的值被 null 覆盖

## 修改源码

直接看源码,定位到 HbaseSinkFunction.invoke 方法

public void invoke(T value, Context context) throws Exception {
    checkErrorAndRethrow();
    // 输入输入转为 Mutation
    mutator.mutate(mutationConverter.convertToMutation(value));

    // flush when the buffer number of mutations greater than the configured max size.
    if (bufferFlushMaxMutations > 0
            && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
        flush();
    }
}

最终定位到组装 Put 的代码 HbaseSerde.createPutMutation 方法

public @Nullable Put createPutMutation(RowData row) {
    checkArgument(keyEncoder != null, "row key is not set.");
    // 获取主键
    byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
    // rowkey 不能为空
    if (rowkey.length == 0) {
        // drop dirty records, rowkey shouldn't be zero length
        return null;
    }
    // upsert
    Put put = new Put(rowkey);
    for (int i = 0; i < fieldLength; i++) {
        if (i != rowkeyIndex) {
            int f = i > rowkeyIndex ? i - 1 : i;
            // get family key
            byte[] familyKey = families[f];
            // 获取 column family 的 row 
            RowData familyRow = row.getRow(i, qualifiers[f].length);
            // 循环 qualifiers, 将 row 中的值填入到 put 中
            for (int q = 0; q < this.qualifiers[f].length; q++) {
                // get quantifier key
                byte[] qualifier = qualifiers[f][q];
                // serialize value
                byte[] value = qualifierEncoders[f][q].encode(familyRow, q);
                put.addColumn(familyKey, qualifier, value);
            }
        }
    }
    return put;
}

修改比较简单,就是在组装 put 的时候,判断一下对应 列的值是否为 null,null 的就不添加到 put 中就可以了

为了保留Hbase sink 有的功能,有不想改太多,直接借用 hbase sink 的 "null-string-literal" 属性,默认将数据中 null 的列转为字符串 "null"

官网 "null-string-literal" 属性介绍

Representation for null values for string fields. HBase source and sink encodes/decodes empty bytes as null values for all types except string type.

借用 "null-string-literal" 属性,当值为 "--" 的时候,就忽略 null 的列,源码如下:

public class HBaseSerde {


private final byte[] nullStringBytes;
// add by venn,是否忽略 null 列
private final boolean ignoreNullColumn;


public HBaseSerde(HBaseTableSchema hbaseSchema, final String nullStringLiteral) {

...
    this.nullStringBytes = nullStringLiteral.getBytes(StandardCharsets.UTF_8);
    // 属性 null-string-literal 的值为 -- ignoreNullColumn = true
    ignoreNullColumn = "--".equals(nullStringLiteral);

...
}

/**
 * Returns an instance of Put that writes record to HBase table.
 *
 * @return The appropriate instance of Put for this use case.
 */
public @Nullable
Put createPutMutation(RowData row) {
    checkArgument(keyEncoder != null, "row key is not set.");
    byte[] rowkey = keyEncoder.encode(row, rowkeyIndex);
    if (rowkey.length == 0) {
        // drop dirty records, rowkey shouldn't be zero length
        return null;
    }
    // upsert
    Put put = new Put(rowkey);
    for (int i = 0; i < fieldLength; i++) {
        if (i != rowkeyIndex) {
            int f = i > rowkeyIndex ? i - 1 : i;
            // get family key
            byte[] familyKey = families[f];
            RowData familyRow = row.getRow(i, qualifiers[f].length);
            for (int q = 0; q < this.qualifiers[f].length; q++) {
                // add by venn, 如果 ignoreNullColumn 为 true,切 对应列为 null,忽略 列
                if (ignoreNullColumn && familyRow.isNullAt(q)) {
                    continue;
                }
                // get quantifier key
                byte[] qualifier = qualifiers[f][q];
                // serialize value
                byte[] value = qualifierEncoders[f][q].encode(familyRow, q);
                put.addColumn(familyKey, qualifier, value);
            }
        }
    }
    return put;
}

}

测试结果忽略

完整代码参考:https://github.com/springMoon/sqlSubmit

欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

原文地址:https://www.cnblogs.com/Springmoon-venn/p/15239358.html