在时序数据的处理中,经常要对原始数据进行清洗、预处理,再使用时序数据库进行长久的储存。用户通常需要在时序数据库之外再搭建 Kafka、Flink、Spark 等流计算处理引擎,增加了用户的开发成本和维护成本。
使用 TDengine 3.0 的流式计算引擎能够最大限度的减少对这些额外中间件的依赖,真正将数据的写入、预处理、长期存储、复杂分析、实时计算、实时报警触发等功能融为一体,并且,所有这些任务只需要使用 SQL 完成,极大降低了用户的学习成本、使用成本。
## 流式计算的创建
CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subquery
......@@ -16,96 +16,13 @@ stream_options: {
其中 subquery select 普通查询语法的子集:
subquery: SELECT [DISTINCT] select_list
[WHERE condition]
[PARTITION BY tag_list]
不支持 order_bylimitslimitfill 语句
### 流式计算与窗口切分查询
window_clause: {
SESSION(ts_col, tol_val)
| INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)]
其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口;
STATE_WINDOW 是状态窗口,产生的连续记录如果具有相同的状态量数值则归属于同一个状态窗口,数值改变后该窗口关闭;
INTERVAL 是时间窗口,用于产生相等时间周期的窗口,查询过滤、聚合等操作按照每个时间窗口为独立的单位执行;
为了便于理解,下面以 interval(10s) 为例,假设事件时间为 "2022-08-06 00:00:01",对应的窗口为 "2022-08-06 00:00:00" ~ "2022-08-06 00:00:10"(窗口 A),那么下一个窗口为 "2022-08-06 00:00:10" ~ "2022-08-06 00:00:20"(窗口 B)。
### 流式计算的触发模式
在创建流时,可以通过 TRIGGER 指令指定流式计算的触发模式。
对于窗口计算,目前提供 3 种触发模式:
1. AT_ONCE:写入立即触发,窗口 A 和窗口 B 的数据写入后均可以立即触发计算;
2. WINDOW_CLOSE:窗口关闭时触发(可配合 WATERMARK 使用,详见[流式计算的乱序数据容忍策略](#流式计算的乱序数据容忍策略),当窗口 B 的数据到达时,窗口 A 才会关闭并触发 WINDOW_CLOSE;
3. MAX_DELAY time:若窗口 A 关闭,则触发计算。若窗口 B 未关闭,且未关闭时长超过 MAX_DELAY 指定的时间,则触发计算。
因此,流式计算提供了以事件时间结合处理时间计算的 MAX_DELAY 触发模式;
MAX_DELAY 模式在窗口关闭时或者数据写入后计算触发的时间超过 MAX_DELAY 指定的时间,会立即触发计算。
### 流式计算的乱序数据容忍策略
在创建流时,可以在 stream_options 中指定 WATERMARK;
流式计算通过 WATERMARK 来度量对乱序数据的容忍程度,WATERMARK 默认为 0。
T = 最新事件时间 - WATERMARK
每批到来的数据都会以上述公式更新窗口关闭时间,并将窗口结束时间 < T 的所有打开的窗口关闭,若触发模式为 WINDOW_CLOSE 或 MAX_DELAY,则推送窗口聚合结果。
以上面的 WINDOW_CLOSE 为例,如果设置了 WATERMARK 15s,那么窗口 A 和 B 均会延迟推送计算结果,当最新事件时间为 "2022-08-06 00:00:25" 时会推送窗口 A 的结果,当最新事件时间为 "2022-08-06 00:00:35" 时会推送窗口 B 的结果。
### 流式计算的过期数据处理策略
1. 直接丢弃:这是常见流式计算引擎提供的默认(甚至是唯一)计算模式;
2. 重新计算:从 TSDB 中重新查找对应窗口的所有数据并重新计算得到最新结果;
模式 1 创建流时需要在 stream_options 中配置 IGNORE EXPIRED,对于已经关闭的窗口,再次落入该窗口的乱序数据会被直接丢弃;
详细的语法规则参考 [流式计算](/taos-sql/stream/)
无论在哪种模式下,WATERMARK 都应该被妥善设置,来得到正确结果(直接丢弃模式)或避免频繁触发重算带来的性能开销(重新计算模式)。
查找过去 12 小时电表电压大于 220V 的记录条数和电流的最大值,并对采集的数据按时间窗口聚合。
### 创建 DB 和原始数据表
......@@ -115,79 +32,94 @@ DROP STREAM [IF EXISTS] stream_name;
drop database if exists stream_db;
create database stream_db;
create table stream_db.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16));
create table stream_db.ctb0 using stream_db.stb tags(0, "subtable0");
create table stream_db.ctb1 using stream_db.stb tags(1, "subtable1");
create table stream_db.ctb2 using stream_db.stb tags(2, "subtable2");
create table stream_db.ctb3 using stream_db.stb tags(3, "subtable3");
create stable stream_db.meters (ts timestamp, current float, voltage int) TAGS (location varchar(64), groupId int);
create table stream_db.d1001 using stream_db.meters tags("beijing", 1);
create table stream_db.d1002 using stream_db.meters tags("guangzhou", 2);
create table stream_db.d1003 using stream_db.meters tags("shanghai", 3);
### 创建流
create stream stream1 into stream_db.stream1_output_stb as select _wstart as start, count(voltage), max(current) from stream_db.meters where voltage > 220 and ts > now - 12h interval (1h);
### 写入数据
insert into stream_db.d1001 values(now-14h, 10.3, 210);
insert into stream_db.d1001 values(now-13h, 13.5, 226);
insert into stream_db.d1001 values(now-12h, 12.5, 221);
insert into stream_db.d1002 values(now-11h, 14.7, 221);
insert into stream_db.d1002 values(now-10h, 10.5, 219);
insert into stream_db.d1002 values(now-9h, 11.2, 217);
insert into stream_db.d1003 values(now-8h, 11.5, 222);
insert into stream_db.d1003 values(now-7h, 12.3, 227);
insert into stream_db.d1003 values(now-6h, 12.3, 215);
### 查询以观查结果
taos> select * from stream_db.stream1_output_stb;
start | count(voltage) | max(current) | group_id |
2022-08-08 08:00:00.000 | 1 | 14.70000 | 0 |
2022-08-08 11:00:00.000 | 1 | 11.50000 | 0 |
2022-08-08 12:00:00.000 | 1 | 12.30000 | 0 |
Query OK, 3 rows in database (0.008239s)
drop database if exists stream_db;
create database stream_db;
create stable stream_db.meters (ts timestamp, current float, voltage int) TAGS (location varchar(64), groupId int);
create table stream_db.d1001 using stream_db.meters tags("beijing", 1);
create table stream_db.d1002 using stream_db.meters tags("shanghai", 2);
create table stream_db.d1003 using stream_db.meters tags("beijing", 2);
create table stream_db.d1004 using stream_db.meters tags("tianjin", 3);
create table stream_db.d1005 using stream_db.meters tags("shanghai", 1);
### 创建流
create stream stream2 into stream_db.stream2_output_stb as select ts,cast(ts as bigint),round(current),location from stream_db.meters where voltage=220 partition by tbname;
insert into stream_db.d1001 values(now-14h, 10.3, 210);
insert into stream_db.d1001 values(now-13h, 13.5, 220);
insert into stream_db.d1002 values(now-12h, 14.7, 218);
insert into stream_db.d1002 values(now-11h, 10.5, 220);
insert into stream_db.d1003 values(now-10h, 11.5, 220);
insert into stream_db.d1003 values(now-9h, 12.3, 215);
insert into stream_db.d1004 values(now-8h, 11.5, 220);
insert into stream_db.d1004 values(now-7h, 15.3, 217);
insert into stream_db.d1005 values(now-6h, 16.5, 216);
insert into stream_db.d1005 values(now-5h, 12.3, 220);
taos> select * from stream_db.stream2_output_stb;
ts | cast(ts as bigint) | round(current) | location | group_id |
2022-08-08 09:29:55.557 | 1659922195557 | 12.00000 | beijing | 7226905450883977166 |
2022-08-08 11:29:55.570 | 1659929395570 | 12.00000 | tianjin | 7226905450884501455 |
2022-08-08 08:29:55.549 | 1659918595549 | 11.00000 | shanghai | 7226905450883452877 |
2022-08-08 06:29:55.534 | 1659911395534 | 14.00000 | beijing | 7226905450882928588 |
2022-08-08 14:29:56.175 | 1659940196175 | 12.00000 | shanghai | 7226905450895708112 |
Query OK, 5 rows in database (0.015235s)
## 示例三
