The Table API is a unified, relational API for stream and batch processing. Table API queries can be run on batch or streaming input without modifications. The Table API is a super set of the SQL language and is specially designed for working with Apache Flink. The Table API is a language-integrated API for Scala and Java. Instead of specifying queries as String values as common with SQL, Table API queries are defined in a language-embedded style in Java or Scala with IDE support like autocompletion and syntax validation.
Table API是用于流和批处理的统一关系API。Table API 查询可以在批处理或流输入上运行,无需修改。表 API 是 SQL 语言的超集,是专门为使用 Apache Flink 而设计的。Table API 是 Scala 和 Java 的语言集成 API。不像SQL那样将查询指定为字符串值,Table API 查询是在 Java 或 Scala 中以嵌入语言的样式定义的,并具有 IDE 支持,如自动完成和语法验证。
The Table API shares many concepts and parts of its API with Flink’s SQL integration. Have a look at the [Common Concepts & API](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/common.html) to learn how to register tables or to create a `Table` object. The [Streaming Concepts](./streaming) pages discuss streaming specific concepts such as dynamic tables and time attributes.
Table API 与 Flink 的 SQL 集成共享许多概念和部分API。查看[常见概念和 API ](//ci.apache.org/projects/flink/flink-docs-release-1.7/dev/table/common.html)学习如何注册表或创建一个 `Table` 对象。[流概念](./streaming)页面讨论了流特定的概念,例如动态表和时间属性。
The following examples assume a registered table called `Orders` with attributes `(a, b, c, rowtime)`. The `rowtime` field is either a logical [time attribute](./streaming/time_attributes.html) in streaming or a regular timestamp field in batch.
The Table API is available for Scala and Java. The Scala Table API leverages on Scala expressions, the Java Table API is based on strings which are parsed and converted into equivalent expressions.
## 概述和例子
The following example shows the differences between the Scala and Java Table API. The table program is executed in a batch environment. It scans the `Orders` table, groups by field `a`, and counts the resulting rows per group. The result of the table program is converted into a `DataSet` of type `Row` and printed.
Table API 可用于 Scala 和 Java。Scala Table API 利用 Scala 表达式,Java Table API 基于被解析并转换成等价表达式的字符串。
The Java Table API is enabled by importing `org.apache.flink.table.api.java.*`. The following example shows how a Java Table API program is constructed and how expressions are specified as strings.
Java Table API 是通过导入 `org.apache.flink.table.api.java.*` 来启用的。下面的示例展示了如何构造 Java Table API 程序,以及如何将表达式指定为字符串。
...
...
@@ -40,9 +41,9 @@ result.print();
The Scala Table API is enabled by importing `org.apache.flink.api.scala._` and `org.apache.flink.table.api.scala._`.
Scala Table API 是通过导入 `org.apache.flink.api.scala._` 和 `org.apache.flink.table.api.scala._` 来启用的。
The following example shows how a Scala Table API program is constructed. Table attributes are referenced using [Scala Symbols](http://scala-lang.org/files/archive/spec/2.12/01-lexical-syntax.html#symbol-literals), which start with an apostrophe character (`'`).
下面的示例展示了如何构造 Scala Table API 程序。表属性使用[Scala 符号](http://scala-lang.org/files/archive/spec/2.12/01-lexical-syntax.html#symbol-literals)引用,Scala 符号以撇号字符(`'`)开头。
...
...
@@ -65,7 +66,7 @@ val result = orders
The next example shows a more complex Table API program. The program scans again the `Orders` table. It filters null values, normalizes the field `a` of type String, and calculates for each hour and product `a` the average billing amount `b`.
Since the Table API is a unified API for batch and streaming data, both example programs can be executed on batch and streaming inputs without any modification of the table program itself. In both cases, the program produces the same results given that streaming records are not late (see [Streaming Concepts](streaming) for details).
由于 Table API 是批处理和流数据的统一 API,因此两个示例程序都可以在批处理和流输入上执行,而无需修改表程序本身。在这两种情况下,程序产生相同的结果,因为流记录不会延迟(有关详细信息,请参见[流概念](streaming))。
## Operations
## 算子
The Table API supports the following operations. Please note that not all operations are available in both batch and streaming yet; they are tagged accordingly.
Table API 支持以下操作。请注意,并非所有操作都可用于批处理和流式处理;它们被相应地标记。
### Scan, Projection, and Filter
### 扫描,投影和过滤 Scan, Projection, and Filter
| Operators | Description |
| --- | --- |
| **Scan**
Batch Streaming | Similar to the FROM clause in a SQL query. Performs a scan of a registered table.
Batch Streaming | Similar to a SQL WHERE clause. Filters out rows that do not pass the filter predicate.
Batch Streaming | 类似于SQL WHERE子句。过滤掉不传递过滤器谓词的行。
...
...
@@ -247,7 +248,7 @@ val result = orders.filter('a % 2 === 0)
or
或
...
...
@@ -260,13 +261,13 @@ val result = orders.where('b === "red")
|
### Aggregations
### 聚合 Aggregations
| Operators | Description |
| --- | --- |
| **GroupBy Aggregation**
Batch Streaming
Result Updating | Similar to a SQL GROUP BY clause. Groups the rows on the grouping keys with a following running aggregation operator to aggregate rows group-wise.
Result Updating | 类似于SQL GROUP BY子句。使用下面正在运行的聚合操作符对分组键上的行进行分组,以按组方式聚合行。
...
...
@@ -277,9 +278,9 @@ Table result = orders.groupBy("a").select("a, b.sum as d");
**Note:** For streaming queries the required state to compute the query result might grow infinitely depending on the type of aggregation and the number of distinct grouping keys. Please provide a query configuration with valid retention interval to prevent excessive state size. See [Query Configuration](streaming/query_configuration.html) for details. |
| **GroupBy Window Aggregation**
Batch Streaming | Groups and aggregates a table on a [group window](#group-windows) and possibly one or more grouping keys.
Streaming | Similar to a SQL OVER clause. Over window aggregates are computed for each row, based on a window (range) of preceding and succeeding rows. See the [over windows section](#over-windows) for more details.
| **在窗口聚合(Over Window Aggregation)**
流 | 类似于 SQL OVER 子句。基于前一行和后一行的窗口(范围),为每一行计算窗口聚合。有关详细信息,请参见[over windows 部分](#over-windows)。
...
...
@@ -314,10 +315,10 @@ Table result = orders
**Note:** All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single [time attribute](streaming/time_attributes.html). |
| **Distinct Aggregation**
Batch Streaming
Result Updating | Similar to a SQL DISTINCT aggregation clause such as COUNT(DISTINCT a). Distinct aggregation declares that an aggregation function (built-in or user-defined) is only applied on distinct input values. Distinct can be applied to **GroupBy Aggregation**, **GroupBy Window Aggregation** and **Over Window Aggregation**.
**注意:** 所有聚合必须在同一个窗口上定义,即,同样的分区、排序和范围。目前,只支持前面(无界和有界)到当前行范围的窗口。还不支持包含以下内容的范围。必须在单个[time 属性](streaming/time_attributes.html)上指定 ORDER BY。 |
User-defined aggregation function can also be used with DISTINCT modifiers. To calculate the aggregate results only for distinct values, simply add the distinct modifier towards the aggregation function.
@@ -357,10 +358,10 @@ orders.groupBy("users").select("users, myUdagg.distinct(points) as myDistinctRes
**Note:** For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See [Query Configuration](streaming/query_configuration.html) for details. |
| **Distinct**
Batch Streaming
Result Updating | Similar to a SQL DISTINCT clause. Returns records with distinct value combinations.
@@ -371,13 +372,13 @@ Table result = orders.distinct();
**Note:** For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See [Query Configuration](streaming/query_configuration.html) for details. |
Result Updating | Similar to a SQL GROUP BY clause. Groups the rows on the grouping keys with a following running aggregation operator to aggregate rows group-wise.
Result Updating | 类似于SQL GROUP BY子句。使用下面正在运行的聚合操作符对分组键上的行进行分组,以按组方式聚合行。
...
...
@@ -388,9 +389,9 @@ val result = orders.groupBy('a).select('a, 'b.sum as 'd)
**Note:** For streaming queries the required state to compute the query result might grow infinitely depending on the type of aggregation and the number of distinct grouping keys. Please provide a query configuration with valid retention interval to prevent excessive state size. See [Query Configuration](streaming/query_configuration.html) for details. |
Streaming | Similar to a SQL OVER clause. Over window aggregates are computed for each row, based on a window (range) of preceding and succeeding rows. See the [over windows section](#over-windows) for more details.
**Note:** All aggregates must be defined over the same window, i.e., same partitioning, sorting, and range. Currently, only windows with PRECEDING (UNBOUNDED and bounded) to CURRENT ROW range are supported. Ranges with FOLLOWING are not supported yet. ORDER BY must be specified on a single [time attribute](streaming/time_attributes.html). |
**注意:** 所有聚合必须在同一个窗口上定义,即,同样的分区、排序和范围。目前,只支持前面(无界和有界)到当前行范围的窗口。还不支持包含以下内容的范围。ORDER BY 必须在单个[time 属性](streaming/time_attributes.html)上指定。|
| **Distinct Aggregation**
Batch Streaming
Result Updating | Similar to a SQL DISTINCT AGGREGATION clause such as COUNT(DISTINCT a). Distinct aggregation declares that an aggregation function (built-in or user-defined) is only applied on distinct input values. Distinct can be applied to **GroupBy Aggregation**, **GroupBy Window Aggregation** and **Over Window Aggregation**.
@@ -451,7 +451,7 @@ val orders: Table = tableEnv.scan("Orders");
User-defined aggregation function can also be used with DISTINCT modifiers. To calculate the aggregate results only for distinct values, simply add the distinct modifier towards the aggregation function.
@@ -464,9 +464,9 @@ orders.groupBy('users).select('users, myUdagg.distinct('points) as 'myDistinctRe
**Note:** For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See [Query Configuration](streaming/query_configuration.html) for details. |
Batch | Similar to a SQL DISTINCT clause. Returns records with distinct value combinations.
Batch | 类似于SQL DISTINCT子句。返回具有不同值组合的记录。
...
...
@@ -477,14 +477,14 @@ val result = orders.distinct()
**Note:** For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct fields. Please provide a query configuration with valid retention interval to prevent excessive state size. See [Query Configuration](streaming/query_configuration.html) for details. |
Batch Streaming | Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined through join operator or using a where or filter operator.
**Note:** For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See [Query Configuration](streaming/query_configuration.html) for details. |
Batch Streaming Result Updating | Similar to SQL LEFT/RIGHT/FULL OUTER JOIN clauses. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.
**Note:** For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See [Query Configuration](streaming/query_configuration.html) for details. |
Batch Streaming | **Note:** Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (`<, <=, >=, >`) or a single equality predicate that compares [time attributes](streaming/time_attributes.html) of the same type (i.e., processing time or event time) of both input tables.For example, the following predicates are valid window join conditions:
@@ -535,7 +535,7 @@ Table result = left.join(right)
|
| **Inner Join with Table Function**
Batch Streaming | Joins a table with the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. A row of the left (outer) table is dropped, if its table function call returns an empty result.
Batch Streaming | Joins a table with the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. If a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values.**Note:** Currently, the predicate of a table function left outer join can only be empty or literal `true`.
Streaming | [Temporal tables](streaming/temporal_tables.html) are tables that track changes over time.A [temporal table function](streaming/temporal_tables.html#temporal-table-functions) provides access to the state of a temporal table at a specific point in time. The syntax to join a table with a temporal table function is the same as in _Inner Join with Table Function_.Currently only inner joins with temporal tables are supported.
Streaming | [时态表](streaming/temporal_tables.html)是跟踪随时间变化的表。[时态表函数](streaming/temporal_tables.html#temporal-table-functions)提供对特定时间点时态表状态的访问。使用时态表函数联接表的语法与 _Inner join with table Function_ 中的语法相同。目前只支持与时态表的内部连接。
...
...
@@ -596,12 +596,12 @@ Table result = orders
For more information please check the more detailed [temporal tables concept description](streaming/temporal_tables.html). |
Batch Streaming | Similar to a SQL JOIN clause. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined through join operator or using a where or filter operator.
@@ -613,9 +613,9 @@ val result = left.join(right).where('a === 'd).select('a, 'b, 'e)
**Note:** For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See [Query Configuration](streaming/query_configuration.html) for details. |
Batch Streaming Result Updating | Similar to SQL LEFT/RIGHT/FULL OUTER JOIN clauses. Joins two tables. Both tables must have distinct field names and at least one equality join predicate must be defined.
@@ -630,9 +630,9 @@ val fullOuterResult = left.fullOuterJoin(right, 'a === 'd).select('a, 'b, 'e)
**Note:** For streaming queries the required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See [Query Configuration](streaming/query_configuration.html) for details. |
Batch Streaming | **Note:** Time-windowed joins are a subset of regular joins that can be processed in a streaming fashion.A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides. Such a condition can be defined by two appropriate range predicates (`<, <=, >=, >`) or a single equality predicate that compares [time attributes](streaming/time_attributes.html) of the same type (i.e., processing time or event time) of both input tables.For example, the following predicates are valid window join conditions:
Batch Streaming | Joins a table with the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. A row of the left (outer) table is dropped, if its table function call returns an empty result.
@@ -668,7 +668,7 @@ Batch Streaming | Joins a table with the results of a table function. Each row o
|
| **Left Outer Join with Table Function**
Batch Streaming | Joins a table with the results of a table function. Each row of the left (outer) table is joined with all rows produced by the corresponding call of the table function. If a table function call returns an empty result, the corresponding outer row is preserved and the result padded with null values.**Note:** Currently, the predicate of a table function left outer join can only be empty or literal `true`.
@@ -684,7 +684,7 @@ Batch Streaming | Joins a table with the results of a table function. Each row o
|
| **Join with Temporal Table**
Streaming | [Temporal tables](streaming/temporal_tables.html) are tables that track their changes over time.A [temporal table function](streaming/temporal_tables.html#temporal-table-functions) provides access to the state of a temporal table at a specific point in time. The syntax to join a table with a temporal table function is the same as in _Inner Join with Table Function_.Currently only inner joins with temporal tables are supported.
Streaming | [时态表](streaming/temporal_tables.html)是跟踪它们随时间变化的表。[时态表函数](streaming/temporal_tables.html#temporal-table-functions) 提供对特定时间点时态表状态的访问。使用时态表函数联接表的语法与 _Inner join with table Function_ 中的语法相同。目前只支持与时态表的内部连接。
...
...
@@ -700,14 +700,14 @@ val result = orders
For more information please check the more detailed [temporal tables concept description](streaming/temporal_tables.html). |
Batch | Similar to a SQL UNION clause. Unions two tables with duplicate records removed. Both tables must have identical field types.
Batch | 与 SQL UNION 子句类似。联合两个表删除了重复记录。两个表必须具有相同的字段类型。
...
...
@@ -721,7 +721,7 @@ Table result = left.union(right);
|
| **UnionAll**
Batch Streaming | Similar to a SQL UNION ALL clause. Unions two tables. Both tables must have identical field types.
Batch Streaming | 类似于SQL UNION ALL子句。连接(Unions)两个表。两个表必须具有相同的字段类型。
...
...
@@ -735,7 +735,7 @@ Table result = left.unionAll(right);
|
| **Intersect**
Batch | Similar to a SQL INTERSECT clause. Intersect returns records that exist in both tables. If a record is present one or both tables more than once, it is returned just once, i.e., the resulting table has no duplicate records. Both tables must have identical field types.
@@ -749,7 +749,7 @@ Table result = left.intersect(right);
|
| **IntersectAll**
Batch | Similar to a SQL INTERSECT ALL clause. IntersectAll returns records that exist in both tables. If a record is present in both tables more than once, it is returned as many times as it is present in both tables, i.e., the resulting table might have duplicate records. Both tables must have identical field types.
Batch |类似于 SQL INTERSECT ALL 子句。IntersectAll 返回存在于两个表中的记录。如果一条记录在两个表中出现不止一次,那么它返回的次数与在两个表中出现的次数相同,即,结果表可能有重复的记录。两个表必须具有相同的字段类型。
...
...
@@ -763,7 +763,7 @@ Table result = left.intersectAll(right);
|
| **Minus**
Batch | Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not exist in the right table. Duplicate records in the left table are returned exactly once, i.e., duplicates are removed. Both tables must have identical field types.
@@ -777,7 +777,7 @@ Table result = left.minus(right);
|
| **MinusAll**
Batch | Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in the right table. A record that is present n times in the left table and m times in the right table is returned (n - m) times, i.e., as many duplicates as are present in the right table are removed. Both tables must have identical field types.
Batch | 类似于 SQL EXCEPT ALL 子句。MinusAll 返回不存在于正确表中的记录。如果一条记录在左表中出现 n 次,在右表中出现 m 次,则返回 (n - m) 次,即,删除右表中出现的所有副本。两个表必须具有相同的字段类型。
...
...
@@ -791,7 +791,7 @@ Table result = left.minusAll(right);
|
| **In**
Batch Streaming | Similar to a SQL IN clause. In returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.
Batch Streaming | 类似于 SQL IN 子句。如果表达式存在于给定的表子查询中,则返回 true。子查询表必须由一列组成。该列必须具有与表达式相同的数据类型。
...
...
@@ -809,12 +809,12 @@ Table result = left.select("a, b, c").where("a.in(RightTable)");
**Note:** For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See [Query Configuration](streaming/query_configuration.html) for details. |
**注意:** 对于流查询,操作在 join 和 group 操作中重写。计算查询结果所需的状态可能会无限增长,这取决于不同输入行的数量。请提供具有有效保留间隔的查询配置,以防止状态过大。有关详细信息,请参见[查询配置](streaming/query_configuration.html)。 |
| Operators | Description |
| --- | --- |
| **Union**
Batch | Similar to a SQL UNION clause. Unions two tables with duplicate records removed, both tables must have identical field types.
Batch | 类似于 SQL UNION 子句。将删除重复记录的两个表合并,两个表必须具有相同的字段类型。
...
...
@@ -828,7 +828,7 @@ val result = left.union(right)
|
| **UnionAll**
Batch Streaming | Similar to a SQL UNION ALL clause. Unions two tables, both tables must have identical field types.
Batch Streaming | 类似于 SQL UNION ALL 子句。联合两个表,两个表必须具有相同的字段类型。
...
...
@@ -842,7 +842,7 @@ val result = left.unionAll(right)
|
| **Intersect**
Batch | Similar to a SQL INTERSECT clause. Intersect returns records that exist in both tables. If a record is present in one or both tables more than once, it is returned just once, i.e., the resulting table has no duplicate records. Both tables must have identical field types.
@@ -856,7 +856,7 @@ val result = left.intersect(right)
|
| **IntersectAll**
Batch | Similar to a SQL INTERSECT ALL clause. IntersectAll returns records that exist in both tables. If a record is present in both tables more than once, it is returned as many times as it is present in both tables, i.e., the resulting table might have duplicate records. Both tables must have identical field types.
Batch | 类似于 SQL INTERSECT ALL 子句。IntersectAll 返回存在于两个表中的记录。如果一条记录在两个表中出现不止一次,那么它返回的次数与在两个表中出现的次数相同,即,结果表可能有重复的记录。两个表必须具有相同的字段类型。
...
...
@@ -870,7 +870,7 @@ val result = left.intersectAll(right)
|
| **Minus**
Batch | Similar to a SQL EXCEPT clause. Minus returns records from the left table that do not exist in the right table. Duplicate records in the left table are returned exactly once, i.e., duplicates are removed. Both tables must have identical field types.
@@ -884,7 +884,7 @@ val result = left.minus(right)
|
| **MinusAll**
Batch | Similar to a SQL EXCEPT ALL clause. MinusAll returns the records that do not exist in the right table. A record that is present n times in the left table and m times in the right table is returned (n - m) times, i.e., as many duplicates as are present in the right table are removed. Both tables must have identical field types.
Batch | 类似于 SQL EXCEPT ALL 子句。MinusAll 返回不存在于正确表中的记录。如果一条记录在左表中出现 n 次,在右表中出现 m 次,则返回(n - m)次,即,删除右表中出现的所有副本。两个表必须具有相同的字段类型。
...
...
@@ -898,7 +898,7 @@ val result = left.minusAll(right)
|
| **In**
Batch Streaming | Similar to a SQL IN clause. In returns true if an expression exists in a given table sub-query. The sub-query table must consist of one column. This column must have the same data type as the expression.
Batch Streaming | 类似于 SQL IN 子句。如果表达式存在于给定表的子查询中,则返回 true。子查询表必须由一列组成。该列必须具有与表达式相同的数据类型。
...
...
@@ -910,14 +910,14 @@ val result = left.select('a, 'b, 'c).where('a.in(right))
**Note:** For streaming queries the operation is rewritten in a join and group operation. The required state to compute the query result might grow infinitely depending on the number of distinct input rows. Please provide a query configuration with valid retention interval to prevent excessive state size. See [Query Configuration](streaming/query_configuration.html) for details. |
**注意:** 对于流查询,操作在 join 和 group 操作中重写。计算查询结果所需的状态可能会无限增长,这取决于不同输入行的数量。请提供具有有效保留间隔的查询配置,以防止状态过大。有关详细信息,请参见[查询配置](streaming/query_configuration.html)。 |
### OrderBy, Offset & Fetch
| Operators | Description |
| --- | --- |
| **Order By**
Batch | Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.
Batch | 类似于 SQL ORDER BY 子句。返回在所有并行分区中全局排序的记录。
...
...
@@ -930,7 +930,7 @@ Table result = in.orderBy("a.asc");
|
| **Offset & Fetch**
Batch | Similar to the SQL OFFSET and FETCH clauses. Offset and Fetch limit the number of records returned from a sorted result. Offset and Fetch are technically part of the Order By operator and thus must be preceded by it.
Batch | Similar to a SQL ORDER BY clause. Returns records globally sorted across all parallel partitions.
Batch | 类似于 SQL ORDER BY 子句。返回在所有并行分区中全局排序的记录。
...
...
@@ -967,7 +967,7 @@ val result = in.orderBy('a.asc)
|
| **Offset & Fetch**
Batch | Similar to the SQL OFFSET and FETCH clauses. Offset and Fetch limit the number of records returned from a sorted result. Offset and Fetch are technically part of the Order By operator and thus must be preceded by it.
@@ -990,7 +990,7 @@ val in = ds.toTable(tableEnv, 'a, 'b, 'c)
| Operators | Description |
| --- | --- |
| **Insert Into**
Batch Streaming | Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.Output tables must be registered in the TableEnvironment (see [Register a TableSink](common.html#register-a-tablesink)). Moreover, the schema of the registered table must match the schema of the query.
Batch Streaming | 类似于 SQL 查询中的 INSERT INTO 子句。执行对已注册输出表的插入。输出表必须在 TableEnvironment 中注册(参见[Register a TableSink](common.html#register-a-tablesink))。此外,已注册表的模式必须与查询的模式匹配。
Batch Streaming | Similar to the INSERT INTO clause in a SQL query. Performs a insertion into a registered output table.Output tables must be registered in the TableEnvironment (see [Register a TableSink](common.html#register-a-tablesink)). Moreover, the schema of the registered table must match the schema of the query.
Batch Streaming | 类似于 SQL 查询中的 INSERT INTO 子句。执行对已注册输出表的插入。输出表必须在TableEnvironment中注册(参见[Register a TableSink](common.html#register-a-tablesink))。此外,已注册表的模式必须与查询的模式匹配。
Group window aggregates group rows into finite groups based on time or row-count intervals and evaluate aggregation functions once per group. For batch tables, windows are a convenient shortcut to group records by time intervals.
Windows are defined using the `window(w: Window)` clause and require an alias, which is specified using the `as` clause. In order to group a table by a window, the window alias must be referenced in the `groupBy(...)` clause like a regular grouping attribute. The following example shows how to define a window aggregation on a table.
In streaming environments, window aggregates can only be computed in parallel if they group on one or more attributes in addition to the window, i.e., the `groupBy(...)` clause references a window alias and at least one additional attribute. A `groupBy(...)` clause that only references a window alias (such as in the example above) can only be evaluated by a single, non-parallel task. The following example shows how to define a window aggregation with additional grouping attributes.
Window properties such as the start, end, or rowtime timestamp of a time window can be added in the select statement as a property of the window alias as `w.start`, `w.end`, and `w.rowtime`, respectively. The window start and rowtime timestamps are the inclusive lower and upper window boundaries. In contrast, the window end timestamp is the exclusive upper window boundary. For example a tumbling window of 30 minutes that starts at 2pm would have `14:00:00.000` as start timestamp, `14:29:59.999` as rowtime timestamp, and `14:30:00.000` as end timestamp.
The `Window` parameter defines how rows are mapped to windows. `Window` is not an interface that users can implement. Instead, the Table API provides a set of predefined `Window` classes with specific semantics, which are translated into underlying `DataStream` or `DataSet` operations. The supported window definitions are listed below.
A tumbling window assigns rows to non-overlapping, continuous windows of fixed length. For example, a tumbling window of 5 minutes groups rows in 5 minutes intervals. Tumbling windows can be defined on event-time, processing-time, or on a row-count.
Tumbling windows are defined by using the `Tumble` class as follows:
Tumbling windows are defined by using the class as follows:
翻滚窗口的定义使用 `滚动(Tumble)` 类如下:
| Method | Description |
| 方法 | 描述 |
| --- | --- |
| `over` | Defines the length the window, either as time or row-count interval. |
| `on` | The time attribute to group (time interval) or sort (row count) on. For batch queries this might be any Long or Timestamp attribute. For streaming queries this must be a [declared event-time or processing-time time attribute](streaming/time_attributes.html). |
| `as` | Assigns an alias to the window. The alias is used to reference the window in the following `groupBy()` clause and optionally to select window properties such as window start, end, or rowtime timestamps in the `select()` clause. |
| `over` | 定义窗口的长度,可以是时间间隔,也可以是行计数间隔。 |
| `on` | 组(时间间隔)或排序(行数)的时间属性。对于批处理查询,这可能是任何 Long 或 Timestamp 属性。对于流查询,这必须是一个[声明的事件时间或处理时间时间属性](streaming/time_attributes.html)。 |
@@ -1136,18 +1137,18 @@ Tumbling windows are defined by using the `Tumble` class as follows:
#### Slide (Sliding Windows)
#### 滑动 Slide(滑动窗口 Sliding Windows)
A sliding window has a fixed size and slides by a specified slide interval. If the slide interval is smaller than the window size, sliding windows are overlapping. Thus, rows can be assigned to multiple windows. For example, a sliding window of 15 minutes size and 5 minute slide interval assigns each row to 3 different windows of 15 minute size, which are evaluated in an interval of 5 minutes. Sliding windows can be defined on event-time, processing-time, or on a row-count.
Sliding windows are defined by using the `Slide` class as follows:
滑动窗口使用 `Slide` 类定义如下:
| Method | Description |
| 方法 | 描述 |
| --- | --- |
| `over` | Defines the length of the window, either as time or row-count interval. |
| `every` | Defines the slide interval, either as time or row-count interval. The slide interval must be of the same type as the size interval. |
| `on` | The time attribute to group (time interval) or sort (row count) on. For batch queries this might be any Long or Timestamp attribute. For streaming queries this must be a [declared event-time or processing-time time attribute](streaming/time_attributes.html). |
| `as` | Assigns an alias to the window. The alias is used to reference the window in the following `groupBy()` clause and optionally to select window properties such as window start, end, or rowtime timestamps in the `select()` clause. |
| `over` | 定义窗口的长度,可以是时间间隔,也可以是行计数间隔。 |
| `every` | 将滑动间隔定义为时间间隔或行计数间隔。滑动间隔必须与大小间隔类型相同。 |
| `on` | 组(时间间隔)或排序(行数)的时间属性。对于批处理查询,这可能是任何 Long 或 Timestamp 属性。对于流查询,这必须是一个[声明的事件时间或处理时间时间属性](streaming/time_attributes.html)。 |
| `as` | Assigns an alias to the window. The alias is used to reference the window in the following `groupBy()` clause and optionally to select window properties such as window start, end, or rowtime timestamps in the `select()` clause.为窗口分配别名。别名用于引用下面的 `groupBy()` 子句中的窗口,并可选地在 `select()` 子句中选择窗口属性,如窗口开始、结束或行时间戳。 |
...
...
@@ -1176,17 +1177,17 @@ Sliding windows are defined by using the `Slide` class as follows:
#### Session (Session Windows)
#### 会话 Session(会话窗口 Session Windows)
Session windows do not have a fixed size but their bounds are defined by an interval of inactivity, i.e., a session window is closes if no event appears for a defined gap period. For example a session window with a 30 minute gap starts when a row is observed after 30 minutes inactivity (otherwise the row would be added to an existing window) and is closed if no row is added within 30 minutes. Session windows can work on event-time or processing-time.
A session window is defined by using the `Session` class as follows:
会话窗口是使用 `Session` 类定义的,如下:
| Method | Description |
| 方法 | 描述 |
| --- | --- |
| `withGap` | Defines the gap between two windows as time interval. |
| `on` | The time attribute to group (time interval) or sort (row count) on. For batch queries this might be any Long or Timestamp attribute. For streaming queries this must be a [declared event-time or processing-time time attribute](streaming/time_attributes.html). |
| `as` | Assigns an alias to the window. The alias is used to reference the window in the following `groupBy()` clause and optionally to select window properties such as window start, end, or rowtime timestamps in the `select()` clause. |
| `withGap` | 将两个窗口之间的间隔定义为时间间隔。 |
| `on` | 组(时间间隔)或排序(行数)的时间属性。对于批处理查询,这可能是任何 Long 或 Timestamp 属性。对于流查询,这必须是一个[声明的事件时间或处理时间时间属性](streaming/time_attributes.html)。 |
@@ -1210,11 +1211,12 @@ A session window is defined by using the `Session` class as follows:
### Over Windows
### Over 窗口
Over window aggregates are known from standard SQL (`OVER` clause) and defined in the `SELECT` clause of a query. Unlike group windows, which are specified in the `GROUP BY` clause, over windows do not collapse rows. Instead over window aggregates compute an aggregate for each input row over a range of its neighboring rows.
Over 窗口聚合可以从标准SQL (`OVER` 子句)中得知,并在查询的 `SELECT` 子句中定义。与组(group)窗口不同,组窗口是在 `GROUP BY` 子句中指定的,在窗口上不会折叠行。相反,在窗口聚合上计算每个输入行在其邻近行范围内的聚合。
Over windows are defined using the `window(w: OverWindow*)` clause and referenced via an alias in the `select()` method. The following example shows how to define an over window aggregation on a table.
Over 窗口使用 `window(w: OverWindow*)` 子句定义,并通过 `select()` 方法中的别名引用。下面的示例显示如何在表上定义 over 窗口聚合。
...
...
@@ -1236,22 +1238,22 @@ val table = input
The `OverWindow` defines a range of rows over which aggregates are computed. `OverWindow` is not an interface that users can implement. Instead, the Table API provides the `Over` class to configure the properties of the over window. Over windows can be defined on event-time or processing-time and on ranges specified as time interval or row-count. The supported over window definitions are exposed as methods on `Over` (and other classes) and are listed below:
`OverWindow` 定义了计算聚合的行范围。`OverWindow` 不是用户可以实现的接口。相反,Table API 提供了 `Over` 类来配置 Over 窗口的属性。Over 窗口可以在事件时间或处理时间上定义,也可以在指定为时间间隔或行数的范围上定义。受支持的 over 窗口定义公开为 `Over` (和其他类)上的方法,如下所示:
| Method | Required | Description |
| 方法 | 必须的 | 描述 |
| --- | --- | --- |
| `partitionBy` | Optional | Defines a partitioning of the input on one or more attributes. Each partition is individually sorted and aggregate functions are applied to each partition separately.**Note:** In streaming environments, over window aggregates can only be computed in parallel if the window includes a partition by clause. Without `partitionBy(...)` the stream is processed by a single, non-parallel task. |
| `orderBy` | Required | Defines the order of rows within each partition and thereby the order in which the aggregate functions are applied to rows.**Note:** For streaming queries this must be a [declared event-time or processing-time time attribute](streaming/time_attributes.html). Currently, only a single sort attribute is supported. |
| `preceding` | Required | Defines the interval of rows that are included in the window and precede the current row. The interval can either be specified as time or row-count interval.[Bounded over windows](tableApi.html#bounded-over-windows) are specified with the size of the interval, e.g., `10.minutes` for a time interval or `10.rows` for a row-count interval.[Unbounded over windows](tableApi.html#unbounded-over-windows) are specified using a constant, i.e., `UNBOUNDED_RANGE` for a time interval or `UNBOUNDED_ROW` for a row-count interval. Unbounded over windows start with the first row of a partition. |
| `following` | Optional | Defines the window interval of rows that are included in the window and follow the current row. The interval must be specified in the same unit as the preceding interval (time or row-count).At the moment, over windows with rows following the current row are not supported. Instead you can specify one of two constants:
| `partitionBy` | 可选的 | 在一个或多个属性上定义输入的分区。每个分区都是单独排序的,聚合函数分别应用于每个分区。**注意:** 在流环境中,只有当窗口包含一个 partition by 子句时,才能并行计算 over 窗口聚合。如果没有 `partitionBy(...)`,流将由单个非并行任务处理。 |
| `preceding` | 必须的 | 定义窗口中包含并位于当前行之前的行间隔。间隔可以指定为时间间隔,也可以指定为行计数间隔。[Bounded over windows](tableApi.html#bounded-over-windows)是用间隔的大小指定的,例如,一个时间间隔的 `10.minutes` 或一个行计数间隔的 `10.rows`。[Unbounded over windows](tableApi.html#unbounded-over-windows)使用常量指定,即,时间间隔为 `UNBOUNDED_RANGE` ,行计数间隔为 `UNBOUNDED_ROW`。Unbounded over windows 从分区的第一行开始。 |
*`CURRENT_ROW`sets the upper bound of the window to the current row.
*`CURRENT_RANGE`sets the upper bound of the window to sort key of the current row, i.e., all rows with the same sort key as the current row are included in the window.
If the `following` clause is omitted, the upper bound of a time interval window is defined as `CURRENT_RANGE` and the upper bound of a row-count interval window is defined as `CURRENT_ROW`. |
| `as` | Required | Assigns an alias to the over window. The alias is used to reference the over window in the following `select()` clause. |
| `as` | 必须的 | 为 over 窗口分配别名。别名用于引用下面的 `select()` 子句中的 over 窗口。 |
**Note:** Currently, all aggregation functions in the same `select()` call must be computed of the same over window.
**注:** 当前,相同的 `select()` 调用中的所有聚合函数必须计算相同的 over 窗口。
#### Unbounded Over Windows
...
...
@@ -1321,9 +1323,9 @@ If the `following` clause is omitted, the upper bound of a time interval window
## Data Types
## 数据类型 Data Types
The Table API is built on top of Flink’s DataSet and DataStream APIs. Internally, it also uses Flink’s `TypeInformation` to define data types. Fully supported types are listed in `org.apache.flink.table.api.Types`. The following table summarizes the relation between Table API types, SQL types, and the resulting Java class.
表 API 构建在 Flink 的 DataSet 和 DataStream API 之上。在内部,它还使用 Flink 的 `TypeInformation` 来定义数据类型。完全支持的类型列在 `org.apache.flink.table.api.Types` 中。下表总结了表 API 类型、SQL 类型和生成的 Java 类之间的关系。
| Table API | SQL | Java type |
| --- | --- | --- |
...
...
@@ -1347,17 +1349,18 @@ The Table API is built on top of Flink’s DataSet and DataStream APIs. Internal
| `Types.MULTISET` | `MULTISET` | e.g. `java.util.HashMap<String, Integer>` for a multiset of `String` |
Generic types are treated as a black box and can be passed on or processed by [user-defined functions](udfs.html).
泛型类型被视为一个黑盒子,可以通过[用户定义函数](udfs.html)传递或处理。
## Expression Syntax
## 表达式语法 Expression Syntax
Some of the operators in previous sections expect one or more expressions. Expressions can be specified using an embedded Scala DSL or as Strings. Please refer to the examples above to learn how expressions can be specified.
Here, `literal` is a valid Java literal. String literals can be specified using single or double quotes. Duplicate the quote for escaping (e.g. `'It''s me.'` or `"I ""like"" dogs."`).
The `fieldReference` specifies a column in the data (or all columns if `*` is used), and `functionIdentifier` specifies a supported scalar function. The column names and function names follow Java identifier syntax.
这里,`literal` 是一个有效的 Java 文字。字符串文本可以使用单引号或双引号指定。复制转义引用(例如 `'It''s me.'` or `"I ""like"" dogs."`)。
Expressions specified as strings can also use prefix notation instead of suffix notation to call operators and functions.
If working with exact numeric values or large decimals is required, the Table API also supports Java’s BigDecimal type. In the Scala Table API decimals can be defined by `BigDecimal("123456")` and in Java by appending a “p” for precise e.g. `123456p`.
指定为字符串的表达式也可以使用前缀表示法而不是后缀表示法来调用操作符和函数。
In order to work with temporal values the Table API supports Java SQL’s Date, Time, and Timestamp types. In the Scala Table API literals can be defined by using `java.sql.Date.valueOf("2016-06-27")`, `java.sql.Time.valueOf("10:10:42")`, or `java.sql.Timestamp.valueOf("2016-06-27 10:10:42.123")`. The Java and Scala Table API also support calling `"2016-06-27".toDate()`, `"10:10:42".toTime()`, and `"2016-06-27 10:10:42.123".toTimestamp()` for converting Strings into temporal types. _Note:_ Since Java’s temporal SQL types are time zone dependent, please make sure that the Flink Client and all TaskManagers use the same time zone.
如果需要处理精确的数值或大小数,那么表 API 还支持 Java 的 BigDecimal 类型。在 Scala 表 API 中,小数可以用 `BigDecimal("123456")` 来定义,而在 Java 中,可以在后面加上“p”来精确定义,例如 `123456p`。
Temporal intervals can be represented as number of months (`Types.INTERVAL_MONTHS`) or number of milliseconds (`Types.INTERVAL_MILLIS`). Intervals of same type can be added or subtracted (e.g. `1.hour + 10.minutes`). Intervals of milliseconds can be added to time points (e.g. `"2016-08-10".toDate + 5.days`).