提交 10ea8e14 编写于 作者: E Elias Soong

[TD-3624] <docs>: fix minor typo.

上级 ffb8c3d6
# 高级功能
## <a class="anchor" id="continuous-query"></a>连续查询(Continuous Query)
## <a class="anchor" id="continuous-query"></a>连续查询(Continuous Query)
连续查询是TDengine定期自动执行的查询,采用滑动窗口的方式进行计算,是一种简化的时间驱动的流式计算。
针对库中的表或超级表,TDengine可提供定期自动执行的连续查询,
用户可让TDengine推送查询的结果,也可以将结果再写回到TDengine中。
每次执行的查询是一个时间窗口,时间窗口随着时间流动向前滑动。
在定义连续查询的时候需要指定时间窗口(time window, 参数interval)大小和每次前向增量时间(forward sliding times, 参数sliding)。
连续查询是TDengine定期自动执行的查询,采用滑动窗口的方式进行计算,是一种简化的时间驱动的流式计算。针对库中的表或超级表,TDengine可提供定期自动执行的连续查询,用户可让TDengine推送查询的结果,也可以将结果再写回到TDengine中。每次执行的查询是一个时间窗口,时间窗口随着时间流动向前滑动。在定义连续查询的时候需要指定时间窗口(time window, 参数interval)大小和每次前向增量时间(forward sliding times, 参数sliding)。
TDengine的连续查询采用时间驱动模式,可以直接使用TAOS SQL进行定义,不需要额外的操作。
使用连续查询,可以方便快捷地按照时间窗口生成结果,从而对原始采集数据进行降采样(down sampling)。
用户通过TAOS SQL定义连续查询以后,TDengine自动在最后的一个完整的时间周期末端拉起查询,
并将计算获得的结果推送给用户或者写回TDengine。
TDengine的连续查询采用时间驱动模式,可以直接使用TAOS SQL进行定义,不需要额外的操作。使用连续查询,可以方便快捷地按照时间窗口生成结果,从而对原始采集数据进行降采样(down sampling)。用户通过TAOS SQL定义连续查询以后,TDengine自动在最后的一个完整的时间周期末端拉起查询,并将计算获得的结果推送给用户或者写回TDengine。
TDengine提供的连续查询与普通流计算中的时间窗口计算具有以下区别:
- 不同于流计算的实时反馈计算结果,连续查询只在时间窗口关闭以后才开始计算。
例如时间周期是1天,那么当天的结果只会在23:59:59以后才会生成。
- 如果有历史记录写入到已经计算完成的时间区间,连续查询并不会重新进行计算,
也不会重新将结果推送给用户。对于写回TDengine的模式,也不会更新已经存在的计算结果。
- 使用连续查询推送结果的模式,服务端并不缓存客户端计算状态,也不提供Exactly-Once的语意保证。
如果用户的应用端崩溃,再次拉起的连续查询将只会从再次拉起的时间开始重新计算最近的一个完整的时间窗口。
如果使用写回模式,TDengine可确保数据写回的有效性和连续性。
- 不同于流计算的实时反馈计算结果,连续查询只在时间窗口关闭以后才开始计算。例如时间周期是1天,那么当天的结果只会在23:59:59以后才会生成。
- 如果有历史记录写入到已经计算完成的时间区间,连续查询并不会重新进行计算,也不会重新将结果推送给用户。对于写回TDengine的模式,也不会更新已经存在的计算结果。
- 使用连续查询推送结果的模式,服务端并不缓存客户端计算状态,也不提供Exactly-Once的语意保证。如果用户的应用端崩溃,再次拉起的连续查询将只会从再次拉起的时间开始重新计算最近的一个完整的时间窗口。如果使用写回模式,TDengine可确保数据写回的有效性和连续性。
### 使用连续查询
......@@ -40,23 +29,19 @@ create table D1002 using meters tags ("Beijing.Haidian", 2);
select avg(voltage) from meters interval(1m) sliding(30s);
```
每次执行这条语句,都会重新计算所有数据。
如果需要每隔30秒执行一次来增量计算最近一分钟的数据,
可以把上面的语句改进成下面的样子,每次使用不同的 `startTime` 并定期执行:
每次执行这条语句,都会重新计算所有数据。 如果需要每隔30秒执行一次来增量计算最近一分钟的数据,可以把上面的语句改进成下面的样子,每次使用不同的 `startTime` 并定期执行:
```sql
select avg(voltage) from meters where ts > {startTime} interval(1m) sliding(30s);
```
这样做没有问题,但TDengine提供了更简单的方法,
只要在最初的查询语句前面加上 `create table {tableName} as ` 就可以了, 例如:
这样做没有问题,但TDengine提供了更简单的方法,只要在最初的查询语句前面加上 `create table {tableName} as ` 就可以了, 例如:
```sql
create table avg_vol as select avg(voltage) from meters interval(1m) sliding(30s);
```
会自动创建一个名为 `avg_vol` 的新表,然后每隔30秒,TDengine会增量执行 `as` 后面的 SQL 语句,
并将查询结果写入这个表中,用户程序后续只要从 `avg_vol` 中查询数据即可。 例如:
会自动创建一个名为 `avg_vol` 的新表,然后每隔30秒,TDengine会增量执行 `as` 后面的 SQL 语句,并将查询结果写入这个表中,用户程序后续只要从 `avg_vol` 中查询数据即可。 例如:
```mysql
taos> select * from avg_vol;
......@@ -70,43 +55,27 @@ taos> select * from avg_vol;
需要注意,查询时间窗口的最小值是10毫秒,没有时间窗口范围的上限。
此外,TDengine还支持用户指定连续查询的起止时间。
如果不输入开始时间,连续查询将从第一条原始数据所在的时间窗口开始;
如果没有输入结束时间,连续查询将永久运行;
如果用户指定了结束时间,连续查询在系统时间达到指定的时间以后停止运行。
比如使用下面的SQL创建的连续查询将运行一小时,之后会自动停止。
此外,TDengine还支持用户指定连续查询的起止时间。如果不输入开始时间,连续查询将从第一条原始数据所在的时间窗口开始;如果没有输入结束时间,连续查询将永久运行;如果用户指定了结束时间,连续查询在系统时间达到指定的时间以后停止运行。比如使用下面的SQL创建的连续查询将运行一小时,之后会自动停止。
```mysql
create table avg_vol as select avg(voltage) from meters where ts > now and ts <= now + 1h interval(1m) sliding(30s);
```
需要说明的是,上面例子中的 `now` 是指创建连续查询的时间,而不是查询执行的时间,否则,查询就无法自动停止了。
另外,为了尽量避免原始数据延迟写入导致的问题,TDengine中连续查询的计算有一定的延迟。
也就是说,一个时间窗口过去后,TDengine并不会立即计算这个窗口的数据,
所以要稍等一会(一般不会超过1分钟)才能查到计算结果。
需要说明的是,上面例子中的 `now` 是指创建连续查询的时间,而不是查询执行的时间,否则,查询就无法自动停止了。另外,为了尽量避免原始数据延迟写入导致的问题,TDengine中连续查询的计算有一定的延迟。也就是说,一个时间窗口过去后,TDengine并不会立即计算这个窗口的数据,所以要稍等一会(一般不会超过1分钟)才能查到计算结果。
### 管理连续查询
用户可在控制台中通过 `show streams` 命令来查看系统中全部运行的连续查询,
并可以通过 `kill stream` 命令杀掉对应的连续查询。
后续版本会提供更细粒度和便捷的连续查询管理命令。
用户可在控制台中通过 `show streams` 命令来查看系统中全部运行的连续查询,并可以通过 `kill stream` 命令杀掉对应的连续查询。后续版本会提供更细粒度和便捷的连续查询管理命令。
## <a class="anchor" id="subscribe"></a>数据订阅(Publisher/Subscriber)
## <a class="anchor" id="subscribe"></a>数据订阅(Publisher/Subscriber)
基于数据天然的时间序列特性,TDengine的数据写入(insert)与消息系统的数据发布(pub)逻辑上一致,
均可视为系统中插入一条带时间戳的新记录。
同时,TDengine在内部严格按照数据时间序列单调递增的方式保存数据。
本质上来说,TDengine中里每一张表均可视为一个标准的消息队列。
基于数据天然的时间序列特性,TDengine的数据写入(insert)与消息系统的数据发布(pub)逻辑上一致,均可视为系统中插入一条带时间戳的新记录。同时,TDengine在内部严格按照数据时间序列单调递增的方式保存数据。本质上来说,TDengine中里每一张表均可视为一个标准的消息队列。
TDengine内嵌支持轻量级的消息订阅与推送服务。
使用系统提供的API,用户可使用普通查询语句订阅数据库中的一张或多张表。
订阅的逻辑和操作状态的维护均是由客户端完成,客户端定时轮询服务器是否有新的记录到达,
有新的记录到达就会将结果反馈到客户。
TDengine内嵌支持轻量级的消息订阅与推送服务。使用系统提供的API,用户可使用普通查询语句订阅数据库中的一张或多张表。订阅的逻辑和操作状态的维护均是由客户端完成,客户端定时轮询服务器是否有新的记录到达,有新的记录到达就会将结果反馈到客户。
TDengine的订阅与推送服务的状态是客户端维持,TDengine服务器并不维持。
因此如果应用重启,从哪个时间点开始获取最新数据,由应用决定。
TDengine的订阅与推送服务的状态是客户端维持,TDengine服务器并不维持。因此如果应用重启,从哪个时间点开始获取最新数据,由应用决定。
TDengine的API中,与订阅相关的主要有以下三个:
......@@ -116,12 +85,9 @@ taos_consume
taos_unsubscribe
```
这些API的文档请见 [C/C++ Connector](https://www.taosdata.com/cn/documentation/connector/)
下面仍以智能电表场景为例介绍一下它们的具体用法(超级表和子表结构请参考上一节“连续查询”),
完整的示例代码可以在 [这里](https://github.com/taosdata/TDengine/blob/master/tests/examples/c/subscribe.c) 找到。
这些API的文档请见 [C/C++ Connector](https://www.taosdata.com/cn/documentation/connector#c-cpp),下面仍以智能电表场景为例介绍一下它们的具体用法(超级表和子表结构请参考上一节“连续查询”),完整的示例代码可以在 [这里](https://github.com/taosdata/TDengine/blob/master/tests/examples/c/subscribe.c) 找到。
如果我们希望当某个电表的电流超过一定限制(比如10A)后能得到通知并进行一些处理, 有两种方法:
一是分别对每张子表进行查询,每次查询后记录最后一条数据的时间戳,后续只查询这个时间戳之后的数据:
如果我们希望当某个电表的电流超过一定限制(比如10A)后能得到通知并进行一些处理, 有两种方法:一是分别对每张子表进行查询,每次查询后记录最后一条数据的时间戳,后续只查询这个时间戳之后的数据:
```sql
select * from D1001 where ts > {last_timestamp1} and current > 10;
......@@ -129,8 +95,7 @@ select * from D1002 where ts > {last_timestamp2} and current > 10;
...
```
这确实可行,但随着电表数量的增加,查询数量也会增加,客户端和服务端的性能都会受到影响,
当电表数增长到一定的程度,系统就无法承受了。
这确实可行,但随着电表数量的增加,查询数量也会增加,客户端和服务端的性能都会受到影响,当电表数增长到一定的程度,系统就无法承受了。
另一种方法是对超级表进行查询。这样,无论有多少电表,都只需一次查询:
......@@ -138,12 +103,7 @@ select * from D1002 where ts > {last_timestamp2} and current > 10;
select * from meters where ts > {last_timestamp} and current > 10;
```
但是,如何选择 `last_timestamp` 就成了一个新的问题。
因为,一方面数据的产生时间(也就是数据时间戳)和数据入库的时间一般并不相同,有时偏差还很大;
另一方面,不同电表的数据到达TDengine的时间也会有差异。
所以,如果我们在查询中使用最慢的那台电表的数据的时间戳作为 `last_timestamp`
就可能重复读入其它电表的数据;
如果使用最快的电表的时间戳,其它电表的数据就可能被漏掉。
但是,如何选择 `last_timestamp` 就成了一个新的问题。因为,一方面数据的产生时间(也就是数据时间戳)和数据入库的时间一般并不相同,有时偏差还很大;另一方面,不同电表的数据到达TDengine的时间也会有差异。所以,如果我们在查询中使用最慢的那台电表的数据的时间戳作为 `last_timestamp`,就可能重复读入其它电表的数据;如果使用最快的电表的时间戳,其它电表的数据就可能被漏掉。
TDengine的订阅功能为上面这个问题提供了一个彻底的解决方案。
......@@ -160,47 +120,29 @@ if (async) {
}
```
TDengine中的订阅既可以是同步的,也可以是异步的,
上面的代码会根据从命令行获取的参数`async`的值来决定使用哪种方式。
这里,同步的意思是用户程序要直接调用`taos_consume`来拉取数据,
而异步则由API在内部的另一个线程中调用`taos_consume`
然后把拉取到的数据交给回调函数`subscribe_callback`去处理。
TDengine中的订阅既可以是同步的,也可以是异步的,上面的代码会根据从命令行获取的参数`async`的值来决定使用哪种方式。这里,同步的意思是用户程序要直接调用`taos_consume`来拉取数据,而异步则由API在内部的另一个线程中调用`taos_consume`,然后把拉取到的数据交给回调函数`subscribe_callback`去处理。
参数`taos`是一个已经建立好的数据库连接,在同步模式下无特殊要求。
但在异步模式下,需要注意它不会被其它线程使用,否则可能导致不可预计的错误,
因为回调函数在API的内部线程中被调用,而TDengine的部分API不是线程安全的。
参数`taos`是一个已经建立好的数据库连接,在同步模式下无特殊要求。但在异步模式下,需要注意它不会被其它线程使用,否则可能导致不可预计的错误,因为回调函数在API的内部线程中被调用,而TDengine的部分API不是线程安全的。
参数`sql`是查询语句,可以在其中使用where子句指定过滤条件。
在我们的例子中,如果只想订阅电流超过10A时的数据,可以这样写:
参数`sql`是查询语句,可以在其中使用where子句指定过滤条件。在我们的例子中,如果只想订阅电流超过10A时的数据,可以这样写:
```sql
select * from meters where current > 10;
```
注意,这里没有指定起始时间,所以会读到所有时间的数据。
如果只想从一天前的数据开始订阅,而不需要更早的历史数据,可以再加上一个时间条件:
注意,这里没有指定起始时间,所以会读到所有时间的数据。如果只想从一天前的数据开始订阅,而不需要更早的历史数据,可以再加上一个时间条件:
```sql
select * from meters where ts > now - 1d and current > 10;
```
订阅的`topic`实际上是它的名字,因为订阅功能是在客户端API中实现的,
所以没必要保证它全局唯一,但需要它在一台客户端机器上唯一。
订阅的`topic`实际上是它的名字,因为订阅功能是在客户端API中实现的,所以没必要保证它全局唯一,但需要它在一台客户端机器上唯一。
如果名`topic`的订阅不存在,参数`restart`没有意义;
但如果用户程序创建这个订阅后退出,当它再次启动并重新使用这个`topic`时,
`restart`就会被用于决定是从头开始读取数据,还是接续上次的位置进行读取。
本例中,如果`restart`**true**(非零值),用户程序肯定会读到所有数据。
但如果这个订阅之前就存在了,并且已经读取了一部分数据,
`restart`**false****0**),用户程序就不会读到之前已经读取的数据了。
如果名`topic`的订阅不存在,参数`restart`没有意义;但如果用户程序创建这个订阅后退出,当它再次启动并重新使用这个`topic`时,`restart`就会被用于决定是从头开始读取数据,还是接续上次的位置进行读取。本例中,如果`restart`**true**(非零值),用户程序肯定会读到所有数据。但如果这个订阅之前就存在了,并且已经读取了一部分数据,且`restart`**false****0**),用户程序就不会读到之前已经读取的数据了。
`taos_subscribe`的最后一个参数是以毫秒为单位的轮询周期。
在同步模式下,如果前后两次调用`taos_consume`的时间间隔小于此时间,
`taos_consume`会阻塞,直到间隔超过此时间。
异步模式下,这个时间是两次调用回调函数的最小时间间隔。
`taos_subscribe`的最后一个参数是以毫秒为单位的轮询周期。在同步模式下,如果前后两次调用`taos_consume`的时间间隔小于此时间,`taos_consume`会阻塞,直到间隔超过此时间。异步模式下,这个时间是两次调用回调函数的最小时间间隔。
`taos_subscribe`的倒数第二个参数用于用户程序向回调函数传递附加参数,
订阅API不对其做任何处理,只原样传递给回调函数。此参数在同步模式下无意义。
`taos_subscribe`的倒数第二个参数用于用户程序向回调函数传递附加参数,订阅API不对其做任何处理,只原样传递给回调函数。此参数在同步模式下无意义。
订阅创建以后,就可以消费其数据了,同步模式下,示例代码是下面的 else 部分:
......@@ -219,9 +161,7 @@ if (async) {
}
```
这里是一个 **while** 循环,用户每按一次回车键就调用一次`taos_consume`
`taos_consume`的返回值是查询到的结果集,与`taos_use_result`完全相同,
例子中使用这个结果集的代码是函数`print_result`
这里是一个 **while** 循环,用户每按一次回车键就调用一次`taos_consume`,而`taos_consume`的返回值是查询到的结果集,与`taos_use_result`完全相同,例子中使用这个结果集的代码是函数`print_result`
```c
void print_result(TAOS_RES* res, int blockFetch) {
......@@ -247,8 +187,7 @@ void print_result(TAOS_RES* res, int blockFetch) {
}
```
其中的 `taos_print_row` 用于处理订阅到数据,在我们的例子中,它会打印出所有符合条件的记录。
而异步模式下,消费订阅到的数据则显得更为简单:
其中的 `taos_print_row` 用于处理订阅到数据,在我们的例子中,它会打印出所有符合条件的记录。而异步模式下,消费订阅到的数据则显得更为简单:
```c
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
......@@ -262,11 +201,7 @@ void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
taos_unsubscribe(tsub, keep);
```
其第二个参数,用于决定是否在客户端保留订阅的进度信息。
如果这个参数是**false****0**),那无论下次调用`taos_subscribe`的时的`restart`参数是什么,
订阅都只能重新开始。
另外,进度信息的保存位置是 *{DataDir}/subscribe/* 这个目录下,
每个订阅有一个与其`topic`同名的文件,删掉某个文件,同样会导致下次创建其对应的订阅时只能重新开始。
其第二个参数,用于决定是否在客户端保留订阅的进度信息。如果这个参数是**false****0**),那无论下次调用`taos_subscribe`时的`restart`参数是什么,订阅都只能重新开始。另外,进度信息的保存位置是 *{DataDir}/subscribe/* 这个目录下,每个订阅有一个与其`topic`同名的文件,删掉某个文件,同样会导致下次创建其对应的订阅时只能重新开始。
代码介绍完毕,我们来看一下实际的运行效果。假设:
......@@ -289,12 +224,11 @@ $ taos
> insert into D1001 values(now, 12, 220, 1);
```
这时,因为电流超过了10A,您应该可以看到示例程序将它输出到了屏幕上。
您可以继续插入一些数据观察示例程序的输出。
这时,因为电流超过了10A,您应该可以看到示例程序将它输出到了屏幕上。您可以继续插入一些数据观察示例程序的输出。
### Java 使用数据订阅功能
订阅功能也提供了 Java 开发接口,相关说明请见 [Java Connector](https://www.taosdata.com/cn/documentation/connector/)。需要注意的是,目前 Java 接口没有提供异步订阅模式,但用户程序可以通过创建 `TimerTask` 等方式达到同样的效果。
订阅功能也提供了 Java 开发接口,相关说明请见 [Java Connector](https://www.taosdata.com/cn/documentation/connector/java#subscribe)。需要注意的是,目前 Java 接口没有提供异步订阅模式,但用户程序可以通过创建 `TimerTask` 等方式达到同样的效果。
下面以一个示例程序介绍其具体使用方法。它所完成的功能与前面介绍的 C 语言示例基本相同,也是订阅数据库中所有电流超过 10A 的记录。
......@@ -404,7 +338,7 @@ ts: 1597466400000 current: 12.4 voltage: 220 phase: 1 location: Beijing.Chaoyang
```
## <a class="anchor" id="cache"></a>缓存(Cache)
## <a class="anchor" id="cache"></a>缓存(Cache)
TDengine采用时间驱动缓存管理策略(First-In-First-Out,FIFO),又称为写驱动的缓存管理机制。这种策略有别于读驱动的数据缓存模式(Least-Recent-Use,LRU),直接将最近写入的数据保存在系统的缓存中。当缓存达到临界值的时候,将最早的数据批量写入磁盘。一般意义上来说,对于物联网数据的使用,用户最为关心最近产生的数据,即当前状态。TDengine充分利用了这一特性,将最近到达的(当前状态)数据保存在缓存中。
......@@ -423,7 +357,7 @@ select last_row(voltage) from meters where location='Beijing.Chaoyang';
该SQL语句将获取所有位于北京朝阳区的电表最后记录的电压值。
## <a class="anchor" id="alert"></a>报警监测(Alert)
## <a class="anchor" id="alert"></a>报警监测(Alert)
在 TDengine 的应用场景中,报警监测是一个常见需求,从概念上说,它要求程序从最近一段时间的数据中筛选出符合一定条件的数据,并基于这些数据根据定义好的公式计算出一个结果,当这个结果符合某个条件且持续一定时间后,以某种形式通知用户。
......
......@@ -285,7 +285,7 @@ JDBC连接器可能报错的错误码包括3种:JDBC driver本身的报错(
* https://github.com/taosdata/TDengine/blob/develop/src/connector/jdbc/src/main/java/com/taosdata/jdbc/TSDBErrorNumbers.java
* https://github.com/taosdata/TDengine/blob/develop/src/inc/taoserror.h
### 订阅
### <a class="anchor" id="subscribe"></a>订阅
#### 创建
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册