Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7e5fa0dd
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
7e5fa0dd
编写于
8月 07, 2022
作者:
J
jiajingbin
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
doc: add 06-stream.md
上级
ef849670
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
179 addition
and
84 deletion
+179
-84
docs/zh/07-develop/06-stream.md
docs/zh/07-develop/06-stream.md
+179
-84
未找到文件。
docs/zh/07-develop/06-stream.md
浏览文件 @
7e5fa0dd
---
sidebar_label
:
连续查询
description
:
"
连续查询是一个按照预设频率自动执行的查询功能,提供按照时间窗口的聚合查询能力,是一种简化的时间驱动流式计算。"
title
:
"
连续查询(Continuous
Query)"
---
连续查询是 TDengine 定期自动执行的查询,采用滑动窗口的方式进行计算,是一种简化的时间驱动的流式计算。针对库中的表或超级表,TDengine 可提供定期自动执行的连续查询,用户可让 TDengine 推送查询的结果,也可以将结果再写回到 TDengine 中。每次执行的查询是一个时间窗口,时间窗口随着时间流动向前滑动。在定义连续查询的时候需要指定时间窗口(time window, 参数 interval)大小和每次前向增量时间(forward sliding times, 参数 sliding)。
TDengine 的连续查询采用时间驱动模式,可以直接使用 TAOS SQL 进行定义,不需要额外的操作。使用连续查询,可以方便快捷地按照时间窗口生成结果,从而对原始采集数据进行降采样(down sampling)。用户通过 TAOS SQL 定义连续查询以后,TDengine 自动在最后的一个完整的时间周期末端拉起查询,并将计算获得的结果推送给用户或者写回 TDengine。
TDengine 提供的连续查询与普通流计算中的时间窗口计算具有以下区别:
-
不同于流计算的实时反馈计算结果,连续查询只在时间窗口关闭以后才开始计算。例如时间周期是 1 天,那么当天的结果只会在 23:59:59 以后才会生成。
-
如果有历史记录写入到已经计算完成的时间区间,连续查询并不会重新进行计算,也不会重新将结果推送给用户。对于写回 TDengine 的模式,也不会更新已经存在的计算结果。
-
使用连续查询推送结果的模式,服务端并不缓存客户端计算状态,也不提供 Exactly-Once 的语义保证。如果用户的应用端崩溃,再次拉起的连续查询将只会从再次拉起的时间开始重新计算最近的一个完整的时间窗口。如果使用写回模式,TDengine 可确保数据写回的有效性和连续性。
## 连续查询语法
```
sql
[
CREATE
TABLE
AS
]
SELECT
select_expr
[,
select_expr
...]
FROM
{
tb_name_list
}
[
WHERE
where_condition
]
[
INTERVAL
(
interval_val
[,
interval_offset
])
[
SLIDING
sliding_val
]]
```
INTERVAL: 连续查询作用的时间窗口
SLIDING: 连续查询的时间窗口向前滑动的时间间隔
## 使用连续查询
下面以智能电表场景为例介绍连续查询的具体使用方法。假设我们通过下列 SQL 语句创建了超级表和子表:
```
sql
create
table
meters
(
ts
timestamp
,
current
float
,
voltage
int
,
phase
float
)
tags
(
location
binary
(
64
),
groupId
int
);
create
table
D1001
using
meters
tags
(
"California.SanFrancisco"
,
2
);
create
table
D1002
using
meters
tags
(
"California.LosAngeles"
,
2
);
...
```
可以通过下面这条 SQL 语句以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压。
```
sql
select
avg
(
voltage
)
from
meters
interval
(
1
m
)
sliding
(
30
s
);
```
每次执行这条语句,都会重新计算所有数据。 如果需要每隔 30 秒执行一次来增量计算最近一分钟的数据,可以把上面的语句改进成下面的样子,每次使用不同的
`startTime`
并定期执行:
```
sql
select
avg
(
voltage
)
from
meters
where
ts
>
{
startTime
}
interval
(
1
m
)
sliding
(
30
s
);
```
这样做没有问题,但 TDengine 提供了更简单的方法,只要在最初的查询语句前面加上
`create table {tableName} as`
就可以了,例如:
```
sql
create
table
avg_vol
as
select
avg
(
voltage
)
from
meters
interval
(
1
m
)
sliding
(
30
s
);
```
会自动创建一个名为
`avg_vol`
的新表,然后每隔 30 秒,TDengine 会增量执行
`as`
后面的 SQL 语句,并将查询结果写入这个表中,用户程序后续只要从
`avg_vol`
中查询数据即可。例如:
```
sql
taos
>
select
*
from
avg_vol
;
ts
|
avg_voltage_
|
===================================================
2020
-
07
-
29
13
:
37
:
30
.
000
|
222
.
0000000
|
2020
-
07
-
29
13
:
38
:
00
.
000
|
221
.
3500000
|
2020
-
07
-
29
13
:
38
:
30
.
000
|
220
.
1700000
|
2020
-
07
-
29
13
:
39
:
00
.
000
|
223
.
0800000
|
```
需要注意,查询时间窗口的最小值是 10 毫秒,没有时间窗口范围的上限。
此外,TDengine 还支持用户指定连续查询的起止时间。如果不输入开始时间,连续查询将从第一条原始数据所在的时间窗口开始;如果没有输入结束时间,连续查询将永久运行;如果用户指定了结束时间,连续查询在系统时间达到指定的时间以后停止运行。比如使用下面的 SQL 创建的连续查询将运行一小时,之后会自动停止。
```
sql
create
table
avg_vol
as
select
avg
(
voltage
)
from
meters
where
ts
>
now
and
ts
<=
now
+
1
h
interval
(
1
m
)
sliding
(
30
s
);
```
需要说明的是,上面例子中的
`now`
是指创建连续查询的时间,而不是查询执行的时间,否则,查询就无法自动停止了。另外,为了尽量避免原始数据延迟写入导致的问题,TDengine 中连续查询的计算有一定的延迟。也就是说,一个时间窗口过去后,TDengine 并不会立即计算这个窗口的数据,所以要稍等一会(一般不会超过 1 分钟)才能查到计算结果。
## 管理连续查询
用户可在控制台中通过
`show streams`
命令来查看系统中全部运行的连续查询,并可以通过
`kill stream`
命令杀掉对应的连续查询。后续版本会提供更细粒度和便捷的连续查询管理命令。
---
sidebar_label
:
流式计算
description
:
"
TDengine
流式计算将数据的写入、预处理、复杂分析、实时计算、报警触发等功能融为一体,是一个能够降低用户部署成本、存储成本和运维成本的计算引擎。"
title
:
流式计算
---
在时序数据的处理中,经常要对原始数据进行清洗、预处理,再使用时序数据库进行长久的储存。用户通常需要在时序数据库之外再搭建 Kafka、Flink、Spark 等流计算处理引擎,增加了用户的开发成本和维护成本。
使用 TDengine 3.0 的流式计算引擎能够最大限度的减少对这些额外中间件的依赖,真正将数据的写入、预处理、长期存储、复杂分析、实时计算、实时报警触发等功能融为一体,并且,所有这些任务只需要使用 SQL 完成,极大降低了用户的学习成本、使用成本。
## 创建流
```
sql
CREATE
STREAM
[
IF
NOT
EXISTS
]
stream_name
[
stream_options
]
INTO
stb_name
AS
subquery
stream_options
:
{
TRIGGER
[
AT_ONCE
|
WINDOW_CLOSE
|
MAX_DELAY
time
]
WATERMARK
time
}
其中
subquery
是
select
普通查询语法的子集
:
subquery
:
SELECT
[
DISTINCT
]
select_list
from_clause
[
WHERE
condition
]
[
PARTITION
BY
tag_list
]
[
window_clause
]
[
group_by_clause
]
不支持
order_by
,
limit
,
slimit
,
fill
语句
```
### 触发模式
在创建流时,可以通过 TRIGGER 指令指定流式计算的触发模式。
对于非窗口计算,流式计算的触发是实时的;
对于窗口计算,目前提供3种触发模式:
1.
AT_ONCE:写入立即触发
2.
WINDOW_CLOSE:窗口关闭时触发(窗口关闭由事件时间决定,可配合 WATERMARK 使用,详见《流式计算的乱序数据容忍策略》)
3.
MAX_DELAY time:若窗口关闭,则触发计算。若窗口未关闭,且未关闭时长超过 MAX_DELAY 指定的时间,则触发计算。
由于窗口关闭是由事件时间决定的,如事件流中断、或持续延迟,则事件时间无法更新,可能导致无法得到最新的计算结果。
因此,流式计算提供了以事件时间结合处理时间计算的 MAX_DELAY 触发模式。
MAX_DELAY 模式在窗口关闭时会立即触发计算。此外,当数据写入后,计算触发的时间超过 MAX_DELAY 指定的时间,则立即触发计算。
### 乱序数据容忍策略
在创建流时,可以在 stream_option 中指定 WATERMARK
流式计算通过 WATERMARK 来度量对乱序数据的容忍程度,WATERMARK 默认为 0。
T = 最新事件时间 - WATERMARK
每批到来的数据都会以上述公式更新窗口关闭时间,并将窗口结束时间 < T 的所有打开的窗口关闭,若触发模式为 WINDOW_CLOSE 或 MAX_DELAY,则推送窗口聚合结果。
### 流式计算的过期数据处理策略
对于已关闭的窗口,再次落入该窗口中的数据被标记为过期数据,对于过期数据,流式计算提供两种处理方式:
1.
直接丢弃:这是常见流式计算引擎提供的默认(甚至是唯一)计算模式
2.
重新计算:从 TSDB 中重新查找对应窗口的所有数据并重新计算得到最新结果
无论在哪种模式下,watermark 都应该被妥善设置,来得到正确结果(直接丢弃模式)或避免频繁触发重算带来的性能开销(重新计算模式)。
### 流式计算与窗口切分查询
窗口子句语法如下:
```
sql
window_clause
:
{
SESSION
(
ts_col
,
tol_val
)
|
STATE_WINDOW
(
col
)
|
INTERVAL
(
interval_val
[,
interval_offset
])
[
SLIDING
(
sliding_val
)]
}
```
其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。
## 流式计算的展示
```
sql
SHOW
STREAMS
;
```
## 流式计算的删除
```
sql
DROP
STREAM
[
IF
NOT
EXISTS
]
stream_name
;
```
## 使用案例
通过以下案例,进一步了解 TDengine 流计算的使用
### 创建 DB 和原始数据表
首先准备数据,完成建库、建一张超级表和多张子表操作:
```
sql
drop
database
if
exists
stream_db
;
create
database
stream_db
;
create
table
stream_db
.
stb
(
ts
timestamp
,
c1
int
,
c2
float
,
c3
varchar
(
16
))
tags
(
t1
int
,
t3
varchar
(
16
));
create
table
stream_db
.
ctb0
using
stream_db
.
stb
tags
(
0
,
"subtable0"
);
create
table
stream_db
.
ctb1
using
stream_db
.
stb
tags
(
1
,
"subtable1"
);
create
table
stream_db
.
ctb2
using
stream_db
.
stb
tags
(
2
,
"subtable2"
);
create
table
stream_db
.
ctb3
using
stream_db
.
stb
tags
(
3
,
"subtable3"
);
```
### 创建流
case1. 创建流实现数据过滤
```
sql
create
stream
stream1
into
stream_db
.
stream1_output_stb
as
select
*
from
stream_db
.
stb
where
c1
>
0
and
c2
!=
10
and
c3
is
not
Null
;
```
case2. 创建流实现标量函数的数据转换
```
sql
create
stream
stream2
into
stream_db
.
stream2_output_stb
as
select
ts
,
abs
(
c2
),
char_length
(
c3
),
cast
(
c1
as
binary
(
16
)),
timezone
()
from
stream_db
.
stb
partition
by
tbname
;
```
case3. 创建流实现数据降采样
```
sql
create
stream
stream3
into
stream_db
.
stream3_output_stb
as
select
_wstart
as
start
,
min
(
c1
),
max
(
c2
),
count
(
c3
)
from
stream_db
.
stb
interval
(
10
s
);
```
case4. 通过 trigger window_close 控制流的触发频率
```
sql
create
stream
stream4
trigger
window_close
into
stream_db
.
stream4_output_stb
as
select
_wstart
as
start
,
min
(
c1
),
max
(
c2
),
count
(
c3
)
from
stream_db
.
stb
interval
(
10
s
);
```
case4. 通过 trigger max_delay 控制流的触发频率
```
sql
create
stream
stream5
trigger
max_delay
3
s
into
stream_db
.
stream5_output_stb
as
select
_wstart
as
start
,
min
(
c1
),
max
(
c2
),
count
(
c3
)
from
stream_db
.
stb
interval
(
10
s
);
```
case6. 通过 watermark 实现乱序数据容忍
```
sql
create
stream
stream6
trigger
window_close
watermark
15
s
into
stream_db
.
stream6_output_stb
as
select
_wstart
as
start
,
min
(
c1
),
max
(
c2
),
count
(
c3
)
from
stream_db
.
stb
interval
(
10
s
);
```
case7. 通过 ignore expired 实现乱序数据丢弃
```
sql
create
stream
stream7
trigger
at_once
ignore
expired
into
stream_db
.
stream7_output_stb
as
select
_wstart
as
start
,
min
(
c1
),
max
(
c2
),
count
(
c3
)
from
stream_db
.
stb
interval
(
10
s
);
```
### 写入数据
```
sql
insert
into
stream_db
.
ctb0
values
(
"2022-08-13 00:00:00"
,
0
,
0
,
'a0'
);
insert
into
stream_db
.
ctb0
values
(
"2022-08-13 00:00:01"
,
1
,
1
,
'a1'
);
insert
into
stream_db
.
ctb0
values
(
"2022-08-13 00:00:07"
,
7
,
7
,
'a7'
);
insert
into
stream_db
.
ctb0
values
(
"2022-08-13 00:00:10"
,
10
,
10
,
'a10'
);
insert
into
stream_db
.
ctb0
values
(
"2022-08-13 00:00:13"
,
13
,
13
,
'a13'
);
insert
into
stream_db
.
ctb0
values
(
"2022-08-13 00:00:20"
,
20
,
20
,
'a20'
);
insert
into
stream_db
.
ctb0
values
(
"2022-08-13 00:00:21"
,
21
,
21
,
'a21'
);
insert
into
stream_db
.
ctb0
values
(
"2022-08-13 00:00:25"
,
25
,
25
,
'a25'
);
insert
into
stream_db
.
ctb0
values
(
"2022-08-13 00:00:26"
,
26
,
26
,
'a26'
);
```
### 查询以观查结果
```
sql
select
*
from
stream_db
.
stream1_output_stb
;
select
*
from
stream_db
.
stream2_output_stb
;
select
*
from
stream_db
.
stream3_output_stb
;
select
*
from
stream_db
.
stream4_output_stb
;
select
*
from
stream_db
.
stream5_output_stb
;
select
*
from
stream_db
.
stream6_output_stb
;
select
*
from
stream_db
.
stream7_output_stb
;
```
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录