--- sidebar_label: 流式计算 title: 流式计算 description: 流式计算的相关 SQL 的详细语法 --- ## 创建流式计算 ```sql CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subquery stream_options: { TRIGGER [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time] WATERMARK time } ``` 其中 subquery 是 select 普通查询语法的子集: ```sql subquery: SELECT select_list from_clause [WHERE condition] [PARTITION BY tag_list] [window_clause] ``` 支持会话窗口、状态窗口与滑动窗口,其中,会话窗口与状态窗口搭配超级表时必须与partition by tbname一起使用 ```sql window_clause: { SESSION(ts_col, tol_val) | STATE_WINDOW(col) | INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] } ``` 其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。 窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished) 例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表,此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。 ```sql CREATE STREAM avg_vol_s INTO avg_vol AS SELECT _wstartts, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s); ``` ## 流式计算的 partition 可以使用 PARTITION BY TBNAME 或 PARTITION BY tag,对一个流进行多分区的计算,每个分区的时间线与时间窗口是独立的,会各自聚合,并写入到目的表中的不同子表。 不带 PARTITION BY 选项时,所有的数据将写入到一张子表。 流式计算创建的超级表有唯一的 tag 列 groupId,每个 partition 会被分配唯一 groupId。与 schemaless 写入一致,我们通过 MD5 计算子表名,并自动创建它。 ## 删除流式计算 ```sql DROP STREAM [IF NOT EXISTS] stream_name; ``` 仅删除流式计算任务,由流式计算写入的数据不会被删除。 ## 展示流式计算 ```sql SHOW STREAMS; ``` 若要展示更详细的信息,可以使用: ```sql SELECT * from performance_schema.`perf_streams`; ``` ## 流式计算的触发模式 在创建流时,可以通过 TRIGGER 指令指定流式计算的触发模式。 对于非窗口计算,流式计算的触发是实时的;对于窗口计算,目前提供 3 种触发模式: 1. AT_ONCE:写入立即触发 2. WINDOW_CLOSE:窗口关闭时触发(窗口关闭由事件时间决定,可配合 watermark 使用) 3. MAX_DELAY time:若窗口关闭,则触发计算。若窗口未关闭,且未关闭时长超过 max delay 指定的时间,则触发计算。 由于窗口关闭是由事件时间决定的,如事件流中断、或持续延迟,则事件时间无法更新,可能导致无法得到最新的计算结果。 因此,流式计算提供了以事件时间结合处理时间计算的 MAX_DELAY 触发模式。 MAX_DELAY 模式在窗口关闭时会立即触发计算。此外,当数据写入后,计算触发的时间超过 max delay 指定的时间,则立即触发计算 ## 流式计算的窗口关闭 流式计算以事件时间(插入记录中的时间戳主键)为基准计算窗口关闭,而非以 TDengine 服务器的时间,以事件时间为基准,可以避免客户端与服务器时间不一致带来的问题,能够解决乱序数据写入等等问题。流式计算还提供了 watermark 来定义容忍的乱序程度。 在创建流时,可以在 stream_option 中指定 watermark,它定义了数据乱序的容忍上界。 流式计算通过 watermark 来度量对乱序数据的容忍程度,watermark 默认为 0。 T = 最新事件时间 - watermark 每次写入的数据都会以上述公式更新窗口关闭时间,并将窗口结束时间 < T 的所有打开的窗口关闭,若触发模式为 WINDOW_CLOSE 或 MAX_DELAY,则推送窗口聚合结果。 ![TDengine 流式计算窗口关闭示意图](./watermark.webp) 图中,纵轴表示不同时刻,对于不同时刻,我们画出其对应的 TDengine 收到的数据,即为横轴。 横轴上的数据点表示已经收到的数据,其中蓝色的点表示事件时间(即数据中的时间戳主键)最后的数据,该数据点减去定义的 watermark 时间,得到乱序容忍的上界 T。 所有结束时间小于 T 的窗口都将被关闭(图中以灰色方框标记)。 T2 时刻,乱序数据(黄色的点)到达 TDengine,由于有 watermark 的存在,这些数据进入的窗口并未被关闭,因此可以被正确处理。 T3 时刻,最新事件到达,T 向后推移超过了第二个窗口关闭的时间,该窗口被关闭,乱序数据被正确处理。 在 window_close 或 max_delay 模式下,窗口关闭直接影响推送结果。在 at_once 模式下,窗口关闭只与内存占用有关。 ## 流式计算的过期数据处理策略 对于已关闭的窗口,再次落入该窗口中的数据被标记为过期数据. TDengine 对于过期数据提供两种处理方式,由 IGNORE EXPIRED 选项指定: 1. 重新计算,即 IGNORE EXPIRED 0:默认配置,从 TSDB 中重新查找对应窗口的所有数据并重新计算得到最新结果 2. 直接丢弃, 即 IGNORE EXPIRED 1:忽略过期数据 无论在哪种模式下,watermark 都应该被妥善设置,来得到正确结果(直接丢弃模式)或避免频繁触发重算带来的性能开销(重新计算模式)。