07-subscribe.mdx 13.7 KB
Newer Older
D
dingbo 已提交
1
---
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
2
sidebar_label: Data Subscription
3
description: "Lightweight service for data subscription and publishing. Time series data inserted into TDengine continuously can be pushed automatically to subscribing clients."
4
title: Data Subscription
D
dingbo 已提交
5 6 7 8 9 10 11 12 13 14 15 16
---

import Tabs from "@theme/Tabs";
import TabItem from "@theme/TabItem";
import Java from "./_sub_java.mdx";
import Python from "./_sub_python.mdx";
import Go from "./_sub_go.mdx";
import Rust from "./_sub_rust.mdx";
import Node from "./_sub_node.mdx";
import CSharp from "./_sub_cs.mdx";
import CDemo from "./_sub_c.mdx";

17
## Introduction
D
dingbo 已提交
18

19
Due to the nature of time series data, data insertion into TDengine is similar to data publishing in message queues. Data is stored in ascending order of timestamp inside TDengine, and so each table in TDengine can essentially be considered as a message queue.
D
dingbo 已提交
20

21
A lightweight service for data subscription and publishing is built into TDengine. With the API provided by TDengine, client programs can use `select` statements to subscribe to data from one or more tables. The subscription and state maintenance is performed on the client side. The client programs poll the server to check whether there is new data, and if so the new data will be pushed back to the client side. If the client program is restarted, where to start retrieving new data is up to the client side.
D
dingbo 已提交
22

23
There are 3 major APIs related to subscription provided in the TDengine client driver.
D
dingbo 已提交
24 25 26 27 28 29 30

```c
taos_subscribe
taos_consume
taos_unsubscribe
```

