Flink基础(二十七):FLINK-SQL语法(三)DQL(三)查询语句(三)操作符(二)

5 OrderBy & Limit

操作符描述
Order By
批处理 流处理
注意: 流处理结果需主要根据 时间属性 按照升序进行排序。支持使用其他排序属性。
SELECT *
FROM Orders
ORDER BY orderTime
Limit
批处理
注意: LIMIT 查询需要有一个 ORDER BY 字句。
SELECT *
FROM Orders
ORDER BY orderTime
LIMIT 3

6 Top-N

目前仅 Blink 计划器支持 Top-N 。

Top-N 查询是根据列排序找到N个最大或最小的值。最大值集和最小值集都被视为是一种 Top-N 的查询。若在批处理或流处理的表中需要显示出满足条件的 N 个最底层记录或最顶层记录, Top-N 查询将会十分有用。得到的结果集将可以进行进一步的分析。

Flink 使用 OVER 窗口条件和过滤条件相结合以进行 Top-N 查询。利用 OVER 窗口的 PARTITION BY 子句的功能,Flink 还支持逐组 Top-N 。 例如,每个类别中实时销量最高的前五种产品。批处理表和流处理表都支持基于SQL的 Top-N 查询。 以下是 TOP-N 表达式的语法:

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
   FROM table_name)
WHERE rownum <= N [AND conditions]

参数说明:

  • ROW_NUMBER(): 根据当前分区内的各行的顺序从第一行开始,依次为每一行分配一个唯一且连续的号码。目前,我们只支持 ROW_NUMBER 在 over 窗口函数中使用。未来将会支持 RANK() 和 DENSE_RANK()函数。
  • PARTITION BY col1[, col2...]: 指定分区列,每个分区都将会有一个 Top-N 结果。
  • ORDER BY col1 [asc|desc][, col2 [asc|desc]...]: 指定排序列,不同列的排序方向可以不一样。
  • WHERE rownum <= N: Flink 需要 rownum <= N 才能识别一个查询是否为 Top-N 查询。 其中, N 代表最大或最小的 N 条记录会被保留。
  • [AND conditions]: 在 where 语句中,可以随意添加其他的查询条件,但其他条件只允许通过 AND 与 rownum <= N 结合使用。

流处理模式需注意 TopN 查询的结果会带有更新。 Flink SQL 会根据排序键对输入的流进行排序;若 top N 的记录发生了变化,变化的部分会以撤销、更新记录的形式发送到下游。 推荐使用一个支持更新的存储作为 Top-N 查询的 sink 。另外,若 top N 记录需要存储到外部存储,则结果表需要拥有相同与 Top-N 查询相同的唯一键。

Top-N 的唯一键是分区列和 rownum 列的结合,另外 Top-N 查询也可以获得上游的唯一键。以下面的任务为例,product_id 是 ShopSales 的唯一键,然后 Top-N 的唯一键是 [categoryrownum] 和 [product_id] 。

下面的样例描述了如何指定带有 Top-N 的 SQL 查询。这个例子的作用是我们上面提到的“查询每个分类实时销量最大的五个产品”。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 读取外部数据源的 DataStream
val ds: DataStream[(String, String, String, Long)] = env.addSource(...)
// 注册名为 “ShopSales” 的 DataStream
tableEnv.createTemporaryView("ShopSales", ds, $"product_id", $"category", $"product_name", $"sales")


// 选择每个分类中销量前5的产品
val result1 = tableEnv.sqlQuery(
    """
      |SELECT *
      |FROM (
      |   SELECT *,
      |       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num
      |   FROM ShopSales)
      |WHERE row_num <= 5
    """.stripMargin)

7 无排名输出优化

如上文所描述,rownum 字段会作为唯一键的其中一个字段写到结果表里面,这会导致大量的结果写出到结果表。比如,当原始结果(名为 product-1001 )从排序第九变化为排序第一时,排名 1-9 的所有结果都会以更新消息的形式发送到结果表。若结果表收到太多的数据,将会成为 SQL 任务的瓶颈。

优化方法是在 Top-N 查询的外部 SELECT 子句中省略 rownum 字段。由于前N条记录的数量通常不大,因此消费者可以自己对记录进行快速排序,因此这是合理的。去掉 rownum 字段后,上述的例子中,只有变化了的记录( product-1001 )需要发送到下游,从而可以节省大量的对结果表的 IO 操作。

以下的例子描述了如何以这种方式优化上述的 Top-N 查询:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 从外部数据源读取 DataStream
val ds: DataStream[(String, String, String, Long)] = env.addSource(...)
// 注册名为 “ShopSales” 的数据源
tableEnv.createTemporaryView("ShopSales", ds, $"product_id", $"category", $"product_name", $"sales")


// 选择每个分类中销量前5的产品
val result1 = tableEnv.sqlQuery(
    """
      |SELECT product_id, category, product_name, sales  -- omit row_num field in the output
      |FROM (
      |   SELECT *,
      |       ROW_NUMBER() OVER (PARTITION BY category ORDER BY sales DESC) as row_num
      |   FROM ShopSales)
      |WHERE row_num <= 5
    """.stripMargin)

