diff --git a/docs/zh/07-develop/06-stream.md b/docs/zh/07-develop/06-stream.md index b2223d15e33114d263b9833df51e4201bc01c772..ca92b9098ea4b6db2b121d522941ab2097bb7d06 100644 --- a/docs/zh/07-develop/06-stream.md +++ b/docs/zh/07-develop/06-stream.md @@ -1,84 +1,179 @@ ---- -sidebar_label: 连续查询 -description: "连续查询是一个按照预设频率自动执行的查询功能,提供按照时间窗口的聚合查询能力,是一种简化的时间驱动流式计算。" -title: "连续查询(Continuous Query)" ---- - -连续查询是 TDengine 定期自动执行的查询,采用滑动窗口的方式进行计算,是一种简化的时间驱动的流式计算。针对库中的表或超级表,TDengine 可提供定期自动执行的连续查询,用户可让 TDengine 推送查询的结果,也可以将结果再写回到 TDengine 中。每次执行的查询是一个时间窗口,时间窗口随着时间流动向前滑动。在定义连续查询的时候需要指定时间窗口(time window, 参数 interval)大小和每次前向增量时间(forward sliding times, 参数 sliding)。 - -TDengine 的连续查询采用时间驱动模式,可以直接使用 TAOS SQL 进行定义,不需要额外的操作。使用连续查询,可以方便快捷地按照时间窗口生成结果,从而对原始采集数据进行降采样(down sampling)。用户通过 TAOS SQL 定义连续查询以后,TDengine 自动在最后的一个完整的时间周期末端拉起查询,并将计算获得的结果推送给用户或者写回 TDengine。 - -TDengine 提供的连续查询与普通流计算中的时间窗口计算具有以下区别: - -- 不同于流计算的实时反馈计算结果,连续查询只在时间窗口关闭以后才开始计算。例如时间周期是 1 天,那么当天的结果只会在 23:59:59 以后才会生成。 -- 如果有历史记录写入到已经计算完成的时间区间,连续查询并不会重新进行计算,也不会重新将结果推送给用户。对于写回 TDengine 的模式,也不会更新已经存在的计算结果。 -- 使用连续查询推送结果的模式,服务端并不缓存客户端计算状态,也不提供 Exactly-Once 的语义保证。如果用户的应用端崩溃,再次拉起的连续查询将只会从再次拉起的时间开始重新计算最近的一个完整的时间窗口。如果使用写回模式,TDengine 可确保数据写回的有效性和连续性。 - -## 连续查询语法 - -```sql -[CREATE TABLE AS] SELECT select_expr [, select_expr ...] - FROM {tb_name_list} - [WHERE where_condition] - [INTERVAL(interval_val [, interval_offset]) [SLIDING sliding_val]] - -``` - -INTERVAL: 连续查询作用的时间窗口 - -SLIDING: 连续查询的时间窗口向前滑动的时间间隔 - -## 使用连续查询 - -下面以智能电表场景为例介绍连续查询的具体使用方法。假设我们通过下列 SQL 语句创建了超级表和子表: - -```sql -create table meters (ts timestamp, current float, voltage int, phase float) tags (location binary(64), groupId int); -create table D1001 using meters tags ("California.SanFrancisco", 2); -create table D1002 using meters tags ("California.LosAngeles", 2); -... -``` - -可以通过下面这条 SQL 语句以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压。 - -```sql -select avg(voltage) from meters interval(1m) sliding(30s); -``` - -每次执行这条语句,都会重新计算所有数据。 如果需要每隔 30 秒执行一次来增量计算最近一分钟的数据,可以把上面的语句改进成下面的样子,每次使用不同的 `startTime` 并定期执行: - -```sql -select avg(voltage) from meters where ts > {startTime} interval(1m) sliding(30s); -``` - -这样做没有问题,但 TDengine 提供了更简单的方法,只要在最初的查询语句前面加上 `create table {tableName} as` 就可以了,例如: - -```sql -create table avg_vol as select avg(voltage) from meters interval(1m) sliding(30s); -``` - -会自动创建一个名为 `avg_vol` 的新表,然后每隔 30 秒,TDengine 会增量执行 `as` 后面的 SQL 语句,并将查询结果写入这个表中,用户程序后续只要从 `avg_vol` 中查询数据即可。例如: - -```sql -taos> select * from avg_vol; - ts | avg_voltage_ | -=================================================== - 2020-07-29 13:37:30.000 | 222.0000000 | - 2020-07-29 13:38:00.000 | 221.3500000 | - 2020-07-29 13:38:30.000 | 220.1700000 | - 2020-07-29 13:39:00.000 | 223.0800000 | -``` - -需要注意,查询时间窗口的最小值是 10 毫秒,没有时间窗口范围的上限。 - -此外,TDengine 还支持用户指定连续查询的起止时间。如果不输入开始时间,连续查询将从第一条原始数据所在的时间窗口开始;如果没有输入结束时间,连续查询将永久运行;如果用户指定了结束时间,连续查询在系统时间达到指定的时间以后停止运行。比如使用下面的 SQL 创建的连续查询将运行一小时,之后会自动停止。 - -```sql -create table avg_vol as select avg(voltage) from meters where ts > now and ts <= now + 1h interval(1m) sliding(30s); -``` - -需要说明的是,上面例子中的 `now` 是指创建连续查询的时间,而不是查询执行的时间,否则,查询就无法自动停止了。另外,为了尽量避免原始数据延迟写入导致的问题,TDengine 中连续查询的计算有一定的延迟。也就是说,一个时间窗口过去后,TDengine 并不会立即计算这个窗口的数据,所以要稍等一会(一般不会超过 1 分钟)才能查到计算结果。 - -## 管理连续查询 - -用户可在控制台中通过 `show streams` 命令来查看系统中全部运行的连续查询,并可以通过 `kill stream` 命令杀掉对应的连续查询。后续版本会提供更细粒度和便捷的连续查询管理命令。 +--- +sidebar_label: 流式计算 +description: "TDengine 流式计算将数据的写入、预处理、复杂分析、实时计算、报警触发等功能融为一体,是一个能够降低用户部署成本、存储成本和运维成本的计算引擎。" +title: 流式计算 +--- +在时序数据的处理中,经常要对原始数据进行清洗、预处理,再使用时序数据库进行长久的储存。用户通常需要在时序数据库之外再搭建 Kafka、Flink、Spark 等流计算处理引擎,增加了用户的开发成本和维护成本。 +使用 TDengine 3.0 的流式计算引擎能够最大限度的减少对这些额外中间件的依赖,真正将数据的写入、预处理、长期存储、复杂分析、实时计算、实时报警触发等功能融为一体,并且,所有这些任务只需要使用 SQL 完成,极大降低了用户的学习成本、使用成本。 + +## 创建流 + +```sql +CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subquery +stream_options: { + TRIGGER [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time] + WATERMARK time +} + +其中 subquery 是 select 普通查询语法的子集: + +subquery: SELECT [DISTINCT] select_list + from_clause + [WHERE condition] + [PARTITION BY tag_list] + [window_clause] + [group_by_clause] + +不支持 order_by,limit,slimit,fill 语句 +``` +### 触发模式 + +在创建流时,可以通过 TRIGGER 指令指定流式计算的触发模式。 + +对于非窗口计算,流式计算的触发是实时的; + +对于窗口计算,目前提供3种触发模式: + +1. AT_ONCE:写入立即触发 + +2. WINDOW_CLOSE:窗口关闭时触发(窗口关闭由事件时间决定,可配合 WATERMARK 使用,详见《流式计算的乱序数据容忍策略》) + +3. MAX_DELAY time:若窗口关闭,则触发计算。若窗口未关闭,且未关闭时长超过 MAX_DELAY 指定的时间,则触发计算。 + +由于窗口关闭是由事件时间决定的,如事件流中断、或持续延迟,则事件时间无法更新,可能导致无法得到最新的计算结果。 + +因此,流式计算提供了以事件时间结合处理时间计算的 MAX_DELAY 触发模式。 + +MAX_DELAY 模式在窗口关闭时会立即触发计算。此外,当数据写入后,计算触发的时间超过 MAX_DELAY 指定的时间,则立即触发计算。 + +### 乱序数据容忍策略 + +在创建流时,可以在 stream_option 中指定 WATERMARK + +流式计算通过 WATERMARK 来度量对乱序数据的容忍程度,WATERMARK 默认为 0。 + + T = 最新事件时间 - WATERMARK + +每批到来的数据都会以上述公式更新窗口关闭时间,并将窗口结束时间 < T 的所有打开的窗口关闭,若触发模式为 WINDOW_CLOSE 或 MAX_DELAY,则推送窗口聚合结果。 + +### 流式计算的过期数据处理策略 + +对于已关闭的窗口,再次落入该窗口中的数据被标记为过期数据,对于过期数据,流式计算提供两种处理方式: + +1. 直接丢弃:这是常见流式计算引擎提供的默认(甚至是唯一)计算模式 + +2. 重新计算:从 TSDB 中重新查找对应窗口的所有数据并重新计算得到最新结果 + +无论在哪种模式下,watermark 都应该被妥善设置,来得到正确结果(直接丢弃模式)或避免频繁触发重算带来的性能开销(重新计算模式)。 + +### 流式计算与窗口切分查询 + +窗口子句语法如下: + +```sql +window_clause: { + SESSION(ts_col, tol_val) + | STATE_WINDOW(col) + | INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] +} +``` + +其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。 + + +## 流式计算的展示 + +```sql +SHOW STREAMS; +``` + +## 流式计算的删除 + +```sql +DROP STREAM [IF NOT EXISTS] stream_name; +``` + +## 使用案例 + +通过以下案例,进一步了解 TDengine 流计算的使用 + +### 创建 DB 和原始数据表 + +首先准备数据,完成建库、建一张超级表和多张子表操作: + +```sql +drop database if exists stream_db; +create database stream_db; + +create table stream_db.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16)); +create table stream_db.ctb0 using stream_db.stb tags(0, "subtable0"); +create table stream_db.ctb1 using stream_db.stb tags(1, "subtable1"); +create table stream_db.ctb2 using stream_db.stb tags(2, "subtable2"); +create table stream_db.ctb3 using stream_db.stb tags(3, "subtable3"); +``` + +### 创建流 +case1. 创建流实现数据过滤 + +```sql +create stream stream1 into stream_db.stream1_output_stb as select * from stream_db.stb where c1 > 0 and c2 != 10 and c3 is not Null; +``` + +case2. 创建流实现标量函数的数据转换 + +```sql +create stream stream2 into stream_db.stream2_output_stb as select ts, abs(c2), char_length(c3), cast(c1 as binary(16)),timezone() from stream_db.stb partition by tbname; +``` + +case3. 创建流实现数据降采样 + +```sql +create stream stream3 into stream_db.stream3_output_stb as select _wstart as start, min(c1), max(c2), count(c3) from stream_db.stb interval (10s); +``` + +case4. 通过 trigger window_close 控制流的触发频率 + +```sql +create stream stream4 trigger window_close into stream_db.stream4_output_stb as select _wstart as start, min(c1), max(c2), count(c3) from stream_db.stb interval (10s); +``` + +case4. 通过 trigger max_delay 控制流的触发频率 + +```sql +create stream stream5 trigger max_delay 3s into stream_db.stream5_output_stb as select _wstart as start, min(c1), max(c2), count(c3) from stream_db.stb interval (10s); +``` + +case6. 通过 watermark 实现乱序数据容忍 + +```sql +create stream stream6 trigger window_close watermark 15s into stream_db.stream6_output_stb as select _wstart as start, min(c1), max(c2), count(c3) from stream_db.stb interval (10s); +``` + +case7. 通过 ignore expired 实现乱序数据丢弃 + +```sql +create stream stream7 trigger at_once ignore expired into stream_db.stream7_output_stb as select _wstart as start, min(c1), max(c2), count(c3) from stream_db.stb interval (10s); +``` + +### 写入数据 +```sql +insert into stream_db.ctb0 values("2022-08-13 00:00:00", 0, 0, 'a0'); +insert into stream_db.ctb0 values("2022-08-13 00:00:01", 1, 1, 'a1'); +insert into stream_db.ctb0 values("2022-08-13 00:00:07", 7, 7, 'a7'); +insert into stream_db.ctb0 values("2022-08-13 00:00:10", 10, 10, 'a10'); +insert into stream_db.ctb0 values("2022-08-13 00:00:13", 13, 13, 'a13'); +insert into stream_db.ctb0 values("2022-08-13 00:00:20", 20, 20, 'a20'); +insert into stream_db.ctb0 values("2022-08-13 00:00:21", 21, 21, 'a21'); +insert into stream_db.ctb0 values("2022-08-13 00:00:25", 25, 25, 'a25'); +insert into stream_db.ctb0 values("2022-08-13 00:00:26", 26, 26, 'a26'); +``` +### 查询以观查结果 +```sql +select * from stream_db.stream1_output_stb; +select * from stream_db.stream2_output_stb; +select * from stream_db.stream3_output_stb; +select * from stream_db.stream4_output_stb; +select * from stream_db.stream5_output_stb; +select * from stream_db.stream6_output_stb; +select * from stream_db.stream7_output_stb; +``` \ No newline at end of file