返回介绍

Table API

发布于 2025-05-02 18:19:15 字数 34658 浏览 0 评论 0 收藏

Table API 是用于流和批处理的统一关系 API。 Table API 查询可以在批量或流式输入上运行而无需修改。 Table API 是 SQL 语言的超级集合,专门用于与 Apache Flink 一起使用。 Table API 是 Scala 和 Java 语言集成 API。 Table API 查询不是像 SQL 一样将字符串值指定为 SQL,而是在 Java 或 Scala 中以嵌入语言的样式定义,并支持自动完成和语法验证等 IDE 支持。

Table API 与 Flink 的 SQL 集成共享其 API 的许多概念和部分。查看 Common Concepts&API 以了解如何注册表或创建 Table 对象。该 流概念 页讨论流如动态表和时间属性,具体的概念。

以下示例假定 Orders 使用属性调用的已注册表 (a, b, c, rowtime) 。该 rowtime 字段是流式传输中的逻辑 时间属性 或批处理中的常规时间戳字段。

概述和示例

Table API 可用于 Scala 和 Java。Scala Table API 利用 Scala 表达式,Java Table API 基于字符串,这些字符串被解析并转换为等效表达式。

以下示例显示了 Scala 和 Java Table API 之间的差异。表程序在批处理环境中执行。它 Orders 按字段 Scan 表, a 并按组计算结果行。表程序的结果转换为 DataSet 类型 Row 并打印。

通过导入启用 Java Table API org.apache.flink.table.api.java.* 。以下示例显示如何构造 Java Table API 程序以及如何将表达式指定为字符串。

// environment configuration
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);

// register Orders table in table environment
// ...

// specify table program
Table orders = tEnv.scan("Orders"); // schema (a, b, c, rowtime)

Table counts = orders
    .groupBy("a")
    .select("a, b.count as cnt");

// conversion to DataSet
DataSet<Row> result = tableEnv.toDataSet(counts, Row.class);
result.print();

The Scala Table API is enabled by importing org.apache.flink.api.scala._ and org.apache.flink.table.api.scala._ .