使用流处理模式时需注意 为了使上述查询输出可以输出到外部存储并且结果正确,外部存储需要拥有与 Top-N 查询一致的唯一键。在上述的查询例子中,若 product_id 是查询的唯一键,那么外部表必须要有 product_id 作为其唯一键。

8 去重

注意 仅 Blink planner 支持去重。

去重是指对在列的集合内重复的行进行删除,只保留第一行或最后一行数据。 在某些情况下,上游的 ETL 作业不能实现精确一次的端到端,这将可能导致在故障恢复 时,sink 中有重复的记录。 由于重复的记录将影响下游分析作业的正确性(例如,SUMCOUNT), 所以在进一步分析之前需要进行数据去重。

与 Top-N 查询相似,Flink 使用 ROW_NUMBER() 去除重复的记录。理论上来说,去重是一个特殊的 Top-N 查询,其中 N 是 1 ,记录则是以处理时间或事件事件进行排序的。

以下代码展示了去重语句的语法:

SELECT [column_list]
FROM (
   SELECT [column_list],
     ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
       ORDER BY time_attr [asc|desc]) AS rownum
   FROM table_name)
WHERE rownum = 1

参数说明:

  • ROW_NUMBER(): 从第一行开始,依次为每一行分配一个唯一且连续的号码。
  • PARTITION BY col1[, col2...]: 指定分区的列,例如去重的键。
  • ORDER BY time_attr [asc|desc]: 指定排序的列。所制定的列必须为 时间属性。目前仅支持 proctime attribute,在未来版本中将会支持 Rowtime atttribute 。升序( ASC )排列指只保留第一行,而降序排列( DESC )则指保留最后一行。
  • WHERE rownum = 1: Flink 需要 rownum = 1 以确定该查询是否为去重查询。

以下的例子描述了如何指定 SQL 查询以在一个流计算表中进行去重操作。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// 从外部数据源读取 DataStream
val ds: DataStream[(String, String, String, Int)] = env.addSource(...)
// 注册名为 “Orders” 的 DataStream
tableEnv.createTemporaryView("Orders", ds, $"order_id", $"user", $"product", $"number", $"proctime".proctime)

// 由于不应该出现两个订单有同一个order_id,所以根据 order_id 去除重复的行,并保留第一行
val result1 = tableEnv.sqlQuery(
    """
      |SELECT order_id, user, product, number
      |FROM (
      |   SELECT *,
      |       ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY proctime DESC) as row_num
      |   FROM Orders)
      |WHERE row_num = 1
    """.stripMargin)

9 分组窗口

SQL 查询的分组窗口是通过 GROUP BY 子句定义的。类似于使用常规 GROUP BY 语句的查询,窗口分组语句的 GROUP BY 子句中带有一个窗口函数为每个分组计算出一个结果。以下是批处理表和流处理表支持的分组窗口函数:

分组窗口函数描述
TUMBLE(time_attr, interval) 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。
HOP(time_attr, interval, interval) 定义一个跳跃的时间窗口(在 Table API 中称为滑动窗口)。滑动窗口有一个固定的持续时间( 第二个 interval 参数 )以及一个滑动的间隔(第一个 interval 参数 )。若滑动间隔小于窗口的持续时间,滑动窗口则会出现重叠;因此,行将会被分配到多个窗口中。比如,一个大小为 15 分组的滑动窗口,其滑动间隔为 5 分钟,将会把每一行数据分配到 3 个 15 分钟的窗口中。滑动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。
SESSION(time_attr, interval) 定义一个会话时间窗口。会话时间窗口没有一个固定的持续时间,但是它们的边界会根据 interval 所定义的不活跃时间所确定;即一个会话时间窗口在定义的间隔时间内没有时间出现,该窗口会被关闭。例如时间窗口的间隔时间是 30 分钟,当其不活跃的时间达到30分钟后,若观测到新的记录,则会启动一个新的会话时间窗口(否则该行数据会被添加到当前的窗口),且若在 30 分钟内没有观测到新纪录,这个窗口将会被关闭。会话时间窗口可以使用事件时间(批处理、流处理)或处理时间(流处理)。

10 时间属性

在流处理表中的 SQL 查询中,分组窗口函数的 time_attr 参数必须引用一个合法的时间属性,且该属性需要指定行的处理时间或事件时间。可参考 时间属性文档 以了解如何定义时间属性。

对于批处理的 SQL 查询,分组窗口函数的 time_attr 参数必须是一个 TIMESTAMP 类型的属性。

11 选择分组窗口的开始和结束时间戳

可以使用以下辅助函数选择组窗口的开始和结束时间戳以及时间属性:

辅助函数描述
TUMBLE_START(time_attr, interval)
HOP_START(time_attr, interval, interval)
SESSION_START(time_attr, interval)

