# 时态表 时态表表示改变的历史记录表上的(参数化)视图的概念,该表返回特定时间点的表的内容。 Flink 可以跟踪应用于追加表的更改,在查询中的特定时间点,允许访问表的内容。 ## 动机 假设我们有下表 `RatesHistory`。 ``` SELECT * FROM RatesHistory; rowtime currency rate ======= ======== ====== 09:00 US Dollar 102 09:00 Euro 114 09:00 Yen 1 10:45 Euro 116 11:15 Euro 119 11:49 Pounds 108 ``` `RatesHistory` 代表一个不断增长的仅附加货币汇率表,相对于 `Yen`(其汇率为`1`)。例如,从 `09:00` 到 `10:45` 期间的`欧元`到`日元`汇率为 `114`。从 `10:45` 到 `11:15`,汇率是 `116`。 鉴于我们希望在 `10:58` 时输出所有当前汇率,我们需要以下 SQL 查询来计算结果表: ``` SELECT * FROM RatesHistory AS r WHERE r.rowtime = ( SELECT MAX(rowtime) FROM RatesHistory AS r2 WHERE r2.currency = r.currency AND r2.rowtime <= TIME '10:58'); ``` 相关子查询确定相应货币的最大时间小于或等于所需时间。外部查询列出具有最大时间戳的汇率。 下表显示了这种计算的结果。在我们的例子中,考虑了对 `10:45` 的 `Euro` 的更新,但是在时间 `10:58`,表格的版本中不会考虑在 `11:15` 对 `Euro`的更新以及新的 `Pounds` 条目。 ``` rowtime currency rate ======= ======== ====== 09:00 US Dollar 102 09:00 Yen 1 10:45 Euro 116 ``` _Temporal Tables_ 的概念旨在简化此类查询,加快执行速度并减少 Flink 的状态使用。 _Temporal Table_ 是仅附加表的参数化视图,它将仅附加(append-only)表的行解释为表的更改日志,并在特定时间点提供该表的版本。将仅附加表解释为更改日志(changgelog)需要指定主键属性和时间戳属性。主键确定覆盖哪些行,时间戳确定行有效的时间。 在上面的例子中,`currency` 将是 `RatesHistory` 表的主键,`rowtime` 将是 timestamp 属性。 在 Flink 中,时态表由 _时态表函数(Temporal Table Function)_ 表示。 ## 时态表函数 为了访问时态表中的数据,必须传递一个[时间属性](time_attributes.html),该属性确定将返回的表的版本。Flink 使用[表函数](../udfs.html#table-functions)的 SQL 语法来提供表达它的方法。 定义后,_Temporal Table Function_ 采用单个时间参数 `timeAttribute` 并返回一组 rows。返回内容包含与给定时间属性相关的所有现有主键的最新行版本。 假设我们基于 `RatesHistory` 表定义了一个时态表函数 `Rates(timeAttribute)`,我们可以通过以下方式查询这样的函数: ``` SELECT * FROM Rates('10:15'); rowtime currency rate ======= ======== ====== 09:00 US Dollar 102 09:00 Euro 114 09:00 Yen 1 SELECT * FROM Rates('11:00'); rowtime currency rate ======= ======== ====== 09:00 US Dollar 102 10:45 Euro 116 09:00 Yen 1 ``` 对于 `Rates(timeAttribute)` 的每个查询都将返回给定 `timeAttribute` 的 `Rates` 的状态。 **注意**:目前,Flink 不支持使用常量时间属性参数直接查询时态表函数。目前,时态表函数只能用于连接。上面的例子展示了函数 `Rates(timeAttribute)`的返回值。 有关如何连接时态表的更多信息,另请参阅有关[连续查询的连接](joins.html)的页面。 ### 定义时态表函数 以下代码段说明了如何从仅追加表创建时态表函数。 ``` import org.apache.flink.table.functions.TemporalTableFunction; (...) // Get the stream and table environments. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); // Provide a static data set of the rates history table. List> ratesHistoryData = new ArrayList<>(); ratesHistoryData.add(Tuple2.of("US Dollar", 102L)); ratesHistoryData.add(Tuple2.of("Euro", 114L)); ratesHistoryData.add(Tuple2.of("Yen", 1L)); ratesHistoryData.add(Tuple2.of("Euro", 116L)); ratesHistoryData.add(Tuple2.of("Euro", 119L)); // Create and register an example table using above data set. // In the real setup, you should replace this with your own table. DataStream> ratesHistoryStream = env.fromCollection(ratesHistoryData); Table ratesHistory = tEnv.fromDataStream(ratesHistoryStream, "r_currency, r_rate, r_proctime.proctime"); tEnv.registerTable("RatesHistory", ratesHistory); // Create and register a temporal table function. // Define "r_proctime" as the time attribute and "r_currency" as the primary key. TemporalTableFunction rates = ratesHistory.createTemporalTableFunction("r_proctime", "r_currency"); // <==== (1) tEnv.registerFunction("Rates", rates); // <==== (2) ``` ``` // Get the stream and table environments. val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) // Provide a static data set of the rates history table. val ratesHistoryData = new mutable.MutableList[(String, Long)] ratesHistoryData.+=(("US Dollar", 102L)) ratesHistoryData.+=(("Euro", 114L)) ratesHistoryData.+=(("Yen", 1L)) ratesHistoryData.+=(("Euro", 116L)) ratesHistoryData.+=(("Euro", 119L)) // Create and register an example table using above data set. // In the real setup, you should replace this with your own table. val ratesHistory = env .fromCollection(ratesHistoryData) .toTable(tEnv, 'r_currency, 'r_rate, 'r_proctime.proctime) tEnv.registerTable("RatesHistory", ratesHistory) // Create and register TemporalTableFunction. // Define "r_proctime" as the time attribute and "r_currency" as the primary key. val rates = ratesHistory.createTemporalTableFunction('r_proctime, 'r_currency) // <==== (1) tEnv.registerFunction("Rates", rates) // <==== (2) ``` 第`(1)`行创建了一个 `rates` [时态表函数](#temporal-table-functions),它允许我们使用[表API](../tableApi.html#joins)中的函数 `rates`。 第`(2)`行在我们的表环境中以名称 `Rates` 注册此函数,这允许我们在[SQL](../sql.html#joins)中使用 `Rates` 函数。