Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5038df97
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
5038df97
编写于
8月 11, 2022
作者:
W
wade zhang
提交者:
GitHub
8月 11, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #15850 from taosdata/test/td-15880
docs: Add 06-stream.md for stream-computing
上级
123cd4f6
8a9e3ed7
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
128 addition
and
84 deletion
+128
-84
docs/zh/07-develop/06-stream.md
docs/zh/07-develop/06-stream.md
+128
-84
未找到文件。
docs/zh/07-develop/06-stream.md
浏览文件 @
5038df97
---
sidebar_label
:
连续查询
description
:
"
连续查询是一个按照预设频率自动执行的查询功能,提供按照时间窗口的聚合查询能力,是一种简化的时间驱动流式计算。"
title
:
"
连续查询(Continuous
Query)"
---
连续查询是 TDengine 定期自动执行的查询,采用滑动窗口的方式进行计算,是一种简化的时间驱动的流式计算。针对库中的表或超级表,TDengine 可提供定期自动执行的连续查询,用户可让 TDengine 推送查询的结果,也可以将结果再写回到 TDengine 中。每次执行的查询是一个时间窗口,时间窗口随着时间流动向前滑动。在定义连续查询的时候需要指定时间窗口(time window, 参数 interval)大小和每次前向增量时间(forward sliding times, 参数 sliding)。
TDengine 的连续查询采用时间驱动模式,可以直接使用 TAOS SQL 进行定义,不需要额外的操作。使用连续查询,可以方便快捷地按照时间窗口生成结果,从而对原始采集数据进行降采样(down sampling)。用户通过 TAOS SQL 定义连续查询以后,TDengine 自动在最后的一个完整的时间周期末端拉起查询,并将计算获得的结果推送给用户或者写回 TDengine。
TDengine 提供的连续查询与普通流计算中的时间窗口计算具有以下区别:
-
不同于流计算的实时反馈计算结果,连续查询只在时间窗口关闭以后才开始计算。例如时间周期是 1 天,那么当天的结果只会在 23:59:59 以后才会生成。
-
如果有历史记录写入到已经计算完成的时间区间,连续查询并不会重新进行计算,也不会重新将结果推送给用户。对于写回 TDengine 的模式,也不会更新已经存在的计算结果。
-
使用连续查询推送结果的模式,服务端并不缓存客户端计算状态,也不提供 Exactly-Once 的语义保证。如果用户的应用端崩溃,再次拉起的连续查询将只会从再次拉起的时间开始重新计算最近的一个完整的时间窗口。如果使用写回模式,TDengine 可确保数据写回的有效性和连续性。
## 连续查询语法
```
sql
[
CREATE
TABLE
AS
]
SELECT
select_expr
[,
select_expr
...]
FROM
{
tb_name_list
}
[
WHERE
where_condition
]
[
INTERVAL
(
interval_val
[,
interval_offset
])
[
SLIDING
sliding_val
]]
```
INTERVAL: 连续查询作用的时间窗口
SLIDING: 连续查询的时间窗口向前滑动的时间间隔
## 使用连续查询
下面以智能电表场景为例介绍连续查询的具体使用方法。假设我们通过下列 SQL 语句创建了超级表和子表:
```
sql
create
table
meters
(
ts
timestamp
,
current
float
,
voltage
int
,
phase
float
)
tags
(
location
binary
(
64
),
groupId
int
);
create
table
D1001
using
meters
tags
(
"California.SanFrancisco"
,
2
);
create
table
D1002
using
meters
tags
(
"California.LosAngeles"
,
2
);
...
```
可以通过下面这条 SQL 语句以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压。
```
sql
select
avg
(
voltage
)
from
meters
interval
(
1
m
)
sliding
(
30
s
);
```
每次执行这条语句,都会重新计算所有数据。 如果需要每隔 30 秒执行一次来增量计算最近一分钟的数据,可以把上面的语句改进成下面的样子,每次使用不同的
`startTime`
并定期执行:
```
sql
select
avg
(
voltage
)
from
meters
where
ts
>
{
startTime
}
interval
(
1
m
)
sliding
(
30
s
);
```
这样做没有问题,但 TDengine 提供了更简单的方法,只要在最初的查询语句前面加上
`create table {tableName} as`
就可以了,例如:
```
sql
create
table
avg_vol
as
select
avg
(
voltage
)
from
meters
interval
(
1
m
)
sliding
(
30
s
);
```
会自动创建一个名为
`avg_vol`
的新表,然后每隔 30 秒,TDengine 会增量执行
`as`
后面的 SQL 语句,并将查询结果写入这个表中,用户程序后续只要从
`avg_vol`
中查询数据即可。例如:
```
sql
taos
>
select
*
from
avg_vol
;
ts
|
avg_voltage_
|
===================================================
2020
-
07
-
29
13
:
37
:
30
.
000
|
222
.
0000000
|
2020
-
07
-
29
13
:
38
:
00
.
000
|
221
.
3500000
|
2020
-
07
-
29
13
:
38
:
30
.
000
|
220
.
1700000
|
2020
-
07
-
29
13
:
39
:
00
.
000
|
223
.
0800000
|
```
需要注意,查询时间窗口的最小值是 10 毫秒,没有时间窗口范围的上限。
此外,TDengine 还支持用户指定连续查询的起止时间。如果不输入开始时间,连续查询将从第一条原始数据所在的时间窗口开始;如果没有输入结束时间,连续查询将永久运行;如果用户指定了结束时间,连续查询在系统时间达到指定的时间以后停止运行。比如使用下面的 SQL 创建的连续查询将运行一小时,之后会自动停止。
```
sql
create
table
avg_vol
as
select
avg
(
voltage
)
from
meters
where
ts
>
now
and
ts
<=
now
+
1
h
interval
(
1
m
)
sliding
(
30
s
);
```
需要说明的是,上面例子中的
`now`
是指创建连续查询的时间,而不是查询执行的时间,否则,查询就无法自动停止了。另外,为了尽量避免原始数据延迟写入导致的问题,TDengine 中连续查询的计算有一定的延迟。也就是说,一个时间窗口过去后,TDengine 并不会立即计算这个窗口的数据,所以要稍等一会(一般不会超过 1 分钟)才能查到计算结果。
## 管理连续查询
用户可在控制台中通过
`show streams`
命令来查看系统中全部运行的连续查询,并可以通过
`kill stream`
命令杀掉对应的连续查询。后续版本会提供更细粒度和便捷的连续查询管理命令。
---
sidebar_label
:
流式计算
description
:
"
TDengine
流式计算将数据的写入、预处理、复杂分析、实时计算、报警触发等功能融为一体,是一个能够降低用户部署成本、存储成本和运维成本的计算引擎。"
title
:
流式计算
---
在时序数据的处理中,经常要对原始数据进行清洗、预处理,再使用时序数据库进行长久的储存。用户通常需要在时序数据库之外再搭建 Kafka、Flink、Spark 等流计算处理引擎,增加了用户的开发成本和维护成本。
使用 TDengine 3.0 的流式计算引擎能够最大限度的减少对这些额外中间件的依赖,真正将数据的写入、预处理、长期存储、复杂分析、实时计算、实时报警触发等功能融为一体,并且,所有这些任务只需要使用 SQL 完成,极大降低了用户的学习成本、使用成本。
## 流式计算的创建
```
sql
CREATE
STREAM
[
IF
NOT
EXISTS
]
stream_name
[
stream_options
]
INTO
stb_name
AS
subquery
stream_options
:
{
TRIGGER
[
AT_ONCE
|
WINDOW_CLOSE
|
MAX_DELAY
time
]
WATERMARK
time
IGNORE
EXPIRED
}
```
详细的语法规则参考
[
流式计算
](
../../taos-sql/stream
)
## 示例一
企业电表的数据经常都是成百上千亿条的,那么想要将这些分散、凌乱的数据清洗或转换都需要比较长的时间,很难做到高效性和实时性,以下例子中,通过流计算可以将过去 12 小时电表电压大于 220V 的数据清洗掉,然后以小时为窗口整合并计算出每个窗口中电流的最大值,并将结果输出到指定的数据表中。
### 创建 DB 和原始数据表
首先准备数据,完成建库、建一张超级表和多张子表操作
```
sql
drop
database
if
exists
stream_db
;
create
database
stream_db
;
create
stable
stream_db
.
meters
(
ts
timestamp
,
current
float
,
voltage
int
)
TAGS
(
location
varchar
(
64
),
groupId
int
);
create
table
stream_db
.
d1001
using
stream_db
.
meters
tags
(
"beijing"
,
1
);
create
table
stream_db
.
d1002
using
stream_db
.
meters
tags
(
"guangzhou"
,
2
);
create
table
stream_db
.
d1003
using
stream_db
.
meters
tags
(
"shanghai"
,
3
);
```
### 创建流
```
sql
create
stream
stream1
into
stream_db
.
stream1_output_stb
as
select
_wstart
as
start
,
_wend
as
end
,
max
(
current
)
as
max_current
from
stream_db
.
meters
where
voltage
<=
220
and
ts
>
now
-
12
h
interval
(
1
h
);
```
### 写入数据
```
sql
insert
into
stream_db
.
d1001
values
(
now
-
14
h
,
10
.
3
,
210
);
insert
into
stream_db
.
d1001
values
(
now
-
13
h
,
13
.
5
,
216
);
insert
into
stream_db
.
d1001
values
(
now
-
12
h
,
12
.
5
,
219
);
insert
into
stream_db
.
d1002
values
(
now
-
11
h
,
14
.
7
,
221
);
insert
into
stream_db
.
d1002
values
(
now
-
10
h
,
10
.
5
,
218
);
insert
into
stream_db
.
d1002
values
(
now
-
9
h
,
11
.
2
,
220
);
insert
into
stream_db
.
d1003
values
(
now
-
8
h
,
11
.
5
,
217
);
insert
into
stream_db
.
d1003
values
(
now
-
7
h
,
12
.
3
,
227
);
insert
into
stream_db
.
d1003
values
(
now
-
6
h
,
12
.
3
,
215
);
```
### 查询以观查结果
```
sql
taos
>
select
*
from
stream_db
.
stream1_output_stb
;
start
|
end
|
max_current
|
group_id
|
===================================================================================================
2022
-
08
-
09
14
:
00
:
00
.
000
|
2022
-
08
-
09
15
:
00
:
00
.
000
|
10
.
50000
|
0
|
2022
-
08
-
09
15
:
00
:
00
.
000
|
2022
-
08
-
09
16
:
00
:
00
.
000
|
11
.
20000
|
0
|
2022
-
08
-
09
16
:
00
:
00
.
000
|
2022
-
08
-
09
17
:
00
:
00
.
000
|
11
.
50000
|
0
|
2022
-
08
-
09
18
:
00
:
00
.
000
|
2022
-
08
-
09
19
:
00
:
00
.
000
|
12
.
30000
|
0
|
Query
OK
,
4
rows
in
database
(
0
.
012033
s
)
```
## 示例二
某运营商平台要采集机房所有服务器的系统资源指标,包含 cpu、内存、网络延迟等,采集后需要对数据进行四舍五入运算,将地域和服务器名以下划线拼接,然后将结果按时间排序并以服务器名分组输出到新的数据表中。
### 创建 DB 和原始数据表
首先准备数据,完成建库、建一张超级表和多张子表操作
```
sql
drop
database
if
exists
stream_db
;
create
database
stream_db
;
create
stable
stream_db
.
idc
(
ts
timestamp
,
cpu
float
,
mem
float
,
latency
float
)
TAGS
(
location
varchar
(
64
),
groupId
int
);
create
table
stream_db
.
server01
using
stream_db
.
idc
tags
(
"beijing"
,
1
);
create
table
stream_db
.
server02
using
stream_db
.
idc
tags
(
"shanghai"
,
2
);
create
table
stream_db
.
server03
using
stream_db
.
idc
tags
(
"beijing"
,
2
);
create
table
stream_db
.
server04
using
stream_db
.
idc
tags
(
"tianjin"
,
3
);
create
table
stream_db
.
server05
using
stream_db
.
idc
tags
(
"shanghai"
,
1
);
```
### 创建流
```
sql
create
stream
stream2
into
stream_db
.
stream2_output_stb
as
select
ts
,
concat_ws
(
"_"
,
location
,
tbname
)
as
server_location
,
round
(
cpu
)
as
cpu
,
round
(
mem
)
as
mem
,
round
(
latency
)
as
latency
from
stream_db
.
idc
partition
by
tbname
order
by
ts
;
```
### 写入数据
```
sql
insert
into
stream_db
.
server01
values
(
now
-
14
h
,
50
.
9
,
654
.
8
,
23
.
11
);
insert
into
stream_db
.
server01
values
(
now
-
13
h
,
13
.
5
,
221
.
2
,
11
.
22
);
insert
into
stream_db
.
server02
values
(
now
-
12
h
,
154
.
7
,
218
.
3
,
22
.
33
);
insert
into
stream_db
.
server02
values
(
now
-
11
h
,
120
.
5
,
111
.
5
,
5
.
55
);
insert
into
stream_db
.
server03
values
(
now
-
10
h
,
101
.
5
,
125
.
6
,
5
.
99
);
insert
into
stream_db
.
server03
values
(
now
-
9
h
,
12
.
3
,
165
.
6
,
6
.
02
);
insert
into
stream_db
.
server04
values
(
now
-
8
h
,
160
.
9
,
120
.
7
,
43
.
51
);
insert
into
stream_db
.
server04
values
(
now
-
7
h
,
240
.
9
,
520
.
7
,
54
.
55
);
insert
into
stream_db
.
server05
values
(
now
-
6
h
,
190
.
9
,
320
.
7
,
55
.
43
);
insert
into
stream_db
.
server05
values
(
now
-
5
h
,
110
.
9
,
600
.
7
,
35
.
54
);
```
### 查询以观查结果
```
sql
taos
>
select
ts
,
server_location
,
cpu
,
mem
,
latency
from
stream_db
.
stream2_output_stb
;
ts
|
server_location
|
cpu
|
mem
|
latency
|
================================================================================================================================
2022
-
08
-
09
21
:
24
:
56
.
785
|
beijing_server01
|
51
.
00000
|
655
.
00000
|
23
.
00000
|
2022
-
08
-
09
22
:
24
:
56
.
795
|
beijing_server01
|
14
.
00000
|
221
.
00000
|
11
.
00000
|
2022
-
08
-
09
23
:
24
:
56
.
806
|
shanghai_server02
|
155
.
00000
|
218
.
00000
|
22
.
00000
|
2022
-
08
-
10
00
:
24
:
56
.
815
|
shanghai_server02
|
121
.
00000
|
112
.
00000
|
6
.
00000
|
2022
-
08
-
10
01
:
24
:
56
.
826
|
beijing_server03
|
102
.
00000
|
126
.
00000
|
6
.
00000
|
2022
-
08
-
10
02
:
24
:
56
.
838
|
beijing_server03
|
12
.
00000
|
166
.
00000
|
6
.
00000
|
2022
-
08
-
10
03
:
24
:
56
.
846
|
tianjin_server04
|
161
.
00000
|
121
.
00000
|
44
.
00000
|
2022
-
08
-
10
04
:
24
:
56
.
853
|
tianjin_server04
|
241
.
00000
|
521
.
00000
|
55
.
00000
|
2022
-
08
-
10
05
:
24
:
56
.
866
|
shanghai_server05
|
191
.
00000
|
321
.
00000
|
55
.
00000
|
2022
-
08
-
10
06
:
24
:
57
.
301
|
shanghai_server05
|
111
.
00000
|
601
.
00000
|
36
.
00000
|
Query
OK
,
10
rows
in
database
(
0
.
022950
s
)
```
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录