Flink SQL 流式写数据到 Hive

Flink 1.11 版本对SQL的优化是很多的,其中最重要的一点就是 hive 功能的完善,不再只是作为持久化的 Catalog,
而是可以用原生的 Flink SQL 流式的写数据到入 hive中

本文使用官网 “Streaming Writing” 案例 (https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html#streaming-writing),

流式写数据到 Hive (刚好之前有同学咨询官网的例子不能写入成功)

The Hive Streaming Sink re-use Filesystem Streaming Sink to integrate Hadoop OutputFormat/RecordWriter to streaming writing. Hadoop RecordWriters are Bulk-encoded Formats, Bulk Formats rolls files on every checkpoint.

Hive Streaming Sink 重用 Filesystem Streaming Sink,集成Hadoop OutputFormat / RecordWriter 流式写入。 Hadoop RecordWriters是 Bulk-encoded 格式,Bulk 格式在每个 checkpoint 上滚动文件。

环境:
  Flink 1.11.2
  Hive 2.3.6
  Hadoop 2.7
  sqlSubmit,我开源 Flink SQL 提交程序(Table Api 的方式提交 SQL,代码已提交 Github:https://github.com/springMoon/sqlSubmit)

官网SQL 如下:

SET table.sql-dialect=hive;  -- 要指定 hive 方言,不然 hive 表创建不成功
CREATE TABLE hive_table (
  user_id STRING,
  order_amount DOUBLE
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',   -- hive 分区提取器
  'sink.partition-commit.trigger'='partition-time',               -- 分区触发提交
  'sink.partition-commit.delay'='1 h',      -- 提交延迟
  'sink.partition-commit.policy.kind'='metastore,success-file'    -- 提交类型
);

SET table.sql-dialect=default;  -- 换回 default 方言
CREATE TABLE kafka_table (
  user_id STRING,
  order_amount DOUBLE,
  log_ts TIMESTAMP(3),
  WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (...);   -- kafka 表的 tblproperties

-- streaming sql, insert into hive table  写入的 sql, 最后两个字段是 是写入分区
INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;

-- batch sql, select with partition pruning
SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';

官网的案例默认是在 sql-client 中执行的,这里是用 Table Api,所以会有点不同,先看下完整的 SQL

drop table if exists user_log;
CREATE TABLE user_log (
  user_id VARCHAR
  ,item_id VARCHAR
  ,category_id VARCHAR
  ,behavior VARCHAR
  ,ts TIMESTAMP(3)
  ,WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  'connector.type' = 'kafka'
  ,'connector.version' = 'universal'
  ,'connector.topic' = 'user_behavior'
  ,'connector.properties.zookeeper.connect' = 'venn:2181'
  ,'connector.properties.bootstrap.servers' = 'venn:9092'
  ,'connector.properties.group.id' = 'user_log'
  ,'connector.startup-mode' = 'group-offsets'
  ,'connector.sink-partitioner' = 'fixed'
  ,'format.type' = 'json'
);

-- set table.sql-dialect=hive;
-- kafka sink  
drop table if exists hive_table;
CREATE TABLE hive_table (
  user_id STRING
  ,item_id STRING
  ,category_id STRING
  ,behavior STRING
) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.delay'='1 min',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);

-- streaming sql, insert into hive table
INSERT INTO TABLE hive_table
SELECT user_id, item_id, category_id, behavior, DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH')
FROM user_log;

跟官网基本一样,唯一的不同是,在指定 sql 方言的时候,Table Api 是这样的:

tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
tabEnv.getConfig().setSqlDialect(SqlDialect.HIVE)

flink 方言官网: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_dialect.html#table-api

注:
partition.time-extractor.timestamp-pattern 指定分区提取器提取时间戳的格式
sink.partition-commit.trigger 触发分区提交的类型可以指定 "process-time" 和 "partition-time" 处理时间和分区时间

如指定天、小时、分钟三级分区:
partition.time-extractor.timestamp-pattern = $dt $hr:$ms:00
分区字段则是: DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm')

hive 表数据如下:

hive> select * from hive_table limit 10;
OK
107025  1007359 4391936 pv      2017-11-26      01
334124  3904483 2520771 cart    2017-11-26      01
475192  3856358 2465336 pv      2017-11-26      01
475192  3856358 2465336 pv      2017-11-26      01
864482  3398512 1639158 pv      2017-11-26      01
987980  3225231 2578647 pv      2017-11-26      01
987980  3225231 2578647 pv      2017-11-26      01
563592  3377194 2131531 pv      2017-11-26      01
939115  241366  4756105 pv      2017-11-26      01
939115  241366  4756105 pv      2017-11-26      01
Time taken: 0.112 seconds, Fetched: 10 row(s)

hive 表对于目录文件情况:

[venn@venn ~]$ hadoop fs -ls /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/
Found 7 items
-rw-r--r--   1 venn supergroup          0 2020-09-24 17:04 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/.part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-5.inprogress.42f75ffc-8c4d-4009-a00a-93482a96a2b8
-rw-r--r--   1 venn supergroup          0 2020-09-24 17:02 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/_SUCCESS
-rw-r--r--   1 venn supergroup       7190 2020-09-24 16:56 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-0
-rw-r--r--   1 venn supergroup       3766 2020-09-24 16:58 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-1
-rw-r--r--   1 venn supergroup       3653 2020-09-24 17:00 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-2
-rw-r--r--   1 venn supergroup       3996 2020-09-24 17:02 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-3
-rw-r--r--   1 venn supergroup       3719 2020-09-24 17:04 /user/hive/warehouse/flink.db/hive_table/dt=2017-11-26/hr=01/part-35464358-55dc-4d76-a174-ba50b1f97c1b-0-4

.part-xxx 文件就是正在写的文件,下面几个就是已经提交的文件

在做官网的案例的过程,还算比较顺利,但是也遇到几个问题:

  • 1、jar 包问题,写 hive 需要 hadoop-mapreduce-client-core-2.7.7.jar
  • 2、参数 sink.partition-commit.delay 的单位支持: DAYS: (d | day | days), HOURS: (h | hour | hours), MINUTES: (min | minute | minutes), SECONDS: (s | sec | secs | second | seconds), MILLISECONDS: (ms | milli | millis | millisecond | milliseconds), MICROSECONDS: (µs | micro | micros | microsecond | microseconds), NANOSECONDS: (ns | nano | nanos | nanosecond | nanoseconds)
  • 3、hive 表日期类型字段,目前是只支持 TIMESTAMP(9),但是 flink 的 timestamp 是 3 位与 6 位(放弃 日期类型,反正 String 类型的日期 hive 也可以识别)

```java
java.time.format.DateTimeParseException: Text '2017-11-26-01 00:00:00' could not be parsed, unparsed text found at index 10
```

  • 4、web 页面 metrics,source 块 往 Sink 块写数 Records Sent 的问题, Records Sent 数对应 checkpoint 次数,因为只会在 checkpoint 的时候才会提交数据到 HDFS,这个消息应该是某个信号数据,而不是真实的数据条数 (上面贴的官网说明有讲,如果没有 checkpoint,数据会写到 hdfs,但是会出于 inprogress状态,并且是 "." 开头的文件,对 hive 来说是隐藏文件,查不到的)

再贴下flink lib:

flink-connector-hbase_2.11-1.11.2.jar       flink-json-1.11.2.jar                   hbase-common-2.1.4.jar                hive-exec-2.3.6.jar                log4j-slf4j-impl-2.12.1.jar
flink-connector-hive_2.11-1.11.2.jar        flink-shaded-zookeeper-3.4.14.jar       hbase-protocol-2.1.4.jar              htrace-core4-4.2.0-incubating.jar  metrics-core-3.2.1.jar
flink-connector-kafka_2.11-1.11.2.jar       flink-table_2.11-1.11.2.jar             hbase-protocol-shaded-2.1.4.jar       kafka-clients-2.2.0.jar
flink-connector-kafka-base_2.11-1.11.2.jar  flink-table-blink_2.11-1.11.2.jar       hbase-shaded-miscellaneous-2.1.0.jar  log4j-1.2-api-2.12.1.jar
flink-csv-1.11.2.jar                        hadoop-mapreduce-client-core-2.7.7.jar  hbase-shaded-netty-2.1.0.jar          log4j-api-2.12.1.jar
flink-dist_2.11-1.11.2.jar                  hbase-client-2.1.4.jar                  hbase-shaded-protobuf-2.1.0.jar       log4j-core-2.12.1.jar

欢迎关注Flink菜鸟公众号,会不定期更新Flink(开发技术)相关的推文

原文地址:https://www.cnblogs.com/Springmoon-venn/p/13726089.html