提交 82d3550a 编写于 作者: J jiajingbin

Merge branch 'main' of https://github.com/taosdata/TDengine into test/td_25718

......@@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "3.1.0.2.alpha")
SET(TD_VER_NUMBER "3.1.0.3.alpha")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)
......
......@@ -40,7 +40,6 @@ meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0
- Multiple kinds of precision can be used for the `timestamp` field. Time precision can be from nanosecond (ns) to hour (h)
- The child table name is created automatically in a rule to guarantee its uniqueness. But you can configure `smlChildTableName` in taos.cfg to specify a tag value as the table names if the tag value is unique globally. For example, if a tag is called `tname` and you set `smlChildTableName=tname` in taos.cfg, when you insert `st,tname=cpu1,t1=4 c1=3 1626006833639000000`, the child table `cpu1` will be created automatically. Note that if multiple rows have the same tname but different tag_set values, the tag_set of the first row is used to create the table and the others are ignored
- It is assumed that the order of field_set in a supertable is consistent, meaning that the first record contains all fields and subsequent records store fields in the same order. If the order is not consistent, set smlDataFormat in taos.cfg to false. Otherwise, data will be written out of order and a database error will occur.(smlDataFormat in taos.cfg default to false after version of 3.0.1.3, smlDataFormat is discarded since 3.0.3.0)
- Due to the fact that SQL table names do not support point(.), schemaless has also processed point(.) and automatically replaced them with underscores(_)
:::
......
......@@ -34,7 +34,6 @@ meters.current 1648432611250 11.3 location=California.LosAngeles groupid=3
```
- The child table name is created automatically in a rule to guarantee its uniqueness. But you can configure `smlChildTableName` in taos.cfg to specify a tag value as the table names if the tag value is unique globally. For example, if a tag is called `tname` and you set `smlChildTableName=tname` in taos.cfg, when you insert `st,tname=cpu1,t1=4 c1=3 1626006833639000000`, the child table `cpu1` will be automatically created. Note that if multiple rows have the same tname but different tag_set values, the tag_set of the first row is used to create the table and the others are ignored.
- Due to the fact that SQL table names do not support point(.), schemaless has also processed point(.) and automatically replaced them with underscores(_)
Please refer to [OpenTSDB Telnet API](http://opentsdb.net/docs/build/html/api_telnet/put.html) for more details.
......
......@@ -49,7 +49,6 @@ Please refer to [OpenTSDB HTTP API](http://opentsdb.net/docs/build/html/api_http
- In JSON protocol, strings will be converted to NCHAR type and numeric values will be converted to double type.
- The child table name is created automatically in a rule to guarantee its uniqueness. But you can configure `smlChildTableName` in taos.cfg to specify a tag value as the table names if the tag value is unique globally. For example, if a tag is called `tname` and you set `smlChildTableName=tname` in taos.cfg, when you insert `st,tname=cpu1,t1=4 c1=3 1626006833639000000`, the child table `cpu1` will be automatically created. Note that if multiple rows have the same tname but different tag_set values, the tag_set of the first row is used to create the table and the others are ignored.
- Due to the fact that SQL table names do not support point(.), schemaless has also processed point(.) and automatically replaced them with underscores(_)
:::
......
......@@ -7,7 +7,7 @@ description: This document describes how to query data in TDengine.
## Syntax
```sql
SELECT {DATABASE() | CLIENT_VERSION() | SERVER_VERSION() | SERVER_STATUS() | NOW() | TODAY() | TIMEZONE()}
SELECT {DATABASE() | CLIENT_VERSION() | SERVER_VERSION() | SERVER_STATUS() | NOW() | TODAY() | TIMEZONE() | CURRENT_USER() | USER() }
SELECT [DISTINCT] select_list
from_clause
......@@ -167,7 +167,7 @@ The following SQL statement returns the number of subtables within the meters su
SELECT COUNT(*) FROM (SELECT DISTINCT TBNAME FROM meters);
```
In the preceding two statements, only tags can be used as filtering conditions in the WHERE clause. For example:
In the preceding two statements, only tags can be used as filtering conditions in the WHERE clause.
**\_QSTART and \_QEND**
......@@ -209,8 +209,7 @@ You can perform INNER JOIN statements based on the primary key. The following co
3. For supertables, the ON condition must be equivalent to the primary key. In addition, the tag columns of the tables on which the INNER JOIN is performed must have a one-to-one relationship. You cannot specify an OR condition.
4. The tables that are included in a JOIN clause must be of the same type (supertable, standard table, or subtable).
5. You can include subqueries before and after the JOIN keyword.
6. You cannot include more than ten tables in a JOIN clause.
7. You cannot include a FILL clause and a JOIN clause in the same statement.
6. You cannot include a FILL clause and a JOIN clause in the same statement.
## GROUP BY
......@@ -301,6 +300,12 @@ SELECT TODAY();
SELECT TIMEZONE();
```
### Obtain Current User
```sql
SELECT CURRENT_USER();
```
## Regular Expression
### Syntax
......@@ -355,7 +360,7 @@ SELECT AVG(CASE WHEN voltage < 200 or voltage > 250 THEN 220 ELSE voltage END) F
## JOIN
TDengine supports the `INTER JOIN` based on the timestamp primary key, that is, the `JOIN` condition must contain the timestamp primary key. As long as the requirement of timestamp-based primary key is met, `INTER JOIN` can be made between normal tables, sub-tables, super tables and sub-queries at will, and there is no limit on the number of tables.
TDengine supports the `INTER JOIN` based on the timestamp primary key, that is, the `JOIN` condition must contain the timestamp primary key. As long as the requirement of timestamp-based primary key is met, `INTER JOIN` can be made between normal tables, sub-tables, super tables and sub-queries at will, and there is no limit on the number of tables, primary key and other conditions must be combined with `AND` operator.
For standard tables:
......
......@@ -1275,6 +1275,14 @@ SELECT SERVER_STATUS();
**Description**: The server status.
### CURRENT_USER
```sql
SELECT CURRENT_USER();
```
**Description**: get current user.
## Geometry Functions
......
......@@ -22,6 +22,14 @@ SHOW CLUSTER;
Shows information about the current cluster.
## SHOW CLUSTER ALIVE
```sql
SHOW CLUSTER ALIVE;
```
It is used to check whether the cluster is available or not. Return value: 0 means unavailable, 1 means available, 2 means partially available (some dnodes are offline, the other dnodes are available)
## SHOW CONNECTIONS
```sql
......
......@@ -135,7 +135,7 @@ The following describes the basic API, synchronous API, asynchronous API, subscr
The base API is used to do things like create database connections and provide a runtime environment for the execution of other APIs.
- `void taos_init()`
- `int taos_init()`
Initializes the runtime environment. If the API is not actively called, the driver will automatically call the API when `taos_connect()` is called, so the program generally does not need to call it manually.
......@@ -168,6 +168,12 @@ The base API is used to do things like create database connections and provide a
:::
- `TAOS *taos_connect_auth(const char *host, const char *user, const char *auth, const char *db, uint16_t port)`
The function is the same as taos_connect. Except that the pass parameter is replaced by auth, other parameters are the same as taos_connect.
- auth: the 32-bit lowercase md5 of the raw password
- `char *taos_get_server_info(TAOS *taos)`
Get server-side version information.
......@@ -184,6 +190,14 @@ The base API is used to do things like create database connections and provide a
- If len is less than the space required to store the db (including the last '\0'), an error is returned. The truncated data assigned in the database ends with '\0'.
- If len is greater than or equal to the space required to store the db (including the last '\0'), return normal 0, and assign the db name ending with '\0' in the database.
- `int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type)`
Set the event callback function.
- fp: event callback function pointer. Declaration:typedef void (*__taos_notify_fn_t)(void *param, void *ext, int type);Param is a user-defined parameter, ext is an extended parameter (depending on the event type, and returns the user password version for TAOS_NOTIFY_PASSVER), and type is the event type
- param: user-defined parameter
- type: event type. Value range: 1) TAOS_NOTIFY_PASSVER: User password changed
- `void taos_close(TAOS *taos)`
Closes the connection, where `taos` is the handle returned by `taos_connect()`.
......@@ -307,21 +321,20 @@ The specific functions related to the interface are as follows (see also the [pr
Parse a SQL command, and bind the parsed result and parameter information to `stmt`. If the parameter length is greater than 0, use this parameter as the length of the SQL command. If it is equal to 0, the length of the SQL command will be determined automatically.
- `int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind)`
- `int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind)`
Not as efficient as `taos_stmt_bind_param_batch()`, but can support non-INSERT type SQL statements.
To bind parameters, bind points to an array (representing the row of data to be bound), making sure that the number and order of the elements in this array are the same as the parameters in the SQL statement. taos_bind is used similarly to MYSQL_BIND in MySQL, as defined below.
```c
typedef struct TAOS_BIND {
typedef struct TAOS_MULTI_BIND {
int buffer_type;
void * buffer;
uintptr_t buffer_length; // not in use
uintptr_t * length;
int * is_null;
int is_unsigned; // not in use
int * error; // not in use
} TAOS_BIND;
void *buffer;
uintptr_t buffer_length;
uint32_t *length;
char *is_null;
int num;
} TAOS_MULTI_BIND;
```
- `int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name)`
......@@ -329,7 +342,7 @@ The specific functions related to the interface are as follows (see also the [pr
(Available in 2.1.1.0 and later versions, only supported for replacing parameter values in INSERT statements)
When the table name in the SQL command uses `? ` placeholder, you can use this function to bind a specific table name.
- `int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags)`
- `int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_MULTI_BIND* tags)`
(Available in 2.1.2.0 and later versions, only supported for replacing parameter values in INSERT statements)
When the table name and TAGS in the SQL command both use `? `, you can use this function to bind the specific table name and the specific TAGS value. The most typical usage scenario is an INSERT statement that uses the automatic table building function (the current version does not support specifying specific TAGS columns.) The number of columns in the TAGS parameter needs to be the same as the number of TAGS requested in the SQL command.
......@@ -358,6 +371,14 @@ The specific functions related to the interface are as follows (see also the [pr
Execute the prepared statement. Currently, a statement can only be executed once.
- `int taos_stmt_affected_rows(TAOS_STMT *stmt)`
Gets the number of rows affected by executing bind statements multiple times.
- `int taos_stmt_affected_rows_once(TAOS_STMT *stmt)`
Gets the number of rows affected by executing a bind statement once.
- `TAOS_RES* taos_stmt_use_result(TAOS_STMT *stmt)`
Gets the result set of a statement. Use the result set in the same way as in the non-parametric call. When finished, `taos_free_result()` should be called on this result set to free resources.
......@@ -514,4 +535,4 @@ In addition to writing data using the SQL method or the parameter binding API, w
- topics: a list of topics subscribed by consumers,need to be freed by tmq_list_destroy
**Return value**
- zero success,none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
\ No newline at end of file
- zero success,none zero failed, wrong message can be obtained through `char *tmq_err2str(int32_t code)`
......@@ -74,7 +74,7 @@ grafana-cli plugins install tdengine-datasource
sudo -u grafana grafana-cli plugins install tdengine-datasource
```
You can also download zip files from [GitHub](https://github.com/taosdata/grafanaplugin/releases/tag/latest) or [Grafana](https://grafana.com/grafana/plugins/tdengine-datasource/?tab=installation) and install manually. The commands are as follows:
You can also download zip files from [GitHub](https://github.com/taosdata/grafanaplugin/releases/latest) or [Grafana](https://grafana.com/grafana/plugins/tdengine-datasource/?tab=installation) and install manually. The commands are as follows:
```bash
GF_VERSION=3.3.1
......
......@@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://t
import Release from "/components/ReleaseV3";
## 3.1.0.2
<Release type="tdengine" version="3.1.0.2" />
## 3.1.0.0
:::note IMPORTANT
......
......@@ -51,7 +51,7 @@ void insertData(TAOS *taos) {
int code = taos_stmt_prepare(stmt, sql, 0);
checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare");
// bind table name and tags
TAOS_BIND tags[2];
TAOS_MULTI_BIND tags[2];
char *location = "California.SanFrancisco";
int groupId = 2;
tags[0].buffer_type = TSDB_DATA_TYPE_BINARY;
......@@ -144,4 +144,4 @@ int main() {
}
// output:
// successfully inserted 2 rows
\ No newline at end of file
// successfully inserted 2 rows
......@@ -58,7 +58,7 @@ void insertData(TAOS *taos) {
int code = taos_stmt_prepare(stmt, sql, 0);
checkErrorCode(stmt, code, "failed to execute taos_stmt_prepare");
// bind table name and tags
TAOS_BIND tags[2];
TAOS_MULTI_BIND tags[2];
char* location = "California.SanFrancisco";
int groupId = 2;
tags[0].buffer_type = TSDB_DATA_TYPE_BINARY;
......@@ -82,7 +82,7 @@ void insertData(TAOS *taos) {
{1648432611749, 12.6, 218, 0.33},
};
TAOS_BIND values[4];
TAOS_MULTI_BIND values[4];
values[0].buffer_type = TSDB_DATA_TYPE_TIMESTAMP;
values[0].buffer_length = sizeof(int64_t);
values[0].length = &values[0].buffer_length;
......@@ -138,4 +138,4 @@ int main() {
// output:
// successfully inserted 2 rows
\ No newline at end of file
// successfully inserted 2 rows
......@@ -39,8 +39,8 @@ meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0
- timestamp 支持多种时间精度。写入数据的时候需要用参数指定时间精度,支持从小时到纳秒的 6 种时间精度
- 为了提高写入的效率,默认假设同一个超级表中 field_set 的顺序是一样的(第一条数据包含所有的 field,后面的数据按照这个顺序),如果顺序不一样,需要配置参数 smlDataFormat 为 false,否则,数据写入按照相同顺序写入,库中数据会异常。(3.0.1.3 之后的版本 smlDataFormat 默认为 false,从3.0.3.0开始,该配置废弃) [TDengine 无模式写入参考指南](/reference/schemaless/#无模式写入行协议)
- 默认产生的子表名是根据规则生成的唯一 ID 值。用户也可以通过在client端的 taos.cfg 里配置 smlChildTableName 参数来指定某个标签值作为子表名。该标签值应该具有全局唯一性。举例如下:假设有个标签名为tname, 配置 smlChildTableName=tname, 插入数据为 st,tname=cpu1,t1=4 c1=3 1626006833639000000 则创建的子表名为 cpu1。注意如果多行数据 tname 相同,但是后面的 tag_set 不同,则使用第一行自动建表时指定的 tag_set,其他的行会忽略)。[TDengine 无模式写入参考指南](/reference/schemaless/#无模式写入行协议)
- 由于sql建表表名不支持点号(.),所以schemaless也对点号(.)做了处理,自动替换为下划线(_)
::
:::
要了解更多可参考:[InfluxDB Line 协议官方文档](https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/) 和 [TDengine 无模式写入参考指南](/reference/schemaless/#无模式写入行协议)
......
......@@ -33,7 +33,6 @@ meters.current 1648432611250 11.3 location=California.LosAngeles groupid=3
```
- 默认生产的子表名是根据规则生成的唯一 ID 值。用户也可以通过在client端的 taos.cfg 里配置 smlChildTableName 参数来指定某个标签值作为子表名。该标签值应该具有全局唯一性。举例如下:假设有个标签名为tname, 配置 smlChildTableName=tname, 插入数据为 meters.current 1648432611250 11.3 tname=cpu1 location=California.LosAngeles groupid=3 则创建的表名为 cpu1,注意如果多行数据 tname 相同,但是后面的 tag_set 不同,则使用第一行自动建表时指定的 tag_set,其他的行会忽略)。
- 由于sql建表表名不支持点号(.),所以schemaless也对点号(.)做了处理,自动替换为下划线(_)。
参考 [OpenTSDB Telnet API 文档](http://opentsdb.net/docs/build/html/api_telnet/put.html)。
## 示例代码
......
......@@ -48,7 +48,7 @@ OpenTSDB JSON 格式协议采用一个 JSON 字符串表示一行或多行数据
- 对于 JSON 格式协议,TDengine 并不会自动把所有标签转成 NCHAR 类型, 字符串将将转为 NCHAR 类型, 数值将同样转换为 DOUBLE 类型。
- 默认生成的子表名是根据规则生成的唯一 ID 值。用户也可以通过在client端的 taos.cfg 里配置 smlChildTableName 参数来指定某个标签值作为子表名。该标签值应该具有全局唯一性。举例如下:假设有个标签名为tname, 配置 smlChildTableName=tname, 插入数据为 `"tags": { "host": "web02","dc": "lga","tname":"cpu1"}` 则创建的子表名为 cpu1。注意如果多行数据 tname 相同,但是后面的 tag_set 不同,则使用第一行自动建表时指定的 tag_set,其他的行会忽略)。
- 由于sql建表表名不支持点号(.),所以schemaless也对点号(.)做了处理,自动替换为下划线(_)。
:::
## 示例代码
......
......@@ -223,7 +223,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
基础 API 用于完成创建数据库连接等工作,为其它 API 的执行提供运行时环境。
- `void taos_init()`
- `int taos_init()`
初始化运行环境。如果没有主动调用该 API,那么调用 `taos_connect()` 时驱动将自动调用该 API,故程序一般无需手动调用。
......@@ -256,6 +256,12 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
:::
- `TAOS *taos_connect_auth(const char *host, const char *user, const char *auth, const char *db, uint16_t port)`
功能同 taos_connect。除 pass 参数替换为 auth 外,其他参数同 taos_connect。
- auth: 原始密码取 32 位小写 md5
- `char *taos_get_server_info(TAOS *taos)`
获取服务端版本信息。
......@@ -272,6 +278,14 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
- 如果,len 小于 存储db需要的空间(包含最后的'\0'),返回错误,database里赋值截断的数据,以'\0'结尾。
- 如果,len 大于等于 存储db需要的空间(包含最后的'\0'),返回正常0,database里赋值以'\0‘结尾的db名。
- `int taos_set_notify_cb(TAOS *taos, __taos_notify_fn_t fp, void *param, int type)`
设置事件回调函数。
- fp 事件回调函数指针。函数声明:typedef void (*__taos_notify_fn_t)(void *param, void *ext, int type);其中, param 为用户自定义参数,ext 为扩展参数(依赖事件类型,针对 TAOS_NOTIFY_PASSVER 返回用户密码版本),type 为事件类型
- param 用户自定义参数
- type 事件类型。取值范围:1)TAOS_NOTIFY_PASSVER: 用户密码改变
- `void taos_close(TAOS *taos)`
关闭连接,其中`taos`是 `taos_connect()` 返回的句柄。
......@@ -396,21 +410,20 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
解析一条 SQL 语句,将解析结果和参数信息绑定到 stmt 上,如果参数 length 大于 0,将使用此参数作为 SQL 语句的长度,如等于 0,将自动判断 SQL 语句的长度。
- `int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind)`
- `int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind)`
不如 `taos_stmt_bind_param_batch()` 效率高,但可以支持非 INSERT 类型的 SQL 语句。
进行参数绑定,bind 指向一个数组(代表所要绑定的一行数据),需保证此数组中的元素数量和顺序与 SQL 语句中的参数完全一致。TAOS_BIND 的使用方法与 MySQL 中的 MYSQL_BIND 类似,具体定义如下:
进行参数绑定,bind 指向一个数组(代表所要绑定的一行数据),需保证此数组中的元素数量和顺序与 SQL 语句中的参数完全一致。TAOS_MULTI_BIND 的使用方法与 MySQL 中的 MYSQL_BIND 类似,具体定义如下:
```c
typedef struct TAOS_BIND {
typedef struct TAOS_MULTI_BIND {
int buffer_type;
void * buffer;
uintptr_t buffer_length; // not in use
uintptr_t * length;
int * is_null;
int is_unsigned; // not in use
int * error; // not in use
} TAOS_BIND;
void *buffer;
uintptr_t buffer_length;
uint32_t *length;
char *is_null;
int num; // the number of columns
} TAOS_MULTI_BIND;
```
- `int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name)`
......@@ -418,7 +431,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
(2.1.1.0 版本新增,仅支持用于替换 INSERT 语句中的参数值)
当 SQL 语句中的表名使用了 `?` 占位时,可以使用此函数绑定一个具体的表名。
- `int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags)`
- `int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_MULTI_BIND* tags)`
(2.1.2.0 版本新增,仅支持用于替换 INSERT 语句中的参数值)
当 SQL 语句中的表名和 TAGS 都使用了 `?` 占位时,可以使用此函数绑定具体的表名和具体的 TAGS 取值。最典型的使用场景是使用了自动建表功能的 INSERT 语句(目前版本不支持指定具体的 TAGS 列)。TAGS 参数中的列数量需要与 SQL 语句中要求的 TAGS 数量完全一致。
......@@ -428,17 +441,6 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
(2.1.1.0 版本新增,仅支持用于替换 INSERT 语句中的参数值)
以多列的方式传递待绑定的数据,需要保证这里传递的数据列的顺序、列的数量与 SQL 语句中的 VALUES 参数完全一致。TAOS_MULTI_BIND 的具体定义如下:
```c
typedef struct TAOS_MULTI_BIND {
int buffer_type;
void * buffer;
uintptr_t buffer_length;
uintptr_t * length;
char * is_null;
int num; // the number of columns
} TAOS_MULTI_BIND;
```
- `int taos_stmt_add_batch(TAOS_STMT *stmt)`
将当前绑定的参数加入批处理中,调用此函数后,可以再次调用 `taos_stmt_bind_param()` 或 `taos_stmt_bind_param_batch()` 绑定新的参数。需要注意,此函数仅支持 INSERT/IMPORT 语句,如果是 SELECT 等其他 SQL 语句,将返回错误。
......@@ -447,6 +449,14 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
执行准备好的语句。目前,一条语句只能执行一次。
- `int taos_stmt_affected_rows(TAOS_STMT *stmt)`
获取执行多次绑定语句影响的行数。
- `int taos_stmt_affected_rows_once(TAOS_STMT *stmt)`
获取执行一次绑定语句影响的行数。
- `TAOS_RES* taos_stmt_use_result(TAOS_STMT *stmt)`
获取语句的结果集。结果集的使用方式与非参数化调用时一致,使用完成后,应对此结果集调用 `taos_free_result()` 以释放资源。
......@@ -602,4 +612,4 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
- topics: 获取的 topic 列表存储在这个结构中,接口内分配内存,需调用tmq_list_destroy释放
**返回值**
- 错误码,0成功,非0失败,可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
\ No newline at end of file
- 错误码,0成功,非0失败,可通过 `char *tmq_err2str(int32_t code)` 函数获取错误信息
......@@ -7,7 +7,7 @@ description: 查询数据的详细语法
## 查询语法
```sql
SELECT {DATABASE() | CLIENT_VERSION() | SERVER_VERSION() | SERVER_STATUS() | NOW() | TODAY() | TIMEZONE()}
SELECT {DATABASE() | CLIENT_VERSION() | SERVER_VERSION() | SERVER_STATUS() | NOW() | TODAY() | TIMEZONE() | CURRENT_USER() | USER() }
SELECT [DISTINCT] select_list
from_clause
......@@ -167,7 +167,7 @@ SELECT table_name, tag_name, tag_type, tag_value FROM information_schema.ins_tag
SELECT COUNT(*) FROM (SELECT DISTINCT TBNAME FROM meters);
```
以上两个查询均只支持在 WHERE 条件子句中添加针对标签(TAGS)的过滤条件。例如:
以上两个查询均只支持在 WHERE 条件子句中添加针对标签(TAGS)的过滤条件。
**\_QSTART/\_QEND**
......@@ -209,8 +209,7 @@ TDengine 支持基于时间戳主键的 INNER JOIN,规则如下:
3. 对于超级表,ON 条件在时间戳主键的等值条件之外,还要求有可以一一对应的标签列等值条件,不支持 OR 条件。
4. 参与 JOIN 计算的表只能是同一种类型,即只能都是超级表,或都是子表,或都是普通表。
5. JOIN 两侧均支持子查询。
6. 参与 JOIN 的表个数上限为 10 个。
7. 不支持与 FILL 子句混合使用。
6. 不支持与 FILL 子句混合使用。
## GROUP BY
......@@ -301,6 +300,12 @@ SELECT TODAY();
SELECT TIMEZONE();
```
### 获取当前用户
```sql
SELECT CURRENT_USER();
```
## 正则表达式过滤
### 语法
......@@ -354,7 +359,7 @@ SELECT AVG(CASE WHEN voltage < 200 or voltage > 250 THEN 220 ELSE voltage END) F
## JOIN 子句
TDengine 支持基于时间戳主键的内连接,即 JOIN 条件必须包含时间戳主键。只要满足基于时间戳主键这个要求,普通表、子表、超级表和子查询之间可以随意的进行内连接,且对表个数没有限制。
TDengine 支持基于时间戳主键的内连接,即 JOIN 条件必须包含时间戳主键。只要满足基于时间戳主键这个要求,普通表、子表、超级表和子查询之间可以随意的进行内连接,且对表个数没有限制,其它连接条件与主键间必须是 AND 操作
普通表与普通表之间的 JOIN 操作:
......
......@@ -1266,6 +1266,14 @@ SELECT SERVER_STATUS();
**说明**:检测服务端是否所有 dnode 都在线,如果是则返回成功,否则返回无法建立连接的错误。
### CURRENT_USER
```sql
SELECT CURRENT_USER();
```
**说明**:获取当前用户。
## Geometry 函数
......
......@@ -31,7 +31,7 @@ select max(current) from meters partition by location interval(10m)
## 窗口切分查询
TDengine 支持按时间窗口切分方式进行聚合结果查询,比如温度传感器每秒采集一次数据,但需查询每隔 10 分钟的温度平均值。这种场景下可以使用窗口子句来获得需要的查询结果。窗口子句用于针对查询的数据集合按照窗口切分成为查询子集并进行聚合,窗口包含时间窗口(time window)、状态窗口(status window)、会话窗口(session window)、件窗口(event window)四种窗口。其中时间窗口又可划分为滑动时间窗口和翻转时间窗口。
TDengine 支持按时间窗口切分方式进行聚合结果查询,比如温度传感器每秒采集一次数据,但需查询每隔 10 分钟的温度平均值。这种场景下可以使用窗口子句来获得需要的查询结果。窗口子句用于针对查询的数据集合按照窗口切分成为查询子集并进行聚合,窗口包含时间窗口(time window)、状态窗口(status window)、会话窗口(session window)、件窗口(event window)四种窗口。其中时间窗口又可划分为滑动时间窗口和翻转时间窗口。
窗口子句语法如下:
......
......@@ -22,6 +22,14 @@ SHOW CLUSTER;
显示当前集群的信息
## SHOW CLUSTER ALIVE
```sql
SHOW CLUSTER ALIVE;
```
查询当前集群的状态是否可用,返回值: 0:不可用 1:完全可用 2:部分可用(集群中部分节点下线,但其它节点仍可以正常使用)
## SHOW CONNECTIONS
```sql
......
......@@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do
import Release from "/components/ReleaseV3";
## 3.1.0.2
<Release type="tdengine" version="3.1.0.2" />
## 3.1.0.0
<Release type="tdengine" version="3.1.0.0" />
......
......@@ -32,12 +32,12 @@ static void queryDB(TAOS *taos, char *command) {
taos_free_result(pSql);
pSql = NULL;
}
pSql = taos_query(taos, command);
code = taos_errno(pSql);
if (0 == code) {
break;
}
}
}
if (code != 0) {
......@@ -63,7 +63,7 @@ int main(int argc, char *argv[]) {
TAOS *taos = taos_connect(argv[1], "root", "taosdata", NULL, 0);
if (taos == NULL) {
printf("failed to connect to server, reason:%s\n", "null taos"/*taos_errstr(taos)*/);
printf("failed to connect to server, reason:%s\n", taos_errstr(NULL));
exit(1);
}
for (int i = 0; i < 100; i++) {
......@@ -86,14 +86,14 @@ void Test(TAOS *taos, char *qstr, int index) {
for (i = 0; i < 10; ++i) {
sprintf(qstr, "insert into m1 values (%" PRId64 ", %d, %d, %d, %d, %f, %lf, '%s')", (uint64_t)(1546300800000 + i * 1000), i, i, i, i*10000000, i*1.0, i*2.0, "hello");
printf("qstr: %s\n", qstr);
// note: how do you wanna do if taos_query returns non-NULL
// if (taos_query(taos, qstr)) {
// printf("insert row: %i, reason:%s\n", i, taos_errstr(taos));
// }
TAOS_RES *result1 = taos_query(taos, qstr);
if (result1 == NULL || taos_errno(result1) != 0) {
printf("failed to insert row, reason:%s\n", taos_errstr(result1));
printf("failed to insert row, reason:%s\n", taos_errstr(result1));
taos_free_result(result1);
exit(1);
} else {
......@@ -107,7 +107,7 @@ void Test(TAOS *taos, char *qstr, int index) {
sprintf(qstr, "SELECT * FROM m1");
result = taos_query(taos, qstr);
if (result == NULL || taos_errno(result) != 0) {
printf("failed to select, reason:%s\n", taos_errstr(result));
printf("failed to select, reason:%s\n", taos_errstr(result));
taos_free_result(result);
exit(1);
}
......
......@@ -59,7 +59,7 @@ typedef struct SDataSinkMgtCfg {
uint32_t maxDataBlockNumPerQuery;
} SDataSinkMgtCfg;
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI);
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI, void** ppSinkManager);
typedef struct SInputData {
const struct SSDataBlock* pData;
......@@ -83,7 +83,7 @@ typedef struct SOutputData {
* @param pHandle output
* @return error code
*/
int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id);
int32_t dsCreateDataSinker(void* pSinkManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id);
int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat);
......
......@@ -189,7 +189,7 @@ int32_t streamInit();
void streamCleanUp();
SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* queue);
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId);
static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) {
ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
......
......@@ -935,7 +935,7 @@ function updateProduct() {
fi
echo
echo -e "\033[44;32;1m${productName2} is updated successfully!${NC}"
echo -e "\033[44;32;1mTo manage ${productName2} instance, view documentation and explorer features, you need to install ${clientName2}Explorer ${NC}"
echo -e "\033[44;32;1mTo manage ${productName2} instance, view documentation or explorer features, please install ${clientName2}Explorer ${NC}"
else
install_bin
install_config
......@@ -1028,7 +1028,7 @@ function installProduct() {
fi
echo -e "\033[44;32;1m${productName2} is installed successfully!${NC}"
echo -e "\033[44;32;1mTo manage ${productName2} instance, view documentation and explorer features, you need to install ${clientName2}Explorer ${NC}"
echo -e "\033[44;32;1mTo manage ${productName2} instance, view documentation or explorer features, please install ${clientName2}Explorer ${NC}"
echo
else # Only install client
install_bin
......
......@@ -687,14 +687,14 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
pReq.pColumns = taosArrayInit(req.schemaRow.nCols, sizeof(SField));
for (int32_t i = 0; i < req.schemaRow.nCols; i++) {
SSchema* pSchema = req.schemaRow.pSchema + i;
SField field = {.type = pSchema->type, .bytes = pSchema->bytes};
SField field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
strcpy(field.name, pSchema->name);
taosArrayPush(pReq.pColumns, &field);
}
pReq.pTags = taosArrayInit(req.schemaTag.nCols, sizeof(SField));
for (int32_t i = 0; i < req.schemaTag.nCols; i++) {
SSchema* pSchema = req.schemaTag.pSchema + i;
SField field = {.type = pSchema->type, .bytes = pSchema->bytes};
SField field = {.type = pSchema->type, .flags = pSchema->flags, .bytes = pSchema->bytes};
strcpy(field.name, pSchema->name);
taosArrayPush(pReq.pTags, &field);
}
......
......@@ -985,6 +985,10 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) {
int32_t tmq_unsubscribe(tmq_t* tmq) {
if(tmq == NULL) return TSDB_CODE_INVALID_PARA;
if (tmq->status != TMQ_CONSUMER_STATUS__READY) {
tscInfo("consumer:0x%" PRIx64 " not in ready state, unsubscribe it directly", tmq->consumerId);
return 0;
}
if (tmq->autoCommit) {
int32_t rsp = tmq_commit_sync(tmq, NULL);
if (rsp != 0) {
......
......@@ -114,9 +114,10 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
const STraceId *trace = &pMsg->info.traceId;
dGTrace("vgId:%d, msg:%p get from vnode-fetch queue", pVnode->vgId, pMsg);
terrno = 0;
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo);
if (code != 0) {
if (terrno != 0) {
if (code == -1 && terrno != 0) {
code = terrno;
}
......
......@@ -3159,137 +3159,161 @@ static int32_t buildDbColsInfoBlock(const SSDataBlock *p, const SSysTableMeta *p
return numOfRows;
}
#define BUILD_COL_FOR_INFO_DB 1
#define BUILD_COL_FOR_PERF_DB 1 << 1
#define BUILD_COL_FOR_USER_DB 1 << 2
#define BUILD_COL_FOR_ALL_DB (BUILD_COL_FOR_INFO_DB | BUILD_COL_FOR_PERF_DB | BUILD_COL_FOR_USER_DB)
static int32_t buildSysDbColsInfo(SSDataBlock *p, char *db, char *tb) {
static int32_t buildSysDbColsInfo(SSDataBlock *p, int8_t buildWhichDBs, char *tb) {
size_t size = 0;
const SSysTableMeta *pSysDbTableMeta = NULL;
if (db[0] && strncmp(db, TSDB_INFORMATION_SCHEMA_DB, TSDB_DB_FNAME_LEN) != 0 &&
strncmp(db, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_DB_FNAME_LEN) != 0) {
return p->info.rows;
if (buildWhichDBs & BUILD_COL_FOR_INFO_DB) {
getInfosDbMeta(&pSysDbTableMeta, &size);
p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB, tb);
}
getInfosDbMeta(&pSysDbTableMeta, &size);
p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_INFORMATION_SCHEMA_DB, tb);
getPerfDbMeta(&pSysDbTableMeta, &size);
p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB, tb);
if (buildWhichDBs & BUILD_COL_FOR_PERF_DB) {
getPerfDbMeta(&pSysDbTableMeta, &size);
p->info.rows = buildDbColsInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB, tb);
}
return p->info.rows;
}
static int8_t determineBuildColForWhichDBs(const char* db) {
int8_t buildWhichDBs;
if (!db[0])
buildWhichDBs = BUILD_COL_FOR_ALL_DB;
else {
char *p = strchr(db, '.');
if (p && strcmp(p + 1, TSDB_INFORMATION_SCHEMA_DB) == 0) {
buildWhichDBs = BUILD_COL_FOR_INFO_DB;
} else if (p && strcmp(p + 1, TSDB_PERFORMANCE_SCHEMA_DB) == 0) {
buildWhichDBs = BUILD_COL_FOR_PERF_DB;
} else {
buildWhichDBs = BUILD_COL_FOR_USER_DB;
}
}
return buildWhichDBs;
}
static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
uint8_t buildWhichDBs;
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SStbObj *pStb = NULL;
int32_t numOfRows = 0;
buildWhichDBs = determineBuildColForWhichDBs(pShow->db);
int32_t numOfRows = 0;
if (!pShow->sysDbRsp) {
numOfRows = buildSysDbColsInfo(pBlock, pShow->db, pShow->filterTb);
numOfRows = buildSysDbColsInfo(pBlock, buildWhichDBs, pShow->filterTb);
mDebug("mndRetrieveStbCol get system table cols, rows:%d, db:%s", numOfRows, pShow->db);
pShow->sysDbRsp = true;
}
SDbObj *pDb = NULL;
if (strlen(pShow->db) > 0) {
pDb = mndAcquireDb(pMnode, pShow->db);
if (pDb == NULL) return terrno;
}
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(typeName, "SUPER_TABLE");
bool fetch = pShow->restore ? false : true;
pShow->restore = false;
while (numOfRows < rows) {
if (fetch) {
pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
if (pShow->pIter == NULL) break;
} else {
fetch = true;
void *pKey = taosHashGetKey(pShow->pIter, NULL);
pStb = sdbAcquire(pSdb, SDB_STB, pKey);
if (!pStb) continue;
if (buildWhichDBs & BUILD_COL_FOR_USER_DB) {
SDbObj *pDb = NULL;
if (strlen(pShow->db) > 0) {
pDb = mndAcquireDb(pMnode, pShow->db);
if (pDb == NULL && TSDB_CODE_MND_DB_NOT_EXIST != terrno && pBlock->info.rows == 0) return terrno;
}
if (pDb != NULL && pStb->dbUid != pDb->uid) {
sdbRelease(pSdb, pStb);
continue;
}
char typeName[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(typeName, "SUPER_TABLE");
bool fetch = pShow->restore ? false : true;
pShow->restore = false;
while (numOfRows < rows) {
if (fetch) {
pShow->pIter = sdbFetch(pSdb, SDB_STB, pShow->pIter, (void **)&pStb);
if (pShow->pIter == NULL) break;
} else {
fetch = true;
void *pKey = taosHashGetKey(pShow->pIter, NULL);
pStb = sdbAcquire(pSdb, SDB_STB, pKey);
if (!pStb) continue;
}
SName name = {0};
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
if (pShow->filterTb[0] && strncmp(pShow->filterTb, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN) != 0) {
sdbRelease(pSdb, pStb);
continue;
}
if (pDb != NULL && pStb->dbUid != pDb->uid) {
sdbRelease(pSdb, pStb);
continue;
}
if ((numOfRows + pStb->numOfColumns) > rows) {
pShow->restore = true;
if (numOfRows == 0) {
mError("mndRetrieveStbCol failed to get stable cols since buf:%d less than result:%d, stable name:%s, db:%s",
rows, pStb->numOfColumns, pStb->name, pStb->db);
SName name = {0};
char stbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
mndExtractTbNameFromStbFullName(pStb->name, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN);
if (pShow->filterTb[0] && strncmp(pShow->filterTb, &stbName[VARSTR_HEADER_SIZE], TSDB_TABLE_NAME_LEN) != 0) {
sdbRelease(pSdb, pStb);
continue;
}
sdbRelease(pSdb, pStb);
break;
}
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
if ((numOfRows + pStb->numOfColumns) > rows) {
pShow->restore = true;
if (numOfRows == 0) {
mError("mndRetrieveStbCol failed to get stable cols since buf:%d less than result:%d, stable name:%s, db:%s",
rows, pStb->numOfColumns, pStb->name, pStb->db);
}
sdbRelease(pSdb, pStb);
break;
}
mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db);
varDataSetLen(stbName, strlen(&stbName[VARSTR_HEADER_SIZE]));
char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&name, varDataVal(db));
varDataSetLen(db, strlen(varDataVal(db)));
mDebug("mndRetrieveStbCol get stable cols, stable name:%s, db:%s", pStb->name, pStb->db);
for (int i = 0; i < pStb->numOfColumns; i++) {
int32_t cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false);
char db[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
tNameFromString(&name, pStb->db, T_NAME_ACCT | T_NAME_DB);
tNameGetDbName(&name, varDataVal(db));
varDataSetLen(db, strlen(varDataVal(db)));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
for (int i = 0; i < pStb->numOfColumns; i++) {
int32_t cols = 0;
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)stbName, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, typeName, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)db, false);
// col name
char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(colName, pStb->pColumns[i].name);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, colName, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, typeName, false);
// col type
int8_t colType = pStb->pColumns[i].type;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char colTypeStr[VARSTR_HEADER_SIZE + 32];
int colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
if (colType == TSDB_DATA_TYPE_VARCHAR) {
colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
(int32_t)(pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE));
} else if (colType == TSDB_DATA_TYPE_NCHAR) {
colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
(int32_t)((pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
}
varDataSetLen(colTypeStr, colTypeLen);
colDataSetVal(pColInfo, numOfRows, (char *)colTypeStr, false);
// col name
char colName[TSDB_COL_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_TO_VARSTR(colName, pStb->pColumns[i].name);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, colName, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->pColumns[i].bytes, false);
while (cols < pShow->numOfColumns) {
// col type
int8_t colType = pStb->pColumns[i].type;
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetNULL(pColInfo, numOfRows);
char colTypeStr[VARSTR_HEADER_SIZE + 32];
int colTypeLen = sprintf(varDataVal(colTypeStr), "%s", tDataTypes[colType].name);
if (colType == TSDB_DATA_TYPE_VARCHAR) {
colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
(int32_t)(pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE));
} else if (colType == TSDB_DATA_TYPE_NCHAR) {
colTypeLen += sprintf(varDataVal(colTypeStr) + colTypeLen, "(%d)",
(int32_t)((pStb->pColumns[i].bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE));
}
varDataSetLen(colTypeStr, colTypeLen);
colDataSetVal(pColInfo, numOfRows, (char *)colTypeStr, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char *)&pStb->pColumns[i].bytes, false);
while (cols < pShow->numOfColumns) {
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetNULL(pColInfo, numOfRows);
}
numOfRows++;
}
numOfRows++;
}
sdbRelease(pSdb, pStb);
}
sdbRelease(pSdb, pStb);
}
if (pDb != NULL) {
mndReleaseDb(pMnode, pDb);
if (pDb != NULL) {
mndReleaseDb(pMnode, pDb);
}
}
pShow->numOfRows += numOfRows;
......
......@@ -235,36 +235,33 @@ static int32_t tsdbCommitOpenReader(SCommitter2 *committer) {
return 0;
}
ASSERT(TARRAY2_SIZE(committer->ctx->fset->lvlArr) == 1);
SSttLvl *lvl = TARRAY2_FIRST(committer->ctx->fset->lvlArr);
ASSERT(lvl->level == 0);
STFileObj *fobj = NULL;
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
SSttFileReader *sttReader;
SSttFileReaderConfig config = {
.tsdb = committer->tsdb,
.szPage = committer->szPage,
.file = fobj->f[0],
};
code = tsdbSttFileReaderOpen(fobj->fname, &config, &sttReader);
TSDB_CHECK_CODE(code, lino, _exit);
SSttLvl *lvl;
TARRAY2_FOREACH(committer->ctx->fset->lvlArr, lvl) {
STFileObj *fobj = NULL;
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
SSttFileReader *sttReader;
SSttFileReaderConfig config = {
.tsdb = committer->tsdb,
.szPage = committer->szPage,
.file = fobj->f[0],
};
code = tsdbSttFileReaderOpen(fobj->fname, &config, &sttReader);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(committer->sttReaderArray, sttReader);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(committer->sttReaderArray, sttReader);
TSDB_CHECK_CODE(code, lino, _exit);
STFileOp op = {
.optype = TSDB_FOP_REMOVE,
.fid = fobj->f->fid,
.of = fobj->f[0],
};
STFileOp op = {
.optype = TSDB_FOP_REMOVE,
.fid = fobj->f->fid,
.of = fobj->f[0],
};
code = TARRAY2_APPEND(committer->fopArray, op);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND(committer->fopArray, op);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
_exit:
......
......@@ -86,7 +86,7 @@ static int32_t tsdbSttLvlApplyEdit(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl *l
// create a file obj
code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj2);
if (code) return code;
code = TARRAY2_APPEND(lvl2->fobjArr, fobj2);
code = TARRAY2_INSERT_PTR(lvl2->fobjArr, i2, &fobj2);
if (code) return code;
i1++;
i2++;
......@@ -112,7 +112,7 @@ static int32_t tsdbSttLvlApplyEdit(STsdb *pTsdb, const SSttLvl *lvl1, SSttLvl *l
// create a file obj
code = tsdbTFileObjInit(pTsdb, fobj1->f, &fobj2);
if (code) return code;
code = TARRAY2_APPEND(lvl2->fobjArr, fobj2);
code = TARRAY2_INSERT_PTR(lvl2->fobjArr, i2, &fobj2);
if (code) return code;
i1++;
i2++;
......
......@@ -224,6 +224,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
}
taosCloseQueue(pDeleter->pDataBlocks);
taosThreadMutexDestroy(&pDeleter->mutex);
taosMemoryFree(pDeleter->pManager);
return TSDB_CODE_SUCCESS;
}
......@@ -279,6 +281,8 @@ _end:
if (deleter != NULL) {
destroyDataSinker((SDataSinkHandle*)deleter);
taosMemoryFree(deleter);
} else {
taosMemoryFree(pManager);
}
return code;
}
......@@ -226,6 +226,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
}
taosCloseQueue(pDispatcher->pDataBlocks);
taosThreadMutexDestroy(&pDispatcher->mutex);
taosMemoryFree(pDispatcher->pManager);
return TSDB_CODE_SUCCESS;
}
......@@ -240,7 +241,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
SDataDispatchHandle* dispatcher = taosMemoryCalloc(1, sizeof(SDataDispatchHandle));
if (NULL == dispatcher) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY;
goto _return;
}
dispatcher->sink.fPut = putDataBlock;
dispatcher->sink.fEndPut = endPut;
......@@ -257,8 +258,13 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
if (NULL == dispatcher->pDataBlocks) {
taosMemoryFree(dispatcher);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY;
goto _return;
}
*pHandle = dispatcher;
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFree(pManager);
return terrno;
}
......@@ -395,6 +395,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
taosMemoryFree(pInserter->pParam);
taosHashCleanup(pInserter->pCols);
taosThreadMutexDestroy(&pInserter->mutex);
taosMemoryFree(pInserter->pManager);
return TSDB_CODE_SUCCESS;
}
......@@ -411,7 +413,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
if (NULL == inserter) {
taosMemoryFree(pParam);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY;
goto _return;
}
SQueryInserterNode* pInserterNode = (SQueryInserterNode*)pDataSink;
......@@ -431,23 +433,18 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
int64_t suid = 0;
int32_t code = pManager->pAPI->metaFn.getTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
if (code) {
destroyDataSinker((SDataSinkHandle*)inserter);
taosMemoryFree(inserter);
return code;
terrno = code;
goto _return;
}
if (pInserterNode->stableId != suid) {
destroyDataSinker((SDataSinkHandle*)inserter);
taosMemoryFree(inserter);
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
return terrno;
goto _return;
}
inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES);
taosThreadMutexInit(&inserter->mutex, NULL);
if (NULL == inserter->pDataBlocks) {
destroyDataSinker((SDataSinkHandle*)inserter);
taosMemoryFree(inserter);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -471,4 +468,15 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
*pHandle = inserter;
return TSDB_CODE_SUCCESS;
_return:
if (inserter) {
destroyDataSinker((SDataSinkHandle*)inserter);
taosMemoryFree(inserter);
} else {
taosMemoryFree(pManager);
}
return terrno;
}
......@@ -18,12 +18,17 @@
#include "planner.h"
#include "tarray.h"
static SDataSinkManager gDataSinkManager = {0};
SDataSinkStat gDataSinkStat = {0};
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI) {
gDataSinkManager.cfg = *cfg;
gDataSinkManager.pAPI = pAPI;
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI, void** ppSinkManager) {
SDataSinkManager* pSinkManager = taosMemoryMalloc(sizeof(SDataSinkManager));
if (NULL == pSinkManager) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pSinkManager->cfg = *cfg;
pSinkManager->pAPI = pAPI;
*ppSinkManager = pSinkManager;
return 0; // to avoid compiler eror
}
......@@ -33,18 +38,22 @@ int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat) {
return 0;
}
int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) {
int32_t dsCreateDataSinker(void* pSinkManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) {
SDataSinkManager* pManager = pSinkManager;
switch ((int)nodeType(pDataSink)) {
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle);
return createDataDispatcher(pManager, pDataSink, pHandle);
case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
return createDataDeleter(&gDataSinkManager, pDataSink, pHandle, pParam);
return createDataDeleter(pManager, pDataSink, pHandle, pParam);
}
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
return createDataInserter(&gDataSinkManager, pDataSink, pHandle, pParam);
return createDataInserter(pManager, pDataSink, pHandle, pParam);
}
default:
break;
}
taosMemoryFree(pSinkManager);
qError("invalid input node type:%d, %s", nodeType(pDataSink), id);
return TSDB_CODE_QRY_INVALID_INPUT;
}
......
......@@ -511,23 +511,25 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
goto _error;
}
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50};
code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
goto _error;
}
if (handle) {
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50};
void* pSinkManager = NULL;
code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
goto _error;
}
void* pSinkParam = NULL;
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, (*pTask), readHandle);
if (code != TSDB_CODE_SUCCESS) {
qError("failed to createDataSinkParam, vgId:%d, code:%s, %s", vgId, tstrerror(code), (*pTask)->id.str);
taosMemoryFree(pSinkManager);
goto _error;
}
// pSinkParam has been freed during create sinker.
code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str);
code = dsCreateDataSinker(pSinkManager, pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str);
}
qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
......
......@@ -64,6 +64,7 @@ typedef struct SFillOperatorInfo {
static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t order);
static void destroyFillOperatorInfo(void* param);
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
static void fillResetPrevForNewGroup(SFillInfo* pFillInfo);
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
SResultInfo* pResultInfo, int32_t order) {
......@@ -84,6 +85,9 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOp
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ts);
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
if (pInfo->pFillInfo->type == TSDB_FILL_PREV || pInfo->pFillInfo->type == TSDB_FILL_LINEAR) {
fillResetPrevForNewGroup(pInfo->pFillInfo);
}
int32_t numOfResultRows = pResultInfo->capacity - pResBlock->info.rows;
taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
......@@ -122,6 +126,15 @@ void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int
pInfo->pRes->info.id.groupId = pBlock->info.id.groupId;
}
static void fillResetPrevForNewGroup(SFillInfo* pFillInfo) {
for (int32_t colIdx = 0; colIdx < pFillInfo->numOfCols; ++colIdx) {
if (!pFillInfo->pFillCol[colIdx].notFillCol) {
SGroupKeys* key = taosArrayGet(pFillInfo->prev.pRowVal, colIdx);
key->isNull = true;
}
}
}
// todo refactor: decide the start key according to the query time range.
static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t order) {
if (order == TSDB_ORDER_ASC) {
......
......@@ -766,6 +766,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
}
streamFreeQitem(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL;
return TSDB_CODE_SUCCESS;
}
......
......@@ -35,18 +35,17 @@ FAIL:
return NULL;
}
void streamQueueClose(SStreamQueue* queue) {
while (1) {
void* qItem = streamQueueNextItem(queue);
if (qItem) {
streamFreeQitem(qItem);
} else {
break;
}
void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) {
qDebug("s-task:0x%x free the queue:%p, items in queue:%d", taskId, pQueue->queue, taosQueueItemSize(pQueue->queue));
void* qItem = NULL;
while ((qItem = streamQueueNextItem(pQueue)) != NULL) {
streamFreeQitem(qItem);
}
taosFreeQall(queue->qall);
taosCloseQueue(queue->queue);
taosMemoryFree(queue);
taosFreeQall(pQueue->qall);
taosCloseQueue(pQueue->queue);
taosMemoryFree(pQueue);
}
#if 0
......
......@@ -220,11 +220,11 @@ void tFreeStreamTask(SStreamTask* pTask) {
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
if (pTask->inputQueue) {
streamQueueClose(pTask->inputQueue);
streamQueueClose(pTask->inputQueue, pTask->id.taskId);
}
if (pTask->outputInfo.queue) {
streamQueueClose(pTask->outputInfo.queue);
streamQueueClose(pTask->outputInfo.queue, pTask->id.taskId);
}
if (pTask->exec.qmsg) {
......@@ -255,6 +255,11 @@ void tFreeStreamTask(SStreamTask* pTask) {
streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING);
}
if (pTask->msgInfo.pData != NULL) {
destroyStreamDataBlock(pTask->msgInfo.pData);
pTask->msgInfo.pData = NULL;
}
if (pTask->id.idStr != NULL) {
taosMemoryFree((void*)pTask->id.idStr);
}
......
......@@ -760,6 +760,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/projectionDesc.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/odbc.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/fill_with_group.py
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-21561.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-20582.py
,,n,system-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/insertMix.py -N 3
......
......@@ -215,7 +215,13 @@ class TDTestCase:
for t in range (2):
tdSql.query(f'select * from information_schema.ins_columns where db_name="db2" and table_type=="NORMAL_TABLE"')
tdSql.checkEqual(20470,len(tdSql.queryResult))
tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'")
tdSql.checkEqual(195, len(tdSql.queryResult))
tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'")
tdSql.checkEqual(54, len(tdSql.queryResult))
def ins_dnodes_check(self):
tdSql.execute('drop database if exists db2')
tdSql.execute('create database if not exists db2 vgroups 1 replica 1')
......
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
# from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 4
self.ctbNum = 10
self.rowsPerTbl = 10000
self.duraion = '1h'
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def create_database(self,tsql, dbName,dropFlag=1,vgroups=2,replica=1, duration:str='1d'):
if dropFlag == 1:
tsql.execute("drop database if exists %s"%(dbName))
tsql.execute("create database if not exists %s vgroups %d replica %d duration %s"%(dbName, vgroups, replica, duration))
tdLog.debug("complete to create database %s"%(dbName))
return
def create_stable(self,tsql, paraDict):
colString = tdCom.gen_column_type_str(colname_prefix=paraDict["colPrefix"], column_elm_list=paraDict["colSchema"])
tagString = tdCom.gen_tag_type_str(tagname_prefix=paraDict["tagPrefix"], tag_elm_list=paraDict["tagSchema"])
sqlString = f"create table if not exists %s.%s (%s) tags (%s)"%(paraDict["dbName"], paraDict["stbName"], colString, tagString)
tdLog.debug("%s"%(sqlString))
tsql.execute(sqlString)
return
def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0):
for i in range(ctbNum):
sqlString = "create table %s.%s%d using %s.%s tags(%d, 'tb%d', 'tb%d', %d, %d, %d)" % \
(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,(i+ctbStartIdx) % 5,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx)
tsql.execute(sqlString)
tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName))
return
def insert_data(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,tsStep):
tdLog.debug("start to insert data ............")
tsql.execute("use %s" %dbName)
pre_insert = "insert into "
sql = pre_insert
for i in range(ctbNum):
rowsBatched = 0
sql += " %s%d values "%(ctbPrefix,i)
for j in range(rowsPerTbl):
if (i < ctbNum/2):
sql += "(%d, %d, %d, %d,%d,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10, j%10, j%10)
else:
sql += "(%d, %d, NULL, %d,NULL,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10)
rowsBatched += 1
if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
rowsBatched = 0
if j < rowsPerTbl - 1:
sql = "insert into %s%d values " %(ctbPrefix,i)
else:
sql = "insert into "
if sql != pre_insert:
tsql.execute(sql)
tdLog.debug("insert data ............ [OK]")
return
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'test',
'dropFlag': 1,
'vgroups': 2,
'stbName': 'meters',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'FLOAT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'smallint', 'count':1},{'type': 'tinyint', 'count':1},{'type': 'bool', 'count':1},{'type': 'binary', 'len':10, 'count':1},{'type': 'nchar', 'len':10, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'nchar', 'len':20, 'count':1},{'type': 'binary', 'len':20, 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'smallint', 'count':1},{'type': 'DOUBLE', 'count':1}],
'ctbPrefix': 't',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 10000,
'batchNum': 3000,
'startTs': 1537146000000,
'tsStep': 600000}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("create database")
self.create_database(tsql=tdSql, dbName=paraDict["dbName"], dropFlag=paraDict["dropFlag"], vgroups=paraDict["vgroups"], replica=self.replicaVar, duration=self.duraion)
tdLog.info("create stb")
self.create_stable(tsql=tdSql, paraDict=paraDict)
tdLog.info("create child tables")
self.create_ctable(tsql=tdSql, dbName=paraDict["dbName"], \
stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],\
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict["ctbStartIdx"])
self.insert_data(tsql=tdSql, dbName=paraDict["dbName"],\
ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],\
rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],\
startTs=paraDict["startTs"],tsStep=paraDict["tsStep"])
return
def test_partition_by_with_interval_fill_prev_new_group_fill_error(self):
## every table has 1500 rows after fill, 10 tables, total 15000 rows.
## there is no data from 9-17 08:00:00 ~ 9-17 09:00:00, so first 60 rows of every group will be NULL, cause no prev value.
sql = "select _wstart, count(*),tbname from meters where ts > '2018-09-17 08:00:00.000' and ts < '2018-09-18 09:00:00.000' partition by tbname interval(1m) fill(PREV) order by tbname, _wstart"
tdSql.query(sql)
for i in range(0,10):
for j in range(0,60):
tdSql.checkData(i*1500+j, 1, None)
sql = "select _wstart, count(*),tbname from meters where ts > '2018-09-17 08:00:00.000' and ts < '2018-09-18 09:00:00.000' partition by tbname interval(1m) fill(LINEAR) order by tbname, _wstart"
tdSql.query(sql)
for i in range(0,10):
for j in range(0,60):
tdSql.checkData(i*1500+j, 1, None)
def run(self):
self.prepareTestEnv()
self.test_partition_by_with_interval_fill_prev_new_group_fill_error()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册