提交 89270400 编写于 作者: S slzhou@taodata.com

Merge branch '3.0' of github.com:taosdata/TDengine into szhou/fix/udf

...@@ -111,7 +111,7 @@ TDengine REST API 详情请参考[官方文档](/reference/rest-api/)。 ...@@ -111,7 +111,7 @@ TDengine REST API 详情请参考[官方文档](/reference/rest-api/)。
这条命令很快完成 1 亿条记录的插入。具体时间取决于硬件性能。 这条命令很快完成 1 亿条记录的插入。具体时间取决于硬件性能。
taosBenchmark 命令本身带有很多选项,配置表的数目、记录条数等等,您可以设置不同参数进行体验,请执行 `taosBenchmark --help` 详细列出。taosBenchmark 详细使用方法请参照 [taosBenchmark 参考手册](../reference/taosbenchmark) taosBenchmark 命令本身带有很多选项,配置表的数目、记录条数等等,您可以设置不同参数进行体验,请执行 `taosBenchmark --help` 详细列出。taosBenchmark 详细使用方法请参照 [taosBenchmark 参考手册](../../reference/taosbenchmark)
## 体验查询 ## 体验查询
......
...@@ -245,7 +245,7 @@ select * from t; ...@@ -245,7 +245,7 @@ select * from t;
Query OK, 2 row(s) in set (0.003128s) Query OK, 2 row(s) in set (0.003128s)
``` ```
除执行 SQL 语句外,系统管理员还可以从 TDengine CLI 进行检查系统运行状态、添加删除用户账号等操作。TDengine CLI 连同应用驱动也可以独立安装在 Linux 或 Windows 机器上运行,更多细节请参考 [这里](../reference/taos-shell/) 除执行 SQL 语句外,系统管理员还可以从 TDengine CLI 进行检查系统运行状态、添加删除用户账号等操作。TDengine CLI 连同应用驱动也可以独立安装在 Linux 或 Windows 机器上运行,更多细节请参考 [这里](../../reference/taos-shell/)
## 使用 taosBenchmark 体验写入速度 ## 使用 taosBenchmark 体验写入速度
......
...@@ -8,17 +8,17 @@ description: "创建、删除数据库,查看、修改数据库参数" ...@@ -8,17 +8,17 @@ description: "创建、删除数据库,查看、修改数据库参数"
```sql ```sql
CREATE DATABASE [IF NOT EXISTS] db_name [database_options] CREATE DATABASE [IF NOT EXISTS] db_name [database_options]
database_options: database_options:
database_option ... database_option ...
database_option: { database_option: {
BUFFER value BUFFER value
| CACHEMODEL {'none' | 'last_row' | 'last_value' | 'both'} | CACHEMODEL {'none' | 'last_row' | 'last_value' | 'both'}
| CACHESIZE value | CACHESIZE value
| COMP {0 | 1 | 2} | COMP {0 | 1 | 2}
| DURATION value | DURATION value
| FSYNC value | WAL_FSYNC_PERIOD value
| MAXROWS value | MAXROWS value
| MINROWS value | MINROWS value
| KEEP value | KEEP value
...@@ -28,7 +28,7 @@ database_option: { ...@@ -28,7 +28,7 @@ database_option: {
| REPLICA value | REPLICA value
| RETENTIONS ingestion_duration:keep_duration ... | RETENTIONS ingestion_duration:keep_duration ...
| STRICT {'off' | 'on'} | STRICT {'off' | 'on'}
| WAL {1 | 2} | WAL_LEVEL {1 | 2}
| VGROUPS value | VGROUPS value
| SINGLE_STABLE {0 | 1} | SINGLE_STABLE {0 | 1}
| WAL_RETENTION_PERIOD value | WAL_RETENTION_PERIOD value
......
...@@ -218,7 +218,7 @@ GROUP BY 子句中的表达式可以包含表或视图中的任何列,这些 ...@@ -218,7 +218,7 @@ GROUP BY 子句中的表达式可以包含表或视图中的任何列,这些
PARTITION BY 子句是 TDengine 特色语法,按 part_list 对数据进行切分,在每个切分的分片中进行计算。 PARTITION BY 子句是 TDengine 特色语法,按 part_list 对数据进行切分,在每个切分的分片中进行计算。
详见 [TDengine 特色查询](./distinguished) 详见 [TDengine 特色查询](../distinguished)
## ORDER BY ## ORDER BY
......
此差异已折叠。
...@@ -40,7 +40,6 @@ ALTER ALL DNODES dnode_option ...@@ -40,7 +40,6 @@ ALTER ALL DNODES dnode_option
dnode_option: { dnode_option: {
'resetLog' 'resetLog'
| 'resetQueryCache'
| 'balance' value | 'balance' value
| 'monitor' value | 'monitor' value
| 'debugFlag' value | 'debugFlag' value
......
...@@ -19,7 +19,7 @@ library_path:包含UDF函数实现的动态链接库的绝对路径,是在 ...@@ -19,7 +19,7 @@ library_path:包含UDF函数实现的动态链接库的绝对路径,是在
OUTPUTTYPE:标识此函数的返回类型。 OUTPUTTYPE:标识此函数的返回类型。
BUFSIZE:中间结果的缓冲区大小,单位是字节。不设置则默认为0。最大不可超过512字节。 BUFSIZE:中间结果的缓冲区大小,单位是字节。不设置则默认为0。最大不可超过512字节。
关于如何开发自定义函数,请参考 [UDF使用说明](../develop/udf) 关于如何开发自定义函数,请参考 [UDF使用说明](../../develop/udf)
## 删除自定义函数 ## 删除自定义函数
......
...@@ -10,7 +10,7 @@ import TabItem from "@theme/TabItem"; ...@@ -10,7 +10,7 @@ import TabItem from "@theme/TabItem";
## 安装 ## 安装
关于安装,请参考 [使用安装包立即开始](../get-started/package) 关于安装,请参考 [使用安装包立即开始](../../get-started/package)
......
...@@ -7,7 +7,7 @@ title: 容量规划 ...@@ -7,7 +7,7 @@ title: 容量规划
## 服务端内存需求 ## 服务端内存需求
每个 Database 可以创建固定数目的 vgroup,默认 2 个 vgroup,在创建数据库时可以通过`vgroups <num>`参数来指定,其副本数由参数`replica <num>`指定。vgroup 中的每个副本会是一个 vnode;所以每个 vnode 占用的内存由以下几个参数决定: 每个 Database 可以创建固定数目的 vgroup,默认 2 个 vgroup,在创建数据库时可以通过`vgroups <num>`参数来指定,其副本数由参数`replica <num>`指定。vgroup 中的每个副本会是一个 vnode;所以每个数据库占用的内存由以下几个参数决定:
- vgroups - vgroups
- replica - replica
...@@ -16,12 +16,15 @@ title: 容量规划 ...@@ -16,12 +16,15 @@ title: 容量规划
- pagesize - pagesize
- cachesize - cachesize
关于这些参数的详细说明请参考 [数据库管理](../taos-sql/database)。 关于这些参数的详细说明请参考 [数据库管理](../../taos-sql/database)。
所以一个数据库所需要的内存大小等于 `vgroups * replica * (buffer + pages * pagesize + cachesize)`。 一个数据库所需要的内存大小等于
但要注意的是这些内存并不需要由单一服务器提供,而是由整个集群中所有数据节点共同负担,相当于由这些数据节点所在的服务器共同负担。如果集群中有不止一个数据库,则所需内存要累加,并由集群中所有服务器共同负担。更复杂的场景是如果集群中的数据节点并非在最初就一次性全部建立,而是随着使用中系统负载的增加服务器并增加数据节点,则新创建的数据库会导致新旧数据节点上的负载并不均衡,此时简单的理论计算并不能直接地简单使用,要结合各数据节点的负载情况。 ```
vgroups * replica * (buffer + pages * pagesize + cachesize)
```
但要注意的是这些内存并不需要由单一服务器提供,而是由整个集群中所有数据节点共同负担,相当于由这些数据节点所在的服务器共同负担。如果集群中有不止一个数据库,则所需内存要累加。更复杂的场景是如果集群中的数据节点并非在最初就一次性全部建立,而是随着使用中系统负载的增加逐步增加服务器并增加数据节点,则新创建的数据库会导致新旧数据节点上的负载并不均衡,此时简单的理论计算并不能直接使用,要结合各数据节点的负载情况。
## 客户端内存需求 ## 客户端内存需求
...@@ -48,7 +51,7 @@ CPU 的需求取决于如下两方面: ...@@ -48,7 +51,7 @@ CPU 的需求取决于如下两方面:
- **数据插入** TDengine 单核每秒能至少处理一万个插入请求。每个插入请求可以带多条记录,一次插入一条记录与插入 10 条记录,消耗的计算资源差别很小。因此每次插入,条数越大,插入效率越高。如果一个插入请求带 200 条以上记录,单核就能达到每秒插入 100 万条记录的速度。但对前端数据采集的要求越高,因为需要缓存记录,然后一批插入。 - **数据插入** TDengine 单核每秒能至少处理一万个插入请求。每个插入请求可以带多条记录,一次插入一条记录与插入 10 条记录,消耗的计算资源差别很小。因此每次插入,条数越大,插入效率越高。如果一个插入请求带 200 条以上记录,单核就能达到每秒插入 100 万条记录的速度。但对前端数据采集的要求越高,因为需要缓存记录,然后一批插入。
- **查询需求** TDengine 提供高效的查询,但是每个场景的查询差异很大,查询频次变化也很大,难以给出客观数字。需要用户针对自己的场景,写一些查询语句,才能确定。 - **查询需求** TDengine 提供高效的查询,但是每个场景的查询差异很大,查询频次变化也很大,难以给出客观数字。需要用户针对自己的场景,写一些查询语句,才能确定。
因此仅对数据插入而言,CPU 是可以估算出来的,但查询所耗的计算资源无法估算。在实际运过程中,不建议 CPU 使用率超过 50%,超过后,需要增加新的节点,以获得更多计算资源。 因此仅对数据插入而言,CPU 是可以估算出来的,但查询所耗的计算资源无法估算。在实际运过程中,不建议 CPU 使用率超过 50%,超过后,需要增加新的节点,以获得更多计算资源。
## 存储需求 ## 存储需求
......
...@@ -4,16 +4,16 @@ title: 容错和灾备 ...@@ -4,16 +4,16 @@ title: 容错和灾备
## 容错 ## 容错
TDengine 支持**WAL**(Write Ahead Log)机制,实现数据的容错能力,保证数据的高可用。 TDengine 支持 **WAL**(Write Ahead Log)机制,实现数据的容错能力,保证数据的高可用。
TDengine 接收到应用的请求数据包时,先将请求的原始数据包写入数据库日志文件,等数据成功写入数据库数据文件后,再删除相应的 WAL。这样保证了 TDengine 能够在断电等因素导致的服务重启时从数据库日志文件中恢复数据,避免数据的丢失。 TDengine 接收到应用的请求数据包时,先将请求的原始数据包写入数据库日志文件,等数据成功写入数据库数据文件后,再删除相应的 WAL。这样保证了 TDengine 能够在断电等因素导致的服务重启时从数据库日志文件中恢复数据,避免数据的丢失。
涉及的系统配置参数有两个: 涉及的系统配置参数有两个:
- wal_level:WAL 级别,1:写WAL,但不执行fsync。2:写WAL,而且执行fsync。默认值为 1。 - wal_level:WAL 级别,1:写 WAL,但不执行 fsync。2:写 WAL,而且执行 fsync。默认值为 1。
- wal_fsync_period:当 wal_evel 设置为 2 时,执行 fsync 的周期。设置为 0,表示每次写入,立即执行 fsync。 - wal_fsync_period:当 wal_level 设置为 2 时,执行 fsync 的周期。设置为 0,表示每次写入,立即执行 fsync。
如果要 100%的保证数据不丢失,需要将 wal_level 设置为 2,fsync 设置为 0。这时写入速度将会下降。但如果应用侧启动的写数据的线程数达到一定的数量(超过 50),那么写入数据的性能也会很不错,只会比 fsync 设置为 3000 毫秒下降 30%左右。 如果要 100%的保证数据不丢失,需要将 wal_level 设置为 2,wal_fsync_period 设置为 0。这时写入速度将会下降。但如果应用侧启动的写数据的线程数达到一定的数量(超过 50),那么写入数据的性能也会很不错,只会比 wal_fsync_period 设置为 3000 毫秒下降 30%左右。
## 灾备 ## 灾备
...@@ -27,4 +27,4 @@ TDengine 集群的节点数必须大于等于副本数,否则创建表时将 ...@@ -27,4 +27,4 @@ TDengine 集群的节点数必须大于等于副本数,否则创建表时将
当 TDengine 集群中的节点部署在不同的物理机上,并设置多个副本数时,就实现了系统的高可靠性,无需再使用其他软件或工具。TDengine 企业版还可以将副本部署在不同机房,从而实现异地容灾。 当 TDengine 集群中的节点部署在不同的物理机上,并设置多个副本数时,就实现了系统的高可靠性,无需再使用其他软件或工具。TDengine 企业版还可以将副本部署在不同机房,从而实现异地容灾。
另外一种灾备方式是通过 `taosX` 将一个 TDengine 集群的数据同步复制到物理上位于不同数据中心的另一个 TDengine 集群。其详细使用方法请参考 [taosX 参考手册](../reference/taosX) 另外一种灾备方式是通过 `taosX` 将一个 TDengine 集群的数据同步复制到物理上位于不同数据中心的另一个 TDengine 集群。其详细使用方法请参考 [taosX 参考手册](../../reference/taosX)
...@@ -102,11 +102,6 @@ extern int32_t tsQuerySmaOptimize; ...@@ -102,11 +102,6 @@ extern int32_t tsQuerySmaOptimize;
// client // client
extern int32_t tsMinSlidingTime; extern int32_t tsMinSlidingTime;
extern int32_t tsMinIntervalTime; extern int32_t tsMinIntervalTime;
extern int32_t tsMaxStreamComputDelay;
extern int32_t tsStreamCompStartDelay;
extern int32_t tsRetryStreamCompDelay;
extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window
extern int64_t tsMaxRetentWindow;
// build info // build info
extern char version[]; extern char version[];
......
...@@ -77,11 +77,11 @@ typedef struct { ...@@ -77,11 +77,11 @@ typedef struct {
} SWalSyncInfo; } SWalSyncInfo;
typedef struct { typedef struct {
int8_t protoVer;
int64_t version; int64_t version;
int16_t msgType; int64_t ingestTs;
int32_t bodyLen; int32_t bodyLen;
int64_t ingestTs; // not implemented int16_t msgType;
int8_t protoVer;
// sync meta // sync meta
SWalSyncInfo syncMeta; SWalSyncInfo syncMeta;
......
...@@ -118,20 +118,6 @@ int32_t tsMaxNumOfDistinctResults = 1000 * 10000; ...@@ -118,20 +118,6 @@ int32_t tsMaxNumOfDistinctResults = 1000 * 10000;
// 1 database precision unit for interval time range, changed accordingly // 1 database precision unit for interval time range, changed accordingly
int32_t tsMinIntervalTime = 1; int32_t tsMinIntervalTime = 1;
// 20sec, the maximum value of stream computing delay, changed accordingly
int32_t tsMaxStreamComputDelay = 20000;
// 10sec, the first stream computing delay time after system launched successfully, changed accordingly
int32_t tsStreamCompStartDelay = 10000;
// the stream computing delay time after executing failed, change accordingly
int32_t tsRetryStreamCompDelay = 10 * 1000;
// The delayed computing ration. 10% of the whole computing time window by default.
float tsStreamComputDelayRatio = 0.1f;
int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
// the maximum allowed query buffer size during query processing for each data node. // the maximum allowed query buffer size during query processing for each data node.
// -1 no limit (default) // -1 no limit (default)
// 0 no query allowed, queries are disabled // 0 no query allowed, queries are disabled
...@@ -330,7 +316,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { ...@@ -330,7 +316,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "fqdn", defaultFqdn, 1) != 0) return -1; if (cfgAddString(pCfg, "fqdn", defaultFqdn, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "serverPort", defaultServerPort, 1, 65056, 1) != 0) return -1; if (cfgAddInt32(pCfg, "serverPort", defaultServerPort, 1, 65056, 1) != 0) return -1;
if (cfgAddDir(pCfg, "tempDir", tsTempDir, 1) != 0) return -1; if (cfgAddDir(pCfg, "tempDir", tsTempDir, 1) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalTempDirGB", 1.0f, 0.001f, 10000000, 1) != 0) return -1; if (cfgAddFloat(pCfg, "minimalTmpDirGB", 1.0f, 0.001f, 10000000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "shellActivityTimer", tsShellActivityTimer, 1, 120, 1) != 0) return -1; if (cfgAddInt32(pCfg, "shellActivityTimer", tsShellActivityTimer, 1, 120, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "compressMsgSize", tsCompressMsgSize, -1, 100000000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "compressMsgSize", tsCompressMsgSize, -1, 100000000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1;
...@@ -383,10 +369,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -383,10 +369,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "countAlwaysReturnValue", tsCountAlwaysReturnValue, 0, 1, 0) != 0) return -1; if (cfgAddInt32(pCfg, "countAlwaysReturnValue", tsCountAlwaysReturnValue, 0, 1, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxStreamCompDelay", tsMaxStreamComputDelay, 10, 1000000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxFirstStreamCompDelay", tsStreamCompStartDelay, 1000, 1000000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "retryStreamCompDelay", tsRetryStreamCompDelay, 10, 1000000000, 0) != 0) return -1;
if (cfgAddFloat(pCfg, "streamCompDelayRatio", tsStreamComputDelayRatio, 0.1, 0.9, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "retrieveBlockingModel", tsRetrieveBlockingModel, 0) != 0) return -1; if (cfgAddBool(pCfg, "retrieveBlockingModel", tsRetrieveBlockingModel, 0) != 0) return -1;
if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1; if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1;
...@@ -532,7 +514,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { ...@@ -532,7 +514,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tstrncpy(tsTempDir, cfgGetItem(pCfg, "tempDir")->str, PATH_MAX); tstrncpy(tsTempDir, cfgGetItem(pCfg, "tempDir")->str, PATH_MAX);
taosExpandDir(tsTempDir, tsTempDir, PATH_MAX); taosExpandDir(tsTempDir, tsTempDir, PATH_MAX);
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTempDirGB")->fval; tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTmpDirGB")->fval;
if (taosMulMkDir(tsTempDir) != 0) { if (taosMulMkDir(tsTempDir) != 0) {
uError("failed to create tempDir:%s since %s", tsTempDir, terrstr()); uError("failed to create tempDir:%s since %s", tsTempDir, terrstr());
return -1; return -1;
...@@ -579,10 +561,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -579,10 +561,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32; tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32;
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32; tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
tsCountAlwaysReturnValue = cfgGetItem(pCfg, "countAlwaysReturnValue")->i32; tsCountAlwaysReturnValue = cfgGetItem(pCfg, "countAlwaysReturnValue")->i32;
tsMaxStreamComputDelay = cfgGetItem(pCfg, "maxStreamCompDelay")->i32;
tsStreamCompStartDelay = cfgGetItem(pCfg, "maxFirstStreamCompDelay")->i32;
tsRetryStreamCompDelay = cfgGetItem(pCfg, "retryStreamCompDelay")->i32;
tsStreamComputDelayRatio = cfgGetItem(pCfg, "streamCompDelayRatio")->fval;
tsQueryBufferSize = cfgGetItem(pCfg, "queryBufferSize")->i32; tsQueryBufferSize = cfgGetItem(pCfg, "queryBufferSize")->i32;
tsRetrieveBlockingModel = cfgGetItem(pCfg, "retrieveBlockingModel")->bval; tsRetrieveBlockingModel = cfgGetItem(pCfg, "retrieveBlockingModel")->bval;
tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval; tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval;
...@@ -758,10 +736,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { ...@@ -758,10 +736,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsMaxShellConns = cfgGetItem(pCfg, "maxShellConns")->i32; tsMaxShellConns = cfgGetItem(pCfg, "maxShellConns")->i32;
} else if (strcasecmp("maxNumOfDistinctRes", name) == 0) { } else if (strcasecmp("maxNumOfDistinctRes", name) == 0) {
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32; tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
} else if (strcasecmp("maxStreamCompDelay", name) == 0) {
tsMaxStreamComputDelay = cfgGetItem(pCfg, "maxStreamCompDelay")->i32;
} else if (strcasecmp("maxFirstStreamCompDelay", name) == 0) {
tsStreamCompStartDelay = cfgGetItem(pCfg, "maxFirstStreamCompDelay")->i32;
} }
break; break;
} }
...@@ -772,8 +746,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { ...@@ -772,8 +746,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
break; break;
} }
case 'i': { case 'i': {
if (strcasecmp("minimalTempDirGB", name) == 0) { if (strcasecmp("minimalTmpDirGB", name) == 0) {
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTempDirGB")->fval; tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTmpDirGB")->fval;
} else if (strcasecmp("minimalDataDirGB", name) == 0) { } else if (strcasecmp("minimalDataDirGB", name) == 0) {
tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval; tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval;
} else if (strcasecmp("minSlidingTime", name) == 0) { } else if (strcasecmp("minSlidingTime", name) == 0) {
...@@ -883,9 +857,7 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { ...@@ -883,9 +857,7 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
break; break;
} }
case 'r': { case 'r': {
if (strcasecmp("retryStreamCompDelay", name) == 0) { if (strcasecmp("retrieveBlockingModel", name) == 0) {
tsRetryStreamCompDelay = cfgGetItem(pCfg, "retryStreamCompDelay")->i32;
} else if (strcasecmp("retrieveBlockingModel", name) == 0) {
tsRetrieveBlockingModel = cfgGetItem(pCfg, "retrieveBlockingModel")->bval; tsRetrieveBlockingModel = cfgGetItem(pCfg, "retrieveBlockingModel")->bval;
} else if (strcasecmp("rpcQueueMemoryAllowed", name) == 0) { } else if (strcasecmp("rpcQueueMemoryAllowed", name) == 0) {
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64; tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
...@@ -913,8 +885,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { ...@@ -913,8 +885,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsNumOfSupportVnodes = cfgGetItem(pCfg, "supportVnodes")->i32; tsNumOfSupportVnodes = cfgGetItem(pCfg, "supportVnodes")->i32;
} else if (strcasecmp("statusInterval", name) == 0) { } else if (strcasecmp("statusInterval", name) == 0) {
tsStatusInterval = cfgGetItem(pCfg, "statusInterval")->i32; tsStatusInterval = cfgGetItem(pCfg, "statusInterval")->i32;
} else if (strcasecmp("streamCompDelayRatio", name) == 0) {
tsStreamComputDelayRatio = cfgGetItem(pCfg, "streamCompDelayRatio")->fval;
} else if (strcasecmp("slaveQuery", name) == 0) { } else if (strcasecmp("slaveQuery", name) == 0) {
tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval; tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval;
} else if (strcasecmp("snodeShmSize", name) == 0) { } else if (strcasecmp("snodeShmSize", name) == 0) {
......
...@@ -805,14 +805,6 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { ...@@ -805,14 +805,6 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
return -1; return -1;
} }
SDnodeObj *pDnode = mndAcquireDnode(pMnode, cfgReq.dnodeId);
if (pDnode == NULL) {
mError("dnode:%d, failed to config since %s ", cfgReq.dnodeId, terrstr());
return -1;
}
SEpSet epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
SDCfgDnodeReq dcfgReq = {0}; SDCfgDnodeReq dcfgReq = {0};
if (strcasecmp(cfgReq.config, "resetlog") == 0) { if (strcasecmp(cfgReq.config, "resetlog") == 0) {
strcpy(dcfgReq.config, "resetlog"); strcpy(dcfgReq.config, "resetlog");
...@@ -860,16 +852,36 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { ...@@ -860,16 +852,36 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
} }
} }
int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, &dcfgReq); int32_t code = -1;
void *pBuf = rpcMallocCont(bufLen); SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SDnodeObj *pDnode = NULL;
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
if (pIter == NULL) break;
if (pBuf == NULL) return -1; if (pDnode->id == cfgReq.dnodeId || cfgReq.dnodeId == -1 || cfgReq.dnodeId == 0) {
tSerializeSDCfgDnodeReq(pBuf, bufLen, &dcfgReq); SEpSet epSet = mndGetDnodeEpset(pDnode);
int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, &dcfgReq);
void *pBuf = rpcMallocCont(bufLen);
if (pBuf != NULL) {
tSerializeSDCfgDnodeReq(pBuf, bufLen, &dcfgReq);
mInfo("dnode:%d, send config req to dnode, app:%p config:%s value:%s", cfgReq.dnodeId, pReq->info.ahandle,
dcfgReq.config, dcfgReq.value);
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen};
tmsgSendReq(&epSet, &rpcMsg);
code = 0;
}
}
mInfo("dnode:%d, send config req to dnode, app:%p config:%s value:%s", cfgReq.dnodeId, pReq->info.ahandle, sdbRelease(pSdb, pDnode);
dcfgReq.config, dcfgReq.value); }
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen};
return tmsgSendReq(&epSet, &rpcMsg); if (code == -1) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
}
return code;
} }
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) { static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) {
......
...@@ -1287,6 +1287,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { ...@@ -1287,6 +1287,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
mDebug("trans:%d, stage keep on redoAction since %s", pTrans->id, tstrerror(code)); mDebug("trans:%d, stage keep on redoAction since %s", pTrans->id, tstrerror(code));
continueExec = false; continueExec = false;
} else { } else {
pTrans->failedTimes++;
pTrans->code = terrno; pTrans->code = terrno;
if (pTrans->policy == TRN_POLICY_ROLLBACK) { if (pTrans->policy == TRN_POLICY_ROLLBACK) {
if (pTrans->lastAction != 0) { if (pTrans->lastAction != 0) {
...@@ -1306,7 +1307,6 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { ...@@ -1306,7 +1307,6 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
mError("trans:%d, stage from redoAction to rollback since %s", pTrans->id, terrstr()); mError("trans:%d, stage from redoAction to rollback since %s", pTrans->id, terrstr());
continueExec = true; continueExec = true;
} else { } else {
pTrans->failedTimes++;
mError("trans:%d, stage keep on redoAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes); mError("trans:%d, stage keep on redoAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
continueExec = false; continueExec = false;
} }
......
...@@ -281,8 +281,8 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { ...@@ -281,8 +281,8 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
const STraceId *trace = &pMsg->info.traceId; const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p", vgId, pMsg, TMSG_INFO(pMsg->msgType), vGInfo("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%ld", vgId, pMsg,
pMsg->info.handle); TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
if (rsp.code == 0) { if (rsp.code == 0) {
...@@ -503,9 +503,6 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon ...@@ -503,9 +503,6 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
if (cbMeta.isWeak == 0) { if (cbMeta.isWeak == 0) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
if (cbMeta.code == 0) { if (cbMeta.code == 0) {
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
...@@ -514,11 +511,17 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c ...@@ -514,11 +511,17 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index; rpcMsg.info.conn.applyIndex = cbMeta.index;
rpcMsg.info.conn.applyTerm = cbMeta.term; rpcMsg.info.conn.applyTerm = cbMeta.term;
vInfo("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.term, rpcMsg.info.conn.applyIndex, cbMeta.isWeak,
cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else { } else {
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info}; SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
vError("vgId:%d, sync commit error, msgtype:%d,%s, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync), vError("vgId:%d, sync commit error, msgtype:%d,%s, index:%ld, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync),
pMsg->msgType, TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code)); pMsg->msgType, TMSG_INFO(pMsg->msgType), cbMeta.index, cbMeta.code, tstrerror(cbMeta.code));
if (rsp.info.handle != NULL) { if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
......
...@@ -612,8 +612,7 @@ int32_t sumFunction(SqlFunctionCtx* pCtx) { ...@@ -612,8 +612,7 @@ int32_t sumFunction(SqlFunctionCtx* pCtx) {
SSumRes* pSumRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SSumRes* pSumRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
if (IS_NULL_TYPE(type)) { if (IS_NULL_TYPE(type)) {
GET_RES_INFO(pCtx)->isNullRes = 1; numOfElem = 0;
numOfElem = 1;
goto _sum_over; goto _sum_over;
} }
...@@ -1172,8 +1171,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { ...@@ -1172,8 +1171,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
SMinmaxResInfo* pBuf = GET_ROWCELL_INTERBUF(pResInfo); SMinmaxResInfo* pBuf = GET_ROWCELL_INTERBUF(pResInfo);
if (IS_NULL_TYPE(type)) { if (IS_NULL_TYPE(type)) {
GET_RES_INFO(pCtx)->isNullRes = 1; numOfElems = 0;
numOfElems = 1;
goto _min_max_over; goto _min_max_over;
} }
......
...@@ -579,11 +579,13 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou ...@@ -579,11 +579,13 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou
if (ctx->noExec == false) { if (ctx->noExec == false) {
for (int32_t m = 0; m < node->pParameterList->length; m++) { for (int32_t m = 0; m < node->pParameterList->length; m++) {
// add impl later
if (node->condType == LOGIC_COND_TYPE_AND) { if (node->condType == LOGIC_COND_TYPE_AND) {
taosArrayAddAll(output->result, params[m].result); taosArrayAddAll(output->result, params[m].result);
// taosArrayDestroy(params[m].result);
// params[m].result = NULL;
} else if (node->condType == LOGIC_COND_TYPE_OR) { } else if (node->condType == LOGIC_COND_TYPE_OR) {
taosArrayAddAll(output->result, params[m].result); taosArrayAddAll(output->result, params[m].result);
// params[m].result = NULL;
} else if (node->condType == LOGIC_COND_TYPE_NOT) { } else if (node->condType == LOGIC_COND_TYPE_NOT) {
// taosArrayAddAll(output->result, params[m].result); // taosArrayAddAll(output->result, params[m].result);
} }
...@@ -593,6 +595,8 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou ...@@ -593,6 +595,8 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou
} else { } else {
for (int32_t m = 0; m < node->pParameterList->length; m++) { for (int32_t m = 0; m < node->pParameterList->length; m++) {
output->status = sifMergeCond(node->condType, output->status, params[m].status); output->status = sifMergeCond(node->condType, output->status, params[m].status);
taosArrayDestroy(params[m].result);
params[m].result = NULL;
} }
} }
_return: _return:
...@@ -607,6 +611,7 @@ static EDealRes sifWalkFunction(SNode *pNode, void *context) { ...@@ -607,6 +611,7 @@ static EDealRes sifWalkFunction(SNode *pNode, void *context) {
SIFCtx *ctx = context; SIFCtx *ctx = context;
ctx->code = sifExecFunction(node, ctx, &output); ctx->code = sifExecFunction(node, ctx, &output);
if (ctx->code != TSDB_CODE_SUCCESS) { if (ctx->code != TSDB_CODE_SUCCESS) {
sifFreeParam(&output);
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
...@@ -624,6 +629,7 @@ static EDealRes sifWalkLogic(SNode *pNode, void *context) { ...@@ -624,6 +629,7 @@ static EDealRes sifWalkLogic(SNode *pNode, void *context) {
SIFCtx *ctx = context; SIFCtx *ctx = context;
ctx->code = sifExecLogic(node, ctx, &output); ctx->code = sifExecLogic(node, ctx, &output);
if (ctx->code) { if (ctx->code) {
sifFreeParam(&output);
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
...@@ -640,6 +646,7 @@ static EDealRes sifWalkOper(SNode *pNode, void *context) { ...@@ -640,6 +646,7 @@ static EDealRes sifWalkOper(SNode *pNode, void *context) {
SIFCtx *ctx = context; SIFCtx *ctx = context;
ctx->code = sifExecOper(node, ctx, &output); ctx->code = sifExecOper(node, ctx, &output);
if (ctx->code) { if (ctx->code) {
sifFreeParam(&output);
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) { if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
...@@ -698,7 +705,11 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) { ...@@ -698,7 +705,11 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
} }
nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx); nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx);
SIF_ERR_RET(ctx.code);
if (ctx.code != 0) {
sifFreeRes(ctx.pRes);
return ctx.code;
}
if (pDst) { if (pDst) {
SIFParam *res = (SIFParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES); SIFParam *res = (SIFParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES);
...@@ -714,8 +725,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) { ...@@ -714,8 +725,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES); taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
} }
sifFreeRes(ctx.pRes); sifFreeRes(ctx.pRes);
return code;
SIF_RET(code);
} }
static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) { static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) {
...@@ -732,8 +742,10 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) { ...@@ -732,8 +742,10 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) {
} }
nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx); nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx);
if (ctx.code != 0) {
SIF_ERR_RET(ctx.code); sifFreeRes(ctx.pRes);
return ctx.code;
}
SIFParam *res = (SIFParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES); SIFParam *res = (SIFParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES);
if (res == NULL) { if (res == NULL) {
...@@ -745,8 +757,7 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) { ...@@ -745,8 +757,7 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) {
sifFreeParam(res); sifFreeParam(res);
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES); taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
taosHashCleanup(ctx.pRes); taosHashCleanup(ctx.pRes);
return code;
SIF_RET(code);
} }
int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, SIdxFltStatus *status) { int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, SIdxFltStatus *status) {
...@@ -760,7 +771,11 @@ int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, ...@@ -760,7 +771,11 @@ int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result,
SArray *output = taosArrayInit(8, sizeof(uint64_t)); SArray *output = taosArrayInit(8, sizeof(uint64_t));
SIFParam param = {.arg = *metaArg, .result = output}; SIFParam param = {.arg = *metaArg, .result = output};
SIF_ERR_RET(sifCalculate((SNode *)pFilterNode, &param)); int32_t code = sifCalculate((SNode *)pFilterNode, &param);
if (code != 0) {
sifFreeParam(&param);
return code;
}
taosArrayAddAll(result, param.result); taosArrayAddAll(result, param.result);
sifFreeParam(&param); sifFreeParam(&param);
......
/** Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com> /** Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* *
* This program is free software: you can use, redistribute, and/or modify * This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * it under the terms of the GNU Affero General Public License, version 3
...@@ -16,6 +15,10 @@ ...@@ -16,6 +15,10 @@
#ifdef USE_UV #ifdef USE_UV
#include "transComm.h" #include "transComm.h"
typedef struct SConnList {
queue conn;
} SConnList;
typedef struct SCliConn { typedef struct SCliConn {
T_REF_DECLARE() T_REF_DECLARE()
uv_connect_t connReq; uv_connect_t connReq;
...@@ -26,7 +29,9 @@ typedef struct SCliConn { ...@@ -26,7 +29,9 @@ typedef struct SCliConn {
SConnBuffer readBuf; SConnBuffer readBuf;
STransQueue cliMsgs; STransQueue cliMsgs;
queue q;
queue q;
SConnList* list;
STransCtx ctx; STransCtx ctx;
bool broken; // link broken or not bool broken; // link broken or not
...@@ -56,13 +61,14 @@ typedef struct SCliMsg { ...@@ -56,13 +61,14 @@ typedef struct SCliMsg {
} SCliMsg; } SCliMsg;
typedef struct SCliThrd { typedef struct SCliThrd {
TdThread thread; // tid TdThread thread; // tid
int64_t pid; // pid int64_t pid; // pid
uv_loop_t* loop; uv_loop_t* loop;
SAsyncPool* asyncPool; SAsyncPool* asyncPool;
uv_idle_t* idle; uv_idle_t* idle;
uv_timer_t timer; uv_prepare_t* prepare;
void* pool; // conn pool uv_timer_t timer;
void* pool; // conn pool
// msg queue // msg queue
queue msg; queue msg;
...@@ -86,10 +92,6 @@ typedef struct SCliObj { ...@@ -86,10 +92,6 @@ typedef struct SCliObj {
SCliThrd** pThreadObj; SCliThrd** pThreadObj;
} SCliObj; } SCliObj;
typedef struct SConnList {
queue conn;
} SConnList;
// conn pool // conn pool
// add expire timeout and capacity limit // add expire timeout and capacity limit
static void* createConnPool(int size); static void* createConnPool(int size);
...@@ -101,7 +103,7 @@ static void doCloseIdleConn(void* param); ...@@ -101,7 +103,7 @@ static void doCloseIdleConn(void* param);
static int sockDebugInfo(struct sockaddr* sockname, char* dst) { static int sockDebugInfo(struct sockaddr* sockname, char* dst) {
struct sockaddr_in addr = *(struct sockaddr_in*)sockname; struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
char buf[20] = {0}; char buf[16] = {0};
int r = uv_ip4_name(&addr, (char*)buf, sizeof(buf)); int r = uv_ip4_name(&addr, (char*)buf, sizeof(buf));
sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port)); sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
return r; return r;
...@@ -118,6 +120,9 @@ static void cliSendCb(uv_write_t* req, int status); ...@@ -118,6 +120,9 @@ static void cliSendCb(uv_write_t* req, int status);
static void cliConnCb(uv_connect_t* req, int status); static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle); static void cliAsyncCb(uv_async_t* handle);
static void cliIdleCb(uv_idle_t* handle); static void cliIdleCb(uv_idle_t* handle);
static void cliPrepareCb(uv_prepare_t* handle);
static int32_t allocConnRef(SCliConn* conn, bool update);
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
...@@ -198,7 +203,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { ...@@ -198,7 +203,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
pThrd = (SCliThrd*)(exh)->pThrd; \ pThrd = (SCliThrd*)(exh)->pThrd; \
} \ } \
} while (0) } while (0)
#define CONN_PERSIST_TIME(para) ((para) == 0 ? 3 * 1000 : (para)) #define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head) \ #define CONN_SHOULD_RELEASE(conn, head) \
...@@ -499,9 +504,8 @@ void* destroyConnPool(void* pool) { ...@@ -499,9 +504,8 @@ void* destroyConnPool(void* pool) {
} }
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
char key[128] = {0}; char key[32] = {0};
CONN_CONSTRUCT_HASH_KEY(key, ip, port); CONN_CONSTRUCT_HASH_KEY(key, ip, port);
SHashObj* pPool = pool; SHashObj* pPool = pool;
SConnList* plist = taosHashGet(pPool, key, strlen(key)); SConnList* plist = taosHashGet(pPool, key, strlen(key));
if (plist == NULL) { if (plist == NULL) {
...@@ -519,13 +523,44 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { ...@@ -519,13 +523,44 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
conn->status = ConnNormal; conn->status = ConnNormal;
QUEUE_REMOVE(&conn->q); QUEUE_REMOVE(&conn->q);
QUEUE_INIT(&conn->q); QUEUE_INIT(&conn->q);
assert(h == &conn->q);
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
conn->task = NULL; conn->task = NULL;
return conn; return conn;
} }
static void addConnToPool(void* pool, SCliConn* conn) {
if (conn->status == ConnInPool) {
return;
}
SCliThrd* thrd = conn->hostThrd;
CONN_HANDLE_THREAD_QUIT(thrd);
allocConnRef(conn, true);
STrans* pTransInst = thrd->pTransInst;
cliReleaseUnfinishedMsg(conn);
transQueueClear(&conn->cliMsgs);
transCtxCleanup(&conn->ctx);
conn->status = ConnInPool;
if (conn->list == NULL) {
char key[32] = {0};
CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port);
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
conn->list = taosHashGet((SHashObj*)pool, key, strlen(key));
}
assert(conn->list != NULL);
QUEUE_INIT(&conn->q);
QUEUE_PUSH(&conn->list->conn, &conn->q);
assert(!QUEUE_IS_EMPTY(&conn->list->conn));
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
arg->param1 = conn;
arg->param2 = thrd;
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
}
static int32_t allocConnRef(SCliConn* conn, bool update) { static int32_t allocConnRef(SCliConn* conn, bool update) {
if (update) { if (update) {
transRemoveExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId);
...@@ -556,38 +591,6 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { ...@@ -556,38 +591,6 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
return 0; return 0;
} }
static void addConnToPool(void* pool, SCliConn* conn) {
if (conn->status == ConnInPool) {
return;
}
SCliThrd* thrd = conn->hostThrd;
CONN_HANDLE_THREAD_QUIT(thrd);
allocConnRef(conn, true);
STrans* pTransInst = thrd->pTransInst;
cliReleaseUnfinishedMsg(conn);
transQueueClear(&conn->cliMsgs);
transCtxCleanup(&conn->ctx);
conn->status = ConnInPool;
char key[128] = {0};
CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port);
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
// list already create before
assert(plist != NULL);
QUEUE_INIT(&conn->q);
QUEUE_PUSH(&plist->conn, &conn->q);
assert(!QUEUE_IS_EMPTY(&plist->conn));
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
arg->param1 = conn;
arg->param2 = thrd;
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
}
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
...@@ -602,11 +605,9 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { ...@@ -602,11 +605,9 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
if (nread > 0) { if (nread > 0) {
pBuf->len += nread; pBuf->len += nread;
if (transReadComplete(pBuf)) { while (transReadComplete(pBuf)) {
tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn); tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn);
cliHandleResp(conn); cliHandleResp(conn);
} else {
tTrace("%s conn %p read partial packet, continue to read", CONN_GET_INST_LABEL(conn), conn);
} }
return; return;
} }
...@@ -967,6 +968,62 @@ static void cliAsyncCb(uv_async_t* handle) { ...@@ -967,6 +968,62 @@ static void cliAsyncCb(uv_async_t* handle) {
static void cliIdleCb(uv_idle_t* handle) { static void cliIdleCb(uv_idle_t* handle) {
SCliThrd* thrd = handle->data; SCliThrd* thrd = handle->data;
tTrace("do idle work"); tTrace("do idle work");
SAsyncPool* pool = thrd->asyncPool;
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
queue wq;
taosThreadMutexLock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq);
taosThreadMutexUnlock(&item->mtx);
int count = 0;
while (!QUEUE_IS_EMPTY(&wq)) {
queue* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (pMsg == NULL) {
continue;
}
(*cliAsyncHandle[pMsg->type])(pMsg, thrd);
count++;
}
}
tTrace("prepare work end");
if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
}
static void cliPrepareCb(uv_prepare_t* handle) {
SCliThrd* thrd = handle->data;
tTrace("prepare work start");
SAsyncPool* pool = thrd->asyncPool;
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
queue wq;
taosThreadMutexLock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq);
taosThreadMutexUnlock(&item->mtx);
int count = 0;
while (!QUEUE_IS_EMPTY(&wq)) {
queue* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (pMsg == NULL) {
continue;
}
(*cliAsyncHandle[pMsg->type])(pMsg, thrd);
count++;
}
}
tTrace("prepare work end");
if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
} }
static void* cliWorkThread(void* arg) { static void* cliWorkThread(void* arg) {
...@@ -1033,7 +1090,12 @@ static SCliThrd* createThrdObj() { ...@@ -1033,7 +1090,12 @@ static SCliThrd* createThrdObj() {
// pThrd->idle = taosMemoryCalloc(1, sizeof(uv_idle_t)); // pThrd->idle = taosMemoryCalloc(1, sizeof(uv_idle_t));
// uv_idle_init(pThrd->loop, pThrd->idle); // uv_idle_init(pThrd->loop, pThrd->idle);
// pThrd->idle->data = pThrd; // pThrd->idle->data = pThrd;
// uv_idle_start(pThrd->idle, cliIdleCb); // uv_idle_start(pThrd->idle, cliIdleCb);
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
uv_prepare_init(pThrd->loop, pThrd->prepare);
pThrd->prepare->data = pThrd;
uv_prepare_start(pThrd->prepare, cliPrepareCb);
pThrd->pool = createConnPool(4); pThrd->pool = createConnPool(4);
transDQCreate(pThrd->loop, &pThrd->delayQueue); transDQCreate(pThrd->loop, &pThrd->delayQueue);
...@@ -1058,6 +1120,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { ...@@ -1058,6 +1120,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
transDQDestroy(pThrd->timeoutQueue, NULL); transDQDestroy(pThrd->timeoutQueue, NULL);
taosMemoryFree(pThrd->idle); taosMemoryFree(pThrd->idle);
taosMemoryFree(pThrd->prepare);
taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd); taosMemoryFree(pThrd);
} }
......
...@@ -120,8 +120,9 @@ int transInitBuffer(SConnBuffer* buf) { ...@@ -120,8 +120,9 @@ int transInitBuffer(SConnBuffer* buf) {
buf->total = 0; buf->total = 0;
return 0; return 0;
} }
int transDestroyBuffer(SConnBuffer* buf) { int transDestroyBuffer(SConnBuffer* p) {
taosMemoryFree(buf->buf); taosMemoryFree(p->buf);
p->buf = NULL;
return 0; return 0;
} }
......
...@@ -73,6 +73,7 @@ typedef struct SWorkThrd { ...@@ -73,6 +73,7 @@ typedef struct SWorkThrd {
uv_os_fd_t fd; uv_os_fd_t fd;
uv_loop_t* loop; uv_loop_t* loop;
SAsyncPool* asyncPool; SAsyncPool* asyncPool;
uv_prepare_t* prepare;
queue msg; queue msg;
TdThreadMutex msgMtx; TdThreadMutex msgMtx;
...@@ -112,6 +113,7 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) ...@@ -112,6 +113,7 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf)
static void uvWorkerAsyncCb(uv_async_t* handle); static void uvWorkerAsyncCb(uv_async_t* handle);
static void uvAcceptAsyncCb(uv_async_t* handle); static void uvAcceptAsyncCb(uv_async_t* handle);
static void uvShutDownCb(uv_shutdown_t* req, int status); static void uvShutDownCb(uv_shutdown_t* req, int status);
static void uvPrepareCb(uv_prepare_t* handle);
/* /*
* time-consuming task throwed into BG work thread * time-consuming task throwed into BG work thread
...@@ -238,8 +240,6 @@ static void uvHandleReq(SSvrConn* pConn) { ...@@ -238,8 +240,6 @@ static void uvHandleReq(SSvrConn* pConn) {
transMsg.msgType = pHead->msgType; transMsg.msgType = pHead->msgType;
transMsg.code = pHead->code; transMsg.code = pHead->code;
// transClearBuffer(&pConn->readBuf);
pConn->inType = pHead->msgType; pConn->inType = pHead->msgType;
if (pConn->status == ConnNormal) { if (pConn->status == ConnNormal) {
if (pHead->persist == 1) { if (pHead->persist == 1) {
...@@ -546,6 +546,52 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) { ...@@ -546,6 +546,52 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) {
uv_close((uv_handle_t*)req->handle, uvDestroyConn); uv_close((uv_handle_t*)req->handle, uvDestroyConn);
taosMemoryFree(req); taosMemoryFree(req);
} }
static void uvPrepareCb(uv_prepare_t* handle) {
// prepare callback
SWorkThrd* pThrd = handle->data;
SAsyncPool* pool = pThrd->asyncPool;
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
queue wq;
taosThreadMutexLock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq);
taosThreadMutexUnlock(&item->mtx);
while (!QUEUE_IS_EMPTY(&wq)) {
queue* head = QUEUE_HEAD(&wq);
QUEUE_REMOVE(head);
SSvrMsg* msg = QUEUE_DATA(head, SSvrMsg, q);
if (msg == NULL) {
tError("unexcept occurred, continue");
continue;
}
// release handle to rpc init
if (msg->type == Quit) {
(*transAsyncHandle[msg->type])(msg, pThrd);
continue;
} else {
STransMsg transMsg = msg->msg;
SExHandle* exh1 = transMsg.info.handle;
int64_t refId = transMsg.info.refId;
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId);
if (exh2 == NULL || exh1 != exh2) {
tTrace("handle except msg %p, ignore it", exh1);
transReleaseExHandle(transGetRefMgt(), refId);
destroySmsg(msg);
continue;
}
msg->pConn = exh1->handle;
transReleaseExHandle(transGetRefMgt(), refId);
(*transAsyncHandle[msg->type])(msg, pThrd);
}
}
}
}
static void uvWorkDoTask(uv_work_t* req) { static void uvWorkDoTask(uv_work_t* req) {
// doing time-consumeing task // doing time-consumeing task
...@@ -695,13 +741,17 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { ...@@ -695,13 +741,17 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
} }
uv_pipe_init(pThrd->loop, pThrd->pipe, 1); uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
// int r = uv_pipe_open(pThrd->pipe, pThrd->fd);
pThrd->pipe->data = pThrd; pThrd->pipe->data = pThrd;
QUEUE_INIT(&pThrd->msg); QUEUE_INIT(&pThrd->msg);
taosThreadMutexInit(&pThrd->msgMtx, NULL); taosThreadMutexInit(&pThrd->msgMtx, NULL);
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
uv_prepare_init(pThrd->loop, pThrd->prepare);
uv_prepare_start(pThrd->prepare, uvPrepareCb);
pThrd->prepare->data = pThrd;
// conn set // conn set
QUEUE_INIT(&pThrd->conn); QUEUE_INIT(&pThrd->conn);
...@@ -986,6 +1036,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) { ...@@ -986,6 +1036,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) {
SRV_RELEASE_UV(pThrd->loop); SRV_RELEASE_UV(pThrd->loop);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg);
transAsyncPoolDestroy(pThrd->asyncPool); transAsyncPoolDestroy(pThrd->asyncPool);
taosMemoryFree(pThrd->prepare);
taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd); taosMemoryFree(pThrd);
} }
......
...@@ -139,7 +139,7 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -139,7 +139,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
const char* idxPattern = "^[0-9]+.idx$"; const char* idxPattern = "^[0-9]+.idx$";
regex_t logRegPattern; regex_t logRegPattern;
regex_t idxRegPattern; regex_t idxRegPattern;
SArray* pLogInfoArray = taosArrayInit(8, sizeof(SWalFileInfo)); SArray* actualLog = taosArrayInit(8, sizeof(SWalFileInfo));
regcomp(&logRegPattern, logPattern, REG_EXTENDED); regcomp(&logRegPattern, logPattern, REG_EXTENDED);
regcomp(&idxRegPattern, idxPattern, REG_EXTENDED); regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
...@@ -159,7 +159,7 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -159,7 +159,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
SWalFileInfo fileInfo; SWalFileInfo fileInfo;
memset(&fileInfo, -1, sizeof(SWalFileInfo)); memset(&fileInfo, -1, sizeof(SWalFileInfo));
sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer); sscanf(name, "%" PRId64 ".log", &fileInfo.firstVer);
taosArrayPush(pLogInfoArray, &fileInfo); taosArrayPush(actualLog, &fileInfo);
} }
} }
...@@ -167,10 +167,10 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -167,10 +167,10 @@ int walCheckAndRepairMeta(SWal* pWal) {
regfree(&logRegPattern); regfree(&logRegPattern);
regfree(&idxRegPattern); regfree(&idxRegPattern);
taosArraySort(pLogInfoArray, compareWalFileInfo); taosArraySort(actualLog, compareWalFileInfo);
int metaFileNum = taosArrayGetSize(pWal->fileInfoSet); int metaFileNum = taosArrayGetSize(pWal->fileInfoSet);
int actualFileNum = taosArrayGetSize(pLogInfoArray); int actualFileNum = taosArrayGetSize(actualLog);
#if 0 #if 0
for (int32_t fileNo = actualFileNum - 1; fileNo >= 0; fileNo--) { for (int32_t fileNo = actualFileNum - 1; fileNo >= 0; fileNo--) {
...@@ -196,11 +196,11 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -196,11 +196,11 @@ int walCheckAndRepairMeta(SWal* pWal) {
taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum); taosArrayPopFrontBatch(pWal->fileInfoSet, metaFileNum - actualFileNum);
} else if (metaFileNum < actualFileNum) { } else if (metaFileNum < actualFileNum) {
for (int i = metaFileNum; i < actualFileNum; i++) { for (int i = metaFileNum; i < actualFileNum; i++) {
SWalFileInfo* pFileInfo = taosArrayGet(pLogInfoArray, i); SWalFileInfo* pFileInfo = taosArrayGet(actualLog, i);
taosArrayPush(pWal->fileInfoSet, pFileInfo); taosArrayPush(pWal->fileInfoSet, pFileInfo);
} }
} }
taosArrayDestroy(pLogInfoArray); taosArrayDestroy(actualLog);
pWal->writeCur = actualFileNum - 1; pWal->writeCur = actualFileNum - 1;
if (actualFileNum > 0) { if (actualFileNum > 0) {
...@@ -221,7 +221,7 @@ int walCheckAndRepairMeta(SWal* pWal) { ...@@ -221,7 +221,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
int code = walSaveMeta(pWal); int code = walSaveMeta(pWal);
if (code < 0) { if (code < 0) {
taosArrayDestroy(pLogInfoArray); taosArrayDestroy(actualLog);
return -1; return -1;
} }
} }
......
...@@ -423,37 +423,38 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { ...@@ -423,37 +423,38 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
return 0; return 0;
} }
int32_t walReadVer(SWalReader *pRead, int64_t ver) { int32_t walReadVer(SWalReader *pReader, int64_t ver) {
wDebug("vgId:%d wal start to read ver %ld", pRead->pWal->cfg.vgId, ver); wDebug("vgId:%d wal start to read ver %ld", pReader->pWal->cfg.vgId, ver);
int64_t contLen; int64_t contLen;
int32_t code;
bool seeked = false; bool seeked = false;
if (pRead->pWal->vers.firstVer == -1) { if (pReader->pWal->vers.firstVer == -1) {
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1; return -1;
} }
if (ver > pRead->pWal->vers.lastVer || ver < pRead->pWal->vers.firstVer) { if (ver > pReader->pWal->vers.lastVer || ver < pReader->pWal->vers.firstVer) {
wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pRead->pWal->cfg.vgId, wDebug("vgId:%d, invalid index:%" PRId64 ", first index:%" PRId64 ", last index:%" PRId64, pReader->pWal->cfg.vgId,
ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.lastVer); ver, pReader->pWal->vers.firstVer, pReader->pWal->vers.lastVer);
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
return -1; return -1;
} }
if (pRead->curInvalid || pRead->curVersion != ver) { if (pReader->curInvalid || pReader->curVersion != ver) {
if (walReadSeekVer(pRead, ver) < 0) { if (walReadSeekVer(pReader, ver) < 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, ver, terrstr()); wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since %s", pReader->pWal->cfg.vgId, ver, terrstr());
return -1; return -1;
} }
seeked = true; seeked = true;
} }
while (1) { while (1) {
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead)); contLen = taosReadFile(pReader->pLogFile, pReader->pHead, sizeof(SWalCkHead));
if (contLen == sizeof(SWalCkHead)) { if (contLen == sizeof(SWalCkHead)) {
break; break;
} else if (contLen == 0 && !seeked) { } else if (contLen == 0 && !seeked) {
walReadSeekVerImpl(pRead, ver); walReadSeekVerImpl(pReader, ver);
seeked = true; seeked = true;
continue; continue;
} else { } else {
...@@ -467,26 +468,26 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { ...@@ -467,26 +468,26 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
} }
} }
contLen = walValidHeadCksum(pRead->pHead); code = walValidHeadCksum(pReader->pHead);
if (contLen != 0) { if (code != 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pRead->pWal->cfg.vgId, wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since head checksum not passed", pReader->pWal->cfg.vgId,
ver); ver);
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
return -1; return -1;
} }
if (pRead->capacity < pRead->pHead->head.bodyLen) { if (pReader->capacity < pReader->pHead->head.bodyLen) {
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pRead->pHead->head.bodyLen); void *ptr = taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReader->pHead->head.bodyLen);
if (ptr == NULL) { if (ptr == NULL) {
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY; terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
return -1; return -1;
} }
pRead->pHead = ptr; pReader->pHead = ptr;
pRead->capacity = pRead->pHead->head.bodyLen; pReader->capacity = pReader->pHead->head.bodyLen;
} }
if ((contLen = taosReadFile(pRead->pLogFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) != if ((contLen = taosReadFile(pReader->pLogFile, pReader->pHead->head.body, pReader->pHead->head.bodyLen)) !=
pRead->pHead->head.bodyLen) { pReader->pHead->head.bodyLen) {
if (contLen < 0) if (contLen < 0)
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
else { else {
...@@ -496,25 +497,28 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) { ...@@ -496,25 +497,28 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
return -1; return -1;
} }
if (pRead->pHead->head.version != ver) { if (pReader->pHead->head.version != ver) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, wError("vgId:%d, unexpected wal log, index:%" PRId64 ", read request index:%" PRId64, pReader->pWal->cfg.vgId,
pRead->pHead->head.version, ver); pReader->pHead->head.version, ver);
pRead->curInvalid = 1; pReader->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
contLen = walValidBodyCksum(pRead->pHead); code = walValidBodyCksum(pReader->pHead);
if (contLen != 0) { if (code != 0) {
wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, wError("vgId:%d, unexpected wal log, index:%" PRId64 ", since body checksum not passed", pReader->pWal->cfg.vgId,
ver); ver);
pRead->curInvalid = 1; uint32_t readCkSum = walCalcBodyCksum(pReader->pHead->head.body, pReader->pHead->head.bodyLen);
uint32_t logCkSum = pReader->pHead->cksumBody;
wError("checksum written into log: %u, checksum calculated: %u", logCkSum, readCkSum);
pReader->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
pRead->curVersion++; pReader->curVersion++;
return 0; return 0;
} }
...@@ -289,18 +289,25 @@ int32_t walEndSnapshot(SWal *pWal) { ...@@ -289,18 +289,25 @@ int32_t walEndSnapshot(SWal *pWal) {
newTotSize -= iter->fileSize; newTotSize -= iter->fileSize;
} }
} }
char fnameStr[WAL_FILE_LEN]; int32_t actualDelete = 0;
char fnameStr[WAL_FILE_LEN];
// remove file // remove file
for (int i = 0; i < deleteCnt; i++) { for (int i = 0; i < deleteCnt; i++) {
pInfo = taosArrayGet(pWal->fileInfoSet, i); pInfo = taosArrayGet(pWal->fileInfoSet, i);
walBuildLogName(pWal, pInfo->firstVer, fnameStr); walBuildLogName(pWal, pInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr); if (taosRemoveFile(fnameStr) < 0) {
goto UPDATE_META;
}
walBuildIdxName(pWal, pInfo->firstVer, fnameStr); walBuildIdxName(pWal, pInfo->firstVer, fnameStr);
taosRemoveFile(fnameStr); if (taosRemoveFile(fnameStr) < 0) {
ASSERT(0);
}
actualDelete++;
} }
UPDATE_META:
// make new array, remove files // make new array, remove files
taosArrayPopFrontBatch(pWal->fileInfoSet, deleteCnt); taosArrayPopFrontBatch(pWal->fileInfoSet, actualDelete);
if (taosArrayGetSize(pWal->fileInfoSet) == 0) { if (taosArrayGetSize(pWal->fileInfoSet) == 0) {
pWal->writeCur = -1; pWal->writeCur = -1;
pWal->vers.firstVer = -1; pWal->vers.firstVer = -1;
......
...@@ -18,7 +18,7 @@ import time ...@@ -18,7 +18,7 @@ import time
import socket import socket
import json import json
import toml import toml
from .boundary import DataBoundary from util.boundary import DataBoundary
import taos import taos
from util.log import * from util.log import *
from util.sql import * from util.sql import *
...@@ -80,23 +80,18 @@ class DataSet: ...@@ -80,23 +80,18 @@ class DataSet:
self.bool_data.append( bool((i + bool_start) % 2 )) self.bool_data.append( bool((i + bool_start) % 2 ))
self.vchar_data.append( f"{vchar_prefix}_{i * vchar_step}" ) self.vchar_data.append( f"{vchar_prefix}_{i * vchar_step}" )
self.nchar_data.append( f"{nchar_prefix}_{i * nchar_step}") self.nchar_data.append( f"{nchar_prefix}_{i * nchar_step}")
self.ts_data.append( int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000 - i * ts_step)) self.ts_data.append( int(datetime.timestamp(datetime.now()) * 1000 - i * ts_step))
def get_disorder_set(self, def get_disorder_set(self, rows, **kwargs):
rows, for k, v in kwargs.items():
int_low :int = INT_MIN, int_low = v if k == "int_low" else INT_MIN
int_up :int = INT_MAX, int_up = v if k == "int_up" else INT_MAX
bint_low :int = BIGINT_MIN, bint_low = v if k == "bint_low" else BIGINT_MIN
bint_up :int = BIGINT_MAX, bint_up = v if k == "bint_up" else BIGINT_MAX
sint_low :int = SMALLINT_MIN, sint_low = v if k == "sint_low" else SMALLINT_MIN
sint_up :int = SMALLINT_MAX, sint_up = v if k == "sint_up" else SMALLINT_MAX
tint_low :int = TINYINT_MIN, tint_low = v if k == "tint_low" else TINYINT_MIN
tint_up :int = TINYINT_MAX, tint_up = v if k == "tint_up" else TINYINT_MAX
ubint_low :int = BIGINT_UN_MIN,
ubint_up :int = BIGINT_UN_MAX,
):
pass pass
......
...@@ -49,18 +49,23 @@ class TDSql: ...@@ -49,18 +49,23 @@ class TDSql:
def close(self): def close(self):
self.cursor.close() self.cursor.close()
def prepare(self): def prepare(self, dbname="db", drop=True, **kwargs):
tdLog.info("prepare database:db") tdLog.info(f"prepare database:{dbname}")
s = 'reset query cache' s = 'reset query cache'
try: try:
self.cursor.execute(s) self.cursor.execute(s)
except: except:
tdLog.notice("'reset query cache' is not supported") tdLog.notice("'reset query cache' is not supported")
s = 'drop database if exists db' if drop:
self.cursor.execute(s) s = f'drop database if exists {dbname}'
s = 'create database db duration 300' self.cursor.execute(s)
s = f'create database {dbname}'
for k, v in kwargs.items():
s += f" {k} {v}"
if "duration" not in kwargs:
s += " duration 300"
self.cursor.execute(s) self.cursor.execute(s)
s = 'use db' s = f'use {dbname}'
self.cursor.execute(s) self.cursor.execute(s)
time.sleep(2) time.sleep(2)
...@@ -106,7 +111,7 @@ class TDSql: ...@@ -106,7 +111,7 @@ class TDSql:
if row_tag: if row_tag:
return self.queryResult return self.queryResult
return self.queryRows return self.queryRows
except Exception as e: except Exception as e:
caller = inspect.getframeinfo(inspect.stack()[1][0]) caller = inspect.getframeinfo(inspect.stack()[1][0])
args = (caller.filename, caller.lineno, sql, repr(e)) args = (caller.filename, caller.lineno, sql, repr(e))
tdLog.notice("%s(%d) failed: sql:%s, %s" % args) tdLog.notice("%s(%d) failed: sql:%s, %s" % args)
...@@ -304,7 +309,7 @@ class TDSql: ...@@ -304,7 +309,7 @@ class TDSql:
tdLog.notice("Try to execute sql again, query times: %d "%i) tdLog.notice("Try to execute sql again, query times: %d "%i)
time.sleep(1) time.sleep(1)
pass pass
else: else:
try: try:
tdLog.notice("Try the last execute sql ") tdLog.notice("Try the last execute sql ")
self.affectedRows = self.cursor.execute(sql) self.affectedRows = self.cursor.execute(sql)
......
...@@ -166,7 +166,7 @@ ...@@ -166,7 +166,7 @@
# ---- query ---- # ---- query ----
./test.sh -f tsim/query/charScalarFunction.sim ./test.sh -f tsim/query/charScalarFunction.sim
# ./test.sh -f tsim/query/explain.sim ./test.sh -f tsim/query/explain.sim
./test.sh -f tsim/query/interval-offset.sim ./test.sh -f tsim/query/interval-offset.sim
./test.sh -f tsim/query/interval.sim ./test.sh -f tsim/query/interval.sim
./test.sh -f tsim/query/scalarFunction.sim ./test.sh -f tsim/query/scalarFunction.sim
...@@ -187,7 +187,7 @@ ...@@ -187,7 +187,7 @@
./test.sh -f tsim/mnode/basic1.sim ./test.sh -f tsim/mnode/basic1.sim
./test.sh -f tsim/mnode/basic2.sim ./test.sh -f tsim/mnode/basic2.sim
./test.sh -f tsim/mnode/basic3.sim ./test.sh -f tsim/mnode/basic3.sim
./test.sh -f tsim/mnode/basic4.sim # TD-17919 ./test.sh -f tsim/mnode/basic4.sim
./test.sh -f tsim/mnode/basic5.sim ./test.sh -f tsim/mnode/basic5.sim
# ---- show ---- # ---- show ----
......
from datetime import datetime
import time
from typing import List, Any, Tuple
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
PRIMARY_COL = "ts"
INT_COL = "c_int"
BINT_COL = "c_bint"
SINT_COL = "c_sint"
TINT_COL = "c_tint"
FLOAT_COL = "c_float"
DOUBLE_COL = "c_double"
BOOL_COL = "c_bool"
TINT_UN_COL = "c_utint"
SINT_UN_COL = "c_usint"
BINT_UN_COL = "c_ubint"
INT_UN_COL = "c_uint"
BINARY_COL = "c_binary"
NCHAR_COL = "c_nchar"
TS_COL = "c_ts"
NUM_COL = [INT_COL, BINT_COL, SINT_COL, TINT_COL, FLOAT_COL, DOUBLE_COL, ]
CHAR_COL = [BINARY_COL, NCHAR_COL, ]
BOOLEAN_COL = [BOOL_COL, ]
TS_TYPE_COL = [TS_COL, ]
INT_TAG = "t_int"
TAG_COL = [INT_TAG]
# insert data args:
TIME_STEP = 10000
NOW = int(datetime.timestamp(datetime.now()) * 1000)
# init db/table
DBNAME = "db"
STBNAME = "stb1"
CTB_PRE = "ct"
NTB_PRE = "nt"
L0 = 0
L1 = 1
L2 = 2
PRIMARY_DIR = 1
NON_PRIMARY_DIR = 0
DATA_PRE0 = f"data0"
DATA_PRE1 = f"data1"
DATA_PRE2 = f"data2"
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
self.taos_cfg_path = tdDnodes.dnodes[0].cfgPath
self.taos_data_dir = tdDnodes.dnodes[0].dataDir
def cfg(self, filename, **update_dict):
cmd = "echo "
for k, v in update_dict.items():
cmd += f"{k} {v}\n"
cmd += f" >> {filename}"
if os.system(cmd) != 0:
tdLog.exit(cmd)
def cfg_str(self, filename, update_str):
cmd = f'echo "{update_str}" >> {filename}'
if os.system(cmd) != 0:
tdLog.exit(cmd)
def cfg_str_list(self, filename, update_list):
for update_str in update_list:
self.cfg_str(filename, update_str)
def del_old_datadir(self, filename):
cmd = f"sed -i '/^dataDir/d' {filename}"
if os.system(cmd) != 0:
tdLog.exit(cmd)
@property
def __err_cfg(self):
cfg_list = []
err_case1 = [
f"dataDir {self.taos_data_dir}/{DATA_PRE0}0 {L0} {NON_PRIMARY_DIR}",
f"dataDir {self.taos_data_dir}/{DATA_PRE1}1 {L1} {PRIMARY_DIR}",
f"dataDir {self.taos_data_dir}/{DATA_PRE2}2 {L2} {NON_PRIMARY_DIR}"
]
err_case2 = [
f"dataDir {self.taos_data_dir}/{DATA_PRE0}0 {L0} {NON_PRIMARY_DIR}",
f"dataDir {self.taos_data_dir}/{DATA_PRE1}1 {L1} {NON_PRIMARY_DIR}",
f"dataDir {self.taos_data_dir}/{DATA_PRE2}2 {L2} {PRIMARY_DIR}"
]
err_case3 = [
f"dataDir {self.taos_data_dir}/data33 3 {NON_PRIMARY_DIR}"
]
err_case4 = [
f"dataDir {self.taos_data_dir}/{DATA_PRE0}0 {L0} {NON_PRIMARY_DIR}",
f"dataDir {self.taos_data_dir}/{DATA_PRE1}1 {L1} {NON_PRIMARY_DIR}",
f"dataDir {self.taos_data_dir}/{DATA_PRE2}2 {L2} {NON_PRIMARY_DIR}",
f"dataDir {self.taos_data_dir}/{DATA_PRE2}2 {L1} {NON_PRIMARY_DIR}"
]
err_case5 = [f"dataDir {self.taos_data_dir}/{DATA_PRE0}0 {L0} {PRIMARY_DIR}"]
for i in range(16):
err_case5.append(f"dataDir {self.taos_data_dir}/{DATA_PRE0}{i+1} {L0} {NON_PRIMARY_DIR}")
err_case6 = [
f"dataDir {self.taos_data_dir}/{DATA_PRE0}0 {L0} {PRIMARY_DIR}",
f"dataDir {self.taos_data_dir}/{DATA_PRE0}1 {L0} {PRIMARY_DIR}",
]
err_case7 = [
f"dataDir {self.taos_data_dir}/{DATA_PRE0}0 {L0} {PRIMARY_DIR}",
f"dataDir {self.taos_data_dir}/{DATA_PRE2}2 {L2} {PRIMARY_DIR}",
]
err_case8 = [
f"dataDir {self.taos_data_dir}/data33 3 {PRIMARY_DIR}"
]
err_case9 = [
f"dataDir {self.taos_data_dir}/data33 -1 {NON_PRIMARY_DIR}"
]
cfg_list.append(err_case1)
cfg_list.append(err_case2)
cfg_list.append(err_case3)
cfg_list.append(err_case4)
cfg_list.append(err_case5)
cfg_list.append(err_case6)
cfg_list.append(err_case7)
cfg_list.append(err_case8)
cfg_list.append(err_case9)
return cfg_list
@property
def __current_cfg(self):
cfg_list = []
current_case1 = [
f"dataDir {self.taos_data_dir}/{DATA_PRE0}0 {L0} {PRIMARY_DIR}",
f"dataDir {self.taos_data_dir}/{DATA_PRE0}1 {L0} {NON_PRIMARY_DIR}",
f"dataDir {self.taos_data_dir}/{DATA_PRE1}1 {L1} {NON_PRIMARY_DIR}",
f"dataDir {self.taos_data_dir}/{DATA_PRE2}2 {L2} {NON_PRIMARY_DIR}"
]
current_case2 = [f"dataDir {self.taos_data_dir}/{DATA_PRE0}0 {L0} {PRIMARY_DIR}"]
for i in range(9):
current_case2.append(f"dataDir {self.taos_data_dir}/{DATA_PRE0}{i+1} {L0} {NON_PRIMARY_DIR}")
# TD-17773bug
current_case3 = [
f"dataDir {self.taos_data_dir}/{DATA_PRE0}0 ",
f"dataDir {self.taos_data_dir}/{DATA_PRE0}1 {L0} {NON_PRIMARY_DIR}",
f"dataDir {self.taos_data_dir}/{DATA_PRE1}0 {L1} {NON_PRIMARY_DIR}",
f"dataDir {self.taos_data_dir}/{DATA_PRE2}0 {L2} {NON_PRIMARY_DIR}",
]
cfg_list.append(current_case1)
cfg_list.append(current_case3)
# case2 must in last of least, because use this cfg as data uniformity test
cfg_list.append(current_case2)
return cfg_list
def cfg_check(self):
for cfg_case in self.__err_cfg:
self.del_old_datadir(filename=self.taos_cfg_path)
tdDnodes.stop(1)
tdDnodes.deploy(1)
self.cfg_str_list(filename=self.taos_cfg_path, update_list=cfg_case)
tdDnodes.starttaosd(1)
time.sleep(2)
tdSql.error(f"show databases")
for cfg_case in self.__current_cfg:
self.del_old_datadir(filename=self.taos_cfg_path)
tdDnodes.stop(1)
tdDnodes.deploy(1)
self.cfg_str_list(filename=self.taos_cfg_path, update_list=cfg_case)
tdDnodes.start(1)
tdSql.query(f"show databases")
def __create_tb(self, stb=STBNAME, ctb_pre = CTB_PRE, ctb_num=20, ntb_pre=NTB_PRE, ntbnum=1, dbname=DBNAME):
tdLog.printNoPrefix("==========step: create table")
create_stb_sql = f'''create table {dbname}.{stb}(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
{TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
{INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
) tags ({INT_TAG} int)
'''
tdSql.execute(create_stb_sql)
for i in range(ntbnum):
create_ntb_sql = f'''create table {dbname}.{ntb_pre}{i+1}(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp,
{TINT_UN_COL} tinyint unsigned, {SINT_UN_COL} smallint unsigned,
{INT_UN_COL} int unsigned, {BINT_UN_COL} bigint unsigned
)
'''
tdSql.execute(create_ntb_sql)
for i in range(ctb_num):
tdSql.execute(f'create table {dbname}.{ctb_pre}{i+1} using {dbname}.{stb} tags ( {i+1} )')
def __insert_data(self, rows, dbname=DBNAME, ctb_num=20):
data = DataSet()
data.get_order_set(rows)
tdLog.printNoPrefix("==========step: start inser data into tables now.....")
for i in range(self.rows):
row_data = f'''
{data.int_data[i]}, {data.bint_data[i]}, {data.sint_data[i]}, {data.tint_data[i]}, {data.float_data[i]}, {data.double_data[i]},
{data.bool_data[i]}, '{data.vchar_data[i]}', '{data.nchar_data[i]}', {data.ts_data[i]}, {data.utint_data[i]},
{data.usint_data[i]}, {data.uint_data[i]}, {data.ubint_data[i]}
'''
neg_row_data = f'''
{-1 * data.int_data[i]}, {-1 * data.bint_data[i]}, {-1 * data.sint_data[i]}, {-1 * data.tint_data[i]}, {-1 * data.float_data[i]}, {-1 * data.double_data[i]},
{data.bool_data[i]}, '{data.vchar_data[i]}', '{data.nchar_data[i]}', {data.ts_data[i]}, {1 * data.utint_data[i]},
{1 * data.usint_data[i]}, {1 * data.uint_data[i]}, {1 * data.ubint_data[i]}
'''
for j in range(ctb_num):
tdSql.execute(
f"insert into {dbname}.{CTB_PRE}{j + 1} values ( {NOW - i * TIME_STEP}, {row_data} )")
# tdSql.execute(
# f"insert into {dbname}.{CTB_PRE}2 values ( {NOW - i * int(TIME_STEP * 0.6)}, {neg_row_data} )")
# tdSql.execute(
# f"insert into {dbname}.{CTB_PRE}4 values ( {NOW - i * int(TIME_STEP * 0.8) }, {row_data} )")
tdSql.execute(
f"insert into {dbname}.{NTB_PRE}1 values ( {NOW - i * int(TIME_STEP * 1.2)}, {row_data} )")
def run(self):
self.rows = 10
self.cfg_check()
tdSql.prepare(dbname=DBNAME, **{"keep": "1d, 1500m, 26h", "duration":"1h", "vgroups": 10})
self.__create_tb(dbname=DBNAME)
self.__insert_data(rows=self.rows, dbname=DBNAME)
tdSql.query(f"select count(*) from {DBNAME}.{NTB_PRE}1")
tdSql.execute(f"flush database {DBNAME}")
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
import datetime import datetime
import time
from dataclasses import dataclass from dataclasses import dataclass
from typing import List, Any, Tuple from typing import List, Any, Tuple
...@@ -328,11 +329,15 @@ class TDTestCase: ...@@ -328,11 +329,15 @@ class TDTestCase:
tdSql.query("select database()") tdSql.query("select database()")
dbname = tdSql.getData(0,0) dbname = tdSql.getData(0,0)
tdSql.query("show databases") tdSql.query("show databases")
for index , value in enumerate(tdSql.cursor.description):
if value[0] == "retention":
r_index = index
break
for row in tdSql.queryResult: for row in tdSql.queryResult:
if row[0] == dbname: if row[0] == dbname:
if row[-1] is None: if row[r_index] is None:
continue continue
if ":" in row[-1]: if ":" in row[r_index]:
sma.rollup_db = True sma.rollup_db = True
if sma.rollup_db : if sma.rollup_db :
return False return False
...@@ -393,8 +398,6 @@ class TDTestCase: ...@@ -393,8 +398,6 @@ class TDTestCase:
else: else:
tdSql.error(self.__create_sma_index(sma)) tdSql.error(self.__create_sma_index(sma))
def __drop_sma_index(self, sma:SMAschema): def __drop_sma_index(self, sma:SMAschema):
sql = f"{sma.drop} {sma.drop_flag} {sma.index_name}" sql = f"{sma.drop} {sma.drop_flag} {sma.index_name}"
return sql return sql
...@@ -416,8 +419,7 @@ class TDTestCase: ...@@ -416,8 +419,7 @@ class TDTestCase:
self.sma_created_index = list(filter(lambda x: x != sma.index_name, self.sma_created_index)) self.sma_created_index = list(filter(lambda x: x != sma.index_name, self.sma_created_index))
tdSql.query("show streams") tdSql.query("show streams")
tdSql.checkRows(self.sma_count) tdSql.checkRows(self.sma_count)
time.sleep(1)
else: else:
tdSql.error(self.__drop_sma_index(sma)) tdSql.error(self.__drop_sma_index(sma))
......
...@@ -136,23 +136,23 @@ class TDTestCase: ...@@ -136,23 +136,23 @@ class TDTestCase:
return sqls return sqls
def __test_current(self): # sourcery skip: use-itertools-product def __test_current(self, dbname="db"): # sourcery skip: use-itertools-product
tdLog.printNoPrefix("==========current sql condition check , must return query ok==========") tdLog.printNoPrefix("==========current sql condition check , must return query ok==========")
tbname = [ tbname = [
"ct1", f"{dbname}.ct1",
"ct2", f"{dbname}.ct2",
"ct4", f"{dbname}.ct4",
] ]
for tb in tbname: for tb in tbname:
for i in range(2,8): for i in range(2,8):
self.__concat_check(tb,i) self.__concat_check(tb,i)
tdLog.printNoPrefix(f"==========current sql condition check in {tb}, col num: {i} over==========") tdLog.printNoPrefix(f"==========current sql condition check in {tb}, col num: {i} over==========")
def __test_error(self): def __test_error(self, dbname="db"):
tdLog.printNoPrefix("==========err sql condition check , must return error==========") tdLog.printNoPrefix("==========err sql condition check , must return error==========")
tbname = [ tbname = [
"t1", f"{dbname}.t1",
"stb1", f"{dbname}.stb1",
] ]
for tb in tbname: for tb in tbname:
...@@ -163,22 +163,20 @@ class TDTestCase: ...@@ -163,22 +163,20 @@ class TDTestCase:
tdLog.printNoPrefix(f"==========err sql condition check in {tb} over==========") tdLog.printNoPrefix(f"==========err sql condition check in {tb} over==========")
def all_test(self): def all_test(self, dbname="db"):
self.__test_current() self.__test_current(dbname)
self.__test_error() self.__test_error(dbname)
def __create_tb(self, dbname="db"):
def __create_tb(self):
tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table") tdLog.printNoPrefix("==========step1:create table")
create_stb_sql = f'''create table stb1( create_stb_sql = f'''create table {dbname}.stb1(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool, {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp
) tags (t1 int) ) tags (t1 int)
''' '''
create_ntb_sql = f'''create table t1( create_ntb_sql = f'''create table {dbname}.t1(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool, {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp
...@@ -188,29 +186,29 @@ class TDTestCase: ...@@ -188,29 +186,29 @@ class TDTestCase:
tdSql.execute(create_ntb_sql) tdSql.execute(create_ntb_sql)
for i in range(4): for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.stb1 tags ( {i+1} )')
def __insert_data(self, rows): def __insert_data(self, rows, dbname="db"):
now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000) now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
for i in range(rows): for i in range(rows):
tdSql.execute( tdSql.execute(
f"insert into ct1 values ( { now_time - i * 1000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )" f"insert into {dbname}.ct1 values ( { now_time - i * 1000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
) )
tdSql.execute( tdSql.execute(
f"insert into ct4 values ( { now_time - i * 7776000000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )" f"insert into {dbname}.ct4 values ( { now_time - i * 7776000000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
) )
tdSql.execute( tdSql.execute(
f"insert into ct2 values ( { now_time - i * 7776000000 }, {-i}, {-11111 * i}, {-111 * i % 32767 }, {-11 * i % 127}, {-1.11*i}, {-1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )" f"insert into {dbname}.ct2 values ( { now_time - i * 7776000000 }, {-i}, {-11111 * i}, {-111 * i % 32767 }, {-11 * i % 127}, {-1.11*i}, {-1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
) )
tdSql.execute( tdSql.execute(
f'''insert into ct1 values f'''insert into {dbname}.ct1 values
( { now_time - rows * 5 }, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar_测试_0', { now_time + 8 } ) ( { now_time - rows * 5 }, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar_测试_0', { now_time + 8 } )
( { now_time + 10000 }, { rows }, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar_测试_9', { now_time + 9 } ) ( { now_time + 10000 }, { rows }, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar_测试_9', { now_time + 9 } )
''' '''
) )
tdSql.execute( tdSql.execute(
f'''insert into ct4 values f'''insert into {dbname}.ct4 values
( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
...@@ -226,7 +224,7 @@ class TDTestCase: ...@@ -226,7 +224,7 @@ class TDTestCase:
) )
tdSql.execute( tdSql.execute(
f'''insert into ct2 values f'''insert into {dbname}.ct2 values
( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
...@@ -242,13 +240,13 @@ class TDTestCase: ...@@ -242,13 +240,13 @@ class TDTestCase:
) )
for i in range(rows): for i in range(rows):
insert_data = f'''insert into t1 values insert_data = f'''insert into {dbname}.t1 values
( { now_time - i * 3600000 }, {i}, {i * 11111}, { i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2}, ( { now_time - i * 3600000 }, {i}, {i * 11111}, { i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2},
"binary_{i}", "nchar_测试_{i}", { now_time - 1000 * i } ) "binary_{i}", "nchar_测试_{i}", { now_time - 1000 * i } )
''' '''
tdSql.execute(insert_data) tdSql.execute(insert_data)
tdSql.execute( tdSql.execute(
f'''insert into t1 values f'''insert into {dbname}.t1 values
( { now_time + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - (( rows // 2 ) * 60 + 30) * 60000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - (( rows // 2 ) * 60 + 30) * 60000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3600000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 3600000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
...@@ -268,22 +266,23 @@ class TDTestCase: ...@@ -268,22 +266,23 @@ class TDTestCase:
tdSql.prepare() tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table") tdLog.printNoPrefix("==========step1:create table")
self.__create_tb() self.__create_tb(dbname="db")
tdLog.printNoPrefix("==========step2:insert data") tdLog.printNoPrefix("==========step2:insert data")
self.rows = 10 self.rows = 10
self.__insert_data(self.rows) self.__insert_data(self.rows, dbname="db")
tdLog.printNoPrefix("==========step3:all check") tdLog.printNoPrefix("==========step3:all check")
self.all_test() self.all_test(dbname="db")
tdDnodes.stop(1) # tdDnodes.stop(1)
tdDnodes.start(1) # tdDnodes.start(1)
tdSql.execute("flush database db")
tdSql.execute("use db") tdSql.execute("use db")
tdLog.printNoPrefix("==========step4:after wal, all check again ") tdLog.printNoPrefix("==========step4:after wal, all check again ")
self.all_test() self.all_test(dbname="db")
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
...@@ -137,22 +137,22 @@ class TDTestCase: ...@@ -137,22 +137,22 @@ class TDTestCase:
return sqls return sqls
def __test_current(self): # sourcery skip: use-itertools-product def __test_current(self, dbname="db"):
tdLog.printNoPrefix("==========current sql condition check , must return query ok==========") tdLog.printNoPrefix("==========current sql condition check , must return query ok==========")
tbname = [ tbname = [
"t1", f"{dbname}.t1",
"stb1", f"{dbname}.stb1",
] ]
for tb in tbname: for tb in tbname:
for i in range(2,8): for i in range(2,8):
self.__concat_check(tb,i) self.__concat_check(tb,i)
tdLog.printNoPrefix(f"==========current sql condition check in {tb}, col num: {i} over==========") tdLog.printNoPrefix(f"==========current sql condition check in {tb}, col num: {i} over==========")
def __test_error(self): def __test_error(self, dbname="db"):
tdLog.printNoPrefix("==========err sql condition check , must return error==========") tdLog.printNoPrefix("==========err sql condition check , must return error==========")
tbname = [ tbname = [
"ct1", f"{dbname}.ct1",
"ct4", f"{dbname}.ct4",
] ]
for tb in tbname: for tb in tbname:
...@@ -163,22 +163,20 @@ class TDTestCase: ...@@ -163,22 +163,20 @@ class TDTestCase:
tdLog.printNoPrefix(f"==========err sql condition check in {tb} over==========") tdLog.printNoPrefix(f"==========err sql condition check in {tb} over==========")
def all_test(self): def all_test(self, dbname="db"):
self.__test_current() self.__test_current(dbname)
self.__test_error() self.__test_error(dbname)
def __create_tb(self, dbname="db"):
def __create_tb(self):
tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table") tdLog.printNoPrefix("==========step1:create table")
create_stb_sql = f'''create table stb1( create_stb_sql = f'''create table {dbname}.stb1(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool, {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp
) tags (t1 int) ) tags (t1 int)
''' '''
create_ntb_sql = f'''create table t1( create_ntb_sql = f'''create table {dbname}.t1(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool, {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp
...@@ -188,29 +186,29 @@ class TDTestCase: ...@@ -188,29 +186,29 @@ class TDTestCase:
tdSql.execute(create_ntb_sql) tdSql.execute(create_ntb_sql)
for i in range(4): for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.stb1 tags ( {i+1} )')
def __insert_data(self, rows): def __insert_data(self, rows, dbname="db"):
now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000) now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
for i in range(rows): for i in range(rows):
tdSql.execute( tdSql.execute(
f"insert into ct1 values ( { now_time - i * 1000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )" f"insert into {dbname}.ct1 values ( { now_time - i * 1000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
) )
tdSql.execute( tdSql.execute(
f"insert into ct4 values ( { now_time - i * 7776000000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )" f"insert into {dbname}.ct4 values ( { now_time - i * 7776000000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
) )
tdSql.execute( tdSql.execute(
f"insert into ct2 values ( { now_time - i * 7776000000 }, {-i}, {-11111 * i}, {-111 * i % 32767 }, {-11 * i % 127}, {-1.11*i}, {-1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )" f"insert into {dbname}.ct2 values ( { now_time - i * 7776000000 }, {-i}, {-11111 * i}, {-111 * i % 32767 }, {-11 * i % 127}, {-1.11*i}, {-1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
) )
tdSql.execute( tdSql.execute(
f'''insert into ct1 values f'''insert into {dbname}.ct1 values
( { now_time - rows * 5 }, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar_测试_0', { now_time + 8 } ) ( { now_time - rows * 5 }, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar_测试_0', { now_time + 8 } )
( { now_time + 10000 }, { rows }, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar_测试_9', { now_time + 9 } ) ( { now_time + 10000 }, { rows }, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar_测试_9', { now_time + 9 } )
''' '''
) )
tdSql.execute( tdSql.execute(
f'''insert into ct4 values f'''insert into {dbname}.ct4 values
( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
...@@ -226,7 +224,7 @@ class TDTestCase: ...@@ -226,7 +224,7 @@ class TDTestCase:
) )
tdSql.execute( tdSql.execute(
f'''insert into ct2 values f'''insert into {dbname}.ct2 values
( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
...@@ -242,13 +240,13 @@ class TDTestCase: ...@@ -242,13 +240,13 @@ class TDTestCase:
) )
for i in range(rows): for i in range(rows):
insert_data = f'''insert into t1 values insert_data = f'''insert into {dbname}.t1 values
( { now_time - i * 3600000 }, {i}, {i * 11111}, { i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2}, ( { now_time - i * 3600000 }, {i}, {i * 11111}, { i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2},
"binary_{i}", "nchar_测试_{i}", { now_time - 1000 * i } ) "binary_{i}", "nchar_测试_{i}", { now_time - 1000 * i } )
''' '''
tdSql.execute(insert_data) tdSql.execute(insert_data)
tdSql.execute( tdSql.execute(
f'''insert into t1 values f'''insert into {dbname}.t1 values
( { now_time + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - (( rows // 2 ) * 60 + 30) * 60000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - (( rows // 2 ) * 60 + 30) * 60000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3600000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 3600000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
...@@ -268,23 +266,23 @@ class TDTestCase: ...@@ -268,23 +266,23 @@ class TDTestCase:
tdSql.prepare() tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table") tdLog.printNoPrefix("==========step1:create table")
self.__create_tb() self.__create_tb(dbname="db")
tdLog.printNoPrefix("==========step2:insert data") tdLog.printNoPrefix("==========step2:insert data")
self.rows = 10 self.rows = 10
self.__insert_data(self.rows) self.__insert_data(self.rows, dbname="db")
tdLog.printNoPrefix("==========step3:all check") tdLog.printNoPrefix("==========step3:all check")
self.all_test() self.all_test(dbname="db")
tdDnodes.stop(1) # tdDnodes.stop(1)
tdDnodes.start(1) # tdDnodes.start(1)
tdSql.execute("flush database db")
tdSql.execute("use db") tdSql.execute("use db")
tdLog.printNoPrefix("==========step4:after wal, all check again ") tdLog.printNoPrefix("==========step4:after wal, all check again ")
self.all_test() self.all_test(dbname="db")
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success(f"{__file__} successfully executed") tdLog.success(f"{__file__} successfully executed")
......
...@@ -137,23 +137,23 @@ class TDTestCase: ...@@ -137,23 +137,23 @@ class TDTestCase:
return sqls return sqls
def __test_current(self): # sourcery skip: use-itertools-product def __test_current(self,dbname="db"): # sourcery skip: use-itertools-product
tdLog.printNoPrefix("==========current sql condition check , must return query ok==========") tdLog.printNoPrefix("==========current sql condition check , must return query ok==========")
tbname = [ tbname = [
"t1", f"{dbname}.t1",
"stb1" f"{dbname}.stb1"
] ]
for tb in tbname: for tb in tbname:
for i in range(2,8): for i in range(2,8):
self.__concat_ws_check(tb,i) self.__concat_ws_check(tb,i)
tdLog.printNoPrefix(f"==========current sql condition check in {tb}, col num: {i} over==========") tdLog.printNoPrefix(f"==========current sql condition check in {tb}, col num: {i} over==========")
def __test_error(self): def __test_error(self, dbname="db"):
tdLog.printNoPrefix("==========err sql condition check , must return error==========") tdLog.printNoPrefix("==========err sql condition check , must return error==========")
tbname = [ tbname = [
"ct1", f"{dbname}.ct1",
"ct2", f"{dbname}.ct2",
"ct4", f"{dbname}.ct4",
] ]
for tb in tbname: for tb in tbname:
...@@ -164,22 +164,21 @@ class TDTestCase: ...@@ -164,22 +164,21 @@ class TDTestCase:
tdLog.printNoPrefix(f"==========err sql condition check in {tb} over==========") tdLog.printNoPrefix(f"==========err sql condition check in {tb} over==========")
def all_test(self): def all_test(self,dbname="db"):
self.__test_current() self.__test_current(dbname)
self.__test_error() self.__test_error(dbname)
def __create_tb(self): def __create_tb(self, dbname="db"):
tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table") tdLog.printNoPrefix("==========step1:create table")
create_stb_sql = f'''create table stb1( create_stb_sql = f'''create table {dbname}.stb1(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool, {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp
) tags (t1 int) ) tags (t1 int)
''' '''
create_ntb_sql = f'''create table t1( create_ntb_sql = f'''create table {dbname}.t1(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool, {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp
...@@ -189,29 +188,29 @@ class TDTestCase: ...@@ -189,29 +188,29 @@ class TDTestCase:
tdSql.execute(create_ntb_sql) tdSql.execute(create_ntb_sql)
for i in range(4): for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.stb1 tags ( {i+1} )')
def __insert_data(self, rows): def __insert_data(self, rows, dbname="db"):
now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000) now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
for i in range(rows): for i in range(rows):
tdSql.execute( tdSql.execute(
f"insert into ct1 values ( { now_time - i * 1000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )" f"insert into {dbname}.ct1 values ( { now_time - i * 1000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
) )
tdSql.execute( tdSql.execute(
f"insert into ct4 values ( { now_time - i * 7776000000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )" f"insert into {dbname}.ct4 values ( { now_time - i * 7776000000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
) )
tdSql.execute( tdSql.execute(
f"insert into ct2 values ( { now_time - i * 7776000000 }, {-i}, {-11111 * i}, {-111 * i % 32767 }, {-11 * i % 127}, {-1.11*i}, {-1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )" f"insert into {dbname}.ct2 values ( { now_time - i * 7776000000 }, {-i}, {-11111 * i}, {-111 * i % 32767 }, {-11 * i % 127}, {-1.11*i}, {-1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
) )
tdSql.execute( tdSql.execute(
f'''insert into ct1 values f'''insert into {dbname}.ct1 values
( { now_time - rows * 5 }, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar_测试_0', { now_time + 8 } ) ( { now_time - rows * 5 }, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar_测试_0', { now_time + 8 } )
( { now_time + 10000 }, { rows }, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar_测试_9', { now_time + 9 } ) ( { now_time + 10000 }, { rows }, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar_测试_9', { now_time + 9 } )
''' '''
) )
tdSql.execute( tdSql.execute(
f'''insert into ct4 values f'''insert into {dbname}.ct4 values
( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
...@@ -227,7 +226,7 @@ class TDTestCase: ...@@ -227,7 +226,7 @@ class TDTestCase:
) )
tdSql.execute( tdSql.execute(
f'''insert into ct2 values f'''insert into {dbname}.ct2 values
( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
...@@ -243,13 +242,13 @@ class TDTestCase: ...@@ -243,13 +242,13 @@ class TDTestCase:
) )
for i in range(rows): for i in range(rows):
insert_data = f'''insert into t1 values insert_data = f'''insert into {dbname}.t1 values
( { now_time - i * 3600000 }, {i}, {i * 11111}, { i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2}, ( { now_time - i * 3600000 }, {i}, {i * 11111}, { i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2},
"binary_{i}", "nchar_测试_{i}", { now_time - 1000 * i } ) "binary_{i}", "nchar_测试_{i}", { now_time - 1000 * i } )
''' '''
tdSql.execute(insert_data) tdSql.execute(insert_data)
tdSql.execute( tdSql.execute(
f'''insert into t1 values f'''insert into {dbname}.t1 values
( { now_time + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - (( rows // 2 ) * 60 + 30) * 60000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - (( rows // 2 ) * 60 + 30) * 60000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3600000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 3600000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
...@@ -269,22 +268,23 @@ class TDTestCase: ...@@ -269,22 +268,23 @@ class TDTestCase:
tdSql.prepare() tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table") tdLog.printNoPrefix("==========step1:create table")
self.__create_tb() self.__create_tb(dbname="db")
tdLog.printNoPrefix("==========step2:insert data") tdLog.printNoPrefix("==========step2:insert data")
self.rows = 10 self.rows = 10
self.__insert_data(self.rows) self.__insert_data(self.rows, dbname="db")
tdLog.printNoPrefix("==========step3:all check") tdLog.printNoPrefix("==========step3:all check")
self.all_test() self.all_test(dbname="db")
tdDnodes.stop(1) # tdDnodes.stop(1)
tdDnodes.start(1) # tdDnodes.start(1)
tdSql.execute("flush database db")
tdSql.execute("use db") tdSql.execute("use db")
tdLog.printNoPrefix("==========step4:after wal, all check again ") tdLog.printNoPrefix("==========step4:after wal, all check again ")
self.all_test() self.all_test(dbname="db")
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
...@@ -137,23 +137,23 @@ class TDTestCase: ...@@ -137,23 +137,23 @@ class TDTestCase:
return sqls return sqls
def __test_current(self): # sourcery skip: use-itertools-product def __test_current(self, dbname="db"): # sourcery skip: use-itertools-product
tdLog.printNoPrefix("==========current sql condition check , must return query ok==========") tdLog.printNoPrefix("==========current sql condition check , must return query ok==========")
tbname = [ tbname = [
"ct1", f"{dbname}.ct1",
"ct2", f"{dbname}.ct2",
"ct4", f"{dbname}.ct4",
] ]
for tb in tbname: for tb in tbname:
for i in range(2,8): for i in range(2,8):
self.__concat_ws_check(tb,i) self.__concat_ws_check(tb,i)
tdLog.printNoPrefix(f"==========current sql condition check in {tb}, col num: {i} over==========") tdLog.printNoPrefix(f"==========current sql condition check in {tb}, col num: {i} over==========")
def __test_error(self): def __test_error(self, dbname="db"):
tdLog.printNoPrefix("==========err sql condition check , must return error==========") tdLog.printNoPrefix("==========err sql condition check , must return error==========")
tbname = [ tbname = [
"t1", f"{dbname}.t1",
"stb1" f"{dbname}.stb1"
] ]
for tb in tbname: for tb in tbname:
...@@ -164,22 +164,21 @@ class TDTestCase: ...@@ -164,22 +164,21 @@ class TDTestCase:
tdLog.printNoPrefix(f"==========err sql condition check in {tb} over==========") tdLog.printNoPrefix(f"==========err sql condition check in {tb} over==========")
def all_test(self): def all_test(self, dbname="db"):
self.__test_current() self.__test_current(dbname="db")
self.__test_error() self.__test_error(dbname="db")
def __create_tb(self): def __create_tb(self, dbname="db"):
tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table") tdLog.printNoPrefix("==========step1:create table")
create_stb_sql = f'''create table stb1( create_stb_sql = f'''create table {dbname}.stb1(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool, {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp
) tags (t1 int) ) tags (t1 int)
''' '''
create_ntb_sql = f'''create table t1( create_ntb_sql = f'''create table {dbname}.t1(
ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint, ts timestamp, {INT_COL} int, {BINT_COL} bigint, {SINT_COL} smallint, {TINT_COL} tinyint,
{FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool, {FLOAT_COL} float, {DOUBLE_COL} double, {BOOL_COL} bool,
{BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp {BINARY_COL} binary(16), {NCHAR_COL} nchar(32), {TS_COL} timestamp
...@@ -189,29 +188,29 @@ class TDTestCase: ...@@ -189,29 +188,29 @@ class TDTestCase:
tdSql.execute(create_ntb_sql) tdSql.execute(create_ntb_sql)
for i in range(4): for i in range(4):
tdSql.execute(f'create table ct{i+1} using stb1 tags ( {i+1} )') tdSql.execute(f'create table {dbname}.ct{i+1} using {dbname}.stb1 tags ( {i+1} )')
def __insert_data(self, rows): def __insert_data(self, rows, dbname="db"):
now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000) now_time = int(datetime.datetime.timestamp(datetime.datetime.now()) * 1000)
for i in range(rows): for i in range(rows):
tdSql.execute( tdSql.execute(
f"insert into ct1 values ( { now_time - i * 1000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )" f"insert into {dbname}.ct1 values ( { now_time - i * 1000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
) )
tdSql.execute( tdSql.execute(
f"insert into ct4 values ( { now_time - i * 7776000000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )" f"insert into {dbname}.ct4 values ( { now_time - i * 7776000000 }, {i}, {11111 * i}, {111 * i % 32767 }, {11 * i % 127}, {1.11*i}, {1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
) )
tdSql.execute( tdSql.execute(
f"insert into ct2 values ( { now_time - i * 7776000000 }, {-i}, {-11111 * i}, {-111 * i % 32767 }, {-11 * i % 127}, {-1.11*i}, {-1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )" f"insert into {dbname}.ct2 values ( { now_time - i * 7776000000 }, {-i}, {-11111 * i}, {-111 * i % 32767 }, {-11 * i % 127}, {-1.11*i}, {-1100.0011*i}, {i%2}, 'binary{i}', 'nchar_测试_{i}', { now_time + 1 * i } )"
) )
tdSql.execute( tdSql.execute(
f'''insert into ct1 values f'''insert into {dbname}.ct1 values
( { now_time - rows * 5 }, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar_测试_0', { now_time + 8 } ) ( { now_time - rows * 5 }, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar_测试_0', { now_time + 8 } )
( { now_time + 10000 }, { rows }, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar_测试_9', { now_time + 9 } ) ( { now_time + 10000 }, { rows }, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar_测试_9', { now_time + 9 } )
''' '''
) )
tdSql.execute( tdSql.execute(
f'''insert into ct4 values f'''insert into {dbname}.ct4 values
( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
...@@ -227,7 +226,7 @@ class TDTestCase: ...@@ -227,7 +226,7 @@ class TDTestCase:
) )
tdSql.execute( tdSql.execute(
f'''insert into ct2 values f'''insert into {dbname}.ct2 values
( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 3888000000 + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time + 7776000000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
...@@ -243,13 +242,13 @@ class TDTestCase: ...@@ -243,13 +242,13 @@ class TDTestCase:
) )
for i in range(rows): for i in range(rows):
insert_data = f'''insert into t1 values insert_data = f'''insert into {dbname}.t1 values
( { now_time - i * 3600000 }, {i}, {i * 11111}, { i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2}, ( { now_time - i * 3600000 }, {i}, {i * 11111}, { i % 32767 }, { i % 127}, { i * 1.11111 }, { i * 1000.1111 }, { i % 2},
"binary_{i}", "nchar_测试_{i}", { now_time - 1000 * i } ) "binary_{i}", "nchar_测试_{i}", { now_time - 1000 * i } )
''' '''
tdSql.execute(insert_data) tdSql.execute(insert_data)
tdSql.execute( tdSql.execute(
f'''insert into t1 values f'''insert into {dbname}.t1 values
( { now_time + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time + 10800000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - (( rows // 2 ) * 60 + 30) * 60000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - (( rows // 2 ) * 60 + 30) * 60000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( { now_time - rows * 3600000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) ( { now_time - rows * 3600000 }, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
...@@ -269,22 +268,23 @@ class TDTestCase: ...@@ -269,22 +268,23 @@ class TDTestCase:
tdSql.prepare() tdSql.prepare()
tdLog.printNoPrefix("==========step1:create table") tdLog.printNoPrefix("==========step1:create table")
self.__create_tb() self.__create_tb(dbname="db")
tdLog.printNoPrefix("==========step2:insert data") tdLog.printNoPrefix("==========step2:insert data")
self.rows = 10 self.rows = 10
self.__insert_data(self.rows) self.__insert_data(self.rows, dbname="db")
tdLog.printNoPrefix("==========step3:all check") tdLog.printNoPrefix("==========step3:all check")
self.all_test() self.all_test(dbname="db")
tdDnodes.stop(1) # tdDnodes.stop(1)
tdDnodes.start(1) # tdDnodes.start(1)
tdSql.execute("flush database db")
tdSql.execute("use db") tdSql.execute("use db")
tdLog.printNoPrefix("==========step4:after wal, all check again ") tdLog.printNoPrefix("==========step4:after wal, all check again ")
self.all_test() self.all_test(dbname="db")
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
此差异已折叠。
...@@ -5,13 +5,14 @@ from util.sqlset import * ...@@ -5,13 +5,14 @@ from util.sqlset import *
class TDTestCase: class TDTestCase:
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__) tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(),logSql) tdSql.init(conn.cursor(),False)
self.setsql = TDSetSql() self.setsql = TDSetSql()
self.rowNum = 10 self.rowNum = 10
self.ts = 1537146000000 self.ts = 1537146000000
self.ntbname = 'ntb' dbname = "db"
self.stbname = 'stb' self.ntbname = f'{dbname}.ntb'
self.stbname = f'{dbname}.stb'
self.column_dict = { self.column_dict = {
'ts':'timestamp', 'ts':'timestamp',
'c1':'int', 'c1':'int',
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册