06-stream.md 6.5 KB
Newer Older
J
jiajingbin 已提交
1 2 3
---
sidebar_label: 流式计算
description: "TDengine 流式计算将数据的写入、预处理、复杂分析、实时计算、报警触发等功能融为一体,是一个能够降低用户部署成本、存储成本和运维成本的计算引擎。"
4
title: 流式计算
J
jiajingbin 已提交
5
---
6

L
Liu Jicong 已提交
7
在时序数据的处理中,经常要对原始数据进行清洗、预处理,再使用时序数据库进行长久的储存。在传统的时序数据解决方案中,常常需要部署 Kafka、Flink 等流处理系统。而流处理系统的复杂性,带来了高昂的开发与运维成本。
L
Liu Jicong 已提交
8

L
Liu Jicong 已提交
9
TDengine 3.0 的流式计算引擎提供了实时处理写入的数据流的能力,使用 SQL 定义实时流变换,当数据被写入流的源表后,数据会被以定义的方式自动处理,并根据定义的触发模式向目的表推送结果。它提供了替代复杂流处理系统的轻量级解决方案,并能够在高吞吐的数据写入的情况下,提供毫秒级的计算结果延迟。
L
Liu Jicong 已提交
10

L
Liu Jicong 已提交
11
流式计算可以包含数据过滤,标量函数计算(含UDF),以及窗口聚合(支持滑动窗口、会话窗口与状态窗口),可以以超级表、子表、普通表为源表,写入到目的超级表。在创建流时,目的超级表将被自动创建,随后新插入的数据会被流定义的方式处理并写入其中,通过 partition by 子句,可以以表名或标签划分 partition,不同的 partition 将写入到目的超级表的不同子表。
L
Liu Jicong 已提交
12

L
Liu Jicong 已提交
13
TDengine 的流式计算能够支持分布在多个 vnode 中的超级表聚合;还能够处理乱序数据的写入:它提供了 watermark 机制以度量容忍数据乱序的程度,并提供了 ignore expired 配置项以决定乱序数据的处理策略——丢弃或者重新计算。
L
Liu Jicong 已提交
14 15 16

详见 [流式计算](../../taos-sql/stream)

J
jiajingbin 已提交
17

18
## 流式计算的创建
J
jiajingbin 已提交
19 20 21 22 23 24

```sql
CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subquery
stream_options: {
 TRIGGER    [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time]
 WATERMARK   time
L
Liu Jicong 已提交
25
 IGNORE EXPIRED [0 | 1]
J
jiajingbin 已提交
26
}
27 28
```

29
详细的语法规则参考 [流式计算](../../taos-sql/stream)
J
jiajingbin 已提交
30

31
## 示例一
J
jiajingbin 已提交
32

33
企业电表的数据经常都是成百上千亿条的,那么想要将这些分散、凌乱的数据清洗或转换都需要比较长的时间,很难做到高效性和实时性,以下例子中,通过流计算可以将电表电压大于 220V 的数据清洗掉,然后以 5 秒为窗口整合并计算出每个窗口中电流的最大值,最后将结果输出到指定的数据表中。
J
jiajingbin 已提交
34 35 36

### 创建 DB 和原始数据表

37
首先准备数据,完成建库、建一张超级表和多张子表操作
J
jiajingbin 已提交
38 39

```sql
40 41 42
DROP DATABASE IF EXISTS power;
CREATE DATABASE power;
USE power;
J
jiajingbin 已提交
43

44
CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);
45

46 47 48 49
CREATE TABLE d1001 USING meters TAGS ("California.SanFrancisco", 2);
CREATE TABLE d1002 USING meters TAGS ("California.SanFrancisco", 3);
CREATE TABLE d1003 USING meters TAGS ("California.LosAngeles", 2);
CREATE TABLE d1004 USING meters TAGS ("California.LosAngeles", 3);
J
jiajingbin 已提交
50 51 52
```

### 创建流
53

