diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in
index 8be698e9c9e38a13aad2d06e5b303055d5fdbb04..b427177e5b812a149aa306e870dcd0c033da41a2 100644
--- a/cmake/taostools_CMakeLists.txt.in
+++ b/cmake/taostools_CMakeLists.txt.in
@@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
- GIT_TAG 01195d6
+ GIT_TAG 63635fc
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE
diff --git a/docs/en/12-taos-sql/02-database.md b/docs/en/12-taos-sql/02-database.md
index 91164a7072f9bf1613165faa512b79a493571e99..ec007d68302f719ec09b1b13f58a1a6bddb2748c 100644
--- a/docs/en/12-taos-sql/02-database.md
+++ b/docs/en/12-taos-sql/02-database.md
@@ -35,8 +35,8 @@ database_option: {
| TABLE_SUFFIX value
| TSDB_PAGESIZE value
| WAL_RETENTION_PERIOD value
- | WAL_ROLL_PERIOD value
| WAL_RETENTION_SIZE value
+ | WAL_ROLL_PERIOD value
| WAL_SEGMENT_SIZE value
}
```
@@ -75,11 +75,10 @@ database_option: {
- TABLE_PREFIX:The prefix length in the table name that is ignored when distributing table to vnode based on table name.
- TABLE_SUFFIX:The suffix length in the table name that is ignored when distributing table to vnode based on table name.
- TSDB_PAGESIZE: The page size of the data storage engine in a vnode. The unit is KB. The default is 4 KB. The range is 1 to 16384, that is, 1 KB to 16 MB.
-- WAL_RETENTION_PERIOD: specifies the maximum time of which WAL files are to be kept after consumption. This parameter is used for data subscription. Enter a time in seconds. The default value 0. A value of 0 indicates that WAL files are not required to keep after consumption. -1: the time of WAL files to keep has no upper limit.
-- WAL_RETENTION_SIZE: specifies the maximum total size of which WAL files are to be kept after consumption. This parameter is used for data subscription. Enter a size in KB. The default value is 0. A value of 0 indicates that WAL files are not required to keep after consumption. -1: the total size of WAL files to keep has no upper limit.
+- WAL_RETENTION_PERIOD: specifies the maximum time of which WAL files are to be kept for consumption. This parameter is used for data subscription. Enter a time in seconds. The default value 0. A value of 0 indicates that WAL files are not required to keep for consumption. Alter it with a proper value at first to create topics.
+- WAL_RETENTION_SIZE: specifies the maximum total size of which WAL files are to be kept for consumption. This parameter is used for data subscription. Enter a size in KB. The default value is 0. A value of 0 indicates that the total size of WAL files to keep for consumption has no upper limit.
- WAL_ROLL_PERIOD: specifies the time after which WAL files are rotated. After this period elapses, a new WAL file is created. The default value is 0. A value of 0 indicates that a new WAL file is created only after TSDB data in memory are flushed to disk.
- WAL_SEGMENT_SIZE: specifies the maximum size of a WAL file. After the current WAL file reaches this size, a new WAL file is created. The default value is 0. A value of 0 indicates that a new WAL file is created only after TSDB data in memory are flushed to disk.
-
### Example Statement
```sql
@@ -123,6 +122,8 @@ alter_database_option: {
| WAL_LEVEL value
| WAL_FSYNC_PERIOD value
| KEEP value
+ | WAL_RETENTION_PERIOD value
+ | WAL_RETENTION_SIZE value
}
```
diff --git a/docs/en/12-taos-sql/04-stable.md b/docs/en/12-taos-sql/04-stable.md
index 37257286372b403721032e48d56cc1f1c3559538..a1d103eaf04a30214a8c1ef88e43a949710bcdbc 100644
--- a/docs/en/12-taos-sql/04-stable.md
+++ b/docs/en/12-taos-sql/04-stable.md
@@ -13,12 +13,11 @@ create_definition:
col_name column_definition
column_definition:
- type_name [COMMENT 'string_value']
+ type_name
```
**More explanations**
- Each supertable can have a maximum of 4096 columns, including tags. The minimum number of columns is 3: a timestamp column used as the key, one tag column, and one data column.
-- When you create a supertable, you can add comments to columns and tags.
- The TAGS keyword defines the tag columns for the supertable. The following restrictions apply to tag columns:
- A tag column can use the TIMESTAMP data type, but the values in the column must be fixed numbers. Timestamps including formulae, such as "now + 10s", cannot be stored in a tag column.
- The name of a tag column cannot be the same as the name of any other column.
diff --git a/docs/en/12-taos-sql/29-changes.md b/docs/en/12-taos-sql/29-changes.md
index 341791d6755aa66fddd98561823ee0e5ac74ba3f..a695a2cae18f28e090816ec98a978674a028df30 100644
--- a/docs/en/12-taos-sql/29-changes.md
+++ b/docs/en/12-taos-sql/29-changes.md
@@ -27,7 +27,7 @@ The following data types can be used in the schema for standard tables.
| - | :------- | :-------- | :------- |
| 1 | ALTER ACCOUNT | Deprecated| This Enterprise Edition-only statement has been removed. It returns the error "This statement is no longer supported."
| 2 | ALTER ALL DNODES | Added | Modifies the configuration of all dnodes.
-| 3 | ALTER DATABASE | Modified | Deprecated
- QUORUM: Specified the required number of confirmations. STRICT is now used to specify strong or weak consistency. The STRICT parameter cannot be modified.
- BLOCKS: Specified the memory blocks used by each vnode. BUFFER is now used to specify the size of the write cache pool for each vnode.
- UPDATE: Specified whether update operations were supported. All databases now support updating data in certain columns.
- CACHELAST: Specified how to cache the newest row of data. CACHEMODEL now replaces CACHELAST.
- COMP: Cannot be modified.
Added - CACHEMODEL: Specifies whether to cache the latest subtable data.
- CACHESIZE: Specifies the size of the cache for the newest subtable data.
- WAL_FSYNC_PERIOD: Replaces the FSYNC parameter.
- WAL_LEVEL: Replaces the WAL parameter.
Modified - REPLICA: Cannot be modified.
- KEEP: Now supports units.
+| 3 | ALTER DATABASE | Modified | Deprecated- QUORUM: Specified the required number of confirmations. STRICT is now used to specify strong or weak consistency. The STRICT parameter cannot be modified.
- BLOCKS: Specified the memory blocks used by each vnode. BUFFER is now used to specify the size of the write cache pool for each vnode.
- UPDATE: Specified whether update operations were supported. All databases now support updating data in certain columns.
- CACHELAST: Specified how to cache the newest row of data. CACHEMODEL now replaces CACHELAST.
- COMP: Cannot be modified.
Added - CACHEMODEL: Specifies whether to cache the latest subtable data.
- CACHESIZE: Specifies the size of the cache for the newest subtable data.
- WAL_FSYNC_PERIOD: Replaces the FSYNC parameter.
- WAL_LEVEL: Replaces the WAL parameter.
- WAL_RETENTION_PERIOD: specifies the time after which WAL files are deleted. This parameter is used for data subscription.
- WAL_RETENTION_SIZE: specifies the size at which WAL files are deleted. This parameter is used for data subscription.
Modified - REPLICA: Cannot be modified.
- KEEP: Now supports units.
| 4 | ALTER STABLE | Modified | Deprecated- CHANGE TAG: Modified the name of a tag. Replaced by RENAME TAG.
Added - RENAME TAG: Replaces CHANGE TAG.
- COMMENT: Specifies comments for a supertable.
| 5 | ALTER TABLE | Modified | Deprecated- CHANGE TAG: Modified the name of a tag. Replaced by RENAME TAG.
Added - RENAME TAG: Replaces CHANGE TAG.
- COMMENT: Specifies comments for a standard table.
- TTL: Specifies the time-to-live for a standard table.
| 6 | ALTER USER | Modified | Deprecated- PRIVILEGE: Specified user permissions. Replaced by GRANT and REVOKE.
Added - ENABLE: Enables or disables a user.
- SYSINFO: Specifies whether a user can query system information.
diff --git a/docs/zh/12-taos-sql/02-database.md b/docs/zh/12-taos-sql/02-database.md
index 4c698dad9235bdb5ae683cd86025138761f7d1d3..b54998e08d33869d113ae883b14f929e5381f168 100644
--- a/docs/zh/12-taos-sql/02-database.md
+++ b/docs/zh/12-taos-sql/02-database.md
@@ -35,8 +35,8 @@ database_option: {
| TABLE_SUFFIX value
| TSDB_PAGESIZE value
| WAL_RETENTION_PERIOD value
- | WAL_ROLL_PERIOD value
| WAL_RETENTION_SIZE value
+ | WAL_ROLL_PERIOD value
| WAL_SEGMENT_SIZE value
}
```
@@ -75,11 +75,10 @@ database_option: {
- TABLE_PREFIX:内部存储引擎根据表名分配存储该表数据的 VNODE 时要忽略的前缀的长度。
- TABLE_SUFFIX:内部存储引擎根据表名分配存储该表数据的 VNODE 时要忽略的后缀的长度。
- TSDB_PAGESIZE:一个 VNODE 中时序数据存储引擎的页大小,单位为 KB,默认为 4 KB。范围为 1 到 16384,即 1 KB到 16 MB。
-- WAL_RETENTION_PERIOD:数据订阅已消费WAL日志,WAL文件的最大额外保留的时长策略。单位为 s。默认为 0,表示无需额外保留。-1, 表示额外保留,时间无上限。
-- WAL_RETENTION_SIZE:数据订阅已消费WAL日志,WAL文件的最大额外保留的累计大小策略。单位为 KB。默认为 0,表示无需额外保留。-1, 表示额外保留,累计大小无上限。
+- WAL_RETENTION_PERIOD: 为了数据订阅消费,需要WAL日志文件额外保留的最大时长策略。WAL日志清理,不受订阅客户端消费状态影响。单位为 s。默认为 0,表示无需为订阅保留。新建订阅,应先设置恰当的时长策略。
+- WAL_RETENTION_SIZE:为了数据订阅消费,需要WAL日志文件额外保留的最大累计大小策略。单位为 KB。默认为 0,表示累计大小无上限。
- WAL_ROLL_PERIOD:wal 文件切换时长,单位为 s。当WAL文件创建并写入后,经过该时间,会自动创建一个新的WAL文件。默认为 0,即仅在TSDB落盘时创建新文件。
- WAL_SEGMENT_SIZE:wal 单个文件大小,单位为 KB。当前写入文件大小超过上限后会自动创建一个新的WAL文件。默认为 0,即仅在TSDB落盘时创建新文件。
-
### 创建数据库示例
```sql
diff --git a/docs/zh/12-taos-sql/04-stable.md b/docs/zh/12-taos-sql/04-stable.md
index c5933228decfb3eb865cad058fa6fbba77fefb58..74ef52ee7c53d6c619a47d7e6453b56f152104a2 100644
--- a/docs/zh/12-taos-sql/04-stable.md
+++ b/docs/zh/12-taos-sql/04-stable.md
@@ -13,12 +13,11 @@ create_definition:
col_name column_definition
column_definition:
- type_name [COMMENT 'string_value']
+ type_name
```
**使用说明**
- 超级表中列的最大个数为 4096,需要注意,这里的 4096 是包含 TAG 列在内的,最小个数为 3,包含一个时间戳主键、一个 TAG 列和一个数据列。
-- 建表时可以给列或标签附加注释。
- TAGS语法指定超级表的标签列,标签列需要遵循以下约定:
- TAGS 中的 TIMESTAMP 列写入数据时需要提供给定值,而暂不支持四则运算,例如 NOW + 10s 这类表达式。
- TAGS 列名不能与其他列名相同。
diff --git a/docs/zh/12-taos-sql/29-changes.md b/docs/zh/12-taos-sql/29-changes.md
index 9d67533cdea5c055c8d3ff58000fb80ac640e271..af45d84ff22b32325f0eabfbcb4794f30c71c79b 100644
--- a/docs/zh/12-taos-sql/29-changes.md
+++ b/docs/zh/12-taos-sql/29-changes.md
@@ -27,7 +27,7 @@ description: "TDengine 3.0 版本的语法变更说明"
| - | :------- | :-------- | :------- |
| 1 | ALTER ACCOUNT | 废除 | 2.x中为企业版功能,3.0不再支持。语法暂时保留了,执行报“This statement is no longer supported”错误。
| 2 | ALTER ALL DNODES | 新增 | 修改所有DNODE的参数。
-| 3 | ALTER DATABASE | 调整 | 废除- QUORUM:写入需要的副本确认数。3.0版本使用STRICT来指定强一致还是弱一致。3.0.0版本STRICT暂不支持修改。
- BLOCKS:VNODE使用的内存块数。3.0版本使用BUFFER来表示VNODE写入内存池的大小。
- UPDATE:更新操作的支持模式。3.0版本所有数据库都支持部分列更新。
- CACHELAST:缓存最新一行数据的模式。3.0版本用CACHEMODEL代替。
- COMP:3.0版本暂不支持修改。
新增 - CACHEMODEL:表示是否在内存中缓存子表的最近数据。
- CACHESIZE:表示缓存子表最近数据的内存大小。
- WAL_FSYNC_PERIOD:代替原FSYNC参数。
- WAL_LEVEL:代替原WAL参数。
调整 - REPLICA:3.0.0版本暂不支持修改。
- KEEP:3.0版本新增支持带单位的设置方式。
+| 3 | ALTER DATABASE | 调整 | 废除- QUORUM:写入需要的副本确认数。3.0版本使用STRICT来指定强一致还是弱一致。3.0.0版本STRICT暂不支持修改。
- BLOCKS:VNODE使用的内存块数。3.0版本使用BUFFER来表示VNODE写入内存池的大小。
- UPDATE:更新操作的支持模式。3.0版本所有数据库都支持部分列更新。
- CACHELAST:缓存最新一行数据的模式。3.0版本用CACHEMODEL代替。
- COMP:3.0版本暂不支持修改。
新增 - CACHEMODEL:表示是否在内存中缓存子表的最近数据。
- CACHESIZE:表示缓存子表最近数据的内存大小。
- WAL_FSYNC_PERIOD:代替原FSYNC参数。
- WAL_LEVEL:代替原WAL参数。
- WAL_RETENTION_PERIOD:3.0.4.0版本新增,wal文件的额外保留策略,用于数据订阅。
- WAL_RETENTION_SIZE:3.0.4.0版本新增,wal文件的额外保留策略,用于数据订阅。
调整 - REPLICA:3.0.0版本暂不支持修改。
- KEEP:3.0版本新增支持带单位的设置方式。
| 4 | ALTER STABLE | 调整 | 废除- CHANGE TAG:修改标签列的名称。3.0版本使用RENAME TAG代替。
新增 - RENAME TAG:代替原CHANGE TAG子句。
- COMMENT:修改超级表的注释。
| 5 | ALTER TABLE | 调整 | 废除- CHANGE TAG:修改标签列的名称。3.0版本使用RENAME TAG代替。
新增 - RENAME TAG:代替原CHANGE TAG子句。
- COMMENT:修改表的注释。
- TTL:修改表的生命周期。
| 6 | ALTER USER | 调整 | 废除- PRIVILEGE:修改用户权限。3.0版本使用GRANT和REVOKE来授予和回收权限。
新增 - ENABLE:启用或停用此用户。
- SYSINFO:修改用户是否可查看系统信息。
diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task
index 0515a5cb69c445a23626c412d0bbda60ad361d14..39415ed0bd666a5add74789f1f165782bb430742 100644
--- a/tests/parallel_test/cases.task
+++ b/tests/parallel_test/cases.task
@@ -130,6 +130,7 @@
,,n,system-test,python3 ./test.py -f 0-others/compatibility.py
,,n,system-test,python3 ./test.py -f 0-others/tag_index_basic.py
,,n,system-test,python3 ./test.py -f 0-others/udfpy_main.py
+,,n,system-test,python3 ./test.py -N 3 -f 0-others/walRetention.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py
diff --git a/tests/system-test/0-others/walRetention.py b/tests/system-test/0-others/walRetention.py
new file mode 100644
index 0000000000000000000000000000000000000000..2b340b79697f874a1e300970fac74f90529f0cc9
--- /dev/null
+++ b/tests/system-test/0-others/walRetention.py
@@ -0,0 +1,472 @@
+###################################################################
+# Copyright (c) 2016 by TAOS Technologies, Inc.
+# All rights reserved.
+#
+# This file is proprietary and confidential to TAOS Technologies.
+# No part of this file may be reproduced, stored, transmitted,
+# disclosed or used in any form or by any means other than as
+# expressly provided by the written permission from Jianhui Tao
+#
+###################################################################
+
+# -*- coding: utf-8 -*-
+
+#
+# The option for wal_retetion_period and wal_retention_size is work well
+#
+
+import taos
+from taos.tmq import Consumer
+
+from util.log import *
+from util.cases import *
+from util.sql import *
+from util.common import *
+from util.sqlset import *
+
+
+import os
+import threading
+import json
+import time
+from datetime import date
+from datetime import datetime
+from datetime import timedelta
+from os import path
+
+
+#
+# -------------- util --------------------------
+#
+def pathSize(path):
+
+ total_size = 0
+ for dirpath, dirnames, filenames in os.walk(path):
+ for i in filenames:
+ # use join to concatenate all the components of path
+ f = os.path.join(dirpath, i)
+ # use getsize to generate size in bytes and add it to the total size
+ total_size += os.path.getsize(f)
+ # print(dirpath)
+
+ print(" %s %.02f MB" % (path, total_size/1024/1024))
+ return total_size
+
+
+# load json from file
+def jsonFromFile(jsonFile):
+ fp = open(jsonFile)
+ return json.load(fp)
+
+
+#
+# ----------------- class ------------------
+#
+
+# wal file object
+class WalFile:
+ def __init__(self, pathFile, fileName):
+ self.mtime = os.path.getmtime(pathFile)
+ self.startVer = int(fileName)
+ self.fsize = os.path.getsize(pathFile)
+ self.endVer = -1
+ self.pathFile = pathFile
+
+ def needDelete(self, delTsLine):
+ return True
+
+# VNode object
+class VNode :
+ # init
+ def __init__(self, dnodeId, path, walPeriod, walSize, walStayRange):
+ self.path = path
+ self.dnodeId = dnodeId
+ self.vgId = 0
+ self.snapVer = 0
+ self.firstVer = 0
+ self.lastVer = -1
+ self.walPeriod = walPeriod
+ self.walSize = walSize
+ self.walStayRange = walStayRange
+ self.walFiles = []
+ self.load(path)
+
+ # load
+ def load(self, path):
+ # load wal
+ walPath = os.path.join(path, "wal")
+ metaFile = ""
+ with os.scandir(walPath) as items:
+ for item in items:
+ if item.is_file():
+ fileName, fileExt = os.path.splitext(item.name)
+ pathFile = os.path.join(walPath, item)
+ if fileExt == ".log":
+ self.walFiles.append(WalFile(pathFile, fileName))
+ elif fileExt == "":
+ if fileName[:8] == "meta-ver":
+ metaFile = pathFile
+ # load config
+ tdLog.info(f' meta-ver file={metaFile}')
+ if metaFile != "":
+ jsonVer = jsonFromFile(metaFile)
+ metaNode = jsonVer["meta"]
+ self.snapVer = int(metaNode["snapshotVer"])
+ self.firstVer = int(metaNode["firstVer"])
+ self.lastVer = int(metaNode["lastVer"])
+
+ # sort with startVer
+ self.walFiles = sorted(self.walFiles, key=lambda x : x.startVer, reverse=True)
+ # set endVer
+ startVer = -1
+ for walFile in self.walFiles:
+ if startVer == -1:
+ startVer = walFile.startVer
+ continue
+ walFile.endVer = startVer - 1
+ startVer = walFile.startVer
+
+ # print total
+ tdLog.info(f" ---- dnode{self.dnodeId} snapVer={self.snapVer} firstVer={self.firstVer} lastVer={self.lastVer} {self.path} --------")
+ for walFile in self.walFiles:
+ mt = datetime.fromtimestamp(walFile.mtime)
+ tdLog.info(f" {walFile.pathFile} {mt} startVer={walFile.startVer} endVer={walFile.endVer}")
+
+ # snapVer compare
+ def canDelete(self, walFile):
+ if walFile.endVer == -1:
+ # end file
+ return False
+
+ # check snapVer
+ ret = False
+ if self.snapVer > walFile.endVer:
+ ret = True
+
+ # check stayRange
+ if self.lastVer != -1 and ret:
+ # first wal file ignore
+ if walFile.startVer == self.firstVer:
+ tdLog.info(f" {walFile.pathFile} can del, but is first. snapVer={self.snapVer} firstVer={self.firstVer}")
+ return False
+
+ # ver in stay range
+ smallVer = self.snapVer - self.walStayRange -1
+ if walFile.startVer >= smallVer:
+ tdLog.info(f" {walFile.pathFile} can del, but range not arrived. snapVer={self.snapVer} smallVer={smallVer}")
+ return False
+
+ return ret
+
+ # get log size
+ def getWalsSize(self):
+ size = 0
+ for walFile in self.walFiles:
+ size += walFile.fsize
+
+ return size
+
+ # vnode
+ def check_retention(self):
+ #
+ # check period
+ #
+ delta = self.walPeriod
+ if self.walPeriod == 0:
+ delta += 1 * 60 # delete after 1 minutes
+ elif self.walPeriod < 3600:
+ delta += 3 * 60 # 5 minutes
+ else:
+ delta += 5 * 60 # 10 minutes
+
+ delTsLine = datetime.now() - timedelta(seconds = delta)
+ delTs = delTsLine.timestamp()
+ for walFile in self.walFiles:
+ mt = datetime.fromtimestamp(walFile.mtime)
+ info = f" {walFile.pathFile} mt={mt} line={delTsLine} start={walFile.startVer} snap={self.snapVer} end= {walFile.endVer}"
+ tdLog.info(info)
+ if walFile.mtime < delTs and self.canDelete(walFile):
+ # wait a moment then check file exist
+ time.sleep(1)
+ if os.path.exists(walFile.pathFile):
+ #report error
+ tdLog.exit(f" wal file expired need delete. \n {walFile.pathFile} \n modify time={mt} \n delTsLine={delTsLine}\n start={walFile.startVer} snap={self.snapVer} end= {walFile.endVer}")
+ return False
+
+ #
+ # check size
+ #
+ if self.walSize == 0:
+ return True
+
+ vnodeSize = self.getWalsSize()
+ if vnodeSize < self.walSize:
+ tdLog.info(f" wal size valid. {self.path} real = {vnodeSize} set = {self.walSize} ")
+ return True
+
+ # check valid
+ tdLog.info(f" wal size over set. {self.path} real = {vnodeSize} set = {self.walSize} ")
+ for walFile in self.walFiles:
+ if self.canDelete(walFile):
+ # wait a moment then check file exist
+ time.sleep(1)
+ if os.path.exists(walFile.pathFile):
+ tdLog.exit(f" wal file size over .\
+ \n wal file = {walFile.pathFile}\
+ \n snapVer = {self.snapVer}\
+ \n real = {vnodeSize} bytes\
+ \n set = {self.walSize} bytes")
+ return False
+ return True
+
+
+# insert by async
+def thread_insert(testCase, tbname, rows):
+ print(f"start thread... {tbname} - {rows} \n")
+ new_conn = testCase.new_connect()
+ testCase.insert_data(tbname, rows, new_conn)
+ new_conn.close()
+ print("end thread\n")
+
+# case
+class TDTestCase:
+ def init(self, conn, logSql, replicaVar=1):
+ self.ts = 1670000000000
+ self.replicaVar = int(replicaVar)
+ tdLog.debug("start to execute %s" % __file__)
+ tdSql.init(conn.cursor())
+ self.setsql = TDSetSql()
+ self.conn = conn
+
+ # init cluster path
+ selfPath = os.path.dirname(os.path.realpath(__file__))
+ if ("community" in selfPath):
+ projPath = selfPath[:selfPath.find("community")]
+ else:
+ projPath = selfPath[:selfPath.find("tests")]
+ self.projDir = f"{projPath}sim/"
+ tdLog.info(f" init projPath={self.projDir}")
+
+ self.column_dict = {
+ 'ts': 'timestamp',
+ 'col1': 'tinyint',
+ 'col2': 'smallint',
+ 'col3': 'int',
+ 'col4': 'bigint',
+ 'col5': 'tinyint unsigned',
+ 'col6': 'smallint unsigned',
+ 'col7': 'int unsigned',
+ 'col8': 'bigint unsigned',
+ 'col9': 'float',
+ 'col10': 'double',
+ 'col11': 'bool',
+ 'col12': 'varchar(120)',
+ 'col13': 'nchar(100)',
+ }
+ self.tag_dict = {
+ 't1': 'tinyint',
+ 't2': 'smallint',
+ 't3': 'int',
+ 't4': 'bigint',
+ 't5': 'tinyint unsigned',
+ 't6': 'smallint unsigned',
+ 't7': 'int unsigned',
+ 't8': 'bigint unsigned',
+ 't9': 'float',
+ 't10': 'double',
+ 't11': 'bool',
+ 't12': 'varchar(120)',
+ 't13': 'nchar(100)',
+ }
+
+ # malloc new connect
+ def new_connect(self):
+ return taos.connect(host = self.conn._host,
+ user = self.conn._user,
+ password = self.conn._password,
+ database = self.dbname,
+ port = self.conn._port,
+ config = self.conn._config)
+
+ def set_stb_sql(self,stbname,column_dict,tag_dict):
+ column_sql = ''
+ tag_sql = ''
+ for k,v in column_dict.items():
+ column_sql += f"{k} {v}, "
+ for k,v in tag_dict.items():
+ tag_sql += f"{k} {v}, "
+ create_stb_sql = f'create stable {stbname} ({column_sql[:-2]}) tags ({tag_sql[:-2]})'
+ return create_stb_sql
+
+ def create_database(self, dbname, wal_period, wal_size_kb, vgroups):
+ self.wal_period = wal_period
+ self.wal_size = wal_size_kb * 1024
+ self.vgroups = vgroups
+ self.dbname = dbname
+ tdSql.execute(f"create database {dbname} wal_retention_period {wal_period} wal_retention_size {wal_size_kb} vgroups {vgroups} replica 3")
+ tdSql.execute(f'use {dbname}')
+
+ # create stable and child tables
+ def create_table(self, stbname, tbname, count):
+ self.child_count = count
+ self.stbname = stbname
+ self.tbname = tbname
+
+ # create stable
+ create_table_sql = self.set_stb_sql(stbname, self.column_dict, self.tag_dict)
+ tdSql.execute(create_table_sql)
+
+ batch_size = 1000
+ # create child table
+ for i in range(count):
+ ti = i % 128
+ tags = f'{ti},{ti},{i},{i},{ti},{ti},{i},{i},{i}.000{i},{i}.000{i},true,"var{i}","nch{i}"'
+ sql = f'create table {tbname}{i} using {stbname} tags({tags});'
+ tdSql.execute(sql)
+ if i % batch_size == 0:
+ tdLog.info(f" create child table {i} ...")
+
+ tdLog.info(f" create {count} child tables ok.")
+
+
+ # insert to child table d1 data
+ def insert_data(self, tbname, insertTime):
+ start = time.time()
+ values = ""
+ child_name = ""
+ cnt = 0
+ rows = 10000000000
+ for j in range(rows):
+ for i in range(self.child_count):
+ tj = j % 128
+ cols = f'{tj},{tj},{j},{j},{tj},{tj},{j},{j},{j}.000{j},{j}.000{j},true,"var{j}","nch{j}涛思数据codepage is utf_32_le"'
+ sql = f'insert into {tbname}{i} values ({self.ts},{cols});'
+ tdSql.execute(sql)
+ self.ts += 1
+ #tdLog.info(f" child table={i} rows={j} insert data.")
+ cost = time.time() - start
+ if j % 100 == 0:
+ tdSql.execute(f"flush database {self.dbname}")
+ tdLog.info(" insert row cost time = %ds rows = %d"%(cost, j))
+ self.consume_topic("topic1", 5)
+
+ if cost > insertTime and j > 100:
+ tdLog.info(" insert finished. cost time = %ds rows = %d"%(cost, j))
+ return
+
+ # create tmq
+ def create_tmq(self):
+ sql = f"create topic topic1 as select ts, col1, concat(col12,t12) from {self.stbname};"
+ tdSql.execute(sql)
+ sql = f"create topic topic2 as select * from {self.stbname};"
+ tdSql.execute(sql)
+ #tdLog.info(sql)
+
+ def check_retention(self, walStayRange):
+ # flash database
+ tdSql.execute(f"flush database {self.dbname}")
+ time.sleep(0.5)
+
+ vnodes = []
+ # put all vnode to list
+ for dnode in os.listdir(self.projDir):
+ vnodeDir = self.projDir + f"{dnode}/data/vnode/"
+ print(f"vnodeDir={vnodeDir}")
+ if os.path.isdir(vnodeDir) == False or dnode[:5] != "dnode":
+ continue
+ # enum all vnode
+ for entry in os.listdir(vnodeDir):
+ entryPath = path.join(vnodeDir, entry)
+
+ if os.path.isdir(entryPath):
+ if path.exists(path.join(entryPath, "vnode.json")):
+ vnode = VNode(int(dnode[5:]), entryPath, self.wal_period, self.wal_size, walStayRange)
+ vnodes.append(vnode)
+
+ # do check
+ for vnode in vnodes:
+ vnode.check_retention()
+
+ # consume topic
+ def consume_topic(self, topic_name, consume_cnt):
+ print("start consume...")
+ consumer = Consumer(
+ {
+ "group.id": "tg2",
+ "td.connect.user": "root",
+ "td.connect.pass": "taosdata",
+ "enable.auto.commit": "true",
+ }
+ )
+ print("start subscrite...")
+ consumer.subscribe([topic_name])
+
+ cnt = 0
+ try:
+ while True and cnt < consume_cnt:
+ res = consumer.poll(1)
+ if not res:
+ break
+ err = res.error()
+ if err is not None:
+ raise err
+ val = res.value()
+ cnt += 1
+ print(f" consume {cnt} ")
+ for block in val:
+ print(block.fetchall())
+ finally:
+ consumer.unsubscribe()
+ consumer.close()
+
+
+ # test db1
+ def test_db(self, dbname, checkTime ,wal_period, wal_size_kb):
+ # var
+ stable = "meters"
+ tbname = "d"
+ vgroups = 6
+ count = 10
+
+ # do
+ self.create_database(dbname, wal_period, wal_size_kb, vgroups)
+ self.create_table(stable, tbname, count)
+
+ # create tmq
+ self.create_tmq()
+
+ # insert data
+ self.insert_data(tbname, checkTime)
+
+ #stopInsert = False
+ #tobj = threading.Thread(target = thread_insert, args=(self, tbname, rows))
+ #tobj.start()
+
+ # check retention
+ tdLog.info(f" -------------- do check retention ---------------")
+ self.check_retention(walStayRange = 256)
+
+
+ # stop insert and wait exit
+ tdLog.info(f" {dbname} stop insert ...")
+ tdLog.info(f" {dbname} test_db end.")
+
+
+ # run
+ def run(self):
+ # period
+ #self.test_db("db1", 10, 60, 0)
+ # size
+ #self.test_db("db2", 5, 10*24*3600, 2*1024) # 2M size
+
+ # period + size
+ self.test_db("db", checkTime = 5*60, wal_period = 60, wal_size_kb=10)
+ #self.test_db("db", checkTime = 3*60, wal_period = 0, wal_size_kb=0)
+
+
+ def stop(self):
+ tdSql.close()
+ tdLog.success("%s successfully executed" % __file__)
+
+tdCases.addWindows(__file__, TDTestCase())
+tdCases.addLinux(__file__, TDTestCase())