S
Sean Ely 已提交
31
For more details about these APIs please refer to [C/C++ Connector](/reference/connector/cpp). Their usage will be introduced below using the use case of meters, in which the schema of STable and subtables from the previous section [Continuous Query](/develop/continuous-query) are used. Full sample code can be found [here](https://github.com/taosdata/TDengine/blob/master/examples/c/subscribe.c).
D
dingbo 已提交
32

S
Sean Ely 已提交
33
If we want to get a notification and take some actions if the current exceeds a threshold, like 10A, from some meters, there are two ways:
34

35
The first way is to query each sub table and record the last timestamp matching the criteria. Then after some time, query the data later than the recorded timestamp, and repeat this process. The SQL statements for this way are as below.
D
dingbo 已提交
36 37 38 39 40 41 42

```sql
select * from D1001 where ts > {last_timestamp1} and current > 10;
select * from D1002 where ts > {last_timestamp2} and current > 10;
...
```

S
Sean Ely 已提交
43
The above way works, but the problem is that the number of `select` statements increases with the number of meters. Additionally, the performance of both client side and server side will be unacceptable once the number of meters grows to a big enough number.
D
dingbo 已提交
44

G
gccgdb1234 已提交
45
A better way is to query on the STable, only one `select` is enough regardless of the number of meters, like below:
D
dingbo 已提交
46 47 48 49 50

```sql
select * from meters where ts > {last_timestamp} and current > 10;
```

S
Sean Ely 已提交
51
However, this presents a new problem in how to choose `last_timestamp`. First, the timestamp when the data is generated is different from the timestamp when the data is inserted into the database, sometimes the difference between them may be very big. Second, the time when the data from different meters arrives at the database may be different too. If the timestamp of the "slowest" meter is used as `last_timestamp` in the query, the data from other meters may be selected repeatedly; but if the timestamp of the "fastest" meter is used as `last_timestamp`, some data from other meters may be missed.
D
dingbo 已提交
52

53
All the problems mentioned above can be resolved easily using the subscription functionality provided by TDengine.
D
dingbo 已提交
54

55
The first step is to create subscription using `taos_subscribe`.
D
dingbo 已提交
56 57 58 59

```c
TAOS_SUB* tsub = NULL;
if (async) {
D
dingbo 已提交
60
  // create an asynchronous subscription, the callback function will be called every 1s
D
dingbo 已提交
61 62
  tsub = taos_subscribe(taos, restart, topic, sql, subscribe_callback, &blockFetch, 1000);
} else {
D
dingbo 已提交
63
  // create an synchronous subscription, need to call 'taos_consume' manually
D
dingbo 已提交
64 65 66 67
  tsub = taos_subscribe(taos, restart, topic, sql, NULL, NULL, 0);
}
```

68
The subscription in TDengine can be either synchronous or asynchronous. In the above sample code, the value of variable `async` is determined from the CLI input, then it's used to create either an async or sync subscription. Sync subscription means the client program needs to invoke `taos_consume` to retrieve data, and async subscription means another thread created by `taos_subscribe` internally invokes `taos_consume` to retrieve data and pass the data to `subscribe_callback` for processing. `subscribe_callback` is a callback function provided by the client program. You should not perform time consuming operations in the callback function.
D
dingbo 已提交
69

70
The parameter `taos` is an established connection. Nothing special needs to be done for thread safety for synchronous subscription. For asynchronous subscription, the taos_subscribe function should be called exclusively by the current thread, to avoid unpredictable errors.
D
dingbo 已提交
71

72
The parameter `sql` is a `select` statement in which the `where` clause can be used to specify filter conditions. In our example, we can subscribe to the records in which the current exceeds 10A, with the following SQL statement:
D
dingbo 已提交
73 74 75 76 77

```sql
select * from meters where current > 10;
```

78
Please note that, all the data will be processed because no start time is specified. If we only want to process data for the past day, a time related condition can be added:
D
dingbo 已提交
79 80 81 82 83

```sql
select * from meters where ts > now - 1d and current > 10;
```

84
The parameter `topic` is the name of the subscription. The client application must guarantee that the name is unique. However, it doesn't have to be globally unique because subscription is implemented in the APIs on the client side.
D
dingbo 已提交
85

86
If the subscription named as `topic` doesn't exist, the parameter `restart` will be ignored. If the subscription named as `topic` has been created before by the client program, when the client program is restarted with the subscription named `topic`, parameter `restart` is used to determine whether to retrieve data from the beginning or from the last point where the subscription was broken.
D
dingbo 已提交
87

88 89 90
If the value of `restart` is **true** (i.e. a non-zero value), data will be retrieved from the beginning. If it is **false** (i.e. zero), the data already consumed before will not be processed again.

The last parameter of `taos_subscribe` is the polling interval in units of millisecond. In sync mode, if the time difference between two continuous invocations to `taos_consume` is smaller than the interval specified by `taos_subscribe`, `taos_consume` will be blocked until the interval is reached. In async mode, this interval is the minimum interval between two invocations to the call back function.
D
dingbo 已提交
91

S
Sean Ely 已提交
92
The second to last parameter of `taos_subscribe` is used to pass arguments to the call back function. `taos_subscribe` doesn't process this parameter and simply passes it to the call back function. This parameter is simply ignored in sync mode.
D
dingbo 已提交
93

94
After a subscription is created, its data can be consumed and processed. Shown below is the sample code to consume data in sync mode, in the else condition of `if (async)`.
D
dingbo 已提交
95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110

```c
if (async) {
  getchar();
} else while(1) {
  TAOS_RES* res = taos_consume(tsub);
  if (res == NULL) {
    printf("failed to consume data.");
    break;
  } else {
    print_result(res, blockFetch);
    getchar();
  }
}
```

111
In the above sample code in the else condition, there is an infinite loop. Each time carriage return is entered `taos_consume` is invoked. The return value of `taos_consume` is the selected result set. In the above sample, `print_result` is used to simplify the printing of the result set. It is similar to `taos_use_result`. Below is the implementation of `print_result`.
D
dingbo 已提交
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137

```c
void print_result(TAOS_RES* res, int blockFetch) {
  TAOS_ROW row = NULL;
  int num_fields = taos_num_fields(res);
  TAOS_FIELD* fields = taos_fetch_fields(res);
  int nRows = 0;
  if (blockFetch) {
    nRows = taos_fetch_block(res, &row);
    for (int i = 0; i < nRows; i++) {
      char temp[256];
      taos_print_row(temp, row + i, fields, num_fields);
      puts(temp);
    }
  } else {
    while ((row = taos_fetch_row(res))) {
      char temp[256];
      taos_print_row(temp, row, fields, num_fields);
      puts(temp);
      nRows++;
    }
  }
  printf("%d rows consumed.\n", nRows);
}
```

138
In the above code `taos_print_row` is used to process the data consumed. All matching rows are printed.
139

140
In async mode, consuming data is simpler as shown below.
D
dingbo 已提交
141 142 143 144 145 146 147

```c
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
  print_result(res, *(int*)param);
}
```

148
`taos_unsubscribe` can be invoked to terminate a subscription.
D
dingbo 已提交
149 150 151 152 153

```c
taos_unsubscribe(tsub, keep);
```

Y
Yu Chen 已提交
154
The second parameter `keep` is used to specify whether to keep the subscription progress on the client sde. If it is **false**, i.e. **0**, then subscription will be restarted from beginning regardless of the `restart` parameter's value when `taos_subscribe` is invoked again. The subscription progress information is stored in _{DataDir}/subscribe/_ , under which there is a file with the same name as `topic` for each subscription(Note: The default value of `DataDir` in the `taos.cfg` file is **/var/lib/taos/**. However, **/var/lib/taos/** does not exist on the Windows server. So you need to change the `DataDir` value to the corresponding existing directory."), the subscription will be restarted from the beginning if the corresponding progress file is removed.
D
dingbo 已提交
155

156
Now let's see the effect of the above sample code, assuming below prerequisites have been done.
D
dingbo 已提交
157

158
- The sample code has been downloaded to local system
159
- TDengine has been installed and launched properly on same system
S
Sean Ely 已提交
160
- The database, STable, and subtables required in the sample code are ready
D
dingbo 已提交
161

S
Sean Ely 已提交
162
Launch the command below in the directory where the sample code resides to compile and start the program.
D
dingbo 已提交
163 164 165 166 167 168

```bash
make
./subscribe -sql='select * from meters where current > 10;'
```

S
Sean Ely 已提交
169
After the program is started, open another terminal and launch TDengine CLI `taos`, then use the below SQL commands to insert a row whose current is 12A into table **D1001**.
D
dingbo 已提交
170 171

```sql
172 173
use test;
insert into D1001 values(now, 12, 220, 1);
D
dingbo 已提交
174 175
```

176
Then, this row of data will be shown by the example program on the first terminal because its current exceeds 10A. More data can be inserted for you to observe the output of the example program.
D
dingbo 已提交
177

178
## Examples
D
dingbo 已提交
179

180
The example program below demonstrates how to subscribe, using connectors, to data rows in which current exceeds 10A.
D
dingbo 已提交
181

182
### Prepare Data
D
dingbo 已提交
183

184
```bash
D
dingbo 已提交
185 186 187 188 189 190 191
# create database "power"
taos> create database power;
# use "power" as the database in following operations
taos> use power;
# create super table "meters"
taos> create table meters(ts timestamp, current float, voltage int, phase int) tags(location binary(64), groupId int);
# create tabes using the schema defined by super table "meters"
G
gccgdb1234 已提交
192 193
taos> create table d1001 using meters tags ("California.SanFrancisco", 2);
taos> create table d1002 using meters tags ("California.LoSangeles", 2);
D
dingbo 已提交
194 195 196 197 198 199 200
# insert some rows
taos> insert into d1001 values("2020-08-15 12:00:00.000", 12, 220, 1),("2020-08-15 12:10:00.000", 12.3, 220, 2),("2020-08-15 12:20:00.000", 12.2, 220, 1);
taos> insert into d1002 values("2020-08-15 12:00:00.000", 9.9, 220, 1),("2020-08-15 12:10:00.000", 10.3, 220, 1),("2020-08-15 12:20:00.000", 11.2, 220, 1);
# filter out the rows in which current is bigger than 10A
taos> select * from meters where current > 10;
           ts            |    current   |    voltage   |  phase |         location          |   groupid   |
===========================================================================================================
G
gccgdb1234 已提交
201 202 203 204 205
 2020-08-15 12:10:00.000 |    10.30000  |     220      |      1 |      California.LoSangeles      |           2 |
 2020-08-15 12:20:00.000 |    11.20000  |     220      |      1 |      California.LoSangeles      |           2 |
 2020-08-15 12:00:00.000 |    12.00000  |     220      |      1 |      California.SanFrancisco     |           2 |
 2020-08-15 12:10:00.000 |    12.30000  |     220      |      2 |      California.SanFrancisco     |           2 |
 2020-08-15 12:20:00.000 |    12.20000  |     220      |      1 |      California.SanFrancisco     |           2 |
D
dingbo 已提交
206 207
Query OK, 5 row(s) in set (0.004896s)
```
208 209

### Example Programs
D
dingbo 已提交
210 211 212

<Tabs defaultValue="java" groupId="lang">
  <TabItem label="Java" value="java">
213
    <Java />
D
dingbo 已提交
214 215
  </TabItem>
  <TabItem label="Python" value="Python">
216
    <Python />
D
dingbo 已提交
217 218 219 220 221
  </TabItem>
  {/* <TabItem label="Go" value="go">
      <Go/>
  </TabItem> */}
  <TabItem label="Rust" value="rust">
222
    <Rust />
D
dingbo 已提交
223 224 225 226 227 228 229 230
  </TabItem>
  {/* <TabItem label="Node.js" value="nodejs">
      <Node/>
  </TabItem>
  <TabItem label="C#" value="csharp">
      <CSharp/>
  </TabItem> */}
  <TabItem label="C" value="c">
231 232
    <CDemo />
  </TabItem>
D
dingbo 已提交
233 234
</Tabs>

235 236
### Run the Examples

S
Sean Ely 已提交
237
The example programs first consume all historical data matching the criteria.
D
dingbo 已提交
238 239

```bash
G
gccgdb1234 已提交
240 241 242 243 244
ts: 1597464000000	current: 12.0	voltage: 220	phase: 1	location: California.SanFrancisco	groupid : 2
ts: 1597464600000	current: 12.3	voltage: 220	phase: 2	location: California.SanFrancisco	groupid : 2
ts: 1597465200000	current: 12.2	voltage: 220	phase: 1	location: California.SanFrancisco	groupid : 2
ts: 1597464600000	current: 10.3	voltage: 220	phase: 1	location: California.LoSangeles	groupid : 2
ts: 1597465200000	current: 11.2	voltage: 220	phase: 1	location: California.LoSangeles	groupid : 2
D
dingbo 已提交
245 246
```

247
Next, use TDengine CLI to insert a new row.
D
dingbo 已提交
248 249 250 251 252 253 254

```
# taos
taos> use power;
taos> insert into d1001 values(now, 12.4, 220, 1);
```

255
Because the current in the inserted row exceeds 10A, it will be consumed by the example program.
D
dingbo 已提交
256 257

```
G
gccgdb1234 已提交
258
ts: 1651146662805	current: 12.4	voltage: 220	phase: 1	location: California.SanFrancisco	groupid: 2
D
dingbo 已提交
259
```