提交 f0dff1f0 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-11274-3.0

...@@ -73,7 +73,7 @@ If `maven` is used to manage the projects, what needs to be done is only adding ...@@ -73,7 +73,7 @@ If `maven` is used to manage the projects, what needs to be done is only adding
<dependency> <dependency>
<groupId>com.taosdata.jdbc</groupId> <groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId> <artifactId>taos-jdbcdriver</artifactId>
<version>2.0.38</version> <version>3.0.0</version>
</dependency> </dependency>
``` ```
...@@ -102,7 +102,7 @@ module goexample ...@@ -102,7 +102,7 @@ module goexample
go 1.17 go 1.17
require github.com/taosdata/driver-go/v2 develop require github.com/taosdata/driver-go/v3 latest
``` ```
:::note :::note
...@@ -137,7 +137,7 @@ Node.js connector provides different ways of establishing connections by providi ...@@ -137,7 +137,7 @@ Node.js connector provides different ways of establishing connections by providi
1. Install Node.js Native Connector 1. Install Node.js Native Connector
``` ```
npm i td2.0-connector npm install @tdengine/client
``` ```
:::note :::note
...@@ -147,7 +147,7 @@ It's recommend to use Node whose version is between `node-v12.8.0` and `node-v13 ...@@ -147,7 +147,7 @@ It's recommend to use Node whose version is between `node-v12.8.0` and `node-v13
2. Install Node.js REST Connector 2. Install Node.js REST Connector
``` ```
npm i td2.0-rest-connector npm install @tdengine/rest
``` ```
</TabItem> </TabItem>
...@@ -167,7 +167,7 @@ Just need to add the reference to [TDengine.Connector](https://www.nuget.org/pac ...@@ -167,7 +167,7 @@ Just need to add the reference to [TDengine.Connector](https://www.nuget.org/pac
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="TDengine.Connector" Version="1.0.6" /> <PackageReference Include="TDengine.Connector" Version="3.0.0" />
</ItemGroup> </ItemGroup>
</Project> </Project>
...@@ -187,7 +187,7 @@ The sample code below are based on dotnet6.0, they may need to be adjusted if yo ...@@ -187,7 +187,7 @@ The sample code below are based on dotnet6.0, they may need to be adjusted if yo
</TabItem> </TabItem>
<TabItem label="R" value="r"> <TabItem label="R" value="r">
1. Download [taos-jdbcdriver-version-dist.jar](https://repo1.maven.org/maven2/com/taosdata/jdbc/taos-jdbcdriver/2.0.38/). 1. Download [taos-jdbcdriver-version-dist.jar](https://repo1.maven.org/maven2/com/taosdata/jdbc/taos-jdbcdriver/3.0.0/).
2. Install the dependency package `RJDBC` 2. Install the dependency package `RJDBC`
```R ```R
......
...@@ -2,13 +2,18 @@ Execute TDengine CLI program `taos` directly from the Linux shell to connect to ...@@ -2,13 +2,18 @@ Execute TDengine CLI program `taos` directly from the Linux shell to connect to
```text ```text
$ taos $ taos
Welcome to the TDengine shell from Linux, Client Version:2.0.5.0 Welcome to the TDengine shell from Linux, Client Version:3.0.0.0
Copyright (c) 2017 by TAOS Data, Inc. All rights reserved. Copyright (c) 2022 by TAOS Data, Inc. All rights reserved.
Server is Community Edition.
taos> show databases; taos> show databases;
name | created_time | ntables | vgroups | replica | quorum | days | keep1,keep2,keep(D) | cache(MB)| blocks | minrows | maxrows | wallevel | fsync | comp | precision | status | name | create_time | vgroups | ntables | replica | strict | duration | keep | buffer | pagesize | pages | minrows | maxrows | comp | precision | status | retention | single_stable | cachemodel | cachesize | wal_level | wal_fsync_period | wal_retention_period | wal_retention_size | wal_roll_period | wal_seg_size |
========================================================================================================================================================================================================================= =========================================================================================================================================================================================================================================================================================================================================================================================================================================================================
test | 2020-10-14 10:35:48.617 | 10 | 1 | 1 | 1 | 2 | 3650,3650,3650 | 16| 6 | 100 | 4096 | 1 | 3000 | 2 | ms | ready | information_schema | NULL | NULL | 14 | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | ready | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL |
log | 2020-10-12 09:08:21.651 | 4 | 1 | 1 | 1 | 10 | 30,30,30 | 1| 3 | 100 | 4096 | 1 | 3000 | 2 | us | ready | performance_schema | NULL | NULL | 3 | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | ready | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL |
Query OK, 2 row(s) in set (0.001198s) db | 2022-08-04 14:14:49.385 | 2 | 4 | 1 | off | 14400m | 5254560m,5254560m,5254560m | 96 | 4 | 256 | 100 | 4096 | 2 | ms | ready | NULL | false | none | 1 | 1 | 3000 | 0 | 0 | 0 | 0 |
Query OK, 3 rows in database (0.019154s)
taos> taos>
``` ```
Go to the `C:\TDengine` directory from `cmd` and execute TDengine CLI program `taos.exe` directly to connect to the TDengine service and enter the TDengine CLI interface, for example, as follows: Go to the `C:\TDengine` directory from `cmd` and execute TDengine CLI program `taos.exe` directly to connect to the TDengine service and enter the TDengine CLI interface, for example, as follows:
```text ```text
C:\TDengine>taos Welcome to the TDengine shell from Windows, Client Version:3.0.0.0
Welcome to the TDengine shell from Linux, Client Version:2.0.5.0 Copyright (c) 2022 by TAOS Data, Inc. All rights reserved.
Copyright (c) 2017 by TAOS Data, Inc. All rights reserved.
taos> show databases; Server is Community Edition.
name | created_time | ntables | vgroups | replica | quorum | days | keep1,keep2,keep(D) | cache(MB) | blocks | minrows | maxrows | wallevel | fsync | comp | precision | status |
=================================================================================================================================================================================================================================================================== taos> show databases;
test | 2020-10-14 10:35:48.617 | 10 | 1 | 1 | 1 | 2 | 3650,3650,3650 | 16 | 6 | 100 | 4096 | 1 | 3000 | 2 | ms | ready | name | create_time | vgroups | ntables | replica | strict | duration | keep | buffer | pagesize | pages | minrows | maxrows | comp | precision | status | retention | single_stable | cachemodel | cachesize | wal_level | wal_fsync_period | wal_retention_period | wal_retention_size | wal_roll_period | wal_seg_size |
log | 2020-10-12 09:08:21.651 | 4 | 1 | 1 | 1 | 10 | 30,30,30 | 1 | 3 | 100 | 4096 | 1 | 3000 | 2 | us | ready | =========================================================================================================================================================================================================================================================================================================================================================================================================================================================================
Query OK, 2 row(s) in set (0.045000s) information_schema | NULL | NULL | 14 | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | ready | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL |
taos> performance_schema | NULL | NULL | 3 | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | ready | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL |
test | 2022-08-04 16:46:40.506 | 2 | 0 | 1 | off | 14400m | 5256000m,5256000m,5256000m | 96 | 4 | 256 |
100 | 4096 | 2 | ms | ready | NULL | false | none | 1 | 1 | 3000 | 0 | 0 | 0 | 0 |
Query OK, 3 rows in database (0.123000s)
taos>
``` ```
...@@ -8,9 +8,9 @@ library("rJava") ...@@ -8,9 +8,9 @@ library("rJava")
library("RJDBC") library("RJDBC")
args<- commandArgs(trailingOnly = TRUE) args<- commandArgs(trailingOnly = TRUE)
driver_path = args[1] # path to jdbc-driver for example: "/root/taos-jdbcdriver-2.0.37-dist.jar" driver_path = args[1] # path to jdbc-driver for example: "/root/taos-jdbcdriver-3.0.0-dist.jar"
driver = JDBC("com.taosdata.jdbc.TSDBDriver", driver_path) driver = JDBC("com.taosdata.jdbc.TSDBDriver", driver_path)
conn = dbConnect(driver, "jdbc:TAOS://127.0.0.1:6030/?user=root&password=taosdata") conn = dbConnect(driver, "jdbc:TAOS://127.0.0.1:6030/?user=root&password=taosdata")
dbGetQuery(conn, "SELECT server_version()") dbGetQuery(conn, "SELECT server_version()")
dbDisconnect(conn) dbDisconnect(conn)
# ANCHOR_END: demo # ANCHOR_END: demo
\ No newline at end of file
...@@ -74,7 +74,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速 ...@@ -74,7 +74,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速
<dependency> <dependency>
<groupId>com.taosdata.jdbc</groupId> <groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId> <artifactId>taos-jdbcdriver</artifactId>
<version>2.0.38</version> <version>3.0.0</version>
</dependency> </dependency>
``` ```
...@@ -103,7 +103,7 @@ module goexample ...@@ -103,7 +103,7 @@ module goexample
go 1.17 go 1.17
require github.com/taosdata/driver-go/v2 develop require github.com/taosdata/driver-go/v3 latest
``` ```
:::note :::note
...@@ -138,7 +138,7 @@ Node.js 连接器通过不同的包提供不同的连接方式。 ...@@ -138,7 +138,7 @@ Node.js 连接器通过不同的包提供不同的连接方式。
1. 安装 Node.js 原生连接器 1. 安装 Node.js 原生连接器
``` ```
npm i td2.0-connector npm install @tdengine/client
``` ```
:::note :::note
...@@ -148,7 +148,7 @@ Node.js 连接器通过不同的包提供不同的连接方式。 ...@@ -148,7 +148,7 @@ Node.js 连接器通过不同的包提供不同的连接方式。
2. 安装 Node.js REST 连接器 2. 安装 Node.js REST 连接器
``` ```
npm i td2.0-rest-connector npm install @tdengine/rest
``` ```
</TabItem> </TabItem>
...@@ -168,7 +168,7 @@ Node.js 连接器通过不同的包提供不同的连接方式。 ...@@ -168,7 +168,7 @@ Node.js 连接器通过不同的包提供不同的连接方式。
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="TDengine.Connector" Version="1.0.6" /> <PackageReference Include="TDengine.Connector" Version="3.0.0" />
</ItemGroup> </ItemGroup>
</Project> </Project>
...@@ -188,7 +188,7 @@ dotnet add package TDengine.Connector ...@@ -188,7 +188,7 @@ dotnet add package TDengine.Connector
</TabItem> </TabItem>
<TabItem label="R" value="r"> <TabItem label="R" value="r">
1. 下载 [taos-jdbcdriver-version-dist.jar](https://repo1.maven.org/maven2/com/taosdata/jdbc/taos-jdbcdriver/2.0.38/) 1. 下载 [taos-jdbcdriver-version-dist.jar](https://repo1.maven.org/maven2/com/taosdata/jdbc/taos-jdbcdriver/3.0.0/)
2. 安装 R 的依赖包`RJDBC` 2. 安装 R 的依赖包`RJDBC`
```R ```R
......
...@@ -16,72 +16,96 @@ description: "支持用户编码的聚合函数和标量函数,在查询中嵌 ...@@ -16,72 +16,96 @@ description: "支持用户编码的聚合函数和标量函数,在查询中嵌
用户可以按照下列函数模板定义自己的标量计算函数 用户可以按照下列函数模板定义自己的标量计算函数
`void udfNormalFunc(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBuf, char* tsOutput, int* numOfOutput, short otype, short obytes, SUdfInit* buf)` `int32_t udf(SUdfDataBlock* inputDataBlock, SUdfColumn *resultColumn)`
其中 udfNormalFunc 是函数名的占位符,以上述模板实现的函数对行数据块进行标量计算,其参数项是固定的,用于按照约束完成与引擎之间的数据交换。 其中 udf 是函数名的占位符,以上述模板实现的函数对行数据块进行标量计算。
- udfNormalFunc 中各参数的具体含义是: - scalarFunction 中各参数的具体含义是:
- data:输入数据。 - inputDataBlock: 输入的数据块
- itype:输入数据的类型。这里采用的是短整型表示法,与各种数据类型对应的值可以参见 [column_meta 中的列类型说明](/reference/rest-api/)。例如 4 用于表示 INT 型。 - resultColumn: 输出列
- iBytes:输入数据中每个值会占用的字节数。
- numOfRows:输入数据的总行数。
- ts:主键时间戳在输入中的列数据(只读)。
- dataOutput:输出数据的缓冲区,缓冲区大小为用户指定的输出类型大小 \* numOfRows。
- interBuf:中间计算结果的缓冲区,大小为用户在创建 UDF 时指定的 BUFSIZE 大小。通常用于计算中间结果与最终结果不一致时使用,由引擎负责分配与释放。
- tsOutput:主键时间戳在输出时的列数据,如果非空可用于输出结果对应的时间戳。
- numOfOutput:输出结果的个数(行数)。
- oType:输出数据的类型。取值含义与 itype 参数一致。
- oBytes:输出数据中每个值占用的字节数。
- buf:用于在 UDF 与引擎间的状态控制信息传递块。
[add_one.c](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/add_one.c) 是结构最简单的 UDF 实现,也即上面定义的 udfNormalFunc 函数的一个具体实现。其功能为:对传入的一个数据列(可能因 WHERE 子句进行了筛选)中的每一项,都输出 +1 之后的值,并且要求输入的列数据类型为 INT。
### 聚合函数 ### 聚合函数
用户可以按照如下函数模板定义自己的聚合函数。 用户可以按照如下函数模板定义自己的聚合函数。
`void abs_max_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf)` `int32_t udf_start(SUdfInterBuf *interBuf)`
其中 udfMergeFunc 是函数名的占位符,以上述模板实现的函数用于对计算中间结果进行聚合,只有针对超级表的聚合查询才需要调用该函数。其中各参数的具体含义是:
- data:udfNormalFunc 的输出数据数组,如果使用了 interBuf 那么 data 就是 interBuf 的数组。
- numOfRows:data 中数据的行数。
- dataOutput:输出数据的缓冲区,大小等于一条最终结果的大小。如果此时输出还不是最终结果,可以选择输出到 interBuf 中即 data 中。
- numOfOutput:输出结果的个数(行数)。
- buf:用于在 UDF 与引擎间的状态控制信息传递块。
[abs_max.c](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/abs_max.c) 实现的是一个聚合函数,功能是对一组数据按绝对值取最大值。
其计算过程为:与所在查询语句相关的数据会被分为多个行数据块,对每个行数据块调用 udfNormalFunc(在本例的实现代码中,实际函数名是 `abs_max`)来生成每个子表的中间结果,再将子表的中间结果调用 udfMergeFunc(本例中,其实际的函数名是 `abs_max_merge`)进行聚合,生成超级表的最终聚合结果或中间结果。聚合查询最后还会通过 udfFinalizeFunc(本例中,其实际的函数名是 `abs_max_finalize`)再把超级表的中间结果处理为最终结果,最终结果只能含 0 或 1 条结果数据。
其他典型场景,如协方差的计算,也可通过定义聚合 UDF 的方式实现。 `int32_t udf(SUdfDataBlock* inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf)`
### 最终计算 `int32_t udf_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result)`
其中 udf 是函数名的占位符。其中各参数的具体含义是:
用户可以按下面的函数模板实现自己的函数对计算结果进行最终计算,通常用于有 interBuf 使用的场景。 - interBuf:中间结果 buffer。
- inputBlock:输入的数据块。
- newInterBuf:新的中间结果buffer。
- result:最终结果。
`void abs_max_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf)`
其中 udfFinalizeFunc 是函数名的占位符 ,其中各参数的具体含义是: 其计算过程为:首先调用udf_start生成结果buffer,然后相关的数据会被分为多个行数据块,对每个行数据块调用 udf 用数据块更新中间结果,最后再调用 udf_finish 从中间结果产生最终结果,最终结果只能含 0 或 1 条结果数据。
- dataOutput:输出数据的缓冲区。
- interBuf:中间结算结果缓冲区,可作为输入。
- numOfOutput:输出数据的个数,对聚合函数来说只能是 0 或者 1。
- buf:用于在 UDF 与引擎间的状态控制信息传递块。
## UDF 实现方式的规则总结 ### UDF 初始化和销毁
`int32_t udf_init()`
三类 UDF 函数: udfNormalFunc、udfMergeFunc、udfFinalizeFunc ,其函数名约定使用相同的前缀,此前缀即 udfNormalFunc 的实际函数名,也即 udfNormalFunc 函数不需要在实际函数名后添加后缀;而udfMergeFunc 的函数名要加上后缀 `_merge`、udfFinalizeFunc 的函数名要加上后缀 `_finalize`,这是 UDF 实现规则的一部分,系统会按照这些函数名后缀来调用相应功能。 `int32_t udf_destroy()`
根据 UDF 函数类型的不同,用户所要实现的功能函数也不同: 其中 udf 是函数名的占位符。udf_init 完成初始化工作。 udf_destroy 完成清理工作。
- 标量函数:UDF 中需实现 udfNormalFunc。
- 聚合函数:UDF 中需实现 udfNormalFunc、udfMergeFunc(对超级表查询)、udfFinalizeFunc。
:::note :::note
如果对应的函数不需要具体的功能,也需要实现一个空函数。 如果对应的函数不需要具体的功能,也需要实现一个空函数。
::: :::
### UDF 数据结构
```c
typedef struct SUdfColumnMeta {
int16_t type;
int32_t bytes;
uint8_t precision;
uint8_t scale;
} SUdfColumnMeta;
typedef struct SUdfColumnData {
int32_t numOfRows;
int32_t rowsAlloc;
union {
struct {
int32_t nullBitmapLen;
char *nullBitmap;
int32_t dataLen;
char *data;
} fixLenCol;
struct {
int32_t varOffsetsLen;
int32_t *varOffsets;
int32_t payloadLen;
char *payload;
int32_t payloadAllocLen;
} varLenCol;
};
} SUdfColumnData;
typedef struct SUdfColumn {
SUdfColumnMeta colMeta;
bool hasNull;
SUdfColumnData colData;
} SUdfColumn;
typedef struct SUdfDataBlock {
int32_t numOfRows;
int32_t numOfCols;
SUdfColumn **udfCols;
} SUdfDataBlock;
typedef struct SUdfInterBuf {
int32_t bufLen;
char* buf;
int8_t numOfResult; //zero or one
} SUdfInterBuf;
```
为了更好的操作以上数据结构,提供了一些便利函数,定义在 taosudf.h。
## 编译 UDF ## 编译 UDF
用户定义函数的 C 语言源代码无法直接被 TDengine 系统使用,而是需要先编译为 动态链接库,之后才能载入 TDengine 系统。 用户定义函数的 C 语言源代码无法直接被 TDengine 系统使用,而是需要先编译为 动态链接库,之后才能载入 TDengine 系统。
......
此差异已折叠。
...@@ -2,13 +2,18 @@ ...@@ -2,13 +2,18 @@
```text ```text
$ taos $ taos
Welcome to the TDengine shell from Linux, Client Version:2.0.5.0 Welcome to the TDengine shell from Linux, Client Version:3.0.0.0
Copyright (c) 2017 by TAOS Data, Inc. All rights reserved. Copyright (c) 2022 by TAOS Data, Inc. All rights reserved.
Server is Community Edition.
taos> show databases; taos> show databases;
name | created_time | ntables | vgroups | replica | quorum | days | keep1,keep2,keep(D) | cache(MB)| blocks | minrows | maxrows | wallevel | fsync | comp | precision | status | name | create_time | vgroups | ntables | replica | strict | duration | keep | buffer | pagesize | pages | minrows | maxrows | comp | precision | status | retention | single_stable | cachemodel | cachesize | wal_level | wal_fsync_period | wal_retention_period | wal_retention_size | wal_roll_period | wal_seg_size |
========================================================================================================================================================================================================================= =========================================================================================================================================================================================================================================================================================================================================================================================================================================================================
test | 2020-10-14 10:35:48.617 | 10 | 1 | 1 | 1 | 2 | 3650,3650,3650 | 16| 6 | 100 | 4096 | 1 | 3000 | 2 | ms | ready | information_schema | NULL | NULL | 14 | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | ready | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL |
log | 2020-10-12 09:08:21.651 | 4 | 1 | 1 | 1 | 10 | 30,30,30 | 1| 3 | 100 | 4096 | 1 | 3000 | 2 | us | ready | performance_schema | NULL | NULL | 3 | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | ready | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL |
Query OK, 2 row(s) in set (0.001198s) db | 2022-08-04 14:14:49.385 | 2 | 4 | 1 | off | 14400m | 5254560m,5254560m,5254560m | 96 | 4 | 256 | 100 | 4096 | 2 | ms | ready | NULL | false | none | 1 | 1 | 3000 | 0 | 0 | 0 | 0 |
Query OK, 3 rows in database (0.019154s)
taos> taos>
``` ```
在 cmd 下进入到 C:\TDengine 目录下直接执行 `taos.exe`,连接到 TDengine 服务,进入到 TDengine CLI 界面,示例如下: 在 cmd 下进入到 C:\TDengine 目录下直接执行 `taos.exe`,连接到 TDengine 服务,进入到 TDengine CLI 界面,示例如下:
```text ```text
C:\TDengine>taos Welcome to the TDengine shell from Windows, Client Version:3.0.0.0
Welcome to the TDengine shell from Linux, Client Version:2.0.5.0 Copyright (c) 2022 by TAOS Data, Inc. All rights reserved.
Copyright (c) 2017 by TAOS Data, Inc. All rights reserved.
taos> show databases; Server is Community Edition.
name | created_time | ntables | vgroups | replica | quorum | days | keep1,keep2,keep(D) | cache(MB) | blocks | minrows | maxrows | wallevel | fsync | comp | precision | status |
=================================================================================================================================================================================================================================================================== taos> show databases;
test | 2020-10-14 10:35:48.617 | 10 | 1 | 1 | 1 | 2 | 3650,3650,3650 | 16 | 6 | 100 | 4096 | 1 | 3000 | 2 | ms | ready | name | create_time | vgroups | ntables | replica | strict | duration | keep | buffer | pagesize | pages | minrows | maxrows | comp | precision | status | retention | single_stable | cachemodel | cachesize | wal_level | wal_fsync_period | wal_retention_period | wal_retention_size | wal_roll_period | wal_seg_size |
log | 2020-10-12 09:08:21.651 | 4 | 1 | 1 | 1 | 10 | 30,30,30 | 1 | 3 | 100 | 4096 | 1 | 3000 | 2 | us | ready | =========================================================================================================================================================================================================================================================================================================================================================================================================================================================================
Query OK, 2 row(s) in set (0.045000s) information_schema | NULL | NULL | 14 | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | ready | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL |
taos> performance_schema | NULL | NULL | 3 | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | ready | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL | NULL |
test | 2022-08-04 16:46:40.506 | 2 | 0 | 1 | off | 14400m | 5256000m,5256000m,5256000m | 96 | 4 | 256 |
100 | 4096 | 2 | ms | ready | NULL | false | none | 1 | 1 | 3000 | 0 | 0 | 0 | 0 |
Query OK, 3 rows in database (0.123000s)
taos>
``` ```
...@@ -46,9 +46,10 @@ enum { ...@@ -46,9 +46,10 @@ enum {
}; };
enum { enum {
TASK_EXEC_STATUS__IDLE = 1, TASK_SCHED_STATUS__INACTIVE = 1,
TASK_EXEC_STATUS__EXECUTING, TASK_SCHED_STATUS__WAITING,
TASK_EXEC_STATUS__CLOSING, TASK_SCHED_STATUS__ACTIVE,
TASK_SCHED_STATUS__FAILED,
}; };
enum { enum {
...@@ -204,13 +205,11 @@ typedef struct { ...@@ -204,13 +205,11 @@ typedef struct {
enum { enum {
TASK_SOURCE__SCAN = 1, TASK_SOURCE__SCAN = 1,
TASK_SOURCE__PIPE, TASK_SOURCE__PIPE,
TASK_SOURCE__MERGE,
}; };
enum { enum {
TASK_EXEC__NONE = 1, TASK_EXEC__NONE = 1,
TASK_EXEC__PIPE, TASK_EXEC__PIPE,
TASK_EXEC__MERGE,
}; };
enum { enum {
...@@ -256,7 +255,7 @@ typedef struct SStreamTask { ...@@ -256,7 +255,7 @@ typedef struct SStreamTask {
int16_t dispatchMsgType; int16_t dispatchMsgType;
int8_t taskStatus; int8_t taskStatus;
int8_t execStatus; int8_t schedStatus;
// node info // node info
int32_t selfChildId; int32_t selfChildId;
...@@ -475,7 +474,6 @@ typedef struct { ...@@ -475,7 +474,6 @@ typedef struct {
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId);
int32_t streamSetupTrigger(SStreamTask* pTask); int32_t streamSetupTrigger(SStreamTask* pTask);
int32_t streamProcessRunReq(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask);
...@@ -487,6 +485,9 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) ...@@ -487,6 +485,9 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp); int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
int32_t streamTryExec(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask);
typedef struct SStreamMeta SStreamMeta; typedef struct SStreamMeta SStreamMeta;
SStreamMeta* streamMetaOpen(); SStreamMeta* streamMetaOpen();
......
...@@ -189,20 +189,6 @@ typedef struct { ...@@ -189,20 +189,6 @@ typedef struct {
tsem_t rspSem; tsem_t rspSem;
} SMqPollCbParam; } SMqPollCbParam;
#if 0
typedef struct {
tmq_t* tmq;
int8_t async;
int8_t automatic;
int8_t freeOffsets;
tmq_commit_cb* userCb;
tsem_t rspSem;
int32_t rspErr;
SArray* offsets;
void* userParam;
} SMqCommitCbParam;
#endif
typedef struct { typedef struct {
tmq_t* tmq; tmq_t* tmq;
int8_t automatic; int8_t automatic;
...@@ -385,29 +371,6 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) { ...@@ -385,29 +371,6 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
return sprintf(dst, "%s:%d", topicName, vg); return sprintf(dst, "%s:%d", topicName, vg);
} }
#if 0
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
pParam->rspErr = code;
if (pParam->async) {
if (pParam->automatic && pParam->tmq->commitCb) {
pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, pParam->tmq->commitCbUserParam);
} else if (!pParam->automatic && pParam->userCb) {
pParam->userCb(pParam->tmq, pParam->rspErr, pParam->userParam);
}
if (pParam->freeOffsets) {
taosArrayDestroy(pParam->offsets);
}
taosMemoryFree(pParam);
} else {
tsem_post(&pParam->rspSem);
}
return 0;
}
#endif
int32_t tmqCommitCb2(void* param, SDataBuf* pBuf, int32_t code) { int32_t tmqCommitCb2(void* param, SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param; SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param;
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
...@@ -660,123 +623,6 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_ ...@@ -660,123 +623,6 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_
return 0; return 0;
} }
#if 0
int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async,
tmq_commit_cb* userCb, void* userParam) {
SMqCMCommitOffsetReq req;
SArray* pOffsets = NULL;
void* buf = NULL;
SMqCommitCbParam* pParam = NULL;
SMsgSendInfo* sendInfo = NULL;
int8_t freeOffsets;
int32_t code = -1;
if (msg == NULL) {
freeOffsets = 1;
pOffsets = taosArrayInit(0, sizeof(SMqOffset));
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
SMqOffset offset;
tstrncpy(offset.topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(offset.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
offset.vgId = pVg->vgId;
offset.offset = pVg->currentOffset;
taosArrayPush(pOffsets, &offset);
}
}
} else {
freeOffsets = 0;
pOffsets = (SArray*)&msg->container;
}
req.num = (int32_t)pOffsets->size;
req.offsets = pOffsets->pData;
SEncoder encoder;
tEncoderInit(&encoder, NULL, 0);
code = tEncodeSMqCMCommitOffsetReq(&encoder, &req);
if (code < 0) {
goto END;
}
int32_t tlen = encoder.pos;
buf = taosMemoryMalloc(tlen);
if (buf == NULL) {
tEncoderClear(&encoder);
goto END;
}
tEncoderClear(&encoder);
tEncoderInit(&encoder, buf, tlen);
tEncodeSMqCMCommitOffsetReq(&encoder, &req);
tEncoderClear(&encoder);
pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
if (pParam == NULL) {
goto END;
}
pParam->tmq = tmq;
pParam->automatic = automatic;
pParam->async = async;
pParam->offsets = pOffsets;
pParam->freeOffsets = freeOffsets;
pParam->userCb = userCb;
pParam->userParam = userParam;
if (!async) tsem_init(&pParam->rspSem, 0, 0);
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) goto END;
sendInfo->msgInfo = (SDataBuf){
.pData = buf,
.len = tlen,
.handle = NULL,
};
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqCommitCb;
sendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (!async) {
tsem_wait(&pParam->rspSem);
code = pParam->rspErr;
tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam);
} else {
code = 0;
}
// avoid double free if msg is sent
buf = NULL;
END:
if (buf) taosMemoryFree(buf);
/*if (pParam) taosMemoryFree(pParam);*/
/*if (sendInfo) taosMemoryFree(sendInfo);*/
if (code != 0 && async) {
if (automatic) {
tmq->commitCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, tmq->commitCbUserParam);
} else {
userCb(tmq, code, (tmq_topic_vgroup_list_t*)pOffsets, userParam);
}
}
if (!async && freeOffsets) {
taosArrayDestroy(pOffsets);
}
return code;
}
#endif
void tmqAssignAskEpTask(void* param, void* tmrId) { void tmqAssignAskEpTask(void* param, void* tmrId) {
tmq_t* tmq = (tmq_t*)param; tmq_t* tmq = (tmq_t*)param;
int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM); int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM);
...@@ -1839,13 +1685,21 @@ int32_t tmq_consumer_close(tmq_t* tmq) { ...@@ -1839,13 +1685,21 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
return rsp; return rsp;
} }
int32_t retryCnt = 0;
tmq_list_t* lst = tmq_list_new(); tmq_list_t* lst = tmq_list_new();
rsp = tmq_subscribe(tmq, lst); while (1) {
rsp = tmq_subscribe(tmq, lst);
if (rsp != TSDB_CODE_MND_CONSUMER_NOT_READY || retryCnt > 5) {
break;
} else {
retryCnt++;
taosMsleep(500);
}
}
tmq_list_destroy(lst); tmq_list_destroy(lst);
if (rsp != 0) { return rsp;
return rsp;
}
} }
// TODO: free resources // TODO: free resources
return 0; return 0;
......
...@@ -216,9 +216,11 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -216,9 +216,11 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
if (offset.val.type == TMQ_OFFSET__LOG) { if (offset.val.type == TMQ_OFFSET__LOG) {
STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey)); STqHandle* pHandle = taosHashGet(pTq->handles, offset.subKey, strlen(offset.subKey));
if (walRefVer(pHandle->pRef, offset.val.version) < 0) { if (pHandle) {
ASSERT(0); if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
return -1; ASSERT(0);
return -1;
}
} }
} }
...@@ -515,7 +517,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -515,7 +517,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
// todo lock // todo lock
STqHandle* pHandle = taosHashGet(pTq->handles, req.subKey, strlen(req.subKey)); STqHandle* pHandle = taosHashGet(pTq->handles, req.subKey, strlen(req.subKey));
if (pHandle == NULL) { if (pHandle == NULL) {
ASSERT(req.oldConsumerId == -1); if (req.oldConsumerId != -1) {
tqError("vgId:%d, build new consumer handle %s for consumer %d, but old consumerId is %ld", req.vgId, req.subKey,
req.newConsumerId, req.oldConsumerId);
}
ASSERT(req.newConsumerId != -1); ASSERT(req.newConsumerId != -1);
STqHandle tqHandle = {0}; STqHandle tqHandle = {0};
pHandle = &tqHandle; pHandle = &tqHandle;
...@@ -604,7 +609,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { ...@@ -604,7 +609,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0);
} }
pTask->execStatus = TASK_EXEC_STATUS__IDLE; pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputQueue = streamQueueOpen(); pTask->inputQueue = streamQueueOpen();
pTask->outputQueue = streamQueueOpen(); pTask->outputQueue = streamQueueOpen();
...@@ -720,7 +725,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) { ...@@ -720,7 +725,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
continue; continue;
} }
if (streamLaunchByWrite(pTask, TD_VID(pTq->pVnode)) < 0) { if (streamSchedExec(pTask) < 0) {
qError("stream task launch failed, task id %d", pTask->taskId); qError("stream task launch failed, task id %d", pTask->taskId);
continue; continue;
} }
...@@ -751,12 +756,13 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -751,12 +756,13 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
} }
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
ASSERT(0);
char* msgStr = pMsg->pCont; char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamDispatchReq req; SStreamDispatchReq req;
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen); tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamDispatchReq(&decoder, &req); tDecodeStreamDispatchReq(&decoder, &req);
int32_t taskId = req.taskId; int32_t taskId = req.taskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
......
...@@ -136,31 +136,6 @@ int32_t tqSendExecReq(STQ* pTq, STqHandle* pHandle) { ...@@ -136,31 +136,6 @@ int32_t tqSendExecReq(STQ* pTq, STqHandle* pHandle) {
return 0; return 0;
} }
int32_t tqEnqueueAll(STQ* pTq, SSubmitReq* pReq) {
void* pIter = NULL;
SStreamDataSubmit* pSubmit = streamDataSubmitNew(pReq);
if (pSubmit == NULL) {
return -1;
}
while (1) {
pIter = taosHashIterate(pTq->handles, pIter);
if (pIter == NULL) break;
STqHandle* pHandle = (STqHandle*)pIter;
if (tqEnqueue(pHandle, pSubmit) < 0) {
continue;
}
int8_t execStatus = atomic_load_8(&pHandle->pushHandle.execStatus);
if (execStatus == TASK_EXEC_STATUS__IDLE || execStatus == TASK_EXEC_STATUS__CLOSING) {
tqSendExecReq(pTq, pHandle);
}
}
streamDataSubmitRefDec(pSubmit);
return 0;
}
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) { int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) {
if (msgType != TDMT_VND_SUBMIT) return 0; if (msgType != TDMT_VND_SUBMIT) return 0;
void* pIter = NULL; void* pIter = NULL;
......
...@@ -88,9 +88,11 @@ bool isResultRowClosed(SResultRow* pResultRow); ...@@ -88,9 +88,11 @@ bool isResultRowClosed(SResultRow* pResultRow);
struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset); struct SResultRowEntryInfo* getResultEntryInfo(const SResultRow* pRow, int32_t index, const int32_t* offset);
static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos) { static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos, bool forUpdate) {
SFilePage* bufPage = (SFilePage*)getBufPage(pBuf, pos->pageId); SFilePage* bufPage = (SFilePage*)getBufPage(pBuf, pos->pageId);
setBufPageDirty(bufPage, true); if (forUpdate) {
setBufPageDirty(bufPage, true);
}
SResultRow* pRow = (SResultRow*)((char*)bufPage + pos->offset); SResultRow* pRow = (SResultRow*)((char*)bufPage + pos->offset);
return pRow; return pRow;
} }
......
...@@ -953,7 +953,7 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI ...@@ -953,7 +953,7 @@ STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowI
return w; return w;
} }
w = getResultRowByPos(pBuf, &pResultRowInfo->cur)->win; w = getResultRowByPos(pBuf, &pResultRowInfo->cur, false)->win;
// in case of typical time window, we can calculate time window directly. // in case of typical time window, we can calculate time window directly.
if (w.skey > ts || w.ekey < ts) { if (w.skey > ts || w.ekey < ts) {
......
...@@ -258,7 +258,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -258,7 +258,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// in case of repeat scan/reverse scan, no new time window added. // in case of repeat scan/reverse scan, no new time window added.
if (isIntervalQuery) { if (isIntervalQuery) {
if (masterscan && p1 != NULL) { // the *p1 may be NULL in case of sliding+offset exists. if (masterscan && p1 != NULL) { // the *p1 may be NULL in case of sliding+offset exists.
pResult = getResultRowByPos(pResultBuf, p1); pResult = getResultRowByPos(pResultBuf, p1, true);
ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset); ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
} }
} else { } else {
...@@ -266,7 +266,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -266,7 +266,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// pResultRowInfo object. // pResultRowInfo object.
if (p1 != NULL) { if (p1 != NULL) {
// todo // todo
pResult = getResultRowByPos(pResultBuf, p1); pResult = getResultRowByPos(pResultBuf, p1, true);
ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset); ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
} }
} }
...@@ -4075,6 +4075,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -4075,6 +4075,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pUser); ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pUser);
if (ops[i] == NULL) { if (ops[i] == NULL) {
taosMemoryFree(ops);
return NULL; return NULL;
} else { } else {
ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId; ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId;
...@@ -4516,7 +4517,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead ...@@ -4516,7 +4517,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
return code; return code;
_complete: _complete:
taosMemoryFreeClear(*pTaskInfo); doDestroyTask(*pTaskInfo);
terrno = code; terrno = code;
return code; return code;
} }
......
...@@ -611,7 +611,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num ...@@ -611,7 +611,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
break; break;
} }
SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1); SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1, false);
ASSERT(pr->offset == p1->offset && pr->pageId == p1->pageId); ASSERT(pr->offset == p1->offset && pr->pageId == p1->pageId);
if (pr->closed) { if (pr->closed) {
...@@ -1345,7 +1345,7 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type ...@@ -1345,7 +1345,7 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type
} }
void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) { void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) {
SResultRow* pResult = getResultRowByPos(pResultBuf, p1); SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false);
SqlFunctionCtx* pCtx = pSup->pCtx; SqlFunctionCtx* pCtx = pSup->pCtx;
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pSup->rowEntryInfoOffset); pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pSup->rowEntryInfoOffset);
...@@ -3481,7 +3481,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes ...@@ -3481,7 +3481,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes
pWinInfo->pos.pageId = (*pResult)->pageId; pWinInfo->pos.pageId = (*pResult)->pageId;
pWinInfo->pos.offset = (*pResult)->offset; pWinInfo->pos.offset = (*pResult)->offset;
} else { } else {
*pResult = getResultRowByPos(pAggSup->pResultBuf, &pWinInfo->pos); *pResult = getResultRowByPos(pAggSup->pResultBuf, &pWinInfo->pos, true);
if (!(*pResult)) { if (!(*pResult)) {
qError("getResultRowByPos return NULL, TID:%s", GET_TASKID(pTaskInfo)); qError("getResultRowByPos return NULL, TID:%s", GET_TASKID(pTaskInfo));
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
......
...@@ -1094,6 +1094,7 @@ static SColumnInfoData* doVectorConvert(SScalarParam* pInput, int32_t* doConvert ...@@ -1094,6 +1094,7 @@ static SColumnInfoData* doVectorConvert(SScalarParam* pInput, int32_t* doConvert
static void doReleaseVec(SColumnInfoData* pCol, int32_t type) { static void doReleaseVec(SColumnInfoData* pCol, int32_t type) {
if (type == VECTOR_DO_CONVERT) { if (type == VECTOR_DO_CONVERT) {
colDataDestroy(pCol); colDataDestroy(pCol);
taosMemoryFree(pCol);
} }
} }
......
...@@ -47,7 +47,7 @@ void streamCleanUp() { ...@@ -47,7 +47,7 @@ void streamCleanUp() {
} }
} }
void streamTriggerByTimer(void* param, void* tmrId) { void streamSchedByTimer(void* param, void* tmrId) {
SStreamTask* pTask = (void*)param; SStreamTask* pTask = (void*)param;
if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) { if (atomic_load_8(&pTask->taskStatus) == TASK_STATUS__DROPPING) {
...@@ -68,28 +68,30 @@ void streamTriggerByTimer(void* param, void* tmrId) { ...@@ -68,28 +68,30 @@ void streamTriggerByTimer(void* param, void* tmrId) {
atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE); atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE);
streamTaskInput(pTask, (SStreamQueueItem*)trigger); streamTaskInput(pTask, (SStreamQueueItem*)trigger);
streamLaunchByWrite(pTask, pTask->nodeId); streamSchedExec(pTask);
} }
taosTmrReset(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer);
} }
int32_t streamSetupTrigger(SStreamTask* pTask) { int32_t streamSetupTrigger(SStreamTask* pTask) {
if (pTask->triggerParam != 0) { if (pTask->triggerParam != 0) {
pTask->timer = taosTmrStart(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer); pTask->timer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
pTask->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; pTask->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE;
} }
return 0; return 0;
} }
int32_t streamLaunchByWrite(SStreamTask* pTask, int32_t vgId) { int32_t streamSchedExec(SStreamTask* pTask) {
int8_t execStatus = atomic_load_8(&pTask->execStatus); int8_t schedStatus =
if (execStatus == TASK_EXEC_STATUS__IDLE || execStatus == TASK_EXEC_STATUS__CLOSING) { atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__WAITING);
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) return -1; if (pRunReq == NULL) {
atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE);
// TODO: do we need htonl? return -1;
pRunReq->head.vgId = vgId; }
pRunReq->head.vgId = pTask->nodeId;
pRunReq->streamId = pTask->streamId; pRunReq->streamId = pTask->streamId;
pRunReq->taskId = pTask->taskId; pRunReq->taskId = pTask->taskId;
SRpcMsg msg = { SRpcMsg msg = {
...@@ -182,14 +184,13 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S ...@@ -182,14 +184,13 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S
streamTaskEnqueue(pTask, pReq, pRsp); streamTaskEnqueue(pTask, pReq, pRsp);
if (exec) { if (exec) {
streamExec(pTask); streamTryExec(pTask);
if (pTask->dispatchType != TASK_DISPATCH__NONE) { if (pTask->dispatchType != TASK_DISPATCH__NONE) {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
streamDispatch(pTask); streamDispatch(pTask);
} }
} else { } else {
streamLaunchByWrite(pTask, pTask->nodeId); streamSchedExec(pTask);
} }
return 0; return 0;
...@@ -219,7 +220,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) { ...@@ -219,7 +220,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) {
} }
int32_t streamProcessRunReq(SStreamTask* pTask) { int32_t streamProcessRunReq(SStreamTask* pTask) {
streamExec(pTask); streamTryExec(pTask);
if (pTask->dispatchType != TASK_DISPATCH__NONE) { if (pTask->dispatchType != TASK_DISPATCH__NONE) {
streamDispatch(pTask); streamDispatch(pTask);
...@@ -272,10 +273,12 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S ...@@ -272,10 +273,12 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S
streamTaskEnqueueRetrieve(pTask, pReq, pRsp); streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
ASSERT(pTask->execType != TASK_EXEC__NONE); ASSERT(pTask->execType != TASK_EXEC__NONE);
streamExec(pTask); streamSchedExec(pTask);
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE); /*streamTryExec(pTask);*/
streamDispatch(pTask);
/*ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);*/
/*streamDispatch(pTask);*/
return 0; return 0;
} }
......
...@@ -440,13 +440,13 @@ FAIL: ...@@ -440,13 +440,13 @@ FAIL:
int32_t streamDispatch(SStreamTask* pTask) { int32_t streamDispatch(SStreamTask* pTask) {
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE); ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
#if 1 ASSERT(pTask->sinkType == TASK_SINK__NONE);
int8_t old = int8_t old =
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
if (old != TASK_OUTPUT_STATUS__NORMAL) { if (old != TASK_OUTPUT_STATUS__NORMAL) {
return 0; return 0;
} }
#endif
SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue);
if (pBlock == NULL) { if (pBlock == NULL) {
...@@ -466,22 +466,8 @@ int32_t streamDispatch(SStreamTask* pTask) { ...@@ -466,22 +466,8 @@ int32_t streamDispatch(SStreamTask* pTask) {
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
goto FREE; goto FREE;
} }
/*atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);*/
FREE: FREE:
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock); taosFreeQitem(pBlock);
#if 0
SRpcMsg dispatchMsg = {0};
SEpSet* pEpSet = NULL;
if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) {
ASSERT(0);
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
return -1;
}
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
taosFreeQitem(pBlock);
tmsgSendReq(pEpSet, &dispatchMsg);
#endif
return code; return code;
} }
...@@ -147,24 +147,23 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { ...@@ -147,24 +147,23 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
return 0; return 0;
} }
// TODO: handle version
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { int32_t streamExecForAll(SStreamTask* pTask) {
while (1) { while (1) {
int32_t cnt = 1; int32_t cnt = 1;
void* data = NULL; void* data = NULL;
while (1) { while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) { if (qItem == NULL) {
qDebug("stream exec over, queue empty"); qDebug("stream task exec over, queue empty, task: %d", pTask->taskId);
break; break;
} }
if (data == NULL) { if (data == NULL) {
data = qItem; data = qItem;
streamQueueProcessSuccess(pTask->inputQueue); streamQueueProcessSuccess(pTask->inputQueue);
if (pTask->execType == TASK_EXEC__NONE) break; if (pTask->execType == TASK_EXEC__NONE) {
/*if (qItem->type == STREAM_INPUT__DATA_BLOCK) {*/ break;
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ }
/*}*/
} else { } else {
void* newRet; void* newRet;
if ((newRet = streamAppendQueueItem(data, qItem)) == NULL) { if ((newRet = streamAppendQueueItem(data, qItem)) == NULL) {
...@@ -181,11 +180,12 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { ...@@ -181,11 +180,12 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
if (pTask->taskStatus == TASK_STATUS__DROPPING) { if (pTask->taskStatus == TASK_STATUS__DROPPING) {
if (data) streamFreeQitem(data); if (data) streamFreeQitem(data);
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return 0;
return NULL;
} }
if (data == NULL) break; if (data == NULL) {
break;
}
if (pTask->execType == TASK_EXEC__NONE) { if (pTask->execType == TASK_EXEC__NONE) {
ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK); ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
...@@ -193,6 +193,8 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { ...@@ -193,6 +193,8 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
continue; continue;
} }
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt); qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt);
streamTaskExecImpl(pTask, data, pRes); streamTaskExecImpl(pTask, data, pRes);
qDebug("stream task %d exec end", pTask->taskId); qDebug("stream task %d exec end", pTask->taskId);
...@@ -203,76 +205,44 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { ...@@ -203,76 +205,44 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
// TODO log failed ver // TODO log failed ver
streamQueueProcessFail(pTask->inputQueue); streamQueueProcessFail(pTask->inputQueue);
taosArrayDestroy(pRes); taosArrayDestroy(pRes);
return NULL; return -1;
} }
qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->type = STREAM_INPUT__DATA_BLOCK;
qRes->blocks = pRes; qRes->blocks = pRes;
if (streamTaskOutput(pTask, qRes) < 0) {
// TODO log failed ver
/*streamQueueProcessFail(pTask->inputQueue);*/
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
return NULL;
}
if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) { if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
qRes->childId = pTask->selfChildId; qRes->childId = pTask->selfChildId;
qRes->sourceVer = pSubmit->ver; qRes->sourceVer = pSubmit->ver;
} }
if (streamTaskOutput(pTask, qRes) < 0) {
// TODO save failed ver
/*streamQueueProcessFail(pTask->inputQueue);*/
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
taosFreeQitem(qRes);
return -1;
}
/*streamQueueProcessSuccess(pTask->inputQueue);*/ /*streamQueueProcessSuccess(pTask->inputQueue);*/
pRes = taosArrayInit(0, sizeof(SSDataBlock));
} }
streamFreeQitem(data);
} }
return pRes; return 0;
} }
// TODO: handle version int32_t streamTryExec(SStreamTask* pTask) {
int32_t streamExec(SStreamTask* pTask) { int8_t schedStatus =
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__ACTIVE);
if (pRes == NULL) return -1; if (schedStatus == TASK_SCHED_STATUS__WAITING) {
while (1) { int32_t code = streamExecForAll(pTask);
int8_t execStatus = if (code < 0) {
atomic_val_compare_exchange_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE, TASK_EXEC_STATUS__EXECUTING); atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__FAILED);
if (execStatus == TASK_EXEC_STATUS__IDLE) { return -1;
// first run }
qDebug("stream exec, enter exec status"); atomic_store_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE);
pRes = streamExecForQall(pTask, pRes);
if (pRes == NULL) goto FAIL;
// temporarily disable status closing, since it runs out of threads
#if 0
// set status closing
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__CLOSING);
// second run, make sure inputQ and qall are cleared
qDebug("stream exec, enter closing status");
pRes = streamExecForQall(pTask, pRes);
if (pRes == NULL) goto FAIL;
#endif
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); if (!taosQueueEmpty(pTask->inputQueue->queue)) {
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE); streamSchedExec(pTask);
qDebug("stream exec, return result");
return 0;
} else if (execStatus == TASK_EXEC_STATUS__CLOSING) {
continue;
} else if (execStatus == TASK_EXEC_STATUS__EXECUTING) {
ASSERT(taosArrayGetSize(pRes) == 0);
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return 0;
} else {
ASSERT(0);
} }
} }
FAIL: return 0;
if (pRes) taosArrayDestroy(pRes);
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
tFreeSStreamTask(pTask);
return 0;
} else {
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
return -1;
}
} }
...@@ -35,9 +35,10 @@ FAIL: ...@@ -35,9 +35,10 @@ FAIL:
void streamQueueClose(SStreamQueue* queue) { void streamQueueClose(SStreamQueue* queue) {
while (1) { while (1) {
void* qItem = streamQueueNextItem(queue); void* qItem = streamQueueNextItem(queue);
if (qItem) if (qItem) {
taosFreeQitem(qItem); taosFreeQitem(qItem);
else } else {
return; return;
}
} }
} }
...@@ -23,7 +23,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId) { ...@@ -23,7 +23,7 @@ SStreamTask* tNewSStreamTask(int64_t streamId) {
} }
pTask->taskId = tGenIdPI32(); pTask->taskId = tGenIdPI32();
pTask->streamId = streamId; pTask->streamId = streamId;
pTask->execStatus = TASK_EXEC_STATUS__IDLE; pTask->schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
...@@ -59,7 +59,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { ...@@ -59,7 +59,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->taskStatus) < 0) return -1; if (tEncodeI8(pEncoder, pTask->taskStatus) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->execStatus) < 0) return -1; if (tEncodeI8(pEncoder, pTask->schedStatus) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
...@@ -114,7 +114,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { ...@@ -114,7 +114,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->taskStatus) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->taskStatus) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->execStatus) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->schedStatus) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
......
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include "taosudf.h"
DLL_EXPORT int32_t bitwise_and_init() {
return 0;
}
DLL_EXPORT int32_t bitwise_and_destroy() {
return 0;
}
DLL_EXPORT int32_t bitwise_and(SUdfDataBlock* block, SUdfColumn *resultCol) {
if (block->numOfCols < 2) {
return TSDB_CODE_UDF_INVALID_INPUT;
}
for (int32_t i = 0; i < block->numOfCols; ++i) {
SUdfColumn* col = block->udfCols[i];
if (!(col->colMeta.type == TSDB_DATA_TYPE_INT)) {
return TSDB_CODE_UDF_INVALID_INPUT;
}
}
SUdfColumnMeta *meta = &resultCol->colMeta;
meta->bytes = 4;
meta->type = TSDB_DATA_TYPE_INT;
meta->scale = 0;
meta->precision = 0;
SUdfColumnData *resultData = &resultCol->colData;
resultData->numOfRows = block->numOfRows;
for (int32_t i = 0; i < resultData->numOfRows; ++i) {
int32_t result = *(int32_t*)udfColDataGetData(block->udfCols[0], i);
int j = 1;
for (; j < block->numOfCols; ++j) {
if (udfColDataIsNull(block->udfCols[j], i)) {
udfColDataSetNull(resultCol, i);
break;
}
char* colData = udfColDataGetData(block->udfCols[j], i);
result &= *(int32_t*)colData;
}
if (j == block->numOfCols) {
udfColDataSet(resultCol, i, (char*)&result, false);
}
}
return TSDB_CODE_SUCCESS;
}
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <math.h>
#include "taosudf.h"
DLL_EXPORT int32_t squares_sum_init() {
return 0;
}
DLL_EXPORT int32_t squares_sum_destroy() {
return 0;
}
DLL_EXPORT int32_t squares_sum_start(SUdfInterBuf *buf) {
*(int64_t*)(buf->buf) = 0;
buf->bufLen = sizeof(double);
buf->numOfResult = 0;
return 0;
}
DLL_EXPORT int32_t squares_sum(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
double sumSquares = *(double*)interBuf->buf;
int8_t numNotNull = 0;
for (int32_t i = 0; i < block->numOfCols; ++i) {
SUdfColumn* col = block->udfCols[i];
if (!(col->colMeta.type == TSDB_DATA_TYPE_INT ||
col->colMeta.type == TSDB_DATA_TYPE_DOUBLE)) {
return TSDB_CODE_UDF_INVALID_INPUT;
}
}
for (int32_t i = 0; i < block->numOfCols; ++i) {
for (int32_t j = 0; j < block->numOfRows; ++j) {
SUdfColumn* col = block->udfCols[i];
if (udfColDataIsNull(col, j)) {
continue;
}
switch (col->colMeta.type) {
case TSDB_DATA_TYPE_INT: {
char* cell = udfColDataGetData(col, j);
int32_t num = *(int32_t*)cell;
sumSquares += (double)num * num;
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
char* cell = udfColDataGetData(col, j);
double num = *(double*)cell;
sumSquares += num * num;
break;
}
default:
break;
}
++numNotNull;
}
}
*(double*)(newInterBuf->buf) = sumSquares;
newInterBuf->bufLen = sizeof(double);
if (interBuf->numOfResult == 0 && numNotNull == 0) {
newInterBuf->numOfResult = 0;
} else {
newInterBuf->numOfResult = 1;
}
return 0;
}
DLL_EXPORT int32_t udf2_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) {
if (buf->numOfResult == 0) {
resultData->numOfResult = 0;
return 0;
}
double sumSquares = *(double*)(buf->buf);
*(double*)(resultData->buf) = sqrt(sumSquares);
resultData->bufLen = sizeof(double);
resultData->numOfResult = 1;
return 0;
}
from tabnanny import check
import taos import taos
import time import time
import inspect import inspect
import traceback import traceback
import socket import socket
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime
from util.log import * from util.log import *
from util.sql import * from util.sql import *
from util.cases import * from util.cases import *
from util.dnodes import * from util.dnodes import *
from util.common import *
PRIVILEGES_ALL = "ALL" PRIVILEGES_ALL = "ALL"
PRIVILEGES_READ = "READ" PRIVILEGES_READ = "READ"
...@@ -21,17 +22,40 @@ WEIGHT_WRITE = 3 ...@@ -21,17 +22,40 @@ WEIGHT_WRITE = 3
PRIMARY_COL = "ts" PRIMARY_COL = "ts"
INT_COL = "c1" INT_COL = "c_int"
BINT_COL = "c2" BINT_COL = "c_bint"
SINT_COL = "c3" SINT_COL = "c_sint"
TINT_COL = "c4" TINT_COL = "c_tint"
FLOAT_COL = "c5" FLOAT_COL = "c_float"
DOUBLE_COL = "c6" DOUBLE_COL = "c_double"
BOOL_COL = "c7" BOOL_COL = "c_bool"
TINT_UN_COL = "c_utint"
BINARY_COL = "c8" SINT_UN_COL = "c_usint"
NCHAR_COL = "c9" BINT_UN_COL = "c_ubint"
TS_COL = "c10" INT_UN_COL = "c_uint"
BINARY_COL = "c_binary"
NCHAR_COL = "c_nchar"
TS_COL = "c_ts"
NUM_COL = [INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, ]
CHAR_COL = [BINARY_COL, NCHAR_COL, ]
BOOLEAN_COL = [BOOL_COL, ]
TS_TYPE_COL = [TS_COL, ]
INT_TAG = "t_int"
ALL_COL = [PRIMARY_COL, INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, BINARY_COL, NCHAR_COL, BOOL_COL, TS_COL]
TAG_COL = [INT_TAG]
# insert data args:
TIME_STEP = 10000
NOW = int(datetime.timestamp(datetime.now()) * 1000)
# init db/table
DBNAME = "db"
STBNAME = "stb1"
CTBNAME = "ct1"
NTBNAME = "nt1"
class TDconnect: class TDconnect:
def __init__(self, def __init__(self,
...@@ -247,25 +271,26 @@ class TDTestCase: ...@@ -247,25 +271,26 @@ class TDTestCase:
with taos_connect(user=user.name, passwd=user.passwd) as use: with taos_connect(user=user.name, passwd=user.passwd) as use:
time.sleep(2) time.sleep(2)
if check_priv == PRIVILEGES_ALL: if check_priv == PRIVILEGES_ALL:
use.query("use db") use.query(f"use {DBNAME}")
use.query("show tables") use.query(f"show {DBNAME}.tables")
use.query("select * from ct1") use.query(f"select * from {DBNAME}.{CTBNAME}")
use.query("insert into t1 (ts) values (now())") use.query(f"insert into {DBNAME}.{CTBNAME} (ts) values (now())")
elif check_priv == PRIVILEGES_READ: elif check_priv == PRIVILEGES_READ:
use.query("use db") use.query(f"use {DBNAME}")
use.query("show tables") use.query(f"show {DBNAME}.tables")
use.query("select * from ct1") use.query(f"select * from {DBNAME}.{CTBNAME}")
use.error("insert into t1 (ts) values (now())") use.error(f"insert into {DBNAME}.{CTBNAME} (ts) values (now())")
elif check_priv == PRIVILEGES_WRITE: elif check_priv == PRIVILEGES_WRITE:
use.query("use db") use.query(f"use {DBNAME}")
use.query("show tables") use.query(f"show {DBNAME}.tables")
use.error("select * from ct1") use.error(f"select * from {DBNAME}.{CTBNAME}")
use.query("insert into t1 (ts) values (now())") use.query(f"insert into {DBNAME}.{CTBNAME} (ts) values (now())")
elif check_priv is None: elif check_priv is None:
use.error("use db") use.error(f"use {DBNAME}")
use.error("show tables") # use.error(f"show {DBNAME}.tables")
use.error("select * from db.ct1") use.error(f"show tables")
use.error("insert into db.t1 (ts) values (now())") use.error(f"select * from {DBNAME}.{CTBNAME}")
use.error(f"insert into {DBNAME}.{CTBNAME} (ts) values (now())")
def __change_user_priv(self, user: User, pre_priv, invoke=False): def __change_user_priv(self, user: User, pre_priv, invoke=False):
if user.priv == pre_priv and invoke : if user.priv == pre_priv and invoke :
...@@ -418,7 +443,7 @@ class TDTestCase: ...@@ -418,7 +443,7 @@ class TDTestCase:
self.__grant_user_privileges(privilege="", dbname="db", user_name=self.__user_list[0]) , self.__grant_user_privileges(privilege="", dbname="db", user_name=self.__user_list[0]) ,
self.__grant_user_privileges(privilege=" ".join(self.__privilege), user_name=self.__user_list[0]) , self.__grant_user_privileges(privilege=" ".join(self.__privilege), user_name=self.__user_list[0]) ,
f"GRANT {self.__privilege[0]} ON * TO {self.__user_list[0]}" , f"GRANT {self.__privilege[0]} ON * TO {self.__user_list[0]}" ,
f"GRANT {self.__privilege[0]} ON db.t1 TO {self.__user_list[0]}" , f"GRANT {self.__privilege[0]} ON {DBNAME}.{NTBNAME} TO {self.__user_list[0]}" ,
] ]
def __revoke_err(self): def __revoke_err(self):
...@@ -430,7 +455,7 @@ class TDTestCase: ...@@ -430,7 +455,7 @@ class TDTestCase:
self.__revoke_user_privileges(privilege="", dbname="db", user_name=self.__user_list[0]) , self.__revoke_user_privileges(privilege="", dbname="db", user_name=self.__user_list[0]) ,
self.__revoke_user_privileges(privilege=" ".join(self.__privilege), user_name=self.__user_list[0]) , self.__revoke_user_privileges(privilege=" ".join(self.__privilege), user_name=self.__user_list[0]) ,
f"REVOKE {self.__privilege[0]} ON * FROM {self.__user_list[0]}" , f"REVOKE {self.__privilege[0]} ON * FROM {self.__user_list[0]}" ,
f"REVOKE {self.__privilege[0]} ON db.t1 FROM {self.__user_list[0]}" , f"REVOKE {self.__privilege[0]} ON {DBNAME}.{NTBNAME} FROM {self.__user_list[0]}" ,
] ]
def test_grant_err(self): def test_grant_err(self):
...@@ -505,101 +530,48 @@ class TDTestCase: ...@@ -505,101 +530,48 @@ class TDTestCase:
self.drop_user_error() self.drop_user_error()
self.drop_user_current() self.drop_user_current()
def __create_tb(self): def __create_tb(self, stb=STBNAME, ctb_num=20, ntbnum=1, dbname=DBNAME):
tdLog.printNoPrefix("==========step: create table")
tdLog.printNoPrefix("==========step1:create table") create_stb_sql = f'''create table {dbname}.{stb}(
create_stb_sql = f'''create table stb1( {PRIMARY_COL} timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool, {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp {TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
) tags (t1 int) {INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
''' ) tags ({INT_TAG} int)
create_ntb_sql = f'''create table t1(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp
)
''' '''
tdSql.execute(create_stb_sql) tdSql.execute(create_stb_sql)
tdSql.execute(create_ntb_sql)
for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )')
{ i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2}
def __insert_data(self, rows):
now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
for i in range(rows):
tdSql.execute(
f"insert into ct1 values ( { now_time - i * 1000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
)
tdSql.execute(
f"insert into ct4 values ( { now_time - i * 7776000000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
)
tdSql.execute(
f"insert into ct2 values ( { now_time - i * 7776000000 }, {-i}, {-11111 * i}, {-111 * i % 32767 }, {-11 * i % 127}, {-1.11*i}, {-1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
)
tdSql.execute(
f'''insert into ct1 values
( { now_time - rows * 5 }, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar_测试_0', { now_time + 8 } )
( { now_time + 10000 }, { rows }, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar_测试_9', { now_time + 9 } )
'''
)
tdSql.execute( for i in range(ntbnum):
f'''insert into ct4 values create_ntb_sql = f'''create table {dbname}.nt{i+1}(
( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) {PRIMARY_COL} timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
( {TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
{ now_time + 5184000000}, {pow(2,31)-pow(2,15)}, {pow(2,63)-pow(2,30)}, 32767, 127, {INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
{ 3.3 * pow(10,38) }, { 1.3 * pow(10,308) }, { rows % 2 }, "binary_limit-1", "nchar_测试_limit-1", { now_time - 86400000}
) )
( '''
{ now_time + 2592000000 }, {pow(2,31)-pow(2,16)}, {pow(2,63)-pow(2,31)}, 32766, 126, tdSql.execute(create_ntb_sql)
{ 3.2 * pow(10,38) }, { 1.2 * pow(10,308) }, { (rows-1) % 2 }, "binary_limit-2", "nchar_测试_limit-2", { now_time - 172800000}
)
'''
)
tdSql.execute( for i in range(ctb_num):
f'''insert into ct2 values tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.{stb} tags ( {i+1} )')
( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) def __insert_data(self, rows, ctb_num=20, dbname=DBNAME, star_time=NOW):
( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) tdLog.printNoPrefix("==========step: start inser data into tables now.....")
( # from ...pytest.util.common import DataSet
{ now_time + 5184000000 }, { -1 * pow(2,31) + pow(2,15) }, { -1 * pow(2,63) + pow(2,30) }, -32766, -126, data = DataSet()
{ -1 * 3.2 * pow(10,38) }, { -1.2 * pow(10,308) }, { rows % 2 }, "binary_limit-1", "nchar_测试_limit-1", { now_time - 86400000 } data.get_order_set(rows)
)
(
{ now_time + 2592000000 }, { -1 * pow(2,31) + pow(2,16) }, { -1 * pow(2,63) + pow(2,31) }, -32767, -127,
{ - 3.3 * pow(10,38) }, { -1.3 * pow(10,308) }, { (rows-1) % 2 }, "binary_limit-2", "nchar_测试_limit-2", { now_time - 172800000 }
)
'''
)
for i in range(rows): for i in range(rows):
insert_data = f'''insert into t1 values row_data = f'''
( { now_time - i * 3600000 }, {i}, {i * 11111}, { i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2}, {data.int_data[i]}, {data.bint_data[i]}, {data.sint_data[i]}, {data.tint_data[i]}, {data.float_data[i]}, {data.double_data[i]},
"binary_{i}", "nchar_测试_{i}", { now_time - 1000 * i } ) {data.bool_data[i]}, '{data.vchar_data[i]}', '{data.nchar_data[i]}', {data.ts_data[i]}, {data.utint_data[i]},
''' {data.usint_data[i]}, {data.uint_data[i]}, {data.ubint_data[i]}
tdSql.execute(insert_data)
tdSql.execute(
f'''insert into t1 values
( { now_time + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - (( rows // 2 ) * 60 + 30) * 60000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3600000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time + 7200000 }, { pow(2,31) - pow(2,15) }, { pow(2,63) - pow(2,30) }, 32767, 127,
{ 3.3 * pow(10,38) }, { 1.3 * pow(10,308) }, { rows % 2 },
"binary_limit-1", "nchar_测试_limit-1", { now_time - 86400000 }
)
(
{ now_time + 3600000 } , { pow(2,31) - pow(2,16) }, { pow(2,63) - pow(2,31) }, 32766, 126,
{ 3.2 * pow(10,38) }, { 1.2 * pow(10,308) }, { (rows-1) % 2 },
"binary_limit-2", "nchar_测试_limit-2", { now_time - 172800000 }
)
''' '''
) tdSql.execute( f"insert into {dbname}.{NTBNAME} values ( {star_time - i * int(TIME_STEP * 1.2)}, {row_data} )" )
for j in range(ctb_num):
tdSql.execute( f"insert into {dbname}.ct{j+1} values ( {star_time - j * i * TIME_STEP}, {row_data} )" )
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
...@@ -656,27 +628,81 @@ class TDTestCase: ...@@ -656,27 +628,81 @@ class TDTestCase:
with taos_connect(user=self.__user_list[0], passwd=f"new{self.__passwd_list[0]}") as user: with taos_connect(user=self.__user_list[0], passwd=f"new{self.__passwd_list[0]}") as user:
# user = conn # user = conn
# 不能创建用户 # 不能创建用户
tdLog.printNoPrefix("==========step5: normal user can not create user") tdLog.printNoPrefix("==========step4.1: normal user can not create user")
user.error("create use utest1 pass 'utest1pass'") user.error("create use utest1 pass 'utest1pass'")
# 可以查看用户 # 可以查看用户
tdLog.printNoPrefix("==========step6: normal user can show user") tdLog.printNoPrefix("==========step4.2: normal user can show user")
user.query("show users") user.query("show users")
assert user.queryRows == self.users_count + 1 assert user.queryRows == self.users_count + 1
# 不可以修改其他用户的密码 # 不可以修改其他用户的密码
tdLog.printNoPrefix("==========step7: normal user can not alter other user pass") tdLog.printNoPrefix("==========step4.3: normal user can not alter other user pass")
user.error(self.__alter_pass_sql(self.__user_list[1], self.__passwd_list[1] )) user.error(self.__alter_pass_sql(self.__user_list[1], self.__passwd_list[1] ))
user.error(self.__alter_pass_sql("root", "taosdata_root" )) user.error(self.__alter_pass_sql("root", "taosdata_root" ))
# 可以修改自己的密码 # 可以修改自己的密码
tdLog.printNoPrefix("==========step8: normal user can alter owner pass") tdLog.printNoPrefix("==========step4.4: normal user can alter owner pass")
user.query(self.__alter_pass_sql(self.__user_list[0], self.__passwd_list[0])) user.query(self.__alter_pass_sql(self.__user_list[0], self.__passwd_list[0]))
# 不可以删除用户,包括自己 # 不可以删除用户,包括自己
tdLog.printNoPrefix("==========step9: normal user can not drop any user ") tdLog.printNoPrefix("==========step4.5: normal user can not drop any user ")
user.error(f"drop user {self.__user_list[0]}") user.error(f"drop user {self.__user_list[0]}")
user.error(f"drop user {self.__user_list[1]}") user.error(f"drop user {self.__user_list[1]}")
user.error("drop user root") user.error("drop user root")
tdLog.printNoPrefix("==========step5: enable info")
taos1_conn = taos.connect(user=self.__user_list[1], password=f"new{self.__passwd_list[1]}")
taos1_conn.query(f"show databases")
tdSql.execute(f"alter user {self.__user_list[1]} enable 0")
tdSql.execute(f"alter user {self.__user_list[2]} enable 0")
taos1_except = True
try:
taos1_conn.query("show databases")
except BaseException:
taos1_except = False
if taos1_except:
tdLog.exit("taos 1 connect except error not occured, when enable == 0, should not r/w ")
else:
tdLog.info("taos 1 connect except error occured, enable == 0")
taos2_except = True
try:
taos.connect(user=self.__user_list[2], password=f"new{self.__passwd_list[2]}")
except BaseException:
taos2_except = False
if taos2_except:
tdLog.exit("taos 2 connect except error not occured, when enable == 0, should not connect")
else:
tdLog.info("taos 2 connect except error occured, enable == 0, can not login")
tdLog.printNoPrefix("==========step6: sysinfo info")
taos3_conn = taos.connect(user=self.__user_list[3], password=f"new{self.__passwd_list[3]}")
taos3_conn.query(f"show dnodes")
taos3_conn.query(f"show {DBNAME}.vgroups")
tdSql.execute(f"alter user {self.__user_list[3]} sysinfo 0")
tdSql.execute(f"alter user {self.__user_list[4]} sysinfo 0")
taos3_except = True
try:
taos3_conn.query(f"show dnodes")
taos3_conn.query(f"show {DBNAME}.vgroups")
except BaseException:
taos3_except = False
if taos3_except:
tdLog.exit("taos 3 query except error not occured, when sysinfo == 0, should not show info:dnode/monde/qnode ")
else:
tdLog.info("taos 3 query except error occured, sysinfo == 0, can not show dnode/vgroups")
taos4_conn = taos.connect(user=self.__user_list[4], password=f"new{self.__passwd_list[4]}")
taos4_except = True
try:
taos4_conn.query(f"show mnodes")
taos4_conn.query(f"show {DBNAME}.vgroups")
except BaseException:
taos4_except = False
if taos4_except:
tdLog.exit("taos 4 query except error not occured, when sysinfo == 0, when enable == 0, should not show info:dnode/monde/qnode")
else:
tdLog.info("taos 4 query except error occured, sysinfo == 0, can not show dnode/vgroups")
# root删除用户测试 # root删除用户测试
tdLog.printNoPrefix("==========step10: super user drop normal user") tdLog.printNoPrefix("==========step7: super user drop normal user")
self.test_drop_user() self.test_drop_user()
tdSql.query("show users") tdSql.query("show users")
......
import datetime from datetime import datetime
import time import time
from dataclasses import dataclass from dataclasses import dataclass
...@@ -8,6 +8,7 @@ from util.sql import * ...@@ -8,6 +8,7 @@ from util.sql import *
from util.cases import * from util.cases import *
from util.dnodes import * from util.dnodes import *
from util.constant import * from util.constant import *
from util.common import *
PRIMARY_COL = "ts" PRIMARY_COL = "ts"
...@@ -38,7 +39,7 @@ TAG_COL = [INT_TAG] ...@@ -38,7 +39,7 @@ TAG_COL = [INT_TAG]
# insert data args: # insert data args:
TIME_STEP = 10000 TIME_STEP = 10000
NOW = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000) NOW = int(datetime.timestamp(datetime.now()) * 1000)
# init db/table # init db/table
DBNAME = "db" DBNAME = "db"
...@@ -47,40 +48,6 @@ CTBNAME = "ct1" ...@@ -47,40 +48,6 @@ CTBNAME = "ct1"
NTBNAME = "nt1" NTBNAME = "nt1"
@dataclass
class DataSet:
ts_data : List[int] = None
int_data : List[int] = None
bint_data : List[int] = None
sint_data : List[int] = None
tint_data : List[int] = None
int_un_data : List[int] = None
bint_un_data: List[int] = None
sint_un_data: List[int] = None
tint_un_data: List[int] = None
float_data : List[float] = None
double_data : List[float] = None
bool_data : List[int] = None
binary_data : List[str] = None
nchar_data : List[str] = None
def __post_init__(self):
self.ts_data = []
self.int_data = []
self.bint_data = []
self.sint_data = []
self.tint_data = []
self.int_un_data = []
self.bint_un_data = []
self.sint_un_data = []
self.tint_un_data = []
self.float_data = []
self.double_data = []
self.bool_data = []
self.binary_data = []
self.nchar_data = []
@dataclass @dataclass
class SMAschema: class SMAschema:
creation : str = "CREATE" creation : str = "CREATE"
...@@ -164,10 +131,6 @@ class SMAschema: ...@@ -164,10 +131,6 @@ class SMAschema:
del self.other[k] del self.other[k]
# from ...pytest.util.sql import *
# from ...pytest.util.constant import *
class TDTestCase: class TDTestCase:
updatecfgDict = {"querySmaOptimize": 1} updatecfgDict = {"querySmaOptimize": 1}
...@@ -469,14 +432,12 @@ class TDTestCase: ...@@ -469,14 +432,12 @@ class TDTestCase:
err_sqls.append( SMAschema(index_flag="SMA INDEX ,", tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) ) err_sqls.append( SMAschema(index_flag="SMA INDEX ,", tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
err_sqls.append( SMAschema(index_name="tbname", tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) ) err_sqls.append( SMAschema(index_name="tbname", tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
# current_set # current_set
cur_sqls.append( SMAschema(max_delay="",tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) ) cur_sqls.append( SMAschema(max_delay="",tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
cur_sqls.append( SMAschema(watermark="",index_name="sma_index_2",tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) ) cur_sqls.append( SMAschema(watermark="",index_name="sma_index_2",tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
cur_sqls.append( SMAschema(sliding="",index_name='sma_index_3',tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) ) cur_sqls.append( SMAschema(sliding="",index_name='sma_index_3',tbname=STBNAME, func=(f"min({INT_COL})",f"max({INT_COL})") ) )
return err_sqls, cur_sqls return err_sqls, cur_sqls
def test_create_sma(self): def test_create_sma(self):
...@@ -512,102 +473,48 @@ class TDTestCase: ...@@ -512,102 +473,48 @@ class TDTestCase:
self.test_create_sma() self.test_create_sma()
self.test_drop_sma() self.test_drop_sma()
pass def __create_tb(self, stb=STBNAME, ctb_num=20, ntbnum=1, dbname=DBNAME):
def __create_tb(self):
tdLog.printNoPrefix("==========step: create table") tdLog.printNoPrefix("==========step: create table")
create_stb_sql = f'''create table {STBNAME}( create_stb_sql = f'''create table {dbname}.{stb}(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, {PRIMARY_COL} timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool, {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp, {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
{TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned, {TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
{INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned {INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
) tags ({INT_TAG} int) ) tags ({INT_TAG} int)
''' '''
create_ntb_sql = f'''create table {NTBNAME}(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
{TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
{INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
)
'''
tdSql.execute(create_stb_sql) tdSql.execute(create_stb_sql)
tdSql.execute(create_ntb_sql)
for i in range(4): for i in range(ntbnum):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') create_ntb_sql = f'''create table {dbname}.nt{i+1}(
{PRIMARY_COL} timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
def __data_set(self, rows): {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
data_set = DataSet() {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
{TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
for i in range(rows): {INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
data_set.ts_data.append(NOW + 1 * (rows - i)) )
data_set.int_data.append(rows - i) '''
data_set.bint_data.append(11111 * (rows - i)) tdSql.execute(create_ntb_sql)
data_set.sint_data.append(111 * (rows - i) % 32767)
data_set.tint_data.append(11 * (rows - i) % 127) for i in range(ctb_num):
data_set.int_un_data.append(rows - i) tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.{stb} tags ( {i+1} )')
data_set.bint_un_data.append(11111 * (rows - i))
data_set.sint_un_data.append(111 * (rows - i) % 32767) def __insert_data(self, rows, ctb_num=20, dbname=DBNAME, star_time=NOW):
data_set.tint_un_data.append(11 * (rows - i) % 127)
data_set.float_data.append(1.11 * (rows - i))
data_set.double_data.append(1100.0011 * (rows - i))
data_set.bool_data.append((rows - i) % 2)
data_set.binary_data.append(f'binary{(rows - i)}')
data_set.nchar_data.append(f'nchar_测试_{(rows - i)}')
return data_set
def __insert_data(self):
tdLog.printNoPrefix("==========step: start inser data into tables now.....") tdLog.printNoPrefix("==========step: start inser data into tables now.....")
data = self.__data_set(rows=self.rows) # from ...pytest.util.common import DataSet
data = DataSet()
data.get_order_set(rows, bint_step=2)
# now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000) for i in range(rows):
null_data = '''null, null, null, null, null, null, null, null, null, null, null, null, null, null'''
zero_data = "0, 0, 0, 0, 0, 0, 0, 'binary_0', 'nchar_0', 0, 0, 0, 0, 0"
for i in range(self.rows):
row_data = f''' row_data = f'''
{data.int_data[i]}, {data.bint_data[i]}, {data.sint_data[i]}, {data.tint_data[i]}, {data.float_data[i]}, {data.double_data[i]}, {data.int_data[i]}, {data.bint_data[i]}, {data.sint_data[i]}, {data.tint_data[i]}, {data.float_data[i]}, {data.double_data[i]},
{data.bool_data[i]}, '{data.binary_data[i]}', '{data.nchar_data[i]}', {data.ts_data[i]}, {data.tint_un_data[i]}, {data.bool_data[i]}, '{data.vchar_data[i]}', '{data.nchar_data[i]}', {data.ts_data[i]}, {data.utint_data[i]},
{data.sint_un_data[i]}, {data.int_un_data[i]}, {data.bint_un_data[i]} {data.usint_data[i]}, {data.uint_data[i]}, {data.ubint_data[i]}
'''
neg_row_data = f'''
{-1 * data.int_data[i]}, {-1 * data.bint_data[i]}, {-1 * data.sint_data[i]}, {-1 * data.tint_data[i]}, {-1 * data.float_data[i]}, {-1 * data.double_data[i]},
{data.bool_data[i]}, '{data.binary_data[i]}', '{data.nchar_data[i]}', {data.ts_data[i]}, {1 * data.tint_un_data[i]},
{1 * data.sint_un_data[i]}, {1 * data.int_un_data[i]}, {1 * data.bint_un_data[i]}
''' '''
tdSql.execute( f"insert into {dbname}.{NTBNAME} values ( {star_time - i * int(TIME_STEP * 1.2)}, {row_data} )" )
tdSql.execute( for j in range(ctb_num):
f"insert into ct1 values ( {NOW - i * TIME_STEP}, {row_data} )") tdSql.execute( f"insert into {dbname}.ct{j+1} values ( {star_time - j * i * TIME_STEP}, {row_data} )" )
tdSql.execute(
f"insert into ct2 values ( {NOW - i * int(TIME_STEP * 0.6)}, {neg_row_data} )")
tdSql.execute(
f"insert into ct4 values ( {NOW - i * int(TIME_STEP * 0.8) }, {row_data} )")
tdSql.execute(
f"insert into {NTBNAME} values ( {NOW - i * int(TIME_STEP * 1.2)}, {row_data} )")
tdSql.execute(
f"insert into ct2 values ( {NOW + int(TIME_STEP * 0.6)}, {null_data} )")
tdSql.execute(
f"insert into ct2 values ( {NOW - (self.rows + 1) * int(TIME_STEP * 0.6)}, {null_data} )")
tdSql.execute(
f"insert into ct2 values ( {NOW - self.rows * int(TIME_STEP * 0.29) }, {null_data} )")
tdSql.execute(
f"insert into ct4 values ( {NOW + int(TIME_STEP * 0.8)}, {null_data} )")
tdSql.execute(
f"insert into ct4 values ( {NOW - (self.rows + 1) * int(TIME_STEP * 0.8)}, {null_data} )")
tdSql.execute(
f"insert into ct4 values ( {NOW - self.rows * int(TIME_STEP * 0.39)}, {null_data} )")
tdSql.execute(
f"insert into {NTBNAME} values ( {NOW + int(TIME_STEP * 1.2)}, {null_data} )")
tdSql.execute(
f"insert into {NTBNAME} values ( {NOW - (self.rows + 1) * int(TIME_STEP * 1.2)}, {null_data} )")
tdSql.execute(
f"insert into {NTBNAME} values ( {NOW - self.rows * int(TIME_STEP * 0.59)}, {null_data} )")
def run(self): def run(self):
self.rows = 10 self.rows = 10
...@@ -616,14 +523,60 @@ class TDTestCase: ...@@ -616,14 +523,60 @@ class TDTestCase:
tdLog.printNoPrefix("==========step1:create table in normal database") tdLog.printNoPrefix("==========step1:create table in normal database")
tdSql.prepare() tdSql.prepare()
self.__create_tb() self.__create_tb(dbname=DBNAME)
self.__insert_data() self.__insert_data(rows=self.rows)
self.all_test() self.all_test()
# # from ...pytest.util.sql import *
# drop databases, create same name db、stb and sma index # drop databases, create same name db、stb and sma index
tdSql.prepare() tdSql.prepare()
self.__create_tb() self.__create_tb(dbname=DBNAME)
self.__insert_data() self.__insert_data(rows=self.rows,star_time=NOW + self.rows * 2 * TIME_STEP)
tdLog.printNoPrefix("==========step1.1 : create a tsma index and checkdata")
tdSql.execute(f"create sma index {DBNAME}.sma_index_name1 on {DBNAME}.{STBNAME} function(max({INT_COL}),max({BINT_COL}),min({INT_COL})) interval(6m,10s) sliding(6m)")
self.__insert_data(rows=self.rows)
tdSql.query(f"select max({INT_COL}), max({BINT_COL}), min({INT_COL}) from {DBNAME}.{STBNAME} interval(6m,10s) sliding(6m)")
tdSql.checkData(0, 0, self.rows - 1)
tdSql.checkData(0, 1, (self.rows - 1) * 2 )
tdSql.checkData(tdSql.queryRows - 1, 2, 0)
# tdSql.checkData(0, 2, 0)
tdLog.printNoPrefix("==========step1.2 : alter table schema, drop col without index")
tdSql.execute(f"alter stable {DBNAME}.{STBNAME} drop column {BINARY_COL}")
tdSql.query(f"select max({INT_COL}), max({BINT_COL}), min({INT_COL}) from {DBNAME}.{STBNAME} interval(6m,10s) sliding(6m)")
tdSql.checkData(0, 0, self.rows - 1)
tdSql.checkData(0, 1, (self.rows - 1) * 2 )
tdSql.checkData(tdSql.queryRows - 1, 2, 0)
tdLog.printNoPrefix("==========step1.3 : alter table schema, drop col with index")
# TODO: TD-18047, can not drop col, when col in tsma-index and tsma-index is not dropped.
tdSql.error(f"alter stable {DBNAME}.stb1 drop column {BINT_COL}")
tdLog.printNoPrefix("==========step1.4 : alter table schema, add col")
tdSql.execute(f"alter stable {DBNAME}.{STBNAME} add column {BINT_COL}_1 bigint")
tdSql.execute(f"insert into {DBNAME}.{CTBNAME} ({PRIMARY_COL}, {BINT_COL}_1) values(now(), 111)")
tdSql.query(f"select max({INT_COL}), max({BINT_COL}), min({INT_COL}) from {DBNAME}.{STBNAME} interval(6m,10s) sliding(6m)")
tdSql.checkData(0, 0, self.rows - 1)
tdSql.checkData(0, 1, (self.rows - 1) * 2 )
tdSql.checkData(tdSql.queryRows - 1, 2, 0)
# tdSql.checkData(0, 2, 0)
tdSql.query(f"select max({BINT_COL}_1) from {DBNAME}.{STBNAME} ")
tdSql.checkData(0, 0 , 111)
tdSql.execute(f"flush database {DBNAME}")
tdLog.printNoPrefix("==========step1.5 : drop child table")
tdSql.execute(f"drop table {CTBNAME}")
tdSql.query(f"select max({INT_COL}), max({BINT_COL}), min({INT_COL}) from {DBNAME}.{STBNAME} interval(6m,10s) sliding(6m)")
tdSql.checkData(0, 0, self.rows - 1)
tdSql.checkData(0, 1, (self.rows - 1) * 2 )
tdSql.checkData(tdSql.queryRows - 1, 2, 0)
tdLog.printNoPrefix("==========step1.6 : drop stable")
tdSql.execute(f"drop table {STBNAME}")
tdSql.error(f"select * from {DBNAME}.{STBNAME}")
self.all_test() self.all_test()
tdLog.printNoPrefix("==========step2:create table in rollup database") tdLog.printNoPrefix("==========step2:create table in rollup database")
...@@ -640,7 +593,6 @@ class TDTestCase: ...@@ -640,7 +593,6 @@ class TDTestCase:
tdSql.execute("flush database db ") tdSql.execute("flush database db ")
tdLog.printNoPrefix("==========step4:after wal, all check again ") tdLog.printNoPrefix("==========step4:after wal, all check again ")
self.all_test() self.all_test()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册