返回相对应的滚动、滑动和会话窗口范围内的下界时间戳。

TUMBLE_END(time_attr, interval)
HOP_END(time_attr, interval, interval)
SESSION_END(time_attr, interval)

返回相对应的滚动、滑动和会话窗口范围以外的上界时间戳。

注意: 范围以外的上界时间戳不可以 在随后基于时间的操作中,作为 行时间属性 使用,比如 interval join 以及 分组窗口或分组窗口上的聚合

TUMBLE_ROWTIME(time_attr, interval)
HOP_ROWTIME(time_attr, interval, interval)
SESSION_ROWTIME(time_attr, interval)

返回相对应的滚动、滑动和会话窗口范围以内的上界时间戳。

返回的是一个可用于后续需要基于时间的操作的时间属性(rowtime attribute),比如interval join 以及 分组窗口或分组窗口上的聚合

TUMBLE_PROCTIME(time_attr, interval)
HOP_PROCTIME(time_attr, interval, interval)
SESSION_PROCTIME(time_attr, interval)

返回一个可用于后续需要基于时间的操作的 处理时间参数,比如interval join 以及 分组窗口或分组窗口上的聚合.

注意: 辅助函数必须使用与 GROUP BY 子句中的分组窗口函数完全相同的参数来调用.

以下的例子展示了如何在流处理表中指定使用分组窗口函数的 SQL 查询。

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

// 从外部数据源读取 DataSource
val ds: DataStream[(Long, String, Int)] = env.addSource(...)
// 计算每日(使用处理时间)的 SUM(amount) 
tableEnv.createTemporaryView("Orders", ds, $"user", $"product", $"amount", $"proctime".proctime, $"rowtime".rowtime)

// 计算每日的 SUM(amount) (使用事件时间)
val result1 = tableEnv.sqlQuery(
    """
      |SELECT
      |  user,
      |  TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart,
      |  SUM(amount)
      | FROM Orders
      | GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
    """.stripMargin)

// 计算每日的 SUM(amount) (使用处理时间)
val result2 = tableEnv.sqlQuery(
  "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user")

// 使用事件时间计算过去24小时中每小时的 SUM(amount) 
val result3 = tableEnv.sqlQuery(
  "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product")

// 计算每个以12小时(事件时间)作为不活动时间的会话的 SUM(amount) 
val result4 = tableEnv.sqlQuery(
    """
      |SELECT
      |  user,
      |  SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart,
      |  SESSION_END(rowtime, INTERVAL '12' HOUR) AS sEnd,
      |  SUM(amount)
      | FROM Orders
      | GROUP BY SESSION(rowtime(), INTERVAL '12' HOUR), user
    """.stripMargin)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// ingest a DataStream from an external source
DataStream<Tuple3<Long, String, Integer>> ds = env.addSource(...);
// register the DataStream as table "Orders"
tableEnv.createTemporaryView("Orders", ds, $("user"), $("product"), $("amount"), $("proctime").proctime(), $("rowtime").rowtime());

// compute SUM(amount) per day (in event-time)
Table result1 = tableEnv.sqlQuery(
  "SELECT user, " +
  "  TUMBLE_START(rowtime, INTERVAL '1' DAY) as wStart,  " +
  "  SUM(amount) FROM Orders " +
  "GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user");

// compute SUM(amount) per day (in processing-time)
Table result2 = tableEnv.sqlQuery(
  "SELECT user, SUM(amount) FROM Orders GROUP BY TUMBLE(proctime, INTERVAL '1' DAY), user");

// compute every hour the SUM(amount) of the last 24 hours in event-time
Table result3 = tableEnv.sqlQuery(
  "SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product");

// compute SUM(amount) per session with 12 hour inactivity gap (in event-time)
Table result4 = tableEnv.sqlQuery(
  "SELECT user, " +
  "  SESSION_START(rowtime, INTERVAL '12' HOUR) AS sStart, " +
  "  SESSION_ROWTIME(rowtime, INTERVAL '12' HOUR) AS snd, " +
  "  SUM(amount) " +
  "FROM Orders " +
  "GROUP BY SESSION(rowtime, INTERVAL '12' HOUR), user");

12 模式匹配

操作符描述
MATCH_RECOGNIZE
流处理

根据 MATCH_RECOGNIZE ISO 标准在流处理表中搜索给定的模式。 这样就可以在SQL查询中描述复杂的事件处理(CEP)逻辑。

更多详情请参考 检测表中的模式.

SELECT T.aid, T.bid, T.cid
FROM MyTable
MATCH_RECOGNIZE (
  PARTITION BY userid
  ORDER BY proctime
  MEASURES
    A.id AS aid,
    B.id AS bid,
    C.id AS cid
  PATTERN (A B C)
  DEFINE
    A AS name = 'a',
    B AS name = 'b',
    C AS name = 'c'
) AS T

本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/14053131.html

原文地址:https://www.cnblogs.com/qiu-hua/p/14053131.html