06-stream.md 5.9 KB
Newer Older
J
jiajingbin 已提交
1 2 3
---
sidebar_label: 流式计算
description: "TDengine 流式计算将数据的写入、预处理、复杂分析、实时计算、报警触发等功能融为一体,是一个能够降低用户部署成本、存储成本和运维成本的计算引擎。"
4
title: 流式计算
J
jiajingbin 已提交
5
---
6

J
jiajingbin 已提交
7 8 9
在时序数据的处理中,经常要对原始数据进行清洗、预处理,再使用时序数据库进行长久的储存。用户通常需要在时序数据库之外再搭建 Kafka、Flink、Spark 等流计算处理引擎,增加了用户的开发成本和维护成本。
使用 TDengine 3.0 的流式计算引擎能够最大限度的减少对这些额外中间件的依赖,真正将数据的写入、预处理、长期存储、复杂分析、实时计算、实时报警触发等功能融为一体,并且,所有这些任务只需要使用 SQL 完成,极大降低了用户的学习成本、使用成本。

10
## 流式计算的创建
J
jiajingbin 已提交
11 12 13 14 15 16

```sql
CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subquery
stream_options: {
 TRIGGER    [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time]
 WATERMARK   time
17
 IGNORE EXPIRED
J
jiajingbin 已提交
18
}
19 20
```

21
详细的语法规则参考 [流式计算](/taos-sql/stream/)
J
jiajingbin 已提交
22

23
## 示例一
J
jiajingbin 已提交
24

25
查找过去 12 小时电表电压大于 220V 的记录条数和电流的最大值,并对采集的数据按时间窗口聚合。
J
jiajingbin 已提交
26 27 28

### 创建 DB 和原始数据表

29
首先准备数据,完成建库、建一张超级表和多张子表操作
J
jiajingbin 已提交
30 31 32

```sql
drop database if exists stream_db;
33
create database stream_db;
J
jiajingbin 已提交
34

35 36 37 38 39
create stable stream_db.meters (ts timestamp, current float, voltage int) TAGS (location varchar(64), groupId int);

create table stream_db.d1001 using stream_db.meters tags("beijing", 1);
create table stream_db.d1002 using stream_db.meters tags("guangzhou", 2);
create table stream_db.d1003 using stream_db.meters tags("shanghai", 3);
J
jiajingbin 已提交
40 41 42
```

### 创建流
43

J
jiajingbin 已提交
44
```sql
45
create stream stream1 into stream_db.stream1_output_stb as select _wstart as start, count(voltage), max(current) from stream_db.meters where voltage > 220 and ts > now - 12h interval (1h);
J
jiajingbin 已提交
46 47
```

48
### 写入数据
J
jiajingbin 已提交
49
```sql
50 51 52 53 54 55 56 57 58
insert into stream_db.d1001 values(now-14h, 10.3, 210);
insert into stream_db.d1001 values(now-13h, 13.5, 226);
insert into stream_db.d1001 values(now-12h, 12.5, 221);
insert into stream_db.d1002 values(now-11h, 14.7, 221);
insert into stream_db.d1002 values(now-10h, 10.5, 219);
insert into stream_db.d1002 values(now-9h, 11.2, 217);
insert into stream_db.d1003 values(now-8h, 11.5, 222);
insert into stream_db.d1003 values(now-7h, 12.3, 227);
insert into stream_db.d1003 values(now-6h, 12.3, 215);
J
jiajingbin 已提交
59 60
```

61
### 查询以观查结果
J
jiajingbin 已提交
62
```sql
63 64 65 66 67 68 69
taos> select * from stream_db.stream1_output_stb;
          start          |    count(voltage)     |     max(current)     |       group_id        |
=================================================================================================
 2022-08-08 08:00:00.000 |                     1 |             14.70000 |                     0 |
 2022-08-08 11:00:00.000 |                     1 |             11.50000 |                     0 |
 2022-08-08 12:00:00.000 |                     1 |             12.30000 |                     0 |
Query OK, 3 rows in database (0.008239s)
J
jiajingbin 已提交
70 71
```

72 73
## 示例二
查询所有电表中电压等于 220V 的数据,对过滤出的电表电流数据进行四舍五入运算,同时将主键时间戳列转换为 bigint 类型,并对采集的数据按表名分组。
J
jiajingbin 已提交
74

75 76
### 创建 DB 和原始数据表
首先准备数据,完成建库、建一张超级表和多张子表操作
J
jiajingbin 已提交
77 78

```sql
79 80
drop database if exists stream_db;
create database stream_db;
J
jiajingbin 已提交
81

82
create stable stream_db.meters (ts timestamp, current float, voltage int) TAGS (location varchar(64), groupId int);
J
jiajingbin 已提交
83

84 85 86 87 88
create table stream_db.d1001 using stream_db.meters tags("beijing", 1);
create table stream_db.d1002 using stream_db.meters tags("shanghai", 2);
create table stream_db.d1003 using stream_db.meters tags("beijing", 2);
create table stream_db.d1004 using stream_db.meters tags("tianjin", 3);
create table stream_db.d1005 using stream_db.meters tags("shanghai", 1);
J
jiajingbin 已提交
89 90
```

91
### 创建流
J
jiajingbin 已提交
92 93

```sql
94
create stream stream2 into stream_db.stream2_output_stb as select ts,cast(ts as bigint),round(current),location from stream_db.meters where voltage=220 partition by tbname;
J
jiajingbin 已提交
95 96 97 98
```

### 写入数据
```sql
99 100 101 102 103 104 105 106 107 108
insert into stream_db.d1001 values(now-14h, 10.3, 210);
insert into stream_db.d1001 values(now-13h, 13.5, 220);
insert into stream_db.d1002 values(now-12h, 14.7, 218);
insert into stream_db.d1002 values(now-11h, 10.5, 220);
insert into stream_db.d1003 values(now-10h, 11.5, 220);
insert into stream_db.d1003 values(now-9h, 12.3, 215);
insert into stream_db.d1004 values(now-8h, 11.5, 220);
insert into stream_db.d1004 values(now-7h, 15.3, 217);
insert into stream_db.d1005 values(now-6h, 16.5, 216);
insert into stream_db.d1005 values(now-5h, 12.3, 220);
J
jiajingbin 已提交
109 110 111
```
### 查询以观查结果
```sql
112 113 114 115 116 117 118 119 120
taos> select * from stream_db.stream2_output_stb;
           ts            |  cast(ts as bigint)   |    round(current)    |            location            |       group_id        |
==================================================================================================================================
 2022-08-08 09:29:55.557 |         1659922195557 |             12.00000 | beijing                        |   7226905450883977166 |
 2022-08-08 11:29:55.570 |         1659929395570 |             12.00000 | tianjin                        |   7226905450884501455 |
 2022-08-08 08:29:55.549 |         1659918595549 |             11.00000 | shanghai                       |   7226905450883452877 |
 2022-08-08 06:29:55.534 |         1659911395534 |             14.00000 | beijing                        |   7226905450882928588 |
 2022-08-08 14:29:56.175 |         1659940196175 |             12.00000 | shanghai                       |   7226905450895708112 |
Query OK, 5 rows in database (0.015235s)
121
```
122 123 124 125

## 示例三
...待续