也可以使用 driver-go 的 af 包建立连接。这个模块封装了 TDengine 的高级功能, 如:参数绑定、订阅等。
The af package of driver-go can also be used to establish connection, with this way some advanced features of TDengine, like parameter binding and subscription, can be used.
```go title="使用 af 包建立原生连接"
```go title="Establish native connection using af package"
For Rust connector, the connection depends on the feature being used. If "rest" feature is enabled, then only the implementation for "rest" is compiled and packaged.
Any application programs running on any kind of platforms can access TDengine through the REST API provided by TDengine. For the details please refer to [REST API](/reference/rest-api/). Besides, application programs can use the connectors of multiple languages to access TDengine, including C/C++, Java, Python, Go, Node.js, C#, and Rust. This chapter describes how to establish connection to TDengine and briefly introduce how to install and use connectors. For details about the connectors please refer to [Connectors](https://docs.taosdata.com/reference/connector/)
## 连接器建立连接的方式
## Establish Connection
连接器建立连接的方式,TDengine 提供两种:
There are two ways to establish connections to TDengine:
1.通过 taosAdapter 组件提供的 REST API 建立与 taosd 的连接,这种连接方式下文中简称"REST 连接“
1.Connection to taosd can be established through the REST API provided by taosAdapter component, this way is called "REST connection" hereinafter.
1.With REST connection, it's not necessary to install the client side driver taosc, it's more friendly for cross-platform with the cost of 30% performance downgrade.
2.With native connection, full compatibility of TDengine can be utilized, like [Parameter Binding](/reference/connector/cpp#Parameter Binding-api), [Subscription](reference/connector/cpp#Subscription), etc.
After the above installation and configuration are done and making sure TDengine service is already started and in service, the Shell command `taos` can be launched to access TDengine.以
`driver-go` uses `cgo` to wrap the APIs provided by taosc, while `cgo` needs `gcc` to compile source code in C language, so please make sure you have proper `gcc` on your system.
:::
:::
</TabItem>
</TabItem>
<TabItemlabel="Rust"value="rust">
<TabItemlabel="Rust"value="rust">
编辑 `Cargo.toml` 添加 `libtaos` 依赖即可。
Just need to add `libtaos` dependency in `Cargo.toml`.
```toml title=Cargo.toml
```toml title=Cargo.toml
[dependencies]
[dependencies]
...
@@ -121,7 +122,7 @@ libtaos = { version = "0.4.2"}
...
@@ -121,7 +122,7 @@ libtaos = { version = "0.4.2"}
Prior to establishing connection, please make sure TDengine is already running and accessible. The following sample code assumes TDengine is running on the same host as the client program, with FQDN configured to "localhost" and serverPort configured to "6030".
<TabsgroupId="lang"defaultValue="java">
<TabsgroupId="lang"defaultValue="java">
<TabItemlabel="Java"value="java">
<TabItemlabel="Java"value="java">
...
@@ -235,6 +236,6 @@ install.packages("RJDBC")
...
@@ -235,6 +236,6 @@ install.packages("RJDBC")
</Tabs>
</Tabs>
:::tip
:::tip
如果建立连接失败,大部分情况下是 FQDN 或防火墙的配置不正确,详细的排查方法请看[《常见问题及反馈》](https://docs.taosdata.com/train-faq/faq)中的“遇到错误 Unable to establish connection, 我怎么办?”
If the connection fails, in most cases it's caused by improper configuration for FQDN or firewall. Please refer to the section "Unable to establish connection" in [FAQ](https://docs.taosdata.com/train-faq/faq).
The data model employed by TDengine is similar to relational, users need to create database and tables. For a specific use case, the design of databases, stables (abbreviated for super table), and tables need to be considered. This chapter will explain the concept without syntax details.
The characteristics of data from different data collecting points may be different, such as collection frequency, days to keep, number of replicas, data block size, whether it's allowed to update data, etc. For TDengine to operate with best performance, it's strongly suggested to put the data with different characteristics into different databases because different storage policy can be set for each database. When creating a database, there are a lot of parameters that can be configured, such as the days to keep data, the number of replicas, the number of memory blocks, time precision, the minimum and maximum number of rows in each data block, compress or not, the time range of the data in single data file, etc. Below is an example of the SQL statement for creating a database.
```sql
```sql
CREATE DATABASE power KEEP 365 DAYS 10 BLOCKS 6 UPDATE 1;
CREATE DATABASE power KEEP 365 DAYS 10 BLOCKS 6 UPDATE 1;
In the above SQL statement, a database named "power" will be created, the data in it will be kept for 365 days, which means the data older than 365 days will be deleted automatically, a new data file will be created every 10 days, the number of memory blocks is 6, data is allowed to be updated. For more details please refer to [Database](/taos-sql/database).
创建库之后,需要使用 SQL 命令 `USE` 将当前库切换过来,例如:
After creating a database, the current database in use can be switched using SQL command `USE`, for example below SQL statement switches the current database to `power`. Without current database specified, table name must be preceded with the corresponding database name.
```sql
```sql
USE power;
USE power;
```
```
将当前连接里操作的库换为 power,否则对具体表操作前,需要使用“库名.表名”来指定库的名字。
:::note
:::note
- 任何一张表或超级表必须属于某个库,在创建表之前,必须先创建库。
- Any table or stable must belong to a database. To create a table or stable, the database it belongs to must be ready.
- 处于两个不同库的表是不能进行 JOIN 操作的。
- JOIN operation can't be performed tables from two different databases.
- 创建并插入记录、查询历史记录的时候,均需要指定时间戳。
- Timestamp needs to be specified when inserting rows or querying historical rows.
In a typical IoT system, there may be a lot of kinds of devices. For example, in the electrical power system there are meters, transformers, bus bars, switches, etc. For easy aggregate of multiple tables, one STable needs to be created for each kind of devices. For example, for the meters in [table 1](/tdinternal/arch#model_table1), below SQL statement can be used to create the super table.
```sql
```sql
CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);
CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int);
```
```
:::note
:::note
这一指令中的 STABLE 关键字,在 2.0.15 之前的版本中需写作 TABLE 。
If you are using versions prior to 2.0.15, the `STABLE` keyword needs to be replaced with `TABLE`.
Similar to creating a normal table, when creating a stable, name and schema need to be provided too. In the stable schema, the first column must be timestamp (like ts in the example), and other columns (like current, voltage and phase in the example) are the data collected. The type of a column can be integer, floating point number, string ,etc. Besides, the schema for tags need t obe provided, like location and groupId in the example. The type of a tag can be integer, floating point number, string, etc. The static properties of a data collection point can be defined as tags, like the location, device type, device group ID, manager ID, etc. Tags in the schema can be added, removed or altered. Please refer to [STable](/taos-sql/stable) for more details.
Each kind of data collecting points needs a corresponding stable to be created, so there may be many stables in an IoT system. For electrical power system, we need to create a stable respectively for meters, transformers, bug bars, switches. There may be multiple kinds of data collecting points on a single device, for example there may be one data collecting point for electrical data like current and voltage and another point for environmental data like temperature, humidity and wind direction, multiple stables are required for such kind of device.
At most 4096 (or 1024 prior to version 2.1.7.0) columns are allowed in a stable. If there are more than 4096 of physical variables to bo collected for a single collecting point, multiple stables are required for such kind of data collecting point. There can be multiple databases in system, while one or more stables can exist in a database.
A specific table needs to be created for each data collecting point. Similar to RDBMS, table name and schema are required to create a table. Beside, one or more tags can be created for each table. To create a table, a stable needs to be used as template and the values need to be specified for the tags. For example, for the meters in [Table 1](/tdinternal/arch#model_table1), the table can be created using below SQL statement.
```sql
```sql
CREATE TABLE d1001 USING meters TAGS ("Beijing.Chaoyang", 2);
CREATE TABLE d1001 USING meters TAGS ("Beijing.Chaoyang", 2);
In the above SQL statement, "d1001" is the table name, "meters" is the stable name, followed by the value of tag "Location" and the value of tag "groupId", which are "Beijing.Chaoyang" and "2" respectively in the example. The tag values can be altered after the table is created. Please refer to [Tables](/taos-sql/table) for details.
It's not recommended to create a table in a database while using a stable from another database as template.
:::
TDengine 建议将数据采集点的全局唯一 ID 作为表名(比如设备序列号)。但对于有的场景,并没有唯一的 ID,可以将多个 ID 组合成一个唯一的 ID。不建议将具有唯一性的 ID 作为标签值。
:::tip
It's suggested to use the global unique ID of a data collecting point as the table name, for example the device serial number. If there isn't such a unique ID, multiple IDs that are not global unique can be combined to form a global unique ID. It's not recommended to use a global unique ID as tag value.
### 自动建表
### Create Table Automatically
在某些特殊场景中,用户在写数据时并不确定某个数据采集点的表是否存在,此时可在写入数据时使用自动建表语法来创建不存在的表,若该表已存在则不会建立新表且后面的 USING 语句被忽略。比如:
In some circumstances, it's not sure whether the table already exists when inserting rows. The table can be created automatically using the SQL statement below, and nothing will happen if the table already exist.
```sql
```sql
INSERT INTO d1001 USING meters TAGS ("Beijng.Chaoyang", 2) VALUES (now, 10.2, 219, 0.32);
INSERT INTO d1001 USING meters TAGS ("Beijng.Chaoyang", 2) VALUES (now, 10.2, 219, 0.32);
In the above SQL statement, a row with value `(now, 10.2, 219, 0.32)` will be inserted into table "d1001". If table "d1001" doesn't exist, it will be created automatically using stable "meters" as template with tag value `"Beijing.Chaoyang", 2`.
Multiple columns data model is supported in TDengine. As long as multiple physical variables are collected by same data collecting point at same time, i.e. the timestamp are identical, these variables can be put in single stable as columns. However, there is another kind of design, i.e. single column data model, a table is created for each physical variable, which means a stable is required for each kind of physical variables. For example, 3 stables are required for current, voltage and phase.
It's recommended to use multiple column data model as possible because it's better in the speed of inserting or querying rows. In some cases, however, the physical variables to be collected vary frequently and correspondingly the stable schema needs to be changed frequently too. In such case, it's more convenient to use single column data model.
- Inserting in batch can gain better performance. Normally, the higher the batch size, the better the performance. Please be noted each single row can't exceed 16K bytes and each single SQL statement can't exceed 1M bytes.
- Inserting with multiple threads can gain better performance too. However, depending on the system resources on the client side and the server side, with the number of inserting threads grows to a specific point, the performance may drop instead of growing. The proper number of threads need to be tested in a specific environment to find the best number.
- If the timestamp for the row to be inserted already exists in the table, the behavior depends on the value of parameter `UPDATE`. If it's set to 0 (also the default value), the row will be discarded. If it's set to 1, the new values will override the old values for the same row.
- The timestamp to be inserted must be newer than the timestamp of subtracting current time by the parameter `KEEP`. If `KEEP` is set to 3650 days, then the data older than 3650 days ago can't be inserted. The timestamp to be inserted can't be newer than the timestamp of current time plus parameter `DAYS`. If `DAYS` is set to 2, the data newer than 2 days later can't be inserted.
2. Please be noted that `use db` can't be used with REST connection because REST connection is stateless, so in the samples `dbName.tbName` is used to specify the table name.
:::
:::
### 参数绑定写入
### Insert with Parameter Binding
TDengine 也提供了支持参数绑定的 Prepare API,与 MySQL 类似,这些 API 目前也仅支持用问号 `?` 来代表待绑定的参数。从 2.1.1.0 和 2.1.2.0 版本开始,TDengine 大幅改进了参数绑定接口对数据写入(INSERT)场景的支持。这样在通过参数绑定接口写入数据时,就避免了 SQL 语法解析的资源消耗,从而在绝大多数情况下显著提升写入性能。
TDengine also provides Prepare API that support parameter binding. Similar to MySQL, only `?` can be used in these APIs to represent the parameters to bind. From version 2.1.1.0 and 2.1.2.0, parameter binding support for inserting data has been improved significantly to improve the insert performance by avoiding the cost of parsing SQL statements.
需要注意的是,只有使用原生连接的连接器,才能使用参数绑定功能。
Parameter binding is available only with native connection.
<Tabs defaultValue="java" groupId="lang">
<Tabs defaultValue="java" groupId="lang">
<TabItem label="Java" value="java">
<TabItem label="Java" value="java">
...
@@ -126,4 +128,3 @@ TDengine 也提供了支持参数绑定的 Prepare API,与 MySQL 类似,这
...
@@ -126,4 +128,3 @@ TDengine 也提供了支持参数绑定的 Prepare API,与 MySQL 类似,这
- Each data in `field_set` must be self-description for its data type. For example 1.2f32 means a value 1.2 of float type, it will be treated as double without the "f" type suffix.
- Multiple kinds of precision can be used for the `timestamp` field. Time precision can be from nanosecond (ns) to hour (h).
:::
:::
要了解更多可参考:[InfluxDB Line 协议官方文档](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) 和 [TDengine 无模式写入参考指南](/reference/schemaless/#无模式写入行协议)
For more details please refer to [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) and [TDengine Schemaless](/reference/schemaless/#Schemaless-Line-Protocol)
A single line of text is used in OpenTSDB line protocol to represent one row of data. OpenTSDB employs single column data model, so one line can only contains single data column. There can be multiple tags. Each line contains 4 parts as below:
- `timestamp` is the timestamp of current row of data. The time precision will be determined automatically based on the length of the timestamp. second and millisecond time precision are supported.\
- value 度量值,必须为一个数值。对应的列名也是 “value”。
- `value` is a physical variable which must be a numeric value, the corresponding column name is "value".
- 最后一部分是标签集, 用空格分隔不同标签, 所有标签自动转化为 nchar 数据类型;
- The last part is tag sets separated by space, all tags will be converted to nchar type automatically.
Similar to OpenTSDB line protocol, `metric` will be used as the stable name, `timestamp` is the timestamp to be used, `value` represents the physical variable collected, `tags` are the tag sets.
Multiple row binding is better in performance than single row binding, but it can only be used with `INSERT` statement while single row binding can be used for other SQL statements besides `INSERT`.
Multiple row binding is better in performance than single row binding, but it can only be used with `INSERT` statement while single row binding can be used for other SQL statements besides `INSERT`.
SQL is used by TDengine as the language for query. Application programs can send SQL statements to TDengine through REST API or connectors. TDengine CLI `taos` can also be used to execute SQL Ad-Hoc query. Here is the list of major query functionalities supported by TDengine:
- 单列、多列数据查询
- Query on single column or multiple columns
- 标签和数值的多种过滤条件:>, <, =, <\>, like 等
- Filter on tags or data columns:>, <, =, <\>, like
For example, below SQL statement can be executed in TDengine CLI `taos` to select the rows whose voltage column is bigger than 215 and limit the output to only 2 rows.
```sql
```sql
select * from d1001 where voltage > 215 order by ts desc limit 2;
```
```title=Output
taos> select * from d1001 where voltage > 215 order by ts desc limit 2;
taos> select * from d1001 where voltage > 215 order by ts desc limit 2;
To meet the requirements in IoT use cases, some special functions have been added in TDengine, for example `twa` (Time Weighted Average), `spared` (The difference between the maximum and the minimum), `last_row` (the last row), more and more functions will be added to better perform in IoT use cases. Furthermore, continuous query is also supported in TDengine.
具体的查询语法请看 [TAOS SQL 的数据查询](/taos-sql/select) 章节。
For detailed query syntax please refer to [Select](/taos-sql/select).
In IoT use cases, there are always multiple data collecting points of same kind. A new concept, called STable (abbreviated for super table), is used in TDengine to represent a kind of data collecting points, and a table is used to represent a specific data collecting point. Tags are used by TDengine to represent the static properties of data collecting points. A specific data collecting point has its own values for static properties. By specifying filter conditions on tags, join query can be performed efficiently between all the tables belonging to same stable, i.e. same kind of data collecting points, can be. Aggregate functions applicable for tables can be used directly on stables, syntax is exactly same.
### 示例一
### Example 1
在 TAOS Shell,查找北京所有智能电表采集的电压平均值,并按照 location 分组。
In TDengine CLI `taos`, use below SQL to get the average voltage of all the meters in BeiJing grouped by location.
```
```
taos> SELECT AVG(voltage) FROM meters GROUP BY location;
taos> SELECT AVG(voltage) FROM meters GROUP BY location;
...
@@ -61,9 +68,9 @@ taos> SELECT AVG(voltage) FROM meters GROUP BY location;
...
@@ -61,9 +68,9 @@ taos> SELECT AVG(voltage) FROM meters GROUP BY location;
Join query is allowed between only the tables of same stable. In [Select](/taos-sql/select), all query operations are marked as whether it supports stable or not.
In IoT use cases, down sampling is widely used to aggregate the data by time range. `INTERVAL` keyword in TDengine can be used to simplify the query by time window. For example, below SQL statement can be used to get the sum of current every 10 seconds from meters table d1001.
```
```
taos> SELECT sum(current) FROM d1001 INTERVAL(10s);
taos> SELECT sum(current) FROM d1001 INTERVAL(10s);
...
@@ -88,7 +95,7 @@ taos> SELECT sum(current) FROM d1001 INTERVAL(10s);
...
@@ -88,7 +95,7 @@ taos> SELECT sum(current) FROM d1001 INTERVAL(10s);
Query OK, 2 row(s) in set (0.000883s)
Query OK, 2 row(s) in set (0.000883s)
```
```
降采样操作也适用于超级表,比如:将北京所有智能电表采集的电流值每秒钟求和
Down sampling can also be used for stable. For example, below SQL statement can be used to get the sum of current from all meters in BeiJing.
```
```
taos> SELECT SUM(current) FROM meters where location like "Beijing%" INTERVAL(1s);
taos> SELECT SUM(current) FROM meters where location like "Beijing%" INTERVAL(1s);
...
@@ -102,7 +109,7 @@ taos> SELECT SUM(current) FROM meters where location like "Beijing%" INTERVAL(1s
...
@@ -102,7 +109,7 @@ taos> SELECT SUM(current) FROM meters where location like "Beijing%" INTERVAL(1s
Down sample also supports time offset. For example, below SQL statement can be used to get the sum of current from all meters but each time window must start at the boundary of 500 milliseconds.
```
```
taos> SELECT SUM(current) FROM meters INTERVAL(1s, 500a);
taos> SELECT SUM(current) FROM meters INTERVAL(1s, 500a);
In IoT use cases, it's hard to align the timestamp of the data collected by each collecting point. However, a lot of algorithms like FFT require the data to be aligned with same time interval and application programs have to handle by themselves in many systems. In TDengine, it's easy to achieve the alignment using down sampling.
如果一个时间间隔里,没有采集的数据,TDengine 还提供插值计算的功能。
Interpolation can be performed in TDengine if there is no data in a time range.
For more details please refer to [Aggregate by Window](/taos-sql/interval).
## 示例代码
## Examples
### 查询数据
### Query
在 [SQL 写入](/develop/insert-data/sql-writing) 一章,我们创建了 power 数据库,并向 meters 表写入了一些数据,以下示例代码展示如何查询这个表的数据。
In the section describing [Insert](/develop/insert-data/sql-writing), a database named `power` is created and some data are inserted into stable `meters`. Below sample code demonstrates how to query the data in this stable.
<Tabs defaultValue="java" groupId="lang">
<Tabs defaultValue="java" groupId="lang">
<TabItem label="Java" value="java">
<TabItem label="Java" value="java">
...
@@ -154,16 +161,16 @@ Query OK, 5 row(s) in set (0.001521s)
...
@@ -154,16 +161,16 @@ Query OK, 5 row(s) in set (0.001521s)
:::note
:::note
1. 无论是使用 REST 连接还是原生连接的连接器,以上示例代码都能正常工作。
1. With either REST connection or native connection, the above sample code work well.
2. Please be noted that `use db` can't be used in case of REST connection because it's stateless.
:::
:::
### 异步查询
### Asynchronous Query
除同步查询 API 之外,TDengine 还提供性能更高的异步调用 API 处理数据插入、查询操作。在软硬件环境相同的情况下,异步 API 处理数据插入的速度比同步 API 快 2-4 倍。异步 API 采用非阻塞式的调用方式,在系统真正完成某个具体数据库操作前,立即返回。调用的线程可以去处理其他工作,从而可以提升整个应用的性能。异步 API 在网络延迟严重的情况下,优点尤为突出。
Besides synchronous query, asynchronous query API is also provided by TDengine to insert or query data more efficiently. With similar hardware and software environment, async API is 2~4 times faster than sync APIs. Async API works in non-blocking mode, which means an operation can be returned without finishing so that the calling thread can switch to other works to improve the performance of the whole application system. Async APIs perform especially better in case of poor network.
需要注意的是,只有使用原生连接的连接器,才能使用异步查询功能。
Please be noted that async query can only be used with native connection.
description: "Continuous query is a query that's executed automatically according to predefined frequency to provide aggregate query capability by time window, it's actually a simplified time driven stream computing."
Continuous query is a query that's executed automatically according to predefined frequency to provide aggregate query capability by time window, it's actually a simplified time driven stream computing. Continuous query can be performed on a table or stable in TDengine. The result of continuous query can be pushed to client or written back to TDengine. Each query is executed on a time window, which moves forward with time. The size of time window and the forward sliding time need to be specified with parameter `INTERVAL` and `SLIDING` respectively.
Continuous query in TDengine is time driven, and can be defined using TAOS SQL directly without any extra operations. With continuous query, the result can be generated according to time window to achieve down sampling of original data. Once a continuous query is defined using TAOS SQL, the query is automatically executed at the end of each time window and the result is pushed back to client or written to TDengine.
TDengine 提供的连续查询与普通流计算中的时间窗口计算具有以下区别:
There are some differences between continuous query in TDengine and time window computation in stream computing:
- The computation is performed and the result is returned in real time in stream computing, but the computation in continuous query is only started when a time window closes. For example, if the time window is 1 day, then the result will only be generated at 23:59:59.
- If a historical data row is written in to a time widow for which the computation has been finished, the computation will not be performed again and the result will not be pushed to client again either. If the result has been written into TDengine, there will be no update for the result.
- In continuous query, if the result is pushed to client, the client status is not cached on the server side and Exactly-once is not guaranteed by the server either. If the client program crashes, a new time window will be generated from the time where the continuous query is restarted. If the result is written into TDengine, the data written into TDengine can be guaranteed as valid and continuous.
In this section the use case of meters will be used to introduce how to use continuous query. Assume the stable and sub tables have been created using below SQL statement.
```sql
```sql
create table meters (ts timestamp, current float, voltage int, phase float) tags (location binary(64), groupId int);
create table meters (ts timestamp, current float, voltage int, phase float) tags (location binary(64), groupId int);
create table D1001 using meters tags ("Beijing.Chaoyang", 2);
create table D1001 using meters tags ("Beijing.Chaoyang", 2);
create table D1002 using meters tags ("Beijing.Haidian", 2);
create table D1002 using meters tags ("Beijing.Haidian", 2);
...
```
```
可以通过下面这条 SQL 语句以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压。
The average voltage for each time window of one minute with 30 seconds as the length of moving forward can be retrieved using below SQL statement.
```sql
```sql
select avg(voltage) from meters interval(1m) sliding(30s);
select avg(voltage) from meters interval(1m) sliding(30s);
Whenever the above SQL statement is executed, all the existing data will be computed again. If the computation needs to be performed every 30 seconds automatically to compute on the data in the past one minute, the above SQL statement needs to be revised as below, in which `{startTime}` stands for the beginning timestamp in the latest time window.
```sql
```sql
select avg(voltage) from meters where ts > {startTime} interval(1m) sliding(30s);
select avg(voltage) from meters where ts > {startTime} interval(1m) sliding(30s);
A table named as `avg_vol` will be created automatically, then every 30 seconds the `select` statement will be executed automatically on the data in the past 1 minutes, i.e. the latest time window, and the result is written into table `avg_vol`. The client program just needs to query from table `avg_vol`. For example:
```sql
```sql
taos> select * from avg_vol;
taos> select * from avg_vol;
...
@@ -69,16 +68,16 @@ taos> select * from avg_vol;
...
@@ -69,16 +68,16 @@ taos> select * from avg_vol;
2020-07-29 13:39:00.000 | 223.0800000 |
2020-07-29 13:39:00.000 | 223.0800000 |
```
```
需要注意,查询时间窗口的最小值是 10 毫秒,没有时间窗口范围的上限。
Please be noted that the minimum allowed time window is 10 milliseconds, and no upper limit.
Besides, it's allowed to specify the start and end time of continuous query. If the start time is not specified, the timestamp of the first original row will be considered as the start time; if the end time is not specified, the continuous will be performed infinitely, otherwise it will be terminated once the end time is reached. For example, the continuous query in below SQL statement will be started from now and terminated one hour later.
```sql
```sql
create table avg_vol as select avg(voltage) from meters where ts > now and ts <= now + 1h interval(1m) sliding(30s);
create table avg_vol as select avg(voltage) from meters where ts > now and ts <= now + 1h interval(1m) sliding(30s);
`now` in above SQL statement stands for the time when the continuous query is created, not the time when the computation is actually performed. Besides, to avoid the trouble caused by the delay of original data as much as possible, the actual computation in continuous query is also started with a little delay. That means, once a time window closes, the computation is not started immediately. Normally, the result can only be available a little time later, normally within one minute, after the time window closes.
`show streams` command can be used in TDengine CLI `taos` to show all the continuous queries in the system, and `kill stream` can be used to terminate a continuous query.
description: "Lightweight service for data subscription and pushing, the time series data inserted into TDengine continuously can be pushed automatically to the subscribing clients."
title: 数据订阅
title: Data Subscription
---
---
import Tabs from "@theme/Tabs";
import Tabs from "@theme/Tabs";
...
@@ -14,13 +14,13 @@ import Node from "./_sub_node.mdx";
...
@@ -14,13 +14,13 @@ import Node from "./_sub_node.mdx";
According to the time series nature of the data, data inserting in TDengine is similar to data publishing in message queues, they both can be considered as a new data record with timestamp is inserted into the system. Data is stored in ascending order of timestamp inside TDengine, so essentially each table in TDengine can be considered as a message queue.
Lightweight service for data subscription and pushing is built in TDengine. With the API provided by TDengine, client programs can used `select` statement to subscribe the data from one or more tables. The subscription and and state maintenance is performed on the client side, the client programs polls the server to check whether there is new data, and if so the new data will be pushed back to the client side. If the client program is restarted, where to start for retrieving new data is up to the client side.
TDengine 的 API 中,与订阅相关的主要有以下三个:
There are 3 major APIs related to subscription provided in the TDengine client driver.
```c
```c
taos_subscribe
taos_subscribe
...
@@ -28,9 +28,11 @@ taos_consume
...
@@ -28,9 +28,11 @@ taos_consume
taos_unsubscribe
taos_unsubscribe
```
```
这些 API 的文档请见 [C/C++ Connector](/reference/connector/cpp),下面仍以智能电表场景为例介绍一下它们的具体用法(超级表和子表结构请参考上一节“连续查询”),完整的示例代码可以在 [这里](https://github.com/taosdata/TDengine/blob/master/examples/c/subscribe.c) 找到。
For more details about these API please refer to [C/C++ Connector](/reference/connector/cpp). Their usage will be introduced below using the use case of meters, in which the schema of stable and sub tables please refer to the previous section "continuous query". Full sample code can be found [here](https://github.com/taosdata/TDengine/blob/master/examples/c/subscribe.c).
If we want to get notification and take some actions if the current exceeds a threshold, like 10A, from some meters, there are two ways:
The first way is to query on each sub table and record the last timestamp matching the criteria, then after some time query on the data later than recorded timestamp and repeat this process. The SQL statements for this way are as below.
```sql
```sql
select * from D1001 where ts > {last_timestamp1} and current > 10;
select * from D1001 where ts > {last_timestamp1} and current > 10;
...
@@ -38,19 +40,19 @@ select * from D1002 where ts > {last_timestamp2} and current > 10;
...
@@ -38,19 +40,19 @@ select * from D1002 where ts > {last_timestamp2} and current > 10;
The above way works, but the problem is that the number of `select` statements increases with the number of meters grows. Finally the performance of both client side and server side will be unacceptable once the number of meters grows to a big enough number.
另一种方法是对超级表进行查询。这样,无论有多少电表,都只需一次查询:
A better way is to query on the stable, only one `select` is enough regardless of the number of meters, like below:
```sql
```sql
select * from meters where ts > {last_timestamp} and current > 10;
select * from meters where ts > {last_timestamp} and current > 10;
However, how to choose `last_timestamp` becomes a new problem if using this way. Firstly, the timestamp when the data is generated is different from the timestamp when the data is inserted into the database, sometimes the difference between them may be very big. Secondly, the time when the data from different meters may arrives at the database may be different too. If the timestamp of the "slowest" meter is used as `last_timestamp` in the query, the data from other meters may be selected repeatedly; but if the timestamp of the "fasted" meters is used as `last_timestamp`, some data from other meters may be missed.
TDengine 的订阅功能为上面这个问题提供了一个彻底的解决方案。
All the problems mentioned above can be resolved thoroughly using subscription provided by TDengine.
首先是使用 `taos_subscribe` 创建订阅:
The first step is to create subscription using `taos_subscribe`.
The subscription in TDengine can be either synchronous or asynchronous. In the above sample code, the value of variable `async` is determined from the CLI input, then it's used to create either an async or sync subscription. Sync subscription means the client program needs to invoke `taos_consume` to retrieve data, and async subscription means another thread created by `taos_subscribe` internally invokes `taos_consume` to retrieve data and pass the data to `subscribe_callback` for processing, `subscribe_callback` is a call back function provided by the client program and it's suggested not to do time consuming operation in the call back function.
参数 `taos` 是一个已经建立好的数据库连接,在同步模式下无特殊要求。但在异步模式下,需要注意它不会被其它线程使用,否则可能导致不可预计的错误,因为回调函数在 API 的内部线程中被调用,而 TDengine 的部分 API 不是线程安全的。
The parameter `taos` is an established connection. There is nothing special in sync subscription mode. In async subscription, it should be exclusively by current thread, otherwise unpredictable error may occur.
参数 `sql` 是查询语句,可以在其中使用 where 子句指定过滤条件。在我们的例子中,如果只想订阅电流超过 10A 时的数据,可以这样写:
The parameter `sql` is a `select` statement in which `where` clause can be used to specify filter conditions. In our example, the data whose current exceeds 10A needs to be subscribed like below SQL statement:
Please be noted that, all the data will be processed because no start time is specified. If only the data from one day ago needs to be processed, a time related condition can be added:
```sql
```sql
select * from meters where ts > now - 1d and current > 10;
select * from meters where ts > now - 1d and current > 10;
```
```
订阅的 `topic` 实际上是它的名字,因为订阅功能是在客户端 API 中实现的,所以没必要保证它全局唯一,但需要它在一台客户端机器上唯一。
The parameter `topic` is the name of the subscription, it needs to be guaranteed unique in the client program, but it's not necessary to be globally unique because subscription is implemented in the APIs on client side.
If the subscription named as `topic` doesn't exist, parameter `restart` would be ignored. If the subscription named as `topic` has been created before by the client program which then exited, when the client program is restarted to use this `topic`, parameter `restart` is used to determine retrieving data from beginning or from the last point where the subscription was broken. If the value of `restart` is **true** (i.e. a non-zero value), the data will be retrieved from beginning, or if it is **false** (i.e. zero), the data already consumed before will not be processed again.
The last parameter of `taos_subscribe` is the polling interval in unit of millisecond. In sync mode, if the time difference between two continuous invocations to `taos_consume` is smaller than the interval specified by `taos_subscribe`, `taos_consume` would be blocked until the interval is reached. In async mode, this interval is the minimum interval between two invocations to the call back function.
`taos_subscribe` 的倒数第二个参数用于用户程序向回调函数传递附加参数,订阅 API 不对其做任何处理,只原样传递给回调函数。此参数在同步模式下无意义。
The last second parameter of `taos_subscribe` is used to pass arguments to the call back function. `taos_subscribe` doesn't process this parameter and simply passes it to the call back function. This parameter is simply ignored in sync mode.
订阅创建以后,就可以消费其数据了,同步模式下,示例代码是下面的 else 部分:
After a subscription is created, its data can be consumed and processed, below is the sample code of how to consume data in sync mode, in the else part if `if (async)`.
In the above sample code, there is an infinite loop, each time carriage return is entered `taos_consume` is invoked, the return value of `taos_consume` is the selected result set, exactly as the input of `taos_use_result`, in the above sample `print_result` is used instead to simplify the sample. Below is the implementation of `print_result`.
```c
```c
void print_result(TAOS_RES* res, int blockFetch) {
void print_result(TAOS_RES* res, int blockFetch) {
...
@@ -131,7 +133,9 @@ void print_result(TAOS_RES* res, int blockFetch) {
...
@@ -131,7 +133,9 @@ void print_result(TAOS_RES* res, int blockFetch) {
The second parameter `keep` is used to specify whether to keep the subscription progress on the client sde. If it is **false**, i.e. **0**, then subscription will be restarted from beginning regardless of the `restart` parameter's value in when `taos_subscribe` is invoked again. The subscription progress information is stored in _{DataDir}/subscribe/_ , under which there is a file with same name as `topic` for each subscription, the subscription will be restarted from beginning if the corresponding progress file is removed.
代码介绍完毕,我们来看一下实际的运行效果。假设:
Now let's see the effect of the above sample code, assuming below prerequisites have been done.
- 示例代码已经下载到本地
- The sample code has been downloaded to local system 示
- TDengine 也已经在同一台机器上安装好
- TDengine has been installed and launched properly on same system
- 示例所需的数据库、超级表、子表已经全部创建好
- The database, stable, sub tables required in the sample code have been ready
则可以在示例代码所在目录执行以下命令来编译并启动示例程序:
It's ready to launch below command in the directory where the sample code resides to compile and start the program.
```bash
```bash
make
make
./subscribe -sql='select * from meters where current > 10;'
./subscribe -sql='select * from meters where current > 10;'
After the program is started, open another terminal and launch TDengine CLI `taos`, then use below SQL commands to insert a row whose current is 12A into table **D1001**.
Then, this row of data will be shown by the example program on the first terminal because its current exceeds 10A. More data can be inserted for you to observe the output of the example program.
## 示例程序
## Examples
下面的示例程序展示是如何使用连接器订阅所有电流超过 10A 的记录。
Below example program demonstrates how to subscribe the data rows whose current exceeds 10A using connectors.
### 准备数据
### Prepare Data
```
```bash
# create database "power"
# create database "power"
taos> create database power;
taos> create database power;
# use "power" as the database in following operations
# use "power" as the database in following operations
...
@@ -200,20 +203,21 @@ taos> select * from meters where current > 10;
...
@@ -200,20 +203,21 @@ taos> select * from meters where current > 10;
The cache management policy in TDengine is First-In-First-Out (FIFO), which is also known as insert driven cache management policy and different from read driven cache management, i.e. Least-Recent-Used (LRU). It simply stores the latest data in cache and flushes the oldest data in cache to disk when the cache usage reaches a threshold. In IoT use cases, the most cared about data is the latest data, i.e. current state. The cache policy in TDengine is based the nature of IoT data.
Caching the latest data provides the capability of retrieving data in milliseconds. With this capability, TDengine can be configured properly to be used as caching system without deploying another separate caching system to simplify the system architecture and minimize the operation cost. The cache will be emptied after TDengine is restarted, TDengine doesn't reload data from disk into cache like a real key-value caching system.
The memory space used by TDengine cache is fixed in size, according to the configuration based on application requirement and system resources. Independent memory pool is allocated for and managed by each vnode (virtual node) in TDengine, there is no sharing of memory pools between vnodes. All the tables belonging to a vnode share all the cache memory of the vnode.
Memory pool is divided into blocks and data is stored in row format in memory and each block follows FIFO policy. The size of each block is determined by configuration parameter `cache`, the number of blocks for each vnode is determined by `blocks`. For each vnode, the total cache size is `cache * blocks`. It's better to set the size of each block to hold at least tends of rows.
`last_row` function can be used to retrieve the last row of a table or a stable to quickly show the current state of devices on monitoring screen. For example below SQL statement retrieves the latest voltage of all meters in Chaoyang district of Beijing.