Flink基础(二十六):FLINK-SQL语法(二)DQL(二)查询语句(二)操作符(一)

1 Scan、Projection 与 Filter

操作符描述
Scan / Select / As
批处理 流处理
SELECT * FROM Orders

SELECT a, c AS d FROM Orders
Where / Filter
批处理 流处理
SELECT * FROM Orders WHERE b = 'red'

SELECT * FROM Orders WHERE a % 2 = 0
用户定义标量函数(Scalar UDF)
批处理 流处理

自定义函数必须事先注册到 TableEnvironment 中。 可阅读 自定义函数文档 以获得如何指定和注册自定义函数的详细信息。

SELECT PRETTY_PRINT(user) FROM Orders

2 聚合

操作符描述
GroupBy 聚合
批处理 流处理
结果更新

注意: GroupBy 在流处理表中会产生更新结果(updating result)。详情请阅读 动态表流概念 。

SELECT a, SUM(b) as d
FROM Orders
GROUP BY a
GroupBy 窗口聚合
批处理 流处理

使用分组窗口对每个组进行计算并得到一个结果行。详情请阅读 分组窗口 章节

SELECT user, SUM(amount)
FROM Orders
GROUP BY TUMBLE(rowtime, INTERVAL '1' DAY), user
Over Window aggregation
流处理

注意: 所有的聚合必须定义到同一个窗口中,即相同的分区、排序和区间。当前仅支持 PRECEDING (无界或有界) 到 CURRENT ROW 范围内的窗口、FOLLOWING 所描述的区间并未支持,ORDER BY 必须指定于单个的时间属性

SELECT COUNT(amount) OVER (
  PARTITION BY user
  ORDER BY proctime
  ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)
FROM Orders

SELECT COUNT(amount) OVER w, SUM(amount) OVER w
FROM Orders 
WINDOW w AS (
  PARTITION BY user
  ORDER BY proctime
  ROWS BETWEEN 2 PRECEDING AND CURRENT ROW)  
Distinct
批处理 流处理
结果更新
SELECT DISTINCT users FROM Orders

注意: 对于流处理查询,根据不同字段的数量,计算查询结果所需的状态可能会无限增长。请提供具有有效保留间隔的查询配置,以防止出现过多的状态。请阅读 查询配置 以获取详细的信息

Grouping sets, Rollup, Cube
批处理 流处理 结果更新
SELECT SUM(amount)
FROM Orders
GROUP BY GROUPING SETS ((user), (product))

Note: 流式 Grouping sets、Rollup 以及 Cube 只在 Blink planner 中支持。

Having
批处理 流处理
SELECT SUM(amount)
FROM Orders
GROUP BY users
HAVING SUM(amount) > 50
用户自定义聚合函数 (UDAGG)
批处理 流处理

UDAGG 必须注册到 TableEnvironment. 参考自定义函数文档 以了解如何指定和注册 UDAGG 。

SELECT MyAggregate(amount)
FROM Orders
GROUP BY users

3 Joins

操作符描述
Inner Equi-join
批处理 流处理

目前仅支持 equi-join ,即 join 的联合条件至少拥有一个相等谓词。不支持任何 cross join 和 theta join。

注意: Join 的顺序没有进行优化,join 会按照 FROM 中所定义的顺序依次执行。请确保 join 所指定的表在顺序执行中不会产生不支持的 cross join (笛卡儿积)以至查询失败。

SELECT *
FROM Orders INNER JOIN Product ON Orders.productId = Product.id

注意: 流查询中可能会因为不同行的输入数量导致计算结果的状态无限增长。请提供具有有效保留间隔的查询配置,以防止出现过多的状态。详情请参考 查询配置 页面.

Outer Equi-join
批处理 流处理 结果更新

目前仅支持 equi-join ,即 join 的联合条件至少拥有一个相等谓词。不支持任何 cross join 和 theta join。

注意: Join 的顺序没有进行优化,join 会按照 FROM 中所定义的顺序依次执行。请确保 join 所指定的表在顺序执行中不会产生不支持的 cross join (笛卡儿积)以至查询失败。

SELECT *
FROM Orders LEFT JOIN Product ON Orders.productId = Product.id

SELECT *
FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id

SELECT *
FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id

注意: 流查询中可能会因为不同行的输入数量导致计算结果的状态无限增长。请提供具有有效保留间隔的查询配置,以防止出现过多的状态。详情请参考 查询配置 页面.

Interval Join
批处理 流处理

注意:Interval join (时间区间关联)是常规 join 的子集,可以使用流的方式进行处理。

Interval join需要至少一个 equi-join 谓词和一个限制了双方时间的 join 条件。例如使用两个适当的范围谓词(<, <=, >=, >),一个 BETWEEN 谓词或一个比较两个输入表中相同类型的 时间属性 (即处理时间和事件时间)的相等谓词

比如,以下谓词是合法的 interval join 条件:

  • ltime = rtime
  • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
  • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId AND
      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime
以上示例中,所有在收到后四小时内发货的 order 会与他们相关的 shipment 进行 join。
Expanding arrays into a relation
批处理 流处理

目前尚未支持非嵌套的 WITH ORDINALITY 。

SELECT users, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
Join 表函数 (UDTF)
批处理 流处理

将表与表函数的结果进行 join 操作。左表(outer)中的每一行将会与调用表函数所产生的所有结果中相关联行进行 join 。

用户自定义表函数( User-defined table functions,UDTFs ) 在执行前必须先注册。请参考 UDF 文档 以获取更多关于指定和注册UDF的信息

Inner Join

若表函数返回了空结果,左表(outer)的行将会被删除。

SELECT users, tag
FROM Orders, LATERAL TABLE(unnest_udtf(tags)) AS t(tag)

Left Outer Join

若表函数返回了空结果,将会保留相对应的外部行并用空值填充结果。

SELECT users, tag
FROM Orders LEFT JOIN LATERAL TABLE(unnest_udtf(tags)) AS t(tag) ON TRUE

注意: 当前仅支持文本常量 TRUE 作为针对横向表的左外部联接的谓词。

Join Temporal Table Function
流处理

Temporal Tables 是跟随时间变化而变化的表。

Temporal Table Function 提供访问 Temporal Tables 在某一时间点的状态的能力。 Join Temporal Table Function 的语法与 Join Table Function 一致。

注意: 目前仅支持在 Temporal Tables 上的 inner join 。

假如 Rates 是一个 Temporal Table Function, join 可以使用 SQL 进行如下的表达:

SELECT
  o_amount, r_rate
FROM
  Orders,
  LATERAL TABLE (Rates(o_proctime))
WHERE
  r_currency = o_currency

请查看 Temporal Tables 概念描述 以了解详细信息。

Join Temporal Tables
批处理 流处理

Temporal Tables 是随时间变化而变化的表。 Temporal Table 提供访问指定时间点的 temporal table 版本的功能。

仅支持带有处理时间的 temporal tables 的 inner 和 left join。

下述示例中,假设 LatestRates 是一个根据最新的 rates 物化的 Temporal Table 。

SELECT
  o.amout, o.currency, r.rate, o.amount * r.rate
FROM
  Orders AS o
  JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
  ON r.currency = o.currency

请阅读 Temporal Tables 概念描述以了解详细信息。

仅 Blink planner 支持。

4 集合操作

操作符描述
Union
批处理
SELECT *
FROM (
    (SELECT user FROM Orders WHERE a % 2 = 0)
  UNION
    (SELECT user FROM Orders WHERE b = 0)
)
UnionAll
批处理 流处理
SELECT *
FROM (
    (SELECT user FROM Orders WHERE a % 2 = 0)
  UNION ALL
    (SELECT user FROM Orders WHERE b = 0)
)
Intersect / Except
批处理
SELECT *
FROM (
    (SELECT user FROM Orders WHERE a % 2 = 0)
  INTERSECT
    (SELECT user FROM Orders WHERE b = 0)
)
SELECT *
FROM (
    (SELECT user FROM Orders WHERE a % 2 = 0)
  EXCEPT
    (SELECT user FROM Orders WHERE b = 0)
)
In
批处理 流处理

若表达式在给定的表子查询中存在,则返回 true 。子查询表必须由单个列构成,且该列的数据类型需与表达式保持一致。

SELECT user, amount
FROM Orders
WHERE product IN (
    SELECT product FROM NewProducts
)

注意: 在流查询中,这一操作将会被重写为 join 和 group 操作。该查询所需要的状态可能会由于不同的输入行数而导致无限增长。请在查询配置中提合理的保留间隔以避免产生状态过大。请阅读 查询配置 以了解详细信息

Exists
批处理 流处理

若子查询的结果多于一行,将返回 true 。仅支持可以被通过 join 和 group 重写的操作。

SELECT user, amount
FROM Orders
WHERE product EXISTS (
    SELECT product FROM NewProducts
)

注意: 在流查询中,这一操作将会被重写为 join 和 group 操作。该查询所需要的状态可能会由于不同的输入行数而导致无限增长。请在查询配置中提合理的保留间隔以避免产生状态过大。请阅读 查询配置 以了解详细信息

本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/14053085.html

原文地址:https://www.cnblogs.com/qiu-hua/p/14053085.html