J
jiajingbin 已提交
54
```sql
L
Liu Jicong 已提交
55
create stream current_stream into current_stream_output_stb as select _wstart as start, _wend as wend, max(current) as max_current from meters where voltage <= 220 interval (5s);
J
jiajingbin 已提交
56 57
```

58
### 写入数据
J
jiajingbin 已提交
59
```sql
60 61 62 63 64 65 66 67
insert into d1001 values("2018-10-03 14:38:05.000", 10.30000, 219, 0.31000);
insert into d1001 values("2018-10-03 14:38:15.000", 12.60000, 218, 0.33000);
insert into d1001 values("2018-10-03 14:38:16.800", 12.30000, 221, 0.31000);
insert into d1002 values("2018-10-03 14:38:16.650", 10.30000, 218, 0.25000);
insert into d1003 values("2018-10-03 14:38:05.500", 11.80000, 221, 0.28000);
insert into d1003 values("2018-10-03 14:38:16.600", 13.40000, 223, 0.29000);
insert into d1004 values("2018-10-03 14:38:05.000", 10.80000, 223, 0.29000);
insert into d1004 values("2018-10-03 14:38:06.500", 11.50000, 221, 0.35000);
J
jiajingbin 已提交
68 69
```

L
Liu Jicong 已提交
70
### 查询以观察结果
71

J
jiajingbin 已提交
72
```sql
S
songshuqi 已提交
73
taos> select start, wend, max_current from current_stream_output_stb;
L
Liu Jicong 已提交
74
          start          |          wend           |     max_current      |
75
===========================================================================
76 77 78
 2018-10-03 14:38:05.000 | 2018-10-03 14:38:10.000 |             10.30000 |
 2018-10-03 14:38:15.000 | 2018-10-03 14:38:20.000 |             12.60000 |
Query OK, 2 rows in database (0.018762s)
J
jiajingbin 已提交
79 80
```

81
## 示例二
J
jiajingbin 已提交
82

83
依然以示例一中的数据为基础,我们已经采集到了每个智能电表的电流和电压数据,现在需要求出有功功率和无功功率,并将地域和电表名以符号 "." 拼接,然后以电表名称分组输出到新的数据表中。
J
jiajingbin 已提交
84

85
### 创建 DB 和原始数据表
J
jiajingbin 已提交
86

87
参考示例一 [创建 DB 和原始数据表](#创建-db-和原始数据表)
J
jiajingbin 已提交
88

89
### 创建流
J
jiajingbin 已提交
90 91

```sql
92
create stream power_stream into power_stream_output_stb as select ts, concat_ws(".", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname;
J
jiajingbin 已提交
93 94 95
```

### 写入数据
96 97 98

参考示例一 [写入数据](#写入数据)

L
Liu Jicong 已提交
99
### 查询以观察结果
J
jiajingbin 已提交
100
```sql
101 102 103
taos> select ts, meter_location, active_power, reactive_power from power_stream_output_stb;
           ts            |         meter_location         |       active_power        |      reactive_power       |
===================================================================================================================
104 105 106 107 108 109 110 111 112
 2018-10-03 14:38:05.000 | California.LosAngeles.d1004    |            2307.834596289 |             688.687331847 |
 2018-10-03 14:38:06.500 | California.LosAngeles.d1004    |            2387.415754896 |             871.474763418 |
 2018-10-03 14:38:05.500 | California.LosAngeles.d1003    |            2506.240411679 |             720.680274962 |
 2018-10-03 14:38:16.600 | California.LosAngeles.d1003    |            2863.424274422 |             854.482390839 |
 2018-10-03 14:38:05.000 | California.SanFrancisco.d1001  |            2148.178871730 |             688.120784090 |
 2018-10-03 14:38:15.000 | California.SanFrancisco.d1001  |            2598.589176205 |             890.081451418 |
 2018-10-03 14:38:16.800 | California.SanFrancisco.d1001  |            2588.728381186 |             829.240910475 |
 2018-10-03 14:38:16.650 | California.SanFrancisco.d1002  |            2175.595991997 |             555.520860397 |
Query OK, 8 rows in database (0.014753s)
L
Liu Jicong 已提交
113
```