The following example shows how a Scala Table API program is constructed. Table attributes are referenced using Scala Symbols , which start with an apostrophe character ( ' ).

import org.apache.flink.api.scala._
import org.apache.flink.table.api.scala._

// environment configuration val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)

// register Orders table in table environment
// ... 
// specify table program val orders = tEnv.scan("Orders") // schema (a, b, c, rowtime) 
val result = orders
         .groupBy('a)
         .select('a, 'b.count as 'cnt)
         .toDataSet[Row] // conversion to DataSet
         .print()

下一个示例显示了一个更复杂的 Table API 程序。程序再次扫描 Orders 表格。它过滤空值,规范化 a String 类型的字段,并计算每小时和产品 a 的平均计费金额 b

// environment configuration
// ...

// specify table program
Table orders = tEnv.scan("Orders"); // schema (a, b, c, rowtime)

Table result = orders
    .filter("a.isNotNull && b.isNotNull && c.isNotNull")
    .select("a.lowerCase(), b, rowtime")
    .window(Tumble.over("1.hour").on("rowtime").as("hourlyWindow"))
    .groupBy("hourlyWindow, a")
    .select("a, hourlyWindow.end as hour, b.avg as avgBillingAmount");
// environment configuration
// ... 
// specify table program val orders: Table = tEnv.scan("Orders") // schema (a, b, c, rowtime) 
val result: Table = orders
    .filter('a.isNotNull && 'b.isNotNull && 'c.isNotNull)
    .select('a.lowerCase(), 'b, 'rowtime)
    .window(Tumble over 1.hour on 'rowtime as 'hourlyWindow)
    .groupBy('hourlyWindow, 'a)
    .select('a, 'hourlyWindow.end as 'hour, 'b.avg as 'avgBillingAmount)

由于 Table API 是批处理和流数据的统一 API,因此两个示例程序都可以在批处理和流输入上执行,而无需对表程序本身进行任何修改。在这两种情况下,程序产生相同的结果,因为流记录不会延迟(有关详细信息,请参阅 流式概念 )。

算子操作

Table API 支持以下 算子操作。请注意,并非所有 算子操作都可用于批处理和流式传输; 他们被相应地标记。

Scan,Projection 和过滤


算子: Scan 批量 流

描述:与 SQL 查询中的 FROM 子句类似。执行已注册表的扫描。

Table orders = tableEnv.scan("Orders");

算子: Select 批量 流

描述:与 SQL SELECT 语句类似。执行选择 算子操作。

Table orders = tableEnv.scan("Orders");
Table result = orders.select("a, c as d");

您可以使用 star( * )作为通配符,选择表中的所有列。

Table result = orders.select("*");

算子: As 批量 流

描述:重命名字段。

Table orders = tableEnv.scan("Orders");
Table result = orders.as("x, y, z, t");

算子: Where / Filter Batch Streaming

描述:与 SQL WHERE 子句类似。过滤掉未通过过滤谓词的行。

Table orders = tableEnv.scan("Orders");
Table result = orders.where("b === 'red'");

要么

Table orders = tableEnv.scan("Orders");
Table result = orders.filter("a % 2 === 0");

聚合


算子: GroupBy 聚合 批处理 流 结果更新

描述:与 SQL GROUP BY 子句类似。使用以下运行的聚合 算子对分组键上的行进行分组,以按组聚合行。

Table orders = tableEnv.scan("Orders");
Table result = orders.groupBy("a").select("a, b.sum as d");

注意: 对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于聚合类型和不同分组键的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅 Streaming Concepts 。


算子: GroupBy 窗口聚合 批量 流

描述:组和聚合组 窗口 上的表以及可能的一个或多个分组键。

Table orders = tableEnv.scan("Orders");
Table result = orders
  .window(Tumble.over("5.minutes").on("rowtime").as("w")) // define window
  .groupBy("a, w") // group by key and window
  .select("a, w.start, w.end, w.rowtime, b.sum as d"); // access window properties and aggregate

算子: Over Windows 聚合

描述:与 SQL OVER 子句类似。基于前一行和后一行的窗口(范围)计算每行的窗口聚合。有关更多详细信息,请参阅 over windows 部分

Table orders = tableEnv.scan("Orders");
Table result = orders
  // define window
  .window(Over
    .partitionBy("a")
    .orderBy("rowtime")
    .preceding("UNBOUNDED_RANGE")
    .following("CURRENT_RANGE")
    .as("w"))
  .select("a, b.avg over w, b.max over w, b.min over w"); // sliding aggregate

注意: 必须在同一窗口中定义所有聚合,即相同的分区,排序和范围。目前,仅支持具有 PRREDING(UNBOUNDED 和有界)到 CURRENT ROW 范围的窗口。尚不支持使用 FOLLOWING 的范围。必须在单个 时间属性 上指定 ORDER BY 。

算子: Distinct 批量 流 结果更新

描述:与 SQL DISTINCT 子句类似。返回具有不同值组合的记录。

Table orders = tableEnv.scan("Orders");
Table result = orders.distinct();

注意: 对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同字段的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅 Streaming Concepts 。

Join


算子: Inner Join 批量 流

描述:与 SQL JOIN 子句类似。关联两张桌子。两个表必须具有不同的字段名称,并且必须通过连接 算子或使用 where 或 filter 算子定义至少一个相等连接谓词。

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.join(right).where("a = d").select("a, b, e");

注意: 对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅 Streaming Concepts 。


算子: Outer Join 批处理 流 结果更新

描述:与 SQL LEFT / RIGHT / FULL OUTER JOIN 子句类似。关联两张桌子。两个表必须具有不同的字段名称,并且必须至少定义一个等于连接谓词。

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");

Table leftOuterResult = left.leftOuterJoin(right, "a = d").select("a, b, e");
Table rightOuterResult = left.rightOuterJoin(right, "a = d").select("a, b, e");
Table fullOuterResult = left.fullOuterJoin(right, "a = d").select("a, b, e");

注意: 对于流式查询,计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅 Streaming Concepts 。


算子: Time-windowed Join 批量 流

描述: 注意: 时间窗口连接是可以以流方式处理的常规连接的子集。时间窗口连接需要至少一个等连接谓词和一个限制双方时间的连接条件。这样的条件可以由两个适当的范围谓词( &lt;, &lt;=, &gt;=, &gt; )或单个等式谓词来定义,该单个等式谓词比较两个输入表的相同类型的 时间属性 (即,处理时间或事件时间)。例如,以下谓词是有效的窗口连接条件:

  • ltime === rtime
  • ltime &gt;= rtime && ltime &lt; rtime + 10.minutes
Table left = tableEnv.fromDataSet(ds1, "a, b, c, ltime.rowtime");
Table right = tableEnv.fromDataSet(ds2, "d, e, f, rtime.rowtime");

Table result = left.join(right)
  .where("a = d && ltime &gt;= rtime - 5.minutes && ltime &lt; rtime + 10.minutes")
  .select("a, b, e, ltime");

算子: TableFunction Inner Join 批量 流

描述:使用表函数的结果连接表。左(外)表的每一行与表函数的相应调用产生的所有行连接。如果其表函数调用返回空结果,则删除左(外)表的一行。

// register function
TableFunction&lt;String&gt; split = new MySplitUDTF();
tEnv.registerFunction("split", split);

// join
Table orders = tableEnv.scan("Orders");
Table result = orders
  .join(new Table(tEnv, "split(c)").as("s", "t", "v"))
  .select("a, b, s, t, v");

算子: TableFunction Left Outer Join Batch Streaming

描述:使用表函数的结果连接表。左(外)表的每一行与表函数的相应调用产生的所有行连接。如果表函数调用返回空结果,则保存相应的外部行,并使用空值填充结果。 注意: 目前,左外连接的表函数的谓词只能是空或文字 true

// register function
TableFunction&lt;String&gt; split = new MySplitUDTF();
tEnv.registerFunction("split", split);

// join
Table orders = tableEnv.scan("Orders");
Table result = orders
  .leftOuterJoin(new Table(tEnv, "split(c)").as("s", "t", "v"))
  .select("a, b, s, t, v");

设置 算子操作


算子: Union 批次

描述:与 SQL UNION 子句类似。联合两个表删除了重复记录。两个表必须具有相同的字段类型。

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.union(right);

算子: UnionAll Batch Streaming

描述:与 SQL UNION ALL 子句类似。工会两张桌子。两个表必须具有相同的字段类型。

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.unionAll(right);

算子: Intersect 批次

描述:类似于 SQL INTERSECT 子句。Intersect 返回两个表中存在的记录。如果一个或两个表不止一次出现记录,则只返回一次,即结果表没有重复记录。两个表必须具有相同的字段类型。

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.intersect(right);

算子: IntersectAll Batch

描述:类似于 SQL INTERSECT ALL 子句。IntersectAll 返回两个表中存在的记录。如果两个表中的记录多次出现,则返回的值与两个表中的记录一样多,即生成的表可能具有重复记录。两个表必须具有相同的字段类型。

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "d, e, f");
Table result = left.intersectAll(right);

算子: Minus

描述:与 SQL EXCEPT 子句类似。减号返回左表中右表中不存在的记录。左表中的重复记录只返回一次,即删除重复项。两个表必须具有相同的字段类型。

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.minus(right);

算子: MinusAll Batch

描述:类似于 SQL EXCEPT ALL 子句。MinusAll 返回右表中不存在的记录。在左表中出现 n 次并在右表中出现 m 次的记录返回(n-m)次,即,删除右表中存在的重复次数。两个表必须具有相同的字段类型。

Table left = tableEnv.fromDataSet(ds1, "a, b, c");
Table right = tableEnv.fromDataSet(ds2, "a, b, c");
Table result = left.minusAll(right);

算子: In 批量 流中

描述:与 SQL IN 子句类似。如果表达式存在于给定的表子查询中,则返回 true。子查询表必须包含一列。此列必须与表达式具有相同的数据类型。

Table left = ds1.toTable(tableEnv, "a, b, c");
Table right = ds2.toTable(tableEnv, "a");

// using implicit registration
Table result = left.select("a, b, c").where("a.in(" + right + ")");

// using explicit registration
tableEnv.registerTable("RightTable", right);
Table result = left.select("a, b, c").where("a.in(RightTable)");

注意: 对于流式查询, 算子操作将在连接和组 算子操作中重写。计算查询结果所需的状态可能会无限增长,具体取决于不同输入行的数量。请提供具有有效保存间隔的查询配置,以防止过大的状态。有关详细信息,请参阅 Streaming Concepts 。

OrderBy,Offset&Fetch


算子: Order By 批次

描述:与 SQL ORDER BY 子句类似。返回跨所有并行分区全局排序的记录。

Table in = tableEnv.fromDataSet(ds, "a, b, c");
Table result = in.orderBy("a.asc");

算子: Offset & Fetch 批次

描述:类似于 SQL OFFSET 和 FETCH 子句。偏移和提取限制从排序结果返回的记录数。Offset 和 Fetch 在技术上是 Order By 算子的一部分,因此必须以它为前缀。

Table in = tableEnv.fromDataSet(ds, "a, b, c");

// returns the first 5 records from the sorted result
Table result1 = in.orderBy("a.asc").fetch(5); 

// skips the first 3 records and returns all following records from the sorted result
Table result2 = in.orderBy("a.asc").offset(3);

// skips the first 10 records and returns the next 5 records from the sorted result
Table result3 = in.orderBy("a.asc").offset(10).fetch(5);

Insert


算子: Insert 批量 流处理

描述:类似于 SQL 查询中的 INSERT INTO 子句。执行插入已注册的输出表。输出表必须在 TableEnvironment 中 注册 (请参阅 注册 TableSink )。此外,已注册表的模式必须与查询的模式匹配。

Table orders = tableEnv.scan("Orders");
orders.insertInto("OutOrders");

GroupWindows

组窗口根据时间或行计数间隔将行组聚合为有限组,并按组评估聚合函数。对于批处理表,窗口是按时间间隔对记录进行分组的便捷快捷方式。

Windows 是使用该 window(w: Window) 子句定义的,需要使用该 as 子句指定的别名。为了按窗口对表进行分组,必须在 groupBy(...) 子句中引用窗口别名,就像常规分组属性一样。以下示例显示如何在表上定义窗口聚合。

Table table = input
  .window([Window w].as("w"))  // define window with alias w
  .groupBy("w")  // group the table by window w
  .select("b.sum");  // aggregate
val table = input
  .window([w: Window] as 'w)  // define window with alias w
  .groupBy('w)   // group the table by window w
  .select('b.sum)  // aggregate

在流式传输环境中,如果窗口聚合除了窗口之外还在一个或多个属性上进行分组,则它们只能并行计算,即 groupBy(...) 子句引用窗口别名和至少一个附加属性。一个 groupBy(...) 仅引用一个窗口别名(如在上面的例子)子句只能由一个单一的,非平行的任务进行评估。以下示例显示如何使用其他分组属性定义窗口聚合。

Table table = input
  .window([Window w].as("w"))  // define window with alias w
  .groupBy("w, a")  // group the table by attribute a and window w 
  .select("a, b.sum");  // aggregate
val table = input
  .window([w: Window] as 'w) // define window with alias w
  .groupBy('w, 'a)  // group the table by attribute a and window w
  .select('a, 'b.sum)  // aggregate

窗口属性诸如开始,结束,或一个时间窗口的 rowtime 时间戳可以在选择语句被添加为窗口别名作为的一个属性 w.startw.end 以及 w.rowtime 分别。窗口开始和行时间戳是包含的低窗口和超窗口边界。相反,窗口结束时间戳是独占的上窗口边界。例如,从下午 2 点开始的 30 分钟的翻滚窗口将具有 14:00:00.000 开始时间戳, 14:29:59.999 行时间戳和 14:30:00.000 结束时间戳。

Table table = input
  .window([Window w].as("w"))  // define window with alias w
  .groupBy("w, a")  // group the table by attribute a and window w 
  .select("a, w.start, w.end, w.rowtime, b.count"); // aggregate and add window start, end, and rowtime timestamps
val table = input
  .window([w: Window] as 'w)  // define window with alias w
  .groupBy('w, 'a)  // group the table by attribute a and window w
  .select('a, 'w.start, 'w.end, 'w.rowtime, 'b.count) // aggregate and add window start, end, and rowtime timestamps

Window 参数定义行如何映射到窗口。 Window 不是用户可以实现的接口。相反, Table API 提供了一组 Window 具有特定语义的预定义类,这些类被转换为底层 DataStreamDataSet 算子操作。支持的窗口定义如下所示。

翻滚(翻滚的 Windows)

翻滚窗口将行分配给固定长度的非重叠连续窗口。例如,5 分钟的翻滚窗口以 5 分钟为间隔对行进行分组。可以在事件时间,处理时间或行数上定义翻滚窗口。

使用 Tumble 类定义翻滚窗口如下:

方法描述
over定义窗口的长度,可以是时间或行计数间隔。
on组的时间属性(时间间隔)或排序(行数)。对于批处理查询,这可能是任何 Long 或 Timestamp 属性。对于流式查询,这必须是 声明的事件时间或处理时间属性
as为窗口指定别名。别名用于引用以下 groupBy() 子句中的窗口,并可选择在子句中选择窗口属性,如窗口开始,结束或行时间戳 select()
// Tumbling Event-time Window
.window(Tumble.over("10.minutes").on("rowtime").as("w"));

// Tumbling Processing-time Window (assuming a processing-time attribute "proctime")
.window(Tumble.over("10.minutes").on("proctime").as("w"));

// Tumbling Row-count Window (assuming a processing-time attribute "proctime")
.window(Tumble.over("10.rows").on("proctime").as("w"));
// Tumbling Event-time Window .window(Tumble over 10.minutes on 'rowtime as 'w)

// Tumbling Processing-time Window (assuming a processing-time attribute "proctime") .window(Tumble over 10.minutes on 'proctime as 'w)

// Tumbling Row-count Window (assuming a processing-time attribute "proctime") .window(Tumble over 10.rows on 'proctime as 'w)

滑动(滑动窗口)

滑动窗口具有固定大小,并按指定的滑动间隔滑动。如果滑动间隔小于窗口大小,则滑动窗口重叠。因此,可以将行分配给多个窗口。例如,15 分钟大小和 5 分钟滑动间隔的滑动窗口将每行分配给 3 个不同的 15 分钟大小的窗口,这些窗口以 5 分钟的间隔进行评估。可以在事件时间,处理时间或行数上定义滑动窗口。

滑动窗口使用 Slide 如下类定义:

方法描述
over定义窗口的长度,可以是时间或行计数间隔。
every定义滑动间隔,可以是时间间隔也可以是行计数间隔。滑动间隔必须与大小间隔的类型相同。
on组的时间属性(时间间隔)或排序(行数)。对于批处理查询,这可能是任何 Long 或 Timestamp 属性。对于流式查询,这必须是 声明的事件时间或处理时间属性
as为窗口指定别名。别名用于引用以下 groupBy() 子句中的窗口,并可选择在子句中选择窗口属性,如窗口开始,结束或行时间戳 select()
// Sliding Event-time Window
.window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"));

// Sliding Processing-time window (assuming a processing-time attribute "proctime")
.window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"));

// Sliding Row-count window (assuming a processing-time attribute "proctime")
.window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"));
// Sliding Event-time Window .window(Slide over 10.minutes every 5.minutes on 'rowtime as 'w)

// Sliding Processing-time window (assuming a processing-time attribute "proctime") .window(Slide over 10.minutes every 5.minutes on 'proctime as 'w)

// Sliding Row-count window (assuming a processing-time attribute "proctime") .window(Slide over 10.rows every 5.rows on 'proctime as 'w)

会话(会话窗口)

会话窗口没有固定的大小,但它们的边界由不活动的间隔定义,即如果在定义的间隙期间没有出现事件,则会话窗口关闭。例如,如果在 30 分钟不活动后观察到一行,则会开始一个 30 分钟间隙的会话窗口(否则该行将被添加到现有窗口中),如果在 30 分钟内未添加任何行,则会关闭。会话窗口可以在事件时间或处理时间上工作。

使用 Session 类定义会话窗口,如下所示:

方法描述
withGap将两个窗口之间的间隔定义为时间间隔。
on组的时间属性(时间间隔)或排序(行数)。对于批处理查询,这可能是任何 Long 或 Timestamp 属性。对于流式查询,这必须是 声明的事件时间或处理时间属性
as为窗口指定别名。别名用于引用以下 groupBy() 子句中的窗口,并可选择在子句中选择窗口属性,如窗口开始,结束或行时间戳 select()
// Session Event-time Window
.window(Session.withGap("10.minutes").on("rowtime").as("w"));

// Session Processing-time Window (assuming a processing-time attribute "proctime")
.window(Session.withGap("10.minutes").on("proctime").as("w"));
// Session Event-time Window .window(Session withGap 10.minutes on 'rowtime as 'w)

// Session Processing-time Window (assuming a processing-time attribute "proctime") .window(Session withGap 10.minutes on 'proctime as 'w)

OverWindows

从标准 SQL( OVER 子句)已知过窗口聚合,并在 SELECT 查询的子句中定义。与 GROUP BY 子句中指定的组窗口不同,在窗口上不会折叠行。而是通过窗口聚合计算每个输入行在其相邻行的范围内的聚合。

OverWindows 是使用 window(w: OverWindow*) 子句定义的,并通过 select() 方法中的别名引用。以下示例显示如何在表上定义过窗口聚合。

Table table = input
  .window([OverWindow w].as("w"))       // define over window with alias w
  .select("a, b.sum over w, c.min over w"); // aggregate over the over window w
val table = input
  .window([w: OverWindow] as 'w)        // define over window with alias w
  .select('a, 'b.sum over 'w, 'c.min over 'w) // aggregate over the over window w

OverWindow 限定范围在其聚集计算的行。 OverWindow 不是用户可以实现的接口。相反, Table API 提供了 Over 用于配置覆盖窗口属性的类。可以在事件时间或处理时间以及指定为时间间隔或行计数的范围上定义窗口。支持的窗口定义在 Over (和其他类)上公开为方法,如下所示:

方法Required描述
partitionByOptional定义一个或多个属性上的输入分区。每个分区都单独排序,聚合函数分别应用于每个分区。 注意: 在流式环境中,如果窗口包含 partition by 子句,则只能并行计算窗口聚合。no / not partitionBy(...) 流由单个非并行任务处理。
orderByRequired定义每个分区中行的顺序,从而定义聚合函数应用于行的顺序。 注意: 对于流式查询,这必须是 声明的事件时间或处理时间属性 。目前,仅支持单个排序属性。
precedingRequired定义窗口中包含的行的间隔,并在当前行之前。间隔可以指定为时间或行计数间隔。 在窗口 上限定具有间隔的大小,例如, 10.minutes 对于时间间隔或 10.rows 行计数间隔。使用常量(即, UNBOUNDED_RANGE 时间间隔或 UNBOUNDED_ROW 行计数间隔)指定 在窗口无界限 。在 Windows 上无限制地从分区的第一行开始。
followingOptional定义窗口中包含的行的窗口间隔,并跟随当前行。必须在与前一个间隔(时间或行计数)相同的单位中指定间隔。目前,不支持在当前行之后包含行的窗口。相反,您可以指定两个常量之一:
 CURRENT_ROW 将窗口的上限设置为当前行。
 CURRENT_RANGE 设置窗口的上限以对当前行的键进行排序,即窗口中包含与当前行具有相同排序键的所有行。
 如果 following 省略该子句,则将时间间隔窗口 CURRENT_RANGE 的上限定义为,并将行计数间隔窗口的上限定义为 CURRENT_ROW
asRequired为覆盖窗口指定别名。别名用于引用以下 select() 子句中的 over window 。

注意: 目前,同一 select() 调用中的所有聚合函数必须计算相同的窗口。

在 Windows 上无限制

// Unbounded Event-time over window (assuming an event-time attribute "rowtime")
.window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_range").as("w"));

// Unbounded Processing-time over window (assuming a processing-time attribute "proctime")
.window(Over.partitionBy("a").orderBy("proctime").preceding("unbounded_range").as("w"));

// Unbounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
.window(Over.partitionBy("a").orderBy("rowtime").preceding("unbounded_row").as("w"));

// Unbounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
.window(Over.partitionBy("a").orderBy("proctime").preceding("unbounded_row").as("w"));
// Unbounded Event-time over window (assuming an event-time attribute "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE as 'w)

// Unbounded Processing-time over window (assuming a processing-time attribute "proctime") .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_RANGE as 'w)

// Unbounded Event-time Row-count over window (assuming an event-time attribute "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_ROW as 'w)

// Unbounded Processing-time Row-count over window (assuming a processing-time attribute "proctime") .window(Over partitionBy 'a orderBy 'proctime preceding UNBOUNDED_ROW as 'w)

限制在 Windows 上

// Bounded Event-time over window (assuming an event-time attribute "rowtime")
.window(Over.partitionBy("a").orderBy("rowtime").preceding("1.minutes").as("w"))

// Bounded Processing-time over window (assuming a processing-time attribute "proctime")
.window(Over.partitionBy("a").orderBy("proctime").preceding("1.minutes").as("w"))

// Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime")
.window(Over.partitionBy("a").orderBy("rowtime").preceding("10.rows").as("w"))

// Bounded Processing-time Row-count over window (assuming a processing-time attribute "proctime")
.window(Over.partitionBy("a").orderBy("proctime").preceding("10.rows").as("w"))
// Bounded Event-time over window (assuming an event-time attribute "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding 1.minutes as 'w)

// Bounded Processing-time over window (assuming a processing-time attribute "proctime") .window(Over partitionBy 'a orderBy 'proctime preceding 1.minutes as 'w)

// Bounded Event-time Row-count over window (assuming an event-time attribute "rowtime") .window(Over partitionBy 'a orderBy 'rowtime preceding 10.rows as 'w)

// Bounded Processing-time Row-count over window (assuming a processing-time attribute "proctime") .window(Over partitionBy 'a orderBy 'proctime preceding 10.rows as 'w)

数据类型

Table API 建立在 Flink 的 DataSet 和 DataStream API 之上。在内部,它还使用 Flink TypeInformation 来定义数据类型。完全支持的类型列在 org.apache.flink.table.api.Types 。下表总结了 Table API 类型,SQL 类型和生成的 Java 类之间的关系。

Table APISQLJava 类型
Types.STRINGVARCHARjava.lang.String
Types.BOOLEANBOOLEANjava.lang.Boolean
Types.BYTETINYINTjava.lang.Byte
Types.SHORTSMALLINTjava.lang.Short
Types.INTINTEGER, INTjava.lang.Integer
Types.LONGBIGINTjava.lang.Long
Types.FLOATREAL, FLOATjava.lang.Float
Types.DOUBLEDOUBLEjava.lang.Double
Types.DECIMALDECIMALjava.math.BigDecimal
Types.SQL_DATEDATEjava.sql.Date
Types.SQL_TIMETIMEjava.sql.Time
Types.SQL_TIMESTAMPTIMESTAMP(3)java.sql.Timestamp
Types.INTERVAL_MONTHSINTERVAL YEAR TO MONTHjava.lang.Integer
Types.INTERVAL_MILLISINTERVAL DAY TO SECOND(3)java.lang.Long
Types.PRIMITIVE_ARRAYARRAY例如 int[]
Types.OBJECT_ARRAYARRAY例如 java.lang.Byte[]
Types.MAPMAPjava.util.HashMap
Types.MULTISETMULTISET例如, java.util.HashMap&lt;String, Integer&gt; 对于多重集合 String
Types.ROWROWorg.apache.flink.types.Row

通用类型和(嵌套)复合类型(例如,POJO,元组,Row,Scala 案例类)也可以是行的字段。

可以使用 值访问函数 访问具有任意嵌套的复合类型的字段。

通用类型被视为黑盒子,可以由 用户定义的函数 传递或处理。

表达式语法

前面部分中的一些 算子需要一个或多个表达式。可以使用嵌入式 Scala DSL 或字符串指定表达式。请参阅上面的示例以了解如何指定表达式。

这是表达式的 EBNF 语法:

expressionList = expression , { "," , expression } ;

expression = timeIndicator | overConstant | alias ;

alias = logic | ( logic , "as" , fieldReference ) | ( logic , "as" , "(" , fieldReference , { "," , fieldReference } , ")" ) ;

logic = comparison , [ ( "&&" | "||" ) , comparison ] ;

comparison = term , [ ( "=" | "==" | "===" | "!=" | "!==" | ">" | ">=" | "<" | "<=" ) , term ] ;

term = product , [ ( "+" | "-" ) , product ] ;

product = unary , [ ( "*" | "/" | "%") , unary ] ;

unary = [ "!" | "-" ] , composite ;

composite = over | nullLiteral | suffixed | atom ;

suffixed = interval | cast | as | if | functionCall ;

interval = timeInterval | rowInterval ;

timeInterval = composite , "." , ("year" | "years" | "quarter" | "quarters" | "month" | "months" | "week" | "weeks" | "day" | "days" | "hour" | "hours" | "minute" | "minutes" | "second" | "seconds" | "milli" | "millis") ;

rowInterval = composite , "." , "rows" ;

cast = composite , ".cast(" , dataType , ")" ;

dataType = "BYTE" | "SHORT" | "INT" | "LONG" | "FLOAT" | "DOUBLE" | "BOOLEAN" | "STRING" | "DECIMAL" | "SQL_DATE" | "SQL_TIME" | "SQL_TIMESTAMP" | "INTERVAL_MONTHS" | "INTERVAL_MILLIS" | ( "MAP" , "(" , dataType , "," , dataType , ")" ) | ( "PRIMITIVE_ARRAY" , "(" , dataType , ")" ) | ( "OBJECT_ARRAY" , "(" , dataType , ")" ) ;

as = composite , ".as(" , fieldReference , ")" ;

if = composite , ".?(" , expression , "," , expression , ")" ;

functionCall = composite , "." , functionIdentifier , [ "(" , [ expression , { "," , expression } ] , ")" ] ;

atom = ( "(" , expression , ")" ) | literal | fieldReference ;

fieldReference = "*" | identifier ;

nullLiteral = "Null(" , dataType , ")" ;

timeIntervalUnit = "YEAR" | "YEAR_TO_MONTH" | "MONTH" | "QUARTER" | "WEEK" | "DAY" | "DAY_TO_HOUR" | "DAY_TO_MINUTE" | "DAY_TO_SECOND" | "HOUR" | "HOUR_TO_MINUTE" | "HOUR_TO_SECOND" | "MINUTE" | "MINUTE_TO_SECOND" | "SECOND" ;

timePointUnit = "YEAR" | "MONTH" | "DAY" | "HOUR" | "MINUTE" | "SECOND" | "QUARTER" | "WEEK" | "MILLISECOND" | "MICROSECOND" ;

over = composite , "over" , fieldReference ;

overConstant = "current_row" | "current_range" | "unbounded_row" | "unbounded_row" ;

timeIndicator = fieldReference , "." , ( "proctime" | "rowtime" ) ;

这里 literal 是一个有效的 Java 文字, fieldReference 指定数据中的列(或使用的所有列 * ),并 functionIdentifier 指定支持的标量函数。列名和函数名遵循 Java 标识符语法。指定为字符串的表达式也可以使用前缀表示法而不是后缀表示法来调用 算子和函数。

如果需要使用精确数值或大小数, Table API 也支持 JavaBigDecimal 类型。在 Scala Table API 中,小数可以 BigDecimal("123456") 通过附加“p” 来定义,也可以在 Java 中定义,例如 123456p

为了使用时态值, Table API 支持 Java SQL 的 Date,Time 和 Timestamp 类型。在 Scala 的 Table API 文本可以通过使用来定义 java.sql.Date.valueOf("2016-06-27")java.sql.Time.valueOf("10:10:42")java.sql.Timestamp.valueOf("2016-06-27 10:10:42.123") 。在 Java 和 Scala Table API 还支持调用 "2016-06-27".toDate()"10:10:42".toTime() 以及 "2016-06-27 10:10:42.123".toTimestamp() 对 String 转化成时间类型。 注意: 由于 Java 临时 SQL 类型与时区有关,因此请确保 Flink Client 和所有 TaskManagers 使用相同的时区。

时间间隔可以表示为月数( Types.INTERVAL_MONTHS )或毫秒数( Types.INTERVAL_MILLIS )。可以添加或减去相同类型的间隔(例如 1.hour + 10.minutes )。可以将毫秒的间隔添加到时间点(例如 "2016-08-10".toDate + 5.days )。

发布评论

需要 登录 才能够评论, 你可以免费 注册 一个本站的账号。
列表为空,暂无数据
    我们使用 Cookies 和其他技术来定制您的体验包括您的登录状态等。通过阅读我们的 隐私政策 了解更多相关信息。 单击 接受 或继续使用网站,即表示您同意使用 Cookies 和您的相关数据。