Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
dbc119f1
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
dbc119f1
编写于
7月 20, 2022
作者:
P
plum-lihui
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
test: add test case for tmq
上级
245ec427
变更
5
展开全部
隐藏空白更改
内联
并排
Showing
5 changed file
with
713 addition
and
23 deletion
+713
-23
tests/system-test/7-tmq/tmqAutoCreateTbl.py
tests/system-test/7-tmq/tmqAutoCreateTbl.py
+1
-1
tests/system-test/7-tmq/tmqDelete-1ctb.py
tests/system-test/7-tmq/tmqDelete-1ctb.py
+164
-21
tests/system-test/7-tmq/tmqDelete-multiCtb.py
tests/system-test/7-tmq/tmqDelete-multiCtb.py
+416
-0
tests/system-test/7-tmq/tmqDropStb.py
tests/system-test/7-tmq/tmqDropStb.py
+129
-0
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+3
-1
未找到文件。
tests/system-test/7-tmq/tmqAutoCreateTbl.py
浏览文件 @
dbc119f1
...
...
@@ -184,7 +184,7 @@ class TDTestCase:
tdLog
.
info
(
"create topics from stb1"
)
topicFromStb1
=
'topic_stb1'
# queryString = "select ts, c1, c2 from %s.%s "%(paraDict['dbName'], paraDict['stbName'])
queryString
=
"select ts, c1, c2 from %s.%s where t4 == 'shanghai'
or t4 == 'changsha'
"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
queryString
=
"select ts, c1, c2 from %s.%s where t4 == 'shanghai'
and t5 == 'shanghai'
"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as %s"
%
(
topicFromStb1
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
...
...
tests/system-test/7-tmq/tmqDelete-1ctb.py
浏览文件 @
dbc119f1
...
...
@@ -79,6 +79,17 @@ class TDTestCase:
tdLog
.
debug
(
"del data ............ [OK]"
)
return
def
threadFunctionForDeletaData
(
self
,
**
paraDict
):
# create new connector for new tdSql instance in my thread
newTdSql
=
tdCom
.
newTdSql
()
self
.
delData
(
newTdSql
,
paraDict
[
"dbName"
],
paraDict
[
"ctbPrefix"
],
paraDict
[
"ctbNum"
],
paraDict
[
"startTs"
],
paraDict
[
"endTs"
],
paraDict
[
"ctbStartIdx"
])
return
def
asyncDeleteData
(
self
,
paraDict
):
pThread
=
threading
.
Thread
(
target
=
self
.
threadFunctionForDeletaData
,
kwargs
=
paraDict
)
pThread
.
start
()
return
pThread
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
...
...
@@ -117,11 +128,17 @@ class TDTestCase:
queryString
=
"select ts, c1, c2 from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as %s"
%
(
topicFromStb1
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
tdSql
.
execute
(
sqlString
)
if
self
.
snapshot
==
0
:
consumerId
=
0
elif
self
.
snapshot
==
1
:
consumerId
=
1
rowsOfDelete
=
0
# paraDict['ctbNum'] = self.ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
consumerId
=
0
expectrowcnt
=
int
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
])
topicList
=
topicFromStb1
ifcheckdata
=
1
...
...
@@ -135,7 +152,7 @@ class TDTestCase:
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tdLog
.
info
(
"
insert process end, and
start to check consume result"
)
tdLog
.
info
(
"start to check consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
...
...
@@ -143,13 +160,18 @@ class TDTestCase:
totalConsumeRows
+=
resultList
[
i
]
tdSql
.
query
(
queryString
)
totalRowsInserted
=
tdSql
.
getRows
()
tdLog
.
info
(
"act consume rows: %d, expect consume rows: %d, act insert rows: %d"
%
(
totalConsumeRows
,
expectrowcnt
,
totalRowsInserted
))
if
totalConsumeRows
!=
expectrowcnt
:
tdLog
.
exit
(
"tmq consume rows error!"
)
totalRowsFromQuery
=
tdSql
.
getRows
()
tdLog
.
info
(
"act consume rows: %d, expect consume rows: %d, act query rows: %d"
%
(
totalConsumeRows
,
expectrowcnt
,
totalRowsFromQuery
))
if
self
.
snapshot
==
0
:
if
totalConsumeRows
!=
expectrowcnt
:
tdLog
.
exit
(
"tmq consume rows error with snapshot = 0!"
)
elif
self
.
snapshot
==
1
:
if
totalConsumeRows
!=
totalRowsFromQuery
:
tdLog
.
exit
(
"tmq consume rows error with snapshot = 1!"
)
tmqCom
.
checkFileContent
(
consumerId
,
queryString
,
rowsOfDelete
)
tmqCom
.
checkFileContent
(
consumerId
=
consumerId
,
queryString
=
queryString
,
skipRowsOfCons
=
rowsOfDelete
)
tdSql
.
query
(
"drop topic %s"
%
topicFromStb1
)
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
...
...
@@ -213,9 +235,11 @@ class TDTestCase:
consumerId
=
1
if
self
.
snapshot
==
0
:
consumerId
=
2
expectrowcnt
=
int
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
(
1
+
1
/
4
+
3
/
4
))
elif
self
.
snapshot
==
1
:
expectrowcnt
=
int
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
(
1
-
1
/
4
+
1
/
4
+
3
/
4
-
1
/
4
))
consumerId
=
3
expectrowcnt
=
int
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
(
1
-
1
/
4
+
1
/
4
+
3
/
4
))
topicList
=
topicFromStb1
ifcheckdata
=
1
...
...
@@ -237,33 +261,152 @@ class TDTestCase:
totalConsumeRows
+=
resultList
[
i
]
tdSql
.
query
(
queryString
)
totalRowsInserted
=
tdSql
.
getRows
()
tdLog
.
info
(
"act consume rows: %d, act insert rows: %d, expect consume rows: %d, "
%
(
totalConsumeRows
,
totalRowsInserted
,
expectrowcnt
))
totalRowsFromQuery
=
tdSql
.
getRows
()
if
totalConsumeRows
!=
expectrowcnt
:
tdLog
.
exit
(
"tmq consume rows error!"
)
tdLog
.
info
(
"act consume rows: %d, act query rows: %d, expect consume rows: %d, "
%
(
totalConsumeRows
,
totalRowsFromQuery
,
expectrowcnt
))
if
self
.
snapshot
==
0
:
if
totalConsumeRows
!=
expectrowcnt
:
tdLog
.
exit
(
"tmq consume rows error with snapshot = 0!"
)
elif
self
.
snapshot
==
1
:
if
totalConsumeRows
!=
totalRowsFromQuery
:
tdLog
.
exit
(
"tmq consume rows error with snapshot = 1!"
)
# tmqCom.checkFileContent(consumerId, queryString)
tdSql
.
query
(
"drop topic %s"
%
topicFromStb1
)
tdLog
.
printNoPrefix
(
"======== test case 2 end ...... "
)
def
tmqCase3
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 3: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
1
,
'rowsPerTbl'
:
10000
,
'batchNum'
:
5000
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
5
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
paraDict
[
'snapshot'
]
=
self
.
snapshot
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tdLog
.
info
(
"restart taosd to ensure that the data falls into the disk"
)
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
tmqCom
.
initConsumerTable
()
tdLog
.
info
(
"create topics from stb1"
)
topicFromStb1
=
'topic_stb1'
queryString
=
"select ts, c1, c2 from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as %s"
%
(
topicFromStb1
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
# paraDict['ctbNum'] = self.ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
consumerId
=
1
if
self
.
snapshot
==
0
:
consumerId
=
4
expectrowcnt
=
int
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
(
1
+
1
/
4
+
3
/
4
))
elif
self
.
snapshot
==
1
:
consumerId
=
5
expectrowcnt
=
int
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
(
1
-
1
/
4
+
1
/
4
+
3
/
4
))
topicList
=
topicFromStb1
ifcheckdata
=
1
ifManualCommit
=
1
keyList
=
'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
# del some data
rowsOfDelete
=
int
(
self
.
rowsPerTbl
/
4
)
paraDict
[
"endTs"
]
=
paraDict
[
"startTs"
]
+
rowsOfDelete
-
1
pDeleteThread
=
self
.
asyncDeleteData
(
paraDict
)
# update to 1/4 rows and insert 3/4 new rows
paraDict
[
'startTs'
]
=
paraDict
[
'startTs'
]
+
int
(
self
.
rowsPerTbl
*
3
/
4
)
# paraDict['rowsPerTbl'] = self.rowsPerTbl
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
pInsertThread
=
tmqCom
.
asyncInsertDataByInterlace
(
paraDict
)
pInsertThread
.
join
()
tdLog
.
info
(
"start to check consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
tdSql
.
query
(
queryString
)
totalRowsFromQuery
=
tdSql
.
getRows
()
tdLog
.
info
(
"act consume rows: %d, act query rows: %d, expect consume rows: %d, "
%
(
totalConsumeRows
,
totalRowsFromQuery
,
expectrowcnt
))
if
self
.
snapshot
==
0
:
if
totalConsumeRows
!=
expectrowcnt
:
tdLog
.
exit
(
"tmq consume rows error with snapshot = 0!"
)
elif
self
.
snapshot
==
1
:
if
totalConsumeRows
!=
totalRowsFromQuery
:
tdLog
.
exit
(
"tmq consume rows error with snapshot = 1!"
)
# tmqCom.checkFileContent(consumerId, queryString)
tdSql
.
query
(
"drop topic %s"
%
topicFromStb1
)
tdLog
.
printNoPrefix
(
"======== test case 3 end ...... "
)
def
run
(
self
):
tdSql
.
prepare
()
self
.
prepareTestEnv
()
# tdSql.prepare()
tdLog
.
printNoPrefix
(
"============================================="
)
tdLog
.
printNoPrefix
(
"======== snapshot is 0: only consume from wal"
)
self
.
snapshot
=
0
self
.
prepareTestEnv
()
self
.
tmqCase1
()
self
.
tmqCase2
()
self
.
tmqCase2
()
self
.
prepareTestEnv
()
tdLog
.
printNoPrefix
(
"===================================================================="
)
tdLog
.
printNoPrefix
(
"======== snapshot is 1: firstly consume from tsbs, and then from wal"
)
self
.
snapshot
=
1
self
.
prepareTestEnv
()
self
.
tmqCase1
()
self
.
tmqCase2
()
self
.
tmqCase2
()
tdLog
.
printNoPrefix
(
"============================================="
)
tdLog
.
printNoPrefix
(
"======== snapshot is 0: only consume from wal"
)
self
.
snapshot
=
0
self
.
prepareTestEnv
()
self
.
tmqCase3
()
tdLog
.
printNoPrefix
(
"===================================================================="
)
tdLog
.
printNoPrefix
(
"======== snapshot is 1: firstly consume from tsbs, and then from wal"
)
self
.
snapshot
=
1
self
.
prepareTestEnv
()
self
.
tmqCase3
()
def
stop
(
self
):
tdSql
.
close
()
...
...
tests/system-test/7-tmq/tmqDelete-multiCtb.py
0 → 100644
浏览文件 @
dbc119f1
此差异已折叠。
点击以展开。
tests/system-test/7-tmq/tmqDropStb.py
0 → 100644
浏览文件 @
dbc119f1
import
sys
import
time
import
socket
import
os
import
threading
import
taos
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
paraDict
=
{
'dbName'
:
'db1'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
2
,
'stbName'
:
'stb0'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
2
},
{
'type'
:
'binary'
,
'len'
:
16
,
'count'
:
1
},
{
'type'
:
'timestamp'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
100
,
'rowsPerTbl'
:
10000
,
'batchNum'
:
2000
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
20
,
'showMsg'
:
1
,
'showRow'
:
1
}
cdbName
=
'cdb'
# some parameter to consumer processor
consumerId
=
0
expectrowcnt
=
0
topicList
=
''
ifcheckdata
=
0
ifManualCommit
=
1
groupId
=
'group.id:cgrp1'
autoCommit
=
'enable.auto.commit:false'
autoCommitInterval
=
'auto.commit.interval.ms:1000'
autoOffset
=
'auto.offset.reset:earliest'
pollDelay
=
20
showMsg
=
1
showRow
=
1
hostname
=
socket
.
gethostname
()
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
logSql
=
False
tdSql
.
init
(
conn
.
cursor
(),
logSql
)
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
tdLog
.
info
(
"step 1: create database, stb, ctb and insert data"
)
tmqCom
.
initConsumerTable
(
self
.
cdbName
)
tdCom
.
create_database
(
tdSql
,
self
.
paraDict
[
"dbName"
],
self
.
paraDict
[
"dropFlag"
])
self
.
paraDict
[
"stbName"
]
=
'stb1'
tdCom
.
create_stable
(
tdSql
,
dbname
=
self
.
paraDict
[
"dbName"
],
stbname
=
self
.
paraDict
[
"stbName"
],
column_elm_list
=
self
.
paraDict
[
"colSchema"
],
tag_elm_list
=
self
.
paraDict
[
"tagSchema"
],
count
=
1
,
default_stbname_prefix
=
self
.
paraDict
[
"stbName"
])
tdCom
.
create_ctable
(
tdSql
,
dbname
=
self
.
paraDict
[
"dbName"
],
stbname
=
self
.
paraDict
[
"stbName"
],
tag_elm_list
=
self
.
paraDict
[
'tagSchema'
],
count
=
self
.
paraDict
[
"ctbNum"
],
default_ctbname_prefix
=
self
.
paraDict
[
"ctbPrefix"
])
tmqCom
.
insert_data_2
(
tdSql
,
self
.
paraDict
[
"dbName"
],
self
.
paraDict
[
"ctbPrefix"
],
self
.
paraDict
[
"ctbNum"
],
self
.
paraDict
[
"rowsPerTbl"
],
self
.
paraDict
[
"batchNum"
],
self
.
paraDict
[
"startTs"
],
self
.
paraDict
[
"ctbStartIdx"
])
# pThread1 = tmqCom.asyncInsertData(paraDict=self.paraDict)
self
.
paraDict
[
"stbName"
]
=
'stb2'
self
.
paraDict
[
"ctbPrefix"
]
=
'newctb'
self
.
paraDict
[
"batchNum"
]
=
10000
tdCom
.
create_stable
(
tdSql
,
dbname
=
self
.
paraDict
[
"dbName"
],
stbname
=
self
.
paraDict
[
"stbName"
],
column_elm_list
=
self
.
paraDict
[
"colSchema"
],
tag_elm_list
=
self
.
paraDict
[
"tagSchema"
],
count
=
1
,
default_stbname_prefix
=
self
.
paraDict
[
"stbName"
])
tdCom
.
create_ctable
(
tdSql
,
dbname
=
self
.
paraDict
[
"dbName"
],
stbname
=
self
.
paraDict
[
"stbName"
],
tag_elm_list
=
self
.
paraDict
[
'tagSchema'
],
count
=
self
.
paraDict
[
"ctbNum"
],
default_ctbname_prefix
=
self
.
paraDict
[
"ctbPrefix"
])
# tmqCom.insert_data_2(tdSql,self.paraDict["dbName"],self.paraDict["ctbPrefix"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"],self.paraDict["startTs"],self.paraDict["ctbStartIdx"])
pThread2
=
tmqCom
.
asyncInsertData
(
paraDict
=
self
.
paraDict
)
tdLog
.
info
(
"create topics from db"
)
topicName1
=
'UpperCasetopic_%s'
%
(
self
.
paraDict
[
'dbName'
])
tdSql
.
execute
(
"create topic %s as database %s"
%
(
topicName1
,
self
.
paraDict
[
'dbName'
]))
topicList
=
topicName1
+
','
+
topicName1
keyList
=
'%s,%s,%s,%s'
%
(
self
.
groupId
,
self
.
autoCommit
,
self
.
autoCommitInterval
,
self
.
autoOffset
)
self
.
expectrowcnt
=
self
.
paraDict
[
"rowsPerTbl"
]
*
self
.
paraDict
[
"ctbNum"
]
*
2
tmqCom
.
insertConsumerInfo
(
self
.
consumerId
,
self
.
expectrowcnt
,
topicList
,
keyList
,
self
.
ifcheckdata
,
self
.
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
self
.
pollDelay
,
self
.
paraDict
[
"dbName"
],
self
.
showMsg
,
self
.
showRow
,
self
.
cdbName
)
tmqCom
.
getStartConsumeNotifyFromTmqsim
()
tdLog
.
info
(
"drop one stable"
)
self
.
paraDict
[
"stbName"
]
=
'stb1'
tdSql
.
execute
(
"drop table %s.%s"
%
(
self
.
paraDict
[
'dbName'
],
self
.
paraDict
[
'stbName'
]))
# tmqCom.drop_ctable(tdSql, dbname=self.paraDict['dbName'], count=self.paraDict["ctbNum"], default_ctbname_prefix=self.paraDict["ctbPrefix"])
pThread2
.
join
()
tdLog
.
info
(
"wait result from consumer, then check it"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
if
not
(
totalConsumeRows
>=
self
.
expectrowcnt
/
2
and
totalConsumeRows
<=
self
.
expectrowcnt
):
tdLog
.
info
(
"act consume rows: %d, expect consume rows: between %d and %d"
%
(
totalConsumeRows
,
self
.
expectrowcnt
/
2
,
self
.
expectrowcnt
))
tdLog
.
exit
(
"tmq consume rows error!"
)
time
.
sleep
(
10
)
tdSql
.
query
(
"drop topic %s"
%
topicName1
)
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
def
run
(
self
):
tdSql
.
prepare
()
self
.
tmqCase1
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/fulltest.sh
浏览文件 @
dbc119f1
...
...
@@ -189,7 +189,9 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
python3 ./test.py
-f
7-tmq/tmqUpdate-1ctb.py
python3 ./test.py
-f
7-tmq/tmqUpdate-multiCtb-snapshot0.py
python3 ./test.py
-f
7-tmq/tmqUpdate-multiCtb-snapshot1.py
#python3 ./test.py -f 7-tmq/tmqDelete-1ctb.py
python3 ./test.py
-f
7-tmq/tmqDelete-1ctb.py
python3 ./test.py
-f
7-tmq/tmqDelete-multiCtb.py
python3 ./test.py
-f
7-tmq/tmqDropStb.py
python3 ./test.py
-f
7-tmq/tmqUdf.py
# python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot0.py
# python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录