@@ -16,9 +16,9 @@ import CDemo from "./_sub_c.mdx";
## Introduction
According to the time series nature of the data, data inserting in TDengine is similar to data publishing in message queues, they both can be considered as a new data record with timestamp is inserted into the system. Data is stored in ascending order of timestamp inside TDengine, so essentially each table in TDengine can be considered as a message queue.
Due to the nature of time series data, data inserting in TDengine is similar to data publishing in message queues. Data is stored in ascending order of timestamp inside TDengine, so each table in TDengine can essentially be considered as a message queue.
Lightweight service for data subscription and pushing is built in TDengine. With the API provided by TDengine, client programs can used `select` statement to subscribe the data from one or more tables. The subscription and and state maintenance is performed on the client side, the client programs polls the server to check whether there is new data, and if so the new data will be pushed back to the client side. If the client program is restarted, where to start for retrieving new data is up to the client side.
A lightweight service for data subscription and pushing is built in TDengine. With the API provided by TDengine, client programs can use `select` statements to subscribe to data from one or more tables. The subscription and state maintenance is performed on the client side, the client programs poll the server to check whether there is new data, and if so the new data will be pushed back to the client side. If the client program is restarted, where to start for retrieving new data is up to the client side.
There are 3 major APIs related to subscription provided in the TDengine client driver.
...
...
@@ -28,9 +28,9 @@ taos_consume
taos_unsubscribe
```
For more details about these API please refer to [C/C++ Connector](/reference/connector/cpp). Their usage will be introduced below using the use case of meters, in which the schema of STable and sub tables please refer to the previous section "continuous query". Full sample code can be found [here](https://github.com/taosdata/TDengine/blob/master/examples/c/subscribe.c).
For more details about these APIs please refer to [C/C++ Connector](/reference/connector/cpp). Their usage will be introduced below using the use case of meters, in which the schema of STable and subtables from the previous section [Continuous Query](/develop/continuous-query) are used. Full sample code can be found [here](https://github.com/taosdata/TDengine/blob/master/examples/c/subscribe.c).
If we want to get notification and take some actions if the current exceeds a threshold, like 10A, from some meters, there are two ways:
If we want to get a notification and take some actions if the current exceeds a threshold, like 10A, from some meters, there are two ways:
The first way is to query on each sub table and record the last timestamp matching the criteria, then after some time query on the data later than recorded timestamp and repeat this process. The SQL statements for this way are as below.
...
...
@@ -40,7 +40,7 @@ select * from D1002 where ts > {last_timestamp2} and current > 10;
...
```
The above way works, but the problem is that the number of `select` statements increases with the number of meters grows. Finally the performance of both client side and server side will be unacceptable once the number of meters grows to a big enough number.
The above way works, but the problem is that the number of `select` statements increases with the number of meters. Additionally, the performance of both client side and server side will be unacceptable once the number of meters grows to a big enough number.
A better way is to query on the STable, only one `select` is enough regardless of the number of meters, like below:
...
...
@@ -48,7 +48,7 @@ A better way is to query on the STable, only one `select` is enough regardless o
select * from meters where ts > {last_timestamp} and current > 10;
```
However, how to choose `last_timestamp` becomes a new problem if using this way. Firstly, the timestamp when the data is generated is different from the timestamp when the data is inserted into the database, sometimes the difference between them may be very big. Secondly, the time when the data from different meters may arrives at the database may be different too. If the timestamp of the "slowest" meter is used as `last_timestamp` in the query, the data from other meters may be selected repeatedly; but if the timestamp of the "fasted" meters is used as `last_timestamp`, some data from other meters may be missed.
However, this presents a new problem in how to choose `last_timestamp`. First, the timestamp when the data is generated is different from the timestamp when the data is inserted into the database, sometimes the difference between them may be very big. Second, the time when the data from different meters arrives at the database may be different too. If the timestamp of the "slowest" meter is used as `last_timestamp` in the query, the data from other meters may be selected repeatedly; but if the timestamp of the "fastest" meter is used as `last_timestamp`, some data from other meters may be missed.
All the problems mentioned above can be resolved thoroughly using subscription provided by TDengine.
...
...
@@ -75,19 +75,19 @@ The parameter `sql` is a `select` statement in which `where` clause can be used
select * from meters where current > 10;
```
Please be noted that, all the data will be processed because no start time is specified. If only the data from one day ago needs to be processed, a time related condition can be added:
Please note that, all the data will be processed because no start time is specified. If only the data from one day ago needs to be processed, a time related condition can be added:
```sql
select * from meters where ts > now - 1d and current > 10;
```
The parameter `topic` is the name of the subscription, it needs to be guaranteed unique in the client program, but it's not necessary to be globally unique because subscription is implemented in the APIs on client side.
The parameter `topic` is the name of the subscription, it needs to be guaranteed unique in the client program, but it's not necessary to be globally unique because subscription is implemented in the APIs on the client side.
If the subscription named as `topic` doesn't exist, parameter `restart` would be ignored. If the subscription named as `topic` has been created before by the client program which then exited, when the client program is restarted to use this `topic`, parameter `restart` is used to determine retrieving data from beginning or from the last point where the subscription was broken. If the value of `restart` is **true** (i.e. a non-zero value), the data will be retrieved from beginning, or if it is **false** (i.e. zero), the data already consumed before will not be processed again.
If the subscription named as `topic` doesn't exist, the parameter `restart` will be ignored. If the subscription named as `topic` has been created before by the client program, when the client program is restarted with the subscription named `topic`, parameter `restart` is used to determine whether to retrieve data from the beginning or from the last point where the subscription was broken. If the value of `restart` is **true** (i.e. a non-zero value), the data will be retrieved from beginning, or if it is **false** (i.e. zero), the data already consumed before will not be processed again.
The last parameter of `taos_subscribe` is the polling interval in unit of millisecond. In sync mode, if the time difference between two continuous invocations to `taos_consume` is smaller than the interval specified by `taos_subscribe`, `taos_consume` would be blocked until the interval is reached. In async mode, this interval is the minimum interval between two invocations to the call back function.
The last parameter of `taos_subscribe` is the polling interval in unit of millisecond. In sync mode, if the time difference between two continuous invocations to `taos_consume` is smaller than the interval specified by `taos_subscribe`, `taos_consume` will be blocked until the interval is reached. In async mode, this interval is the minimum interval between two invocations to the call back function.
The last second parameter of `taos_subscribe` is used to pass arguments to the call back function. `taos_subscribe` doesn't process this parameter and simply passes it to the call back function. This parameter is simply ignored in sync mode.
The second to last parameter of `taos_subscribe` is used to pass arguments to the call back function. `taos_subscribe` doesn't process this parameter and simply passes it to the call back function. This parameter is simply ignored in sync mode.
After a subscription is created, its data can be consumed and processed, below is the sample code of how to consume data in sync mode, in the else part if `if (async)`.
The second parameter `keep` is used to specify whether to keep the subscription progress on the client sde. If it is **false**, i.e. **0**, then subscription will be restarted from beginning regardless of the `restart` parameter's value in when `taos_subscribe` is invoked again. The subscription progress information is stored in _{DataDir}/subscribe/_ , under which there is a file with same name as `topic` for each subscription, the subscription will be restarted from beginning if the corresponding progress file is removed.
The second parameter `keep` is used to specify whether to keep the subscription progress on the client sde. If it is **false**, i.e. **0**, then subscription will be restarted from beginning regardless of the `restart` parameter's value when `taos_subscribe` is invoked again. The subscription progress information is stored in _{DataDir}/subscribe/_ , under which there is a file with the same name as `topic` for each subscription, the subscription will be restarted from the beginning if the corresponding progress file is removed.
Now let's see the effect of the above sample code, assuming below prerequisites have been done.
- The sample code has been downloaded to local system
- TDengine has been installed and launched properly on same system
- The database, STable, sub tables required in the sample code have been ready
- The database, STable, and subtables required in the sample code are ready
It's ready to launch below command in the directory where the sample code resides to compile and start the program.
Launch the command below in the directory where the sample code resides to compile and start the program.
```bash
make
./subscribe -sql='select * from meters where current > 10;'
```
After the program is started, open another terminal and launch TDengine CLI `taos`, then use below SQL commands to insert a row whose current is 12A into table **D1001**.
After the program is started, open another terminal and launch TDengine CLI `taos`, then use the below SQL commands to insert a row whose current is 12A into table **D1001**.
```sql
use test;
...
...
@@ -232,7 +232,7 @@ Query OK, 5 row(s) in set (0.004896s)
### Run the Examples
The example programs firstly consume all historical data matching the criteria.
The example programs first consume all historical data matching the criteria.