@@ -133,7 +133,7 @@ For more details please refer to [Aggregate by Window](/taos-sql/interval).
### Query
In [Insert](/develop/insert-data/sql-writing), a database named `power` is created and some data are inserted into stable `meters`. Below sample code demonstrates how to query the data in this stable.
In the section describing [Insert](/develop/insert-data/sql-writing), a database named `power` is created and some data are inserted into stable `meters`. Below sample code demonstrates how to query the data in this stable.
description: "Continuous query is a query that's executed automatically according to predefined frequency to provide aggregate query capability by time window, it's actually a simplified time driven stream computing."
Continuous query is a query that's executed automatically according to predefined frequency to provide aggregate query capability by time window, it's actually a simplified time driven stream computing. Continuous query can be performed on a table or stable in TDengine. The result of continuous query can be pushed to client or written back to TDengine. Each query is executed on a time window, which moves forward with time. The size of time window and the forward sliding time need to be specified with parameter `INTERVAL` and `SLIDING` respectively.
Continuous query in TDengine is time driven, and can be defined using TAOS SQL directly without any extra operations. With continuous query, the result can be generated according to time window to achieve down sampling of original data. Once a continuous query is defined using TAOS SQL, the query is automatically executed at the end of each time window and the result is pushed back to client or written to TDengine.
TDengine 提供的连续查询与普通流计算中的时间窗口计算具有以下区别:
There are some differences between continuous query in TDengine and time window computation in stream computing:
- The computation is performed and the result is returned in real time in stream computing, but the computation in continuous query is only started when a time window closes. For example, if the time window is 1 day, then the result will only be generated at 23:59:59.
- If a historical data row is written in to a time widow for which the computation has been finished, the computation will not be performed again and the result will not be pushed to client again either. If the result has been written into TDengine, there will be no update for the result.
- In continuous query, if the result is pushed to client, the client status is not cached on the server side and Exactly-once is not guaranteed by the server either. If the client program crashes, a new time window will be generated from the time where the continuous query is restarted. If the result is written into TDengine, the data written into TDengine can be guaranteed as valid and continuous.
In this section the use case of meters will be used to introduce how to use continuous query. Assume the stable and sub tables have been created using below SQL statement.
```sql
create table meters (ts timestamp, current float, voltage int, phase float) tags (location binary(64), groupId int);
create table D1001 using meters tags ("Beijing.Chaoyang", 2);
create table D1002 using meters tags ("Beijing.Haidian", 2);
...
```
可以通过下面这条 SQL 语句以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压。
The average voltage for each time window of one minute with 30 seconds as the length of moving forward can be retrieved using below SQL statement.
```sql
select avg(voltage) from meters interval(1m) sliding(30s);
Whenever the above SQL statement is executed, all the existing data will be computed again. If the computation needs to be performed every 30 seconds automatically to compute on the data in the past one minute, the above SQL statement needs to be revised as below, in which `{startTime}` stands for the beginning timestamp in the latest time window.
```sql
select avg(voltage) from meters where ts > {startTime} interval(1m) sliding(30s);
A table named as `avg_vol` will be created automatically, then every 30 seconds the `select` statement will be executed automatically on the data in the past 1 minutes, i.e. the latest time window, and the result is written into table `avg_vol`. The client program just needs to query from table `avg_vol`. For example:
```sql
taos> select * from avg_vol;
...
...
@@ -69,16 +68,16 @@ taos> select * from avg_vol;
2020-07-29 13:39:00.000 | 223.0800000 |
```
需要注意,查询时间窗口的最小值是 10 毫秒,没有时间窗口范围的上限。
Please be noted that the minimum allowed time window is 10 milliseconds, and no upper limit.
Besides, it's allowed to specify the start and end time of continuous query. If the start time is not specified, the timestamp of the first original row will be considered as the start time; if the end time is not specified, the continuous will be performed infinitely, otherwise it will be terminated once the end time is reached. For example, the continuous query in below SQL statement will be started from now and terminated one hour later.
```sql
create table avg_vol as select avg(voltage) from meters where ts > now and ts <= now + 1h interval(1m) sliding(30s);
`now` in above SQL statement stands for the time when the continuous query is created, not the time when the computation is actually performed. Besides, to avoid the trouble caused by the delay of original data as much as possible, the actual computation in continuous query is also started with a little delay. That means, once a time window closes, the computation is not started immediately. Normally, the result can only be available a little time later, normally within one minute, after the time window closes.
`show streams` command can be used in TDengine CLI `taos` to show all the continuous queries in the system, and `kill stream` can be used to terminate a continuous query.
description: "Lightweight service for data subscription and pushing, the time series data inserted into TDengine continuously can be pushed automatically to the subscribing clients."
title: Data Subscription
---
import Tabs from "@theme/Tabs";
...
...
@@ -14,13 +14,13 @@ import Node from "./_sub_node.mdx";
According to the time series nature of the data, data inserting in TDengine is similar to data publishing in message queues, they both can be considered as a new data record with timestamp is inserted into the system. Data is stored in ascending order of timestamp inside TDengine, so essentially each table in TDengine can be considered as a message queue.
Lightweight service for data subscription and pushing is built in TDengine. With the API provided by TDengine, client programs can used `select` statement to subscribe the data from one or more tables. The subscription and and state maintenance is performed on the client side, the client programs polls the server to check whether there is new data, and if so the new data will be pushed back to the client side. If the client program is restarted, where to start for retrieving new data is up to the client side.
TDengine 的 API 中,与订阅相关的主要有以下三个:
There are 3 major APIs related to subscription provided in the TDengine client driver.
```c
taos_subscribe
...
...
@@ -28,9 +28,11 @@ taos_consume
taos_unsubscribe
```
这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面仍以智能电表场景为例介绍一下它们的具体用法(超级表和子表结构请参考上一节“连续查询”),完整的示例代码可以在 [这里](https://github.com/taosdata/TDengine/blob/master/examples/c/subscribe.c) 找到。
For more details about these API please refer to [C/C++ Connector](/reference/connector/cpp). Their usage will be introduced below using the use case of meters, in which the schema of stable and sub tables please refer to the previous section "continuous query". Full sample code can be found [here](https://github.com/taosdata/TDengine/blob/master/examples/c/subscribe.c).
If we want to get notification and take some actions if the current exceeds a threshold, like 10A, from some meters, there are two ways:
The first way is to query on each sub table and record the last timestamp matching the criteria, then after some time query on the data later than recorded timestamp and repeat this process. The SQL statements for this way are as below.
```sql
select * from D1001 where ts > {last_timestamp1} and current > 10;
...
...
@@ -38,19 +40,19 @@ select * from D1002 where ts > {last_timestamp2} and current > 10;
The above way works, but the problem is that the number of `select` statements increases with the number of meters grows. Finally the performance of both client side and server side will be unacceptable once the number of meters grows to a big enough number.
另一种方法是对超级表进行查询。这样,无论有多少电表,都只需一次查询:
A better way is to query on the stable, only one `select` is enough regardless of the number of meters, like below:
```sql
select * from meters where ts > {last_timestamp} and current > 10;
However, how to choose `last_timestamp` becomes a new problem if using this way. Firstly, the timestamp when the data is generated is different from the timestamp when the data is inserted into the database, sometimes the difference between them may be very big. Secondly, the time when the data from different meters may arrives at the database may be different too. If the timestamp of the "slowest" meter is used as `last_timestamp` in the query, the data from other meters may be selected repeatedly; but if the timestamp of the "fasted" meters is used as `last_timestamp`, some data from other meters may be missed.
TDengine 的订阅功能为上面这个问题提供了一个彻底的解决方案。
All the problems mentioned above can be resolved thoroughly using subscription provided by TDengine.
首先是使用 `taos_subscribe` 创建订阅:
The first step is to create subscription using `taos_subscribe`.
The subscription in TDengine can be either synchronous or asynchronous. In the above sample code, the value of variable `async` is determined from the CLI input, then it's used to create either an async or sync subscription. Sync subscription means the client program needs to invoke `taos_consume` to retrieve data, and async subscription means another thread created by `taos_subscribe` internally invokes `taos_consume` to retrieve data and pass the data to `subscribe_callback` for processing, `subscribe_callback` is a call back function provided by the client program and it's suggested not to do time consuming operation in the call back function.
参数 `taos` 是一个已经建立好的数据库连接,在同步模式下无特殊要求。但在异步模式下,需要注意它不会被其它线程使用,否则可能导致不可预计的错误,因为回调函数在 API 的内部线程中被调用,而 TDengine 的部分 API 不是线程安全的。
The parameter `taos` is an established connection. There is nothing special in sync subscription mode. In async subscription, it should be exclusively by current thread, otherwise unpredictable error may occur.
参数 `sql` 是查询语句,可以在其中使用 where 子句指定过滤条件。在我们的例子中,如果只想订阅电流超过 10A 时的数据,可以这样写:
The parameter `sql` is a `select` statement in which `where` clause can be used to specify filter conditions. In our example, the data whose current exceeds 10A needs to be subscribed like below SQL statement:
Please be noted that, all the data will be processed because no start time is specified. If only the data from one day ago needs to be processed, a time related condition can be added:
```sql
select * from meters where ts > now - 1d and current > 10;
```
订阅的 `topic` 实际上是它的名字,因为订阅功能是在客户端 API 中实现的,所以没必要保证它全局唯一,但需要它在一台客户端机器上唯一。
The parameter `topic` is the name of the subscription, it needs to be guaranteed unique in the client program, but it's not necessary to be globally unique because subscription is implemented in the APIs on client side.
If the subscription named as `topic` doesn't exist, parameter `restart` would be ignored. If the subscription named as `topic` has been created before by the client program which then exited, when the client program is restarted to use this `topic`, parameter `restart` is used to determine retrieving data from beginning or from the last point where the subscription was broken. If the value of `restart` is **true** (i.e. a non-zero value), the data will be retrieved from beginning, or if it is **false** (i.e. zero), the data already consumed before will not be processed again.
The last parameter of `taos_subscribe` is the polling interval in unit of millisecond. In sync mode, if the time difference between two continuous invocations to `taos_consume` is smaller than the interval specified by `taos_subscribe`, `taos_consume` would be blocked until the interval is reached. In async mode, this interval is the minimum interval between two invocations to the call back function.
`taos_subscribe` 的倒数第二个参数用于用户程序向回调函数传递附加参数,订阅 API 不对其做任何处理,只原样传递给回调函数。此参数在同步模式下无意义。
The last second parameter of `taos_subscribe` is used to pass arguments to the call back function. `taos_subscribe` doesn't process this parameter and simply passes it to the call back function. This parameter is simply ignored in sync mode.
订阅创建以后,就可以消费其数据了,同步模式下,示例代码是下面的 else 部分:
After a subscription is created, its data can be consumed and processed, below is the sample code of how to consume data in sync mode, in the else part if `if (async)`.
In the above sample code, there is an infinite loop, each time carriage return is entered `taos_consume` is invoked, the return value of `taos_consume` is the selected result set, exactly as the input of `taos_use_result`, in the above sample `print_result` is used instead to simplify the sample. Below is the implementation of `print_result`.
```c
void print_result(TAOS_RES* res, int blockFetch) {
...
...
@@ -131,7 +133,9 @@ void print_result(TAOS_RES* res, int blockFetch) {
The second parameter `keep` is used to specify whether to keep the subscription progress on the client sde. If it is **false**, i.e. **0**, then subscription will be restarted from beginning regardless of the `restart` parameter's value in when `taos_subscribe` is invoked again. The subscription progress information is stored in _{DataDir}/subscribe/_ , under which there is a file with same name as `topic` for each subscription, the subscription will be restarted from beginning if the corresponding progress file is removed.
代码介绍完毕,我们来看一下实际的运行效果。假设:
Now let's see the effect of the above sample code, assuming below prerequisites have been done.
- 示例代码已经下载到本地
- TDengine 也已经在同一台机器上安装好
- 示例所需的数据库、超级表、子表已经全部创建好
- The sample code has been downloaded to local system 示
- TDengine has been installed and launched properly on same system
- The database, stable, sub tables required in the sample code have been ready
则可以在示例代码所在目录执行以下命令来编译并启动示例程序:
It's ready to launch below command in the directory where the sample code resides to compile and start the program.
```bash
make
./subscribe -sql='select * from meters where current > 10;'
After the program is started, open another terminal and launch TDengine CLI `taos`, then use below SQL commands to insert a row whose current is 12A into table **D1001**.
Then, this row of data will be shown by the example program on the first terminal because its current exceeds 10A. More data can be inserted for you to observe the output of the example program.
## 示例程序
## Examples
下面的示例程序展示是如何使用连接器订阅所有电流超过 10A 的记录。
Below example program demonstrates how to subscribe the data rows whose current exceeds 10A using connectors.
### 准备数据
### Prepare Data
```
```bash
# create database "power"
taos> create database power;
# use "power" as the database in following operations
...
...
@@ -200,20 +203,21 @@ taos> select * from meters where current > 10;
The cache management policy in TDengine is First-In-First-Out (FIFO), which is also known as insert driven cache management policy and different from read driven cache management, i.e. Least-Recent-Used (LRU). It simply stores the latest data in cache and flushes the oldest data in cache to disk when the cache usage reaches a threshold. In IoT use cases, the most cared about data is the latest data, i.e. current state. The cache policy in TDengine is based the nature of IoT data.
Caching the latest data provides the capability of retrieving data in milliseconds. With this capability, TDengine can be configured properly to be used as caching system without deploying another separate caching system to simplify the system architecture and minimize the operation cost. The cache will be emptied after TDengine is restarted, TDengine doesn't reload data from disk into cache like a real key-value caching system.
The memory space used by TDengine cache is fixed in size, according to the configuration based on application requirement and system resources. Independent memory pool is allocated for and managed by each vnode (virtual node) in TDengine, there is no sharing of memory pools between vnodes. All the tables belonging to a vnode share all the cache memory of the vnode.
Memory pool is divided into blocks and data is stored in row format in memory and each block follows FIFO policy. The size of each block is determined by configuration parameter `cache`, the number of blocks for each vnode is determined by `blocks`. For each vnode, the total cache size is `cache * blocks`. It's better to set the size of each block to hold at least tends of rows.
`last_row` function can be used to retrieve the last row of a table or a stable to quickly show the current state of devices on monitoring screen. For example below SQL statement retrieves the latest voltage of all meters in Chaoyang district of Beijing.