提交 563deee1 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/tsdb_last

...@@ -13,7 +13,7 @@ title: 通过 Docker 快速体验 TDengine ...@@ -13,7 +13,7 @@ title: 通过 Docker 快速体验 TDengine
如果已经安装了 docker, 只需执行下面的命令。 如果已经安装了 docker, 只需执行下面的命令。
```shell ```shell
docker run -d -p 6030:6030 -p 6041/6041 -p 6043-6049/6043-6049 -p 6043-6049:6043-6049/udp tdengine/tdengine docker run -d -p 6030:6030 -p 6041:6041 -p 6043-6049:6043-6049 -p 6043-6049:6043-6049/udp tdengine/tdengine
``` ```
注意:TDengine 3.0 服务端仅使用 6030 TCP 端口。6041 为 taosAdapter 所使用提供 REST 服务端口。6043-6049 为 taosAdapter 提供第三方应用接入所使用端口,可根据需要选择是否打开。 注意:TDengine 3.0 服务端仅使用 6030 TCP 端口。6041 为 taosAdapter 所使用提供 REST 服务端口。6043-6049 为 taosAdapter 提供第三方应用接入所使用端口,可根据需要选择是否打开。
......
...@@ -29,6 +29,7 @@ echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-stable stable main" | ...@@ -29,6 +29,7 @@ echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-stable stable main" |
如果安装 Beta 版需要安装包仓库 如果安装 Beta 版需要安装包仓库
```bash ```bash
wget -qO - http://repos.taosdata.com/tdengine.key | sudo apt-key add -
echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-beta beta main" | sudo tee /etc/apt/sources.list.d/tdengine-beta.list echo "deb [arch=amd64] http://repos.taosdata.com/tdengine-beta beta main" | sudo tee /etc/apt/sources.list.d/tdengine-beta.list
``` ```
......
--- ---
sidebar_label: 消息队列 sidebar_label: 数据订阅
title: 消息队列 title: 数据订阅
--- ---
TDengine 3.0.0.0 开始对消息队列做了大幅的优化和增强以简化用户的解决方案。 TDengine 3.0.0.0 开始对消息队列做了大幅的优化和增强以简化用户的解决方案。
...@@ -8,24 +8,17 @@ TDengine 3.0.0.0 开始对消息队列做了大幅的优化和增强以简化用 ...@@ -8,24 +8,17 @@ TDengine 3.0.0.0 开始对消息队列做了大幅的优化和增强以简化用
## 创建订阅主题 ## 创建订阅主题
```sql ```sql
CREATE TOPIC [IF NOT EXISTS] topic_name AS {subquery | DATABASE db_name | STABLE stb_name }; CREATE TOPIC [IF NOT EXISTS] topic_name AS subquery;
``` ```
订阅主题包括三种:列订阅、超级表订阅和数据库订阅。
**列订阅是**用 subquery 描述,支持过滤和标量函数和 UDF 标量函数,不支持 JOIN、GROUP BY、窗口切分子句、聚合函数和 UDF 聚合函数。列订阅规则如下: TOPIC 支持过滤和标量函数和 UDF 标量函数,不支持 JOIN、GROUP BY、窗口切分子句、聚合函数和 UDF 聚合函数。列订阅规则如下:
1. TOPIC 一旦创建则返回结果的字段确定 1. TOPIC 一旦创建则返回结果的字段确定
2. 被订阅或用于计算的列不可被删除、修改 2. 被订阅或用于计算的列不可被删除、修改
3. 列可以新增,但新增的列不出现在订阅结果字段中 3. 列可以新增,但新增的列不出现在订阅结果字段中
4. 对于 select \*,则订阅展开为创建时所有的列(子表、普通表为数据列,超级表为数据列加标签列) 4. 对于 select \*,则订阅展开为创建时所有的列(子表、普通表为数据列,超级表为数据列加标签列)
**超级表订阅和数据库订阅**规则如下:
1. 被订阅主体的 schema 变更不受限
2. 返回消息中 schema 是块级别的,每块的 schema 可能不一样
3. 列变更后写入的数据若未落盘,将以写入时的 schema 返回
4. 列变更后写入的数据若未已落盘,将以落盘时的 schema 返回
## 删除订阅主题 ## 删除订阅主题
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
title: REST API title: REST API
--- ---
为支持各种不同类型平台的开发,TDengine 提供符合 REST 设计标准的 API,即 REST API。为最大程度降低学习成本,不同于其他数据库 REST API 的设计方法,TDengine 直接通过 HTTP POST 请求 BODY 中包含的 SQL 语句来操作数据库,仅需要一个 URL。REST 连接器的使用参见[视频教程](https://www.taosdata.com/blog/2020/11/11/1965.html)。 为支持各种不同类型平台的开发,TDengine 提供符合 REST 设计标准的 API,即 REST API。为最大程度降低学习成本,不同于其他数据库 REST API 的设计方法,TDengine 直接通过 HTTP POST 请求 BODY 中包含的 SQL 语句来操作数据库,仅需要一个 URL。REST 连接器的使用参见 [视频教程](https://www.taosdata.com/blog/2020/11/11/1965.html)。
:::note :::note
与原生连接器的一个区别是,RESTful 接口是无状态的,因此 `USE db_name` 指令没有效果,所有对表名、超级表名的引用都需要指定数据库名前缀。支持在 RESTful URL 中指定 db_name,这时如果 SQL 语句中没有指定数据库名前缀的话,会使用 URL 中指定的这个 db_name。 与原生连接器的一个区别是,RESTful 接口是无状态的,因此 `USE db_name` 指令没有效果,所有对表名、超级表名的引用都需要指定数据库名前缀。支持在 RESTful URL 中指定 db_name,这时如果 SQL 语句中没有指定数据库名前缀的话,会使用 URL 中指定的这个 db_name。
...@@ -20,8 +20,10 @@ RESTful 接口不依赖于任何 TDengine 的库,因此客户端不需要安 ...@@ -20,8 +20,10 @@ RESTful 接口不依赖于任何 TDengine 的库,因此客户端不需要安
下面示例是列出所有的数据库,请把 h1.taosdata.com 和 6041(缺省值)替换为实际运行的 TDengine 服务 FQDN 和端口号: 下面示例是列出所有的数据库,请把 h1.taosdata.com 和 6041(缺省值)替换为实际运行的 TDengine 服务 FQDN 和端口号:
```html ```bash
curl -L -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" -d "show databases;" h1.taosdata.com:6041/rest/sql curl -L -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" \
-d "select name, ntables, status from information_schema.ins_databases;" \
h1.taosdata.com:6041/rest/sql
``` ```
返回值结果如下表示验证通过: 返回值结果如下表示验证通过:
...@@ -35,188 +37,27 @@ curl -L -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" -d "show databases;" h1.t ...@@ -35,188 +37,27 @@ curl -L -H "Authorization: Basic cm9vdDp0YW9zZGF0YQ==" -d "show databases;" h1.t
"VARCHAR", "VARCHAR",
64 64
], ],
[
"create_time",
"TIMESTAMP",
8
],
[
"vgroups",
"SMALLINT",
2
],
[ [
"ntables", "ntables",
"BIGINT", "BIGINT",
8 8
], ],
[
"replica",
"TINYINT",
1
],
[
"strict",
"VARCHAR",
4
],
[
"duration",
"VARCHAR",
10
],
[
"keep",
"VARCHAR",
32
],
[
"buffer",
"INT",
4
],
[
"pagesize",
"INT",
4
],
[
"pages",
"INT",
4
],
[
"minrows",
"INT",
4
],
[
"maxrows",
"INT",
4
],
[
"comp",
"TINYINT",
1
],
[
"precision",
"VARCHAR",
2
],
[ [
"status", "status",
"VARCHAR", "VARCHAR",
10 10
],
[
"retention",
"VARCHAR",
60
],
[
"single_stable",
"BOOL",
1
],
[
"cachemodel",
"VARCHAR",
11
],
[
"cachesize",
"INT",
4
],
[
"wal_level",
"TINYINT",
1
],
[
"wal_fsync_period",
"INT",
4
],
[
"wal_retention_period",
"INT",
4
],
[
"wal_retention_size",
"BIGINT",
8
],
[
"wal_roll_period",
"INT",
4
],
[
"wal_seg_size",
"BIGINT",
8
] ]
], ],
"data": [ "data": [
[ [
"information_schema", "information_schema",
null, 16,
null, "ready"
14,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
"ready",
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
], ],
[ [
"performance_schema", "performance_schema",
null, 9,
null, "ready"
3,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
"ready",
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
] ]
], ],
"rows": 2 "rows": 2
...@@ -231,21 +72,21 @@ http://<fqdn>:<port>/rest/sql/[db_name] ...@@ -231,21 +72,21 @@ http://<fqdn>:<port>/rest/sql/[db_name]
参数说明: 参数说明:
- fqnd: 集群中的任一台主机 FQDN 或 IP 地址 - fqnd: 集群中的任一台主机 FQDN 或 IP 地址
- port: 配置文件中 httpPort 配置项,缺省为 6041 - port: 配置文件中 httpPort 配置项,缺省为 6041
- db_name: 可选参数,指定本次所执行的 SQL 语句的默认数据库库名。 - db_name: 可选参数,指定本次所执行的 SQL 语句的默认数据库库名。
例如:`http://h1.taos.com:6041/rest/sql/test` 是指向地址为 `h1.taos.com:6041` 的 URL,并将默认使用的数据库库名设置为 `test`。 例如:`http://h1.taos.com:6041/rest/sql/test` 是指向地址为 `h1.taos.com:6041` 的 URL,并将默认使用的数据库库名设置为 `test`。
HTTP 请求的 Header 里需带有身份认证信息,TDengine 支持 Basic 认证与自定义认证两种机制,后续版本将提供标准安全的数字签名机制来做身份验证。 HTTP 请求的 Header 里需带有身份认证信息,TDengine 支持 Basic 认证与自定义认证两种机制,后续版本将提供标准安全的数字签名机制来做身份验证。
- [自定义身份认证信息](#自定义授权码)如下所示 - [自定义身份认证信息](#自定义授权码)如下所示
```text ```text
Authorization: Taosd <TOKEN> Authorization: Taosd <TOKEN>
``` ```
- Basic 身份认证信息如下所示 - Basic 身份认证信息如下所示
```text ```text
Authorization: Basic <TOKEN> Authorization: Basic <TOKEN>
...@@ -259,13 +100,13 @@ HTTP 请求的 BODY 里就是一个完整的 SQL 语句,SQL 语句中的数据 ...@@ -259,13 +100,13 @@ HTTP 请求的 BODY 里就是一个完整的 SQL 语句,SQL 语句中的数据
curl -L -H "Authorization: Basic <TOKEN>" -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name] curl -L -H "Authorization: Basic <TOKEN>" -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
``` ```
或者 或者
```bash ```bash
curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name] curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
``` ```
其中,`TOKEN` 为 `{username}:{password}` 经过 Base64 编码之后的字符串,例如 `root:taosdata` 编码后为 `cm9vdDp0YW9zZGF0YQ==` 其中,`TOKEN` 为 `{username}:{password}` 经过 Base64 编码之后的字符串,例如 `root:taosdata` 编码后为 `cm9vdDp0YW9zZGF0YQ==`
## HTTP 返回格式 ## HTTP 返回格式
...@@ -282,27 +123,9 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name] ...@@ -282,27 +123,9 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
### HTTP body 结构 ### HTTP body 结构
<table> #### 正确执行
<tr>
<th>执行结果</th> 样例:
<th>说明</th>
<th>样例</th>
</tr>
<tr>
<td>正确执行</td>
<td>
code:(int)0 代表成功
<br/>
<br/>
column_meta:([][3]any)列信息,每个列会用三个值来说明,分别为:列名(string)、列类型(string)、类型长度(int)
<br/>
<br/>
rows:(int)数据返回行数
<br/>
<br/>
data:([][]any)具体数据内容
</td>
<td>
```json ```json
{ {
...@@ -313,23 +136,16 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name] ...@@ -313,23 +136,16 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
} }
``` ```
</td> 说明:
</tr>
<tr> - code:(`int`)0 代表成功。
<td>正确查询</td> - column_meta:(`[1][3]any`)只返回 `[["affected_rows", "INT", 4]]`。
<td> - rows:(`int`)只返回 `1`。
code:(int)0 代表成功 - data:(`[][]any`)返回受影响行数。
<br/>
<br/> #### 正确查询
column_meta:([][3]any) 列信息,每个列会用三个值来说明,分别为:列名(string)、列类型(string)、类型长度(int)
<br/> 样例:
<br/>
rows:(int)数据返回行数
<br/>
<br/>
data:([][]any)具体数据内容
</td>
<td>
```json ```json
{ {
...@@ -385,17 +201,35 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name] ...@@ -385,17 +201,35 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
} }
``` ```
</td> 说明:
</tr>
<tr> - code:(`int`)0 代表成功。
<td>错误</td> - column_meta:(`[][3]any`) 列信息,每个列会用三个值来说明,分别为:列名(string)、列类型(string)、类型长度(int)。
<td> - rows:(`int`)数据返回行数。
code:(int)错误码 - data:(`[][]any`)具体数据内容(时间格式仅支持 RFC3339,结果集为 0 时区)。
<br/>
<br/> 列类型使用如下字符串:
desc:(string)错误描述
</td> - "NULL"
<td> - "BOOL"
- "TINYINT"
- "SMALLINT"
- "INT"
- "BIGINT"
- "FLOAT"
- "DOUBLE"
- "VARCHAR"
- "TIMESTAMP"
- "NCHAR"
- "TINYINT UNSIGNED"
- "SMALLINT UNSIGNED"
- "INT UNSIGNED"
- "BIGINT UNSIGNED"
- "JSON"
#### 错误
样例:
```json ```json
{ {
...@@ -404,30 +238,10 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name] ...@@ -404,30 +238,10 @@ curl -L -u username:password -d "<SQL>" <ip>:<PORT>/rest/sql/[db_name]
} }
``` ```
</td> 说明:
</tr>
</table> - code:(`int`)错误码。
- desc:(`string`)错误描述。
### 说明
- 时间格式仅支持 RFC3339,结果集为 0 时区
- 列类型使用如下字符串:
> "NULL"
> "BOOL"
> "TINYINT"
> "SMALLINT"
> "INT"
> "BIGINT"
> "FLOAT"
> "DOUBLE"
> "VARCHAR"
> "TIMESTAMP"
> "NCHAR"
> "TINYINT UNSIGNED"
> "SMALLINT UNSIGNED"
> "INT UNSIGNED"
> "BIGINT UNSIGNED"
> "JSON"
## 自定义授权码 ## 自定义授权码
...@@ -439,11 +253,9 @@ curl http://<fqnd>:<port>/rest/login/<username>/<password> ...@@ -439,11 +253,9 @@ curl http://<fqnd>:<port>/rest/login/<username>/<password>
其中,`fqdn` 是 TDengine 数据库的 FQDN 或 IP 地址,`port` 是 TDengine 服务的端口号,`username` 为数据库用户名,`password` 为数据库密码,返回值为 JSON 格式,各字段含义如下: 其中,`fqdn` 是 TDengine 数据库的 FQDN 或 IP 地址,`port` 是 TDengine 服务的端口号,`username` 为数据库用户名,`password` 为数据库密码,返回值为 JSON 格式,各字段含义如下:
- status:请求结果的标志位 - status:请求结果的标志位。
- code:返回值代码。
- code:返回值代码 - desc:授权码。
- desc:授权码
获取授权码示例: 获取授权码示例:
......
...@@ -404,47 +404,3 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多 ...@@ -404,47 +404,3 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
**支持版本** **支持版本**
该功能接口从 2.3.0.0 版本开始支持。 该功能接口从 2.3.0.0 版本开始支持。
### 订阅和消费 API
订阅 API 目前支持订阅一张或多张表,并通过定期轮询的方式不断获取写入表中的最新数据。
- `TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval)`
该函数负责启动订阅服务,成功时返回订阅对象,失败时返回 `NULL`,其参数为:
- taos:已经建立好的数据库连接
- restart:如果订阅已经存在,是重新开始,还是继续之前的订阅
- topic:订阅的主题(即名称),此参数是订阅的唯一标识
- sql:订阅的查询语句,此语句只能是 `select` 语句,只应查询原始数据,只能按时间正序查询数据
- fp:收到查询结果时的回调函数(稍后介绍函数原型),只在异步调用时使用,同步调用时此参数应该传 `NULL`
- param:调用回调函数时的附加参数,系统 API 将其原样传递到回调函数,不进行任何处理
- interval:轮询周期,单位为毫秒。异步调用时,将根据此参数周期性的调用回调函数,为避免对系统性能造成影响,不建议将此参数设置的过小;同步调用时,如两次调用 `taos_consume()` 的间隔小于此周期,API 将会阻塞,直到时间间隔超过此周期。
- `typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code)`
异步模式下,回调函数的原型,其参数为:
- tsub:订阅对象
- res:查询结果集,注意结果集中可能没有记录
- param:调用 `taos_subscribe()` 时客户程序提供的附加参数
- code:错误码
:::note
在这个回调函数里不可以做耗时过长的处理,尤其是对于返回的结果集中数据较多的情况,否则有可能导致客户端阻塞等异常状态。如果必须进行复杂计算,则建议在另外的线程中进行处理。
:::
- `TAOS_RES *taos_consume(TAOS_SUB *tsub)`
同步模式下,该函数用来获取订阅的结果。 用户应用程序将其置于一个循环之中。 如两次调用 `taos_consume()` 的间隔小于订阅的轮询周期,API 将会阻塞,直到时间间隔超过此周期。如果数据库有新记录到达,该 API 将返回该最新的记录,否则返回一个没有记录的空结果集。 如果返回值为 `NULL`,说明系统出错。 异步模式下,用户程序不应调用此 API。
:::note
在调用 `taos_consume()` 之后,用户应用应确保尽快调用 `taos_fetch_row()` 或 `taos_fetch_block()` 来处理订阅结果,否则服务端会持续缓存查询结果数据等待客户端读取,极端情况下会导致服务端内存消耗殆尽,影响服务稳定性。
:::
- `void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress)`
取消订阅。 如参数 `keepProgress` 不为 0,API 会保留订阅的进度信息,后续调用 `taos_subscribe()` 时可以基于此进度继续;否则将删除进度信息,后续只能重新开始读取数据。
...@@ -712,7 +712,7 @@ while(true) { ...@@ -712,7 +712,7 @@ while(true) {
} }
``` ```
`poll` 每次调用获取一个消息。请按需选择合理的调用 `poll` 的频率(如例子中的 `Duration.ofMillis(100)`),否则会给服务端造成不必要的压力。 `poll` 每次调用获取一个消息。
#### 关闭订阅 #### 关闭订阅
......
...@@ -275,12 +275,8 @@ typedef struct SStreamTask { ...@@ -275,12 +275,8 @@ typedef struct SStreamTask {
int32_t nodeId; int32_t nodeId;
SEpSet epSet; SEpSet epSet;
// used for task source and sink,
// while task agg should have processedVer for each child
int64_t recoverSnapVer; int64_t recoverSnapVer;
int64_t startVer; int64_t startVer;
int64_t checkpointVer;
int64_t processedVer;
// children info // children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*> SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
......
...@@ -381,6 +381,9 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { ...@@ -381,6 +381,9 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
} }
#endif #endif
taosMemoryFree(pParam->pOffset);
if (pBuf->pData) taosMemoryFree(pBuf->pData);
/*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId, /*tscDebug("receive offset commit cb of %s on vgId:%d, offset is %" PRId64, pParam->pOffset->subKey, pParam->->vgId,
* pOffset->version);*/ * pOffset->version);*/
...@@ -402,6 +405,8 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { ...@@ -402,6 +405,8 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
tsem_post(&pParamSet->rspSem); tsem_post(&pParamSet->rspSem);
} }
taosMemoryFree(pParamSet);
#if 0 #if 0
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree); taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree); taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
...@@ -611,12 +616,12 @@ int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t ...@@ -611,12 +616,12 @@ int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t
} }
} }
#if 0
if (!async) { if (!async) {
#if 0
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree); taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree); taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
}
#endif #endif
}
return 0; return 0;
} }
...@@ -1216,6 +1221,7 @@ END: ...@@ -1216,6 +1221,7 @@ END:
} else { } else {
taosMemoryFree(pParam); taosMemoryFree(pParam);
} }
taosMemoryFree(pMsg->pData);
return code; return code;
} }
......
...@@ -80,7 +80,7 @@ int32_t vnodeQueryOpen(SVnode* pVnode); ...@@ -80,7 +80,7 @@ int32_t vnodeQueryOpen(SVnode* pVnode);
void vnodeQueryClose(SVnode* pVnode); void vnodeQueryClose(SVnode* pVnode);
int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct); int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct); int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg); int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit.c // vnodeCommit.c
int32_t vnodeBegin(SVnode* pVnode); int32_t vnodeBegin(SVnode* pVnode);
...@@ -98,6 +98,7 @@ void vnodeSyncStart(SVnode* pVnode); ...@@ -98,6 +98,7 @@ void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode);
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg); void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
bool vnodeIsLeader(SVnode* pVnode); bool vnodeIsLeader(SVnode* pVnode);
bool vnodeIsRoleLeader(SVnode* pVnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -144,6 +144,7 @@ int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb ...@@ -144,6 +144,7 @@ int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb
STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId,
void* pMemRef); void* pMemRef);
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
// tq // tq
int tqInit(); int tqInit();
...@@ -169,10 +170,9 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); ...@@ -169,10 +170,9 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list);
SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq); const char* stbFullName, SBatchDeleteReq* pDeleteReq);
// sma // sma
int32_t smaInit(); int32_t smaInit();
......
...@@ -201,9 +201,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { ...@@ -201,9 +201,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
} }
SBatchDeleteReq deleteReq; SBatchDeleteReq deleteReq;
SSubmitReq *pSubmitReq = SSubmitReq *pSubmitReq = tqBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true,
tdBlockToSubmit(pSma->pVnode, (const SArray *)msg, pTsmaStat->pTSchema, true, pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbUid, pTsmaStat->pTSma->dstTbName, &deleteReq);
pTsmaStat->pTSma->dstTbName, pTsmaStat->pTSma->dstVgId, &deleteReq);
if (!pSubmitReq) { if (!pSubmitReq) {
smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), smaError("vgId:%d, failed to gen submit blk while tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "tq.h" #include "tq.h"
#include "vnd.h"
#if 0 #if 0
void tqTmrRspFunc(void* param, void* tmrId) { void tqTmrRspFunc(void* param, void* tmrId) {
...@@ -212,9 +213,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ ...@@ -212,9 +213,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
#endif #endif
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
walApplyVer(pTq->pVnode->pWal, ver); if (vnodeIsRoleLeader(pTq->pVnode) && msgType == TDMT_VND_SUBMIT) {
if (msgType == TDMT_VND_SUBMIT) {
if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0; if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0;
void* data = taosMemoryMalloc(msgLen); void* data = taosMemoryMalloc(msgLen);
......
...@@ -25,8 +25,7 @@ int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl ...@@ -25,8 +25,7 @@ int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGidCol = taosArrayGet(pDataBlock->pDataBlock, GROUPID_COLUMN_INDEX);
for (int32_t row = 0; row < totRow; row++) { for (int32_t row = 0; row < totRow; row++) {
int64_t ts = *(int64_t*)colDataGetData(pTsCol, row); int64_t ts = *(int64_t*)colDataGetData(pTsCol, row);
/*int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);*/ int64_t groupId = *(int64_t*)colDataGetData(pGidCol, row);
int64_t groupId = 0;
char* name = buildCtbNameByGroupId(stbFullName, groupId); char* name = buildCtbNameByGroupId(stbFullName, groupId);
tqDebug("stream delete msg: groupId :%ld, name: %s", groupId, name); tqDebug("stream delete msg: groupId :%ld, name: %s", groupId, name);
SMetaReader mr = {0}; SMetaReader mr = {0};
...@@ -49,8 +48,8 @@ int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl ...@@ -49,8 +48,8 @@ int32_t tdBuildDeleteReq(SVnode* pVnode, const char* stbFullName, const SSDataBl
return 0; return 0;
} }
SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, bool createTb, SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pTSchema, bool createTb,
int64_t suid, const char* stbFullName, int32_t vgId, SBatchDeleteReq* pDeleteReq) { int64_t suid, const char* stbFullName, SBatchDeleteReq* pDeleteReq) {
SSubmitReq* ret = NULL; SSubmitReq* ret = NULL;
SArray* schemaReqs = NULL; SArray* schemaReqs = NULL;
SArray* schemaReqSz = NULL; SArray* schemaReqSz = NULL;
...@@ -153,7 +152,7 @@ SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem ...@@ -153,7 +152,7 @@ SSubmitReq* tdBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
// assign data // assign data
// TODO // TODO
ret = rpcMallocCont(cap); ret = rpcMallocCont(cap);
ret->header.vgId = vgId; ret->header.vgId = pVnode->config.vgId;
ret->length = sizeof(SSubmitReq); ret->length = sizeof(SSubmitReq);
ret->numOfBlocks = htonl(sz); ret->numOfBlocks = htonl(sz);
...@@ -234,8 +233,8 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) { ...@@ -234,8 +233,8 @@ void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
ASSERT(pTask->tbSink.pTSchema); ASSERT(pTask->tbSink.pTSchema);
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq)); deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
SSubmitReq* pReq = tdBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid, SSubmitReq* pReq = tqBlockToSubmit(pVnode, pRes, pTask->tbSink.pTSchema, true, pTask->tbSink.stbUid,
pTask->tbSink.stbFullName, pVnode->config.vgId, &deleteReq); pTask->tbSink.stbFullName, &deleteReq);
tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId); tqDebug("vgId:%d, task %d convert blocks over, put into write-queue", TD_VID(pVnode), pTask->taskId);
......
...@@ -247,6 +247,8 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -247,6 +247,8 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); vTrace("vgId:%d, process %s request success, index:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
walApplyVer(pVnode->pWal, version);
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1; return -1;
......
...@@ -764,6 +764,8 @@ void vnodeSyncStart(SVnode *pVnode) { ...@@ -764,6 +764,8 @@ void vnodeSyncStart(SVnode *pVnode) {
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
bool vnodeIsRoleLeader(SVnode *pVnode) { return syncGetMyRole(pVnode->sync) == TAOS_SYNC_STATE_LEADER; }
bool vnodeIsLeader(SVnode *pVnode) { bool vnodeIsLeader(SVnode *pVnode) {
if (!syncIsReady(pVnode->sync)) { if (!syncIsReady(pVnode->sync)) {
vDebug("vgId:%d, vnode not ready, state:%s, restore:%d", pVnode->config.vgId, syncGetMyRoleStr(pVnode->sync), vDebug("vgId:%d, vnode not ready, state:%s, restore:%d", pVnode->config.vgId, syncGetMyRoleStr(pVnode->sync),
......
...@@ -642,7 +642,6 @@ static int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarPar ...@@ -642,7 +642,6 @@ static int32_t doTrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarPar
int32_t charLen = (type == TSDB_DATA_TYPE_VARCHAR) ? len : len / TSDB_NCHAR_SIZE; int32_t charLen = (type == TSDB_DATA_TYPE_VARCHAR) ? len : len / TSDB_NCHAR_SIZE;
trimFn(input, output, type, charLen); trimFn(input, output, type, charLen);
varDataSetLen(output, len);
colDataAppend(pOutputData, i, output, false); colDataAppend(pOutputData, i, output, false);
output += varDataTLen(output); output += varDataTLen(output);
} }
......
...@@ -238,19 +238,23 @@ class TAdapter: ...@@ -238,19 +238,23 @@ class TAdapter:
if self.running != 0: if self.running != 0:
psCmd = f"ps -ef|grep -w {toBeKilled}| grep -v grep | awk '{{print $2}}'" psCmd = f"ps -ef|grep -w {toBeKilled}| grep -v grep | awk '{{print $2}}'"
# psCmd = f"pgrep {toBeKilled}"
processID = subprocess.check_output( processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8") psCmd, shell=True)
while(processID): while(processID):
killCmd = f"kill {signal} {processID} > /dev/null 2>&1" killCmd = f"pkill {signal} {processID} > /dev/null 2>&1"
os.system(killCmd) os.system(killCmd)
time.sleep(1) time.sleep(1)
processID = subprocess.check_output( processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8") psCmd, shell=True).decode("utf-8")
if not platform.system().lower() == 'windows': if not platform.system().lower() == 'windows':
for port in range(6030, 6041): port = 6041
fuserCmd = f"fuser -k -n tcp {port} > /dev/null" fuserCmd = f"fuser -k -n tcp {port} > /dev/null"
os.system(fuserCmd) os.system(fuserCmd)
# for port in range(6030, 6041):
# fuserCmd = f"fuser -k -n tcp {port} > /dev/null"
# os.system(fuserCmd)
self.running = 0 self.running = 0
tdLog.debug(f"taosadapter is stopped by kill {signal}") tdLog.debug(f"taosadapter is stopped by kill {signal}")
......
...@@ -35,7 +35,7 @@ class TDTestCase: ...@@ -35,7 +35,7 @@ class TDTestCase:
self.testcaseFilename = os.path.split(__file__)[-1] self.testcaseFilename = os.path.split(__file__)[-1]
# os.system("rm -rf %s/%s.sql" % (self.testcasePath,self.testcaseFilename)) # os.system("rm -rf %s/%s.sql" % (self.testcasePath,self.testcaseFilename))
tdLog.debug("start to execute %s" % __file__) tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), True) tdSql.init(conn.cursor(), False)
def run(self): def run(self):
# tdSql.prepare() # tdSql.prepare()
...@@ -47,20 +47,20 @@ class TDTestCase: ...@@ -47,20 +47,20 @@ class TDTestCase:
i = 0 i = 0
# add 100000 table # add 100000 table
tdSql.execute("create table if not exists jsons1(ts timestamp, dataInt int, dataBool bool, dataStr nchar(50), dataStrBin binary(150)) tags(jtag json)") tdSql.execute("create table if not exists jsons1(ts timestamp, dataInt int, dataBool bool, dataStr nchar(50), dataStrBin binary(150)) tags(jtag json)")
while i <= 10 0000: while i <= 100000:
sql = """insert into jsons1_{%d} using jsons1 tags('{"tag1":{%d}}') values(1591060618000, 1, false, 'json1', '你是') (1591060608000, 23, true, '等等', 'json')"""%(i, i) sql = """insert into jsons1_{%d} using jsons1 tags('{"tag1":{%d}}') values(1591060618000, 1, false, 'json1', '你是') (1591060608000, 23, true, '等等', 'json')"""%(i, i)
tdSql.execute(sql) tdSql.execute(sql)
i = i + 1 i = i + 1
// do query # do query
i = 0 i = 0
while i <= 10 0000: while i <= 100000:
sql = """select count(*) from jsons1 where jtag->'tag1' = %d"""%(i) sql = """select count(*) from jsons1 where jtag->'tag1' = %d"""%(i)
tdSql.query(sql) tdSql.query(sql)
if 1 != tdSql.getRows(): if 1 != tdSql.getRows():
print("err: %s"%(sql)) print("err: %s"%(sql))
while i <= 10000000 while i <= 10000000:
sql = """insert into jsons1_{%d} using jsons1 tags('{"tag1":{%d}}') values(1591060618000, 1, false, 'json1', '你是') (1591060608000, 23, true, '等等', 'json')"""%(i, i) sql = """insert into jsons1_{%d} using jsons1 tags('{"tag1":{%d}}') values(1591060618000, 1, false, 'json1', '你是') (1591060608000, 23, true, '等等', 'json')"""%(i, i)
tdSql.execute(sql) tdSql.execute(sql)
i = i + 1 i = i + 1
...@@ -704,4 +704,3 @@ class TDTestCase: ...@@ -704,4 +704,3 @@ class TDTestCase:
tdCases.addWindows(__file__, TDTestCase()) tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase()) tdCases.addLinux(__file__, TDTestCase())
...@@ -26,6 +26,7 @@ TS_TYPE_COL = [ TS_COL, ] ...@@ -26,6 +26,7 @@ TS_TYPE_COL = [ TS_COL, ]
ALL_COL = [ INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, BOOL_COL, BINARY_COL, NCHAR_COL, TS_COL ] ALL_COL = [ INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, BOOL_COL, BINARY_COL, NCHAR_COL, TS_COL ]
DBNAME = "db"
class TDTestCase: class TDTestCase:
def init(self, conn, logSql): def init(self, conn, logSql):
...@@ -133,13 +134,13 @@ class TDTestCase: ...@@ -133,13 +134,13 @@ class TDTestCase:
return f"select leastsquares({select_clause}, {start_val}, {step_val}) from {from_clause} {where_condition} {group_condition}" return f"select leastsquares({select_clause}, {start_val}, {step_val}) from {from_clause} {where_condition} {group_condition}"
@property @property
def __tb_list(self): def __tb_list(self, dbname=DBNAME):
return [ return [
"ct1", f"{dbname}.ct1",
"ct4", f"{dbname}.ct4",
"t1", f"{dbname}.nt1",
"ct2", f"{dbname}.ct2",
"stb1", f"{dbname}.stb1",
] ]
@property @property
...@@ -161,7 +162,8 @@ class TDTestCase: ...@@ -161,7 +162,8 @@ class TDTestCase:
err_sqls = [] err_sqls = []
__no_join_tblist = self.__tb_list __no_join_tblist = self.__tb_list
for tb in __no_join_tblist: for tb in __no_join_tblist:
select_claus_list = self.__query_condition(tb) tbname = tb.split(".")[-1]
select_claus_list = self.__query_condition(tbname)
for select_claus in select_claus_list: for select_claus in select_claus_list:
group_claus = self.__group_condition(col=select_claus) group_claus = self.__group_condition(col=select_claus)
where_claus = self.__where_condition(query_conditon=select_claus) where_claus = self.__where_condition(query_conditon=select_claus)
...@@ -207,25 +209,25 @@ class TDTestCase: ...@@ -207,25 +209,25 @@ class TDTestCase:
def __test_current(self): def __test_current(self):
# tdSql.query("explain select c1 from ct1") # tdSql.query("explain select c1 from {dbname}.ct1")
# tdSql.query("explain select 1 from ct2") # tdSql.query("explain select 1 from {dbname}.ct2")
# tdSql.query("explain select cast(ceil(c6) as bigint) from ct4 group by c6") # tdSql.query("explain select cast(ceil(c6) as bigint) from {dbname}.ct4 group by c6")
# tdSql.query("explain select count(c3) from ct4 group by c7 having count(c3) > 0") # tdSql.query("explain select count(c3) from {dbname}.ct4 group by c7 having count(c3) > 0")
# tdSql.query("explain select ct2.c3 from ct4 join ct2 on ct4.ts=ct2.ts") # tdSql.query("explain select ct2.c3 from {dbname}.ct4 join ct2 on ct4.ts=ct2.ts")
# tdSql.query("explain select c1 from stb1 where c1 is not null and c1 in (0, 1, 2) or c1 between 2 and 100 ") # tdSql.query("explain select c1 from stb1 where c1 is not null and c1 in (0, 1, 2) or c1 between 2 and 100 ")
self.leastsquares_check() self.leastsquares_check()
def __test_error(self): def __test_error(self, dbname=DBNAME):
tdLog.printNoPrefix("===step 0: err case, must return err") tdLog.printNoPrefix("===step 0: err case, must return err")
tdSql.error( "select leastsquares(c1) from ct8" ) tdSql.error( f"select leastsquares(c1) from {dbname}.ct8" )
tdSql.error( "select leastsquares(c1, 1) from ct1 " ) tdSql.error( f"select leastsquares(c1, 1) from {dbname}.ct1 " )
tdSql.error( "select leastsquares(c1, null, 1) from ct1 " ) tdSql.error( f"select leastsquares(c1, null, 1) from {dbname}.ct1 " )
tdSql.error( "select leastsquares(c1, 1, null) from ct1 " ) tdSql.error( f"select leastsquares(c1, 1, null) from {dbname}.ct1 " )
tdSql.error( "select leastsquares(null, 1, 1) from ct1 " ) tdSql.error( f"select leastsquares(null, 1, 1) from {dbname}.ct1 " )
tdSql.error( '''select leastsquares(['c1 + c1', 'c1 + c2', 'c1 + c3', 'c1 + c4', 'c1 + c5', 'c1 + c6', 'c1 + c7', 'c1 + c8', 'c1 + c9', 'c1 + c10']) tdSql.error( f'''select leastsquares(['c1 + c1', 'c1 + c2', 'c1 + c3', 'c1 + c4', 'c1 + c5', 'c1 + c6', 'c1 + c7', 'c1 + c8', 'c1 + c9', 'c1 + c10'])
from ct1 from {dbname}.ct1
where ['c1 + c1', 'c1 + c2', 'c1 + c3', 'c1 + c4', 'c1 + c5', 'c1 + c6', 'c1 + c7', 'c1 + c8', 'c1 + c9', 'c1 + c10'] is not null where ['c1 + c1', 'c1 + c2', 'c1 + c3', 'c1 + c4', 'c1 + c5', 'c1 + c6', 'c1 + c7', 'c1 + c8', 'c1 + c9', 'c1 + c10'] is not null
group by ['c1 + c1', 'c1 + c2', 'c1 + c3', 'c1 + c4', 'c1 + c5', 'c1 + c6', 'c1 + c7', 'c1 + c8', 'c1 + c9', 'c1 + c10'] group by ['c1 + c1', 'c1 + c2', 'c1 + c3', 'c1 + c4', 'c1 + c5', 'c1 + c6', 'c1 + c7', 'c1 + c8', 'c1 + c9', 'c1 + c10']
having ['c1 + c1', 'c1 + c2', 'c1 + c3', 'c1 + c4', 'c1 + c5', 'c1 + c6', 'c1 + c7', 'c1 + c8', 'c1 + c9', 'c1 + c10'] is not null ''' ) having ['c1 + c1', 'c1 + c2', 'c1 + c3', 'c1 + c4', 'c1 + c5', 'c1 + c6', 'c1 + c7', 'c1 + c8', 'c1 + c9', 'c1 + c10'] is not null ''' )
...@@ -234,16 +236,16 @@ class TDTestCase: ...@@ -234,16 +236,16 @@ class TDTestCase:
self.__test_error() self.__test_error()
self.__test_current() self.__test_current()
def __create_tb(self): def __create_tb(self, dbname=DBNAME):
tdLog.printNoPrefix("==========step1:create table") tdLog.printNoPrefix("==========step1:create table")
create_stb_sql = f'''create table stb1( create_stb_sql = f'''create table {dbname}.stb1(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool, {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp
) tags (t1 int) ) tags (t1 int)
''' '''
create_ntb_sql = f'''create table t1( create_ntb_sql = f'''create table {dbname}.nt1(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool, {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp
...@@ -253,30 +255,29 @@ class TDTestCase: ...@@ -253,30 +255,29 @@ class TDTestCase:
tdSql.execute(create_ntb_sql) tdSql.execute(create_ntb_sql)
for i in range(4): for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.stb1 tags ( {i+1} )')
{ i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2}
def __insert_data(self, rows): def __insert_data(self, rows, dbname=DBNAME):
now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000) now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
for i in range(rows): for i in range(rows):
tdSql.execute( tdSql.execute(
f"insert into ct1 values ( { now_time - i * 1000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )" f"insert into {dbname}.ct1 values ( { now_time - i * 1000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
) )
tdSql.execute( tdSql.execute(
f"insert into ct4 values ( { now_time - i * 7776000000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )" f"insert into {dbname}.ct4 values ( { now_time - i * 7776000000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
) )
tdSql.execute( tdSql.execute(
f"insert into ct2 values ( { now_time - i * 7776000000 }, {-i}, {-11111 * i}, {-111 * i % 32767 }, {-11 * i % 127}, {-1.11*i}, {-1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )" f"insert into {dbname}.ct2 values ( { now_time - i * 7776000000 }, {-i}, {-11111 * i}, {-111 * i % 32767 }, {-11 * i % 127}, {-1.11*i}, {-1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
) )
tdSql.execute( tdSql.execute(
f'''insert into ct1 values f'''insert into {dbname}.ct1 values
( { now_time - rows * 5 }, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar_测试_0', { now_time + 8 } ) ( { now_time - rows * 5 }, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar_测试_0', { now_time + 8 } )
( { now_time + 10000 }, { rows }, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar_测试_9', { now_time + 9 } ) ( { now_time + 10000 }, { rows }, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar_测试_9', { now_time + 9 } )
''' '''
) )
tdSql.execute( tdSql.execute(
f'''insert into ct4 values f'''insert into {dbname}.ct4 values
( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
...@@ -292,7 +293,7 @@ class TDTestCase: ...@@ -292,7 +293,7 @@ class TDTestCase:
) )
tdSql.execute( tdSql.execute(
f'''insert into ct2 values f'''insert into {dbname}.ct2 values
( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
...@@ -308,13 +309,13 @@ class TDTestCase: ...@@ -308,13 +309,13 @@ class TDTestCase:
) )
for i in range(rows): for i in range(rows):
insert_data = f'''insert into t1 values insert_data = f'''insert into {dbname}.nt1 values
( { now_time - i * 3600000 }, {i}, {i * 11111}, { i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2}, ( { now_time - i * 3600000 }, {i}, {i * 11111}, { i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2},
"binary_{i}", "nchar_测试_{i}", { now_time - 1000 * i } ) "binary_{i}", "nchar_测试_{i}", { now_time - 1000 * i } )
''' '''
tdSql.execute(insert_data) tdSql.execute(insert_data)
tdSql.execute( tdSql.execute(
f'''insert into t1 values f'''insert into {dbname}.nt1 values
( { now_time + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - (( rows // 2 ) * 60 + 30) * 60000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - (( rows // 2 ) * 60 + 30) * 60000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3600000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 3600000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
...@@ -332,7 +333,7 @@ class TDTestCase: ...@@ -332,7 +333,7 @@ class TDTestCase:
def run(self): def run(self):
tdSql.prepare() tdSql.prepare(DBNAME)
tdLog.printNoPrefix("==========step1:create table") tdLog.printNoPrefix("==========step1:create table")
self.__create_tb() self.__create_tb()
...@@ -344,10 +345,9 @@ class TDTestCase: ...@@ -344,10 +345,9 @@ class TDTestCase:
tdLog.printNoPrefix("==========step3:all check") tdLog.printNoPrefix("==========step3:all check")
self.all_test() self.all_test()
tdDnodes.stop(1) tdSql.execute(f"flush database {DBNAME}")
tdDnodes.start(1)
tdSql.execute("use db") tdSql.execute(f"use {DBNAME}")
tdLog.printNoPrefix("==========step4:after wal, all check again ") tdLog.printNoPrefix("==========step4:after wal, all check again ")
self.all_test() self.all_test()
......
...@@ -19,6 +19,7 @@ TS_COL = "c10" ...@@ -19,6 +19,7 @@ TS_COL = "c10"
UN_CHAR_COL = [INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, BOOL_COL, ] UN_CHAR_COL = [INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, BOOL_COL, ]
CHAR_COL = [ BINARY_COL, NCHAR_COL, ] CHAR_COL = [ BINARY_COL, NCHAR_COL, ]
TS_TYPE_COL = [TS_COL] TS_TYPE_COL = [TS_COL]
DBNAME = "db"
class TDTestCase: class TDTestCase:
...@@ -102,16 +103,16 @@ class TDTestCase: ...@@ -102,16 +103,16 @@ class TDTestCase:
return sqls return sqls
def __test_current(self): def __test_current(self, dbname=DBNAME):
tdLog.printNoPrefix("==========current sql condition check , must return query ok==========") tdLog.printNoPrefix("==========current sql condition check , must return query ok==========")
tbname = ["ct1", "ct2", "ct4", "t1", "stb1"] tbname = [f"{dbname}.ct1", f"{dbname}.ct2", f"{dbname}.ct4", f"{dbname}.nt1", f"{dbname}.stb1"]
for tb in tbname: for tb in tbname:
self.__length_current_check(tb) self.__length_current_check(tb)
tdLog.printNoPrefix(f"==========current sql condition check in {tb} over==========") tdLog.printNoPrefix(f"==========current sql condition check in {tb} over==========")
def __test_error(self): def __test_error(self, dbname=DBNAME):
tdLog.printNoPrefix("==========err sql condition check , must return error==========") tdLog.printNoPrefix("==========err sql condition check , must return error==========")
tbname = ["ct1", "ct2", "ct4", "t1", "stb1"] tbname = [f"{dbname}.ct1", f"{dbname}.ct2", f"{dbname}.ct4", f"{dbname}.nt1", f"{dbname}.stb1"]
for tb in tbname: for tb in tbname:
for errsql in self.__length_err_check(tb): for errsql in self.__length_err_check(tb):
...@@ -124,17 +125,16 @@ class TDTestCase: ...@@ -124,17 +125,16 @@ class TDTestCase:
self.__test_error() self.__test_error()
def __create_tb(self): def __create_tb(self, dbname=DBNAME):
tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table") tdLog.printNoPrefix("==========step1:create table")
create_stb_sql = f'''create table stb1( create_stb_sql = f'''create table {dbname}.stb1(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool, {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp
) tags (t1 int) ) tags (t1 int)
''' '''
create_ntb_sql = f'''create table t1( create_ntb_sql = f'''create table {dbname}.nt1(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool, {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp
...@@ -144,29 +144,29 @@ class TDTestCase: ...@@ -144,29 +144,29 @@ class TDTestCase:
tdSql.execute(create_ntb_sql) tdSql.execute(create_ntb_sql)
for i in range(4): for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.stb1 tags ( {i+1} )')
def __insert_data(self, rows): def __insert_data(self, rows, dbname=DBNAME):
now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000) now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
for i in range(rows): for i in range(rows):
tdSql.execute( tdSql.execute(
f"insert into ct1 values ( { now_time - i * 1000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar{i}', { now_time + 1 * i } )" f"insert into {dbname}.ct1 values ( { now_time - i * 1000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar{i}', { now_time + 1 * i } )"
) )
tdSql.execute( tdSql.execute(
f"insert into ct4 values ( { now_time - i * 7776000000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar{i}', { now_time + 1 * i } )" f"insert into {dbname}.ct4 values ( { now_time - i * 7776000000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar{i}', { now_time + 1 * i } )"
) )
tdSql.execute( tdSql.execute(
f"insert into ct2 values ( { now_time - i * 7776000000 }, {-i}, {-11111 * i}, {-111 * i % 32767 }, {-11 * i % 127}, {-1.11*i}, {-1100.0011*i}, {i%2}, 'binary{i}', 'nchar{i}', { now_time + 1 * i } )" f"insert into {dbname}.ct2 values ( { now_time - i * 7776000000 }, {-i}, {-11111 * i}, {-111 * i % 32767 }, {-11 * i % 127}, {-1.11*i}, {-1100.0011*i}, {i%2}, 'binary{i}', 'nchar{i}', { now_time + 1 * i } )"
) )
tdSql.execute( tdSql.execute(
f'''insert into ct1 values f'''insert into {dbname}.ct1 values
( { now_time - rows * 5 }, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', { now_time + 8 } ) ( { now_time - rows * 5 }, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', { now_time + 8 } )
( { now_time + 10000 }, { rows }, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', { now_time + 9 } ) ( { now_time + 10000 }, { rows }, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', { now_time + 9 } )
''' '''
) )
tdSql.execute( tdSql.execute(
f'''insert into ct4 values f'''insert into {dbname}.ct4 values
( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3888000000+ 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 3888000000+ 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
...@@ -182,7 +182,7 @@ class TDTestCase: ...@@ -182,7 +182,7 @@ class TDTestCase:
) )
tdSql.execute( tdSql.execute(
f'''insert into ct2 values f'''insert into {dbname}.ct2 values
( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3888000000+ 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 3888000000+ 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
...@@ -198,13 +198,13 @@ class TDTestCase: ...@@ -198,13 +198,13 @@ class TDTestCase:
) )
for i in range(rows): for i in range(rows):
insert_data = f'''insert into t1 values insert_data = f'''insert into {dbname}.nt1 values
( { now_time - i * 3600000 }, {i}, {i * 11111}, { i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2}, ( { now_time - i * 3600000 }, {i}, {i * 11111}, { i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2},
"binary_{i}", "nchar_{i}", { now_time - 1000 * i } ) "binary_{i}", "nchar_{i}", { now_time - 1000 * i } )
''' '''
tdSql.execute(insert_data) tdSql.execute(insert_data)
tdSql.execute( tdSql.execute(
f'''insert into t1 values f'''insert into {dbname}.nt1 values
( { now_time + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - (( rows // 2 ) * 60 + 30) * 60000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - (( rows // 2 ) * 60 + 30) * 60000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3600000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 3600000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
...@@ -233,8 +233,7 @@ class TDTestCase: ...@@ -233,8 +233,7 @@ class TDTestCase:
tdLog.printNoPrefix("==========step3:all check") tdLog.printNoPrefix("==========step3:all check")
self.all_test() self.all_test()
tdDnodes.stop(1) tdSql.execute("flush database db")
tdDnodes.start(1)
tdSql.execute("use db") tdSql.execute("use db")
......
此差异已折叠。
...@@ -110,15 +110,20 @@ python3 ./test.py -f 2-query/histogram.py ...@@ -110,15 +110,20 @@ python3 ./test.py -f 2-query/histogram.py
python3 ./test.py -f 2-query/histogram.py -R python3 ./test.py -f 2-query/histogram.py -R
python3 ./test.py -f 2-query/hyperloglog.py python3 ./test.py -f 2-query/hyperloglog.py
python3 ./test.py -f 2-query/hyperloglog.py -R python3 ./test.py -f 2-query/hyperloglog.py -R
python3 ./test.py -f 2-query/interp.py
python3 ./test.py -f 2-query/interp.py -R
python3 ./test.py -f 2-query/irate.py python3 ./test.py -f 2-query/irate.py
# python3 ./test.py -f 2-query/irate.py -R # python3 ./test.py -f 2-query/irate.py -R
python3 ./test.py -f 2-query/join.py python3 ./test.py -f 2-query/join.py
python3 ./test.py -f 2-query/join.py -R python3 ./test.py -f 2-query/join.py -R
python3 ./test.py -f 2-query/last_row.py
python3 ./test.py -f 2-query/interp.py python3 ./test.py -f 2-query/last_row.py -R
python3 ./test.py -f 2-query/interp.py -R python3 ./test.py -f 2-query/last.py
python3 ./test.py -f 2-query/last.py -R
python3 ./test.py -f 2-query/leastsquares.py
python3 ./test.py -f 2-query/leastsquares.py -R
python3 ./test.py -f 2-query/length.py
python3 ./test.py -f 2-query/length.py -R
python3 ./test.py -f 1-insert/update_data.py python3 ./test.py -f 1-insert/update_data.py
...@@ -127,7 +132,6 @@ python3 ./test.py -f 1-insert/delete_data.py ...@@ -127,7 +132,6 @@ python3 ./test.py -f 1-insert/delete_data.py
python3 ./test.py -f 2-query/varchar.py python3 ./test.py -f 2-query/varchar.py
python3 ./test.py -f 2-query/ltrim.py python3 ./test.py -f 2-query/ltrim.py
python3 ./test.py -f 2-query/rtrim.py python3 ./test.py -f 2-query/rtrim.py
python3 ./test.py -f 2-query/length.py
python3 ./test.py -f 2-query/upper.py python3 ./test.py -f 2-query/upper.py
python3 ./test.py -f 2-query/lower.py python3 ./test.py -f 2-query/lower.py
python3 ./test.py -f 2-query/join2.py python3 ./test.py -f 2-query/join2.py
...@@ -136,7 +140,6 @@ python3 ./test.py -f 2-query/union.py ...@@ -136,7 +140,6 @@ python3 ./test.py -f 2-query/union.py
python3 ./test.py -f 2-query/union1.py python3 ./test.py -f 2-query/union1.py
python3 ./test.py -f 2-query/concat2.py python3 ./test.py -f 2-query/concat2.py
python3 ./test.py -f 2-query/spread.py python3 ./test.py -f 2-query/spread.py
python3 ./test.py -f 2-query/leastsquares.py
python3 ./test.py -f 2-query/timezone.py python3 ./test.py -f 2-query/timezone.py
...@@ -144,7 +147,6 @@ python3 ./test.py -f 2-query/Now.py ...@@ -144,7 +147,6 @@ python3 ./test.py -f 2-query/Now.py
python3 ./test.py -f 2-query/Today.py python3 ./test.py -f 2-query/Today.py
python3 ./test.py -f 2-query/max.py python3 ./test.py -f 2-query/max.py
python3 ./test.py -f 2-query/min.py python3 ./test.py -f 2-query/min.py
python3 ./test.py -f 2-query/last.py
python3 ./test.py -f 2-query/To_iso8601.py python3 ./test.py -f 2-query/To_iso8601.py
python3 ./test.py -f 2-query/To_unixtimestamp.py python3 ./test.py -f 2-query/To_unixtimestamp.py
python3 ./test.py -f 2-query/timetruncate.py python3 ./test.py -f 2-query/timetruncate.py
...@@ -178,7 +180,6 @@ python3 ./test.py -f 2-query/ttl_comment.py ...@@ -178,7 +180,6 @@ python3 ./test.py -f 2-query/ttl_comment.py
python3 ./test.py -f 2-query/twa.py python3 ./test.py -f 2-query/twa.py
python3 ./test.py -f 2-query/queryQnode.py python3 ./test.py -f 2-query/queryQnode.py
python3 ./test.py -f 2-query/max_partition.py python3 ./test.py -f 2-query/max_partition.py
python3 ./test.py -f 2-query/last_row.py
python3 ./test.py -f 2-query/tsbsQuery.py python3 ./test.py -f 2-query/tsbsQuery.py
python3 ./test.py -f 6-cluster/5dnode1mnode.py python3 ./test.py -f 6-cluster/5dnode1mnode.py
......
...@@ -194,7 +194,7 @@ if __name__ == "__main__": ...@@ -194,7 +194,7 @@ if __name__ == "__main__":
processID = subprocess.check_output(psCmd, shell=True) processID = subprocess.check_output(psCmd, shell=True)
for port in range(6030, 6041): for port in range(6030, 6041):
usePortPID = "lsof -i tcp:%d | grep LISTEn | awk '{print $2}'" % port usePortPID = "lsof -i tcp:%d | grep LISTEN | awk '{print $2}'" % port
processID = subprocess.check_output(usePortPID, shell=True) processID = subprocess.check_output(usePortPID, shell=True)
if processID: if processID:
...@@ -206,11 +206,13 @@ if __name__ == "__main__": ...@@ -206,11 +206,13 @@ if __name__ == "__main__":
time.sleep(2) time.sleep(2)
if restful: if restful:
toBeKilled = "taosadapter" toBeKilled = "taosadapt"
killCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}' | xargs kill -TERM > /dev/null 2>&1" % toBeKilled # killCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}' | xargs kill -TERM > /dev/null 2>&1" % toBeKilled
killCmd = f"pkill {toBeKilled}"
psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled
# psCmd = f"pgrep {toBeKilled}"
processID = subprocess.check_output(psCmd, shell=True) processID = subprocess.check_output(psCmd, shell=True)
while(processID): while(processID):
...@@ -218,14 +220,14 @@ if __name__ == "__main__": ...@@ -218,14 +220,14 @@ if __name__ == "__main__":
time.sleep(1) time.sleep(1)
processID = subprocess.check_output(psCmd, shell=True) processID = subprocess.check_output(psCmd, shell=True)
for port in range(6030, 6041): port = 6041
usePortPID = "lsof -i tcp:%d | grep LISTEn | awk '{print $2}'" % port usePortPID = f"lsof -i tcp:{port} | grep LISTEN | awk '{{print $2}}'"
processID = subprocess.check_output(usePortPID, shell=True) processID = subprocess.check_output(usePortPID, shell=True)
if processID: if processID:
killCmd = "kill -TERM %s" % processID killCmd = f"kill -TERM {processID}"
os.system(killCmd) os.system(killCmd)
fuserCmd = "fuser -k -n tcp %d" % port fuserCmd = f"fuser -k -n tcp {port}"
os.system(fuserCmd) os.system(fuserCmd)
tdLog.info('stop taosadapter') tdLog.info('stop taosadapter')
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册