Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e4b73870
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e4b73870
编写于
7月 22, 2022
作者:
P
plum-lihui
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
test: add test cases into ci
上级
90348e52
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
225 addition
and
14 deletion
+225
-14
tests/system-test/7-tmq/tmqCommon.py
tests/system-test/7-tmq/tmqCommon.py
+21
-7
tests/system-test/7-tmq/tmqDropNtb.py
tests/system-test/7-tmq/tmqDropNtb.py
+2
-2
tests/system-test/7-tmq/tmqDropStbCtb.py
tests/system-test/7-tmq/tmqDropStbCtb.py
+9
-5
tests/system-test/7-tmq/tmqUpdateWithConsume.py
tests/system-test/7-tmq/tmqUpdateWithConsume.py
+190
-0
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+3
-0
未找到文件。
tests/system-test/7-tmq/tmqCommon.py
浏览文件 @
e4b73870
...
...
@@ -536,16 +536,30 @@ class TMQCom:
insert_sql
=
f
'insert into
{
dbname
}
.
{
tbname_prefix
}{
tblIdx
+
tbname_index_start_num
}
values (
{
column_value_str
}
);'
tsql
.
execute
(
insert_sql
)
def
waitSubscriptionExit
(
self
,
tsql
,
max_wait_count
=
20
):
def
waitSubscriptionExit
(
self
,
tsql
,
topicName
):
wait_cnt
=
0
while
(
wait_cnt
<
max_wait_count
):
while
True
:
exit_flag
=
1
tsql
.
query
(
"show subscriptions"
)
if
tsql
.
getRows
()
==
0
:
break
else
:
time
.
sleep
(
2
)
wait_cnt
+=
1
rows
=
tsql
.
getRows
()
for
idx
in
range
(
rows
):
if
tsql
.
getData
(
idx
,
0
)
!=
topicName
:
continue
if
tsql
.
getData
(
idx
,
3
)
==
None
:
continue
else
:
time
.
sleep
(
0.5
)
wait_cnt
+=
1
exit_flag
=
0
break
if
exit_flag
==
1
:
break
tsql
.
query
(
"show subscriptions"
)
tdLog
.
info
(
"show subscriptions:"
)
tdLog
.
info
(
tsql
.
queryResult
)
tdLog
.
info
(
"wait subscriptions exit for %d s"
%
wait_cnt
)
def
close
(
self
):
...
...
tests/system-test/7-tmq/tmqDropNtb.py
浏览文件 @
e4b73870
...
...
@@ -103,7 +103,7 @@ class TDTestCase:
tdLog
.
exit
(
"tmq consume rows error with snapshot = 0!"
)
tdLog
.
info
(
"wait subscriptions exit ...."
)
tmqCom
.
waitSubscriptionExit
(
tdSql
)
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicFromDb
)
tdSql
.
query
(
"drop topic %s"
%
topicFromDb
)
tdLog
.
info
(
"success dorp topic: %s"
%
topicFromDb
)
...
...
@@ -196,7 +196,7 @@ class TDTestCase:
tdLog
.
exit
(
"tmq consume rows error with snapshot = 0!"
)
tdLog
.
info
(
"wait subscriptions exit ...."
)
tmqCom
.
waitSubscriptionExit
(
tdSql
)
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicFromDb
)
tdSql
.
query
(
"drop topic %s"
%
topicFromDb
)
tdLog
.
info
(
"success dorp topic: %s"
%
topicFromDb
)
...
...
tests/system-test/7-tmq/tmqDropStbCtb.py
浏览文件 @
e4b73870
...
...
@@ -51,7 +51,7 @@ class TDTestCase:
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
#
tmqCom.initConsumerTable()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
...
...
@@ -98,6 +98,8 @@ class TDTestCase:
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
# again create one new stb1
paraDict
[
"stbName"
]
=
'stb1'
paraDict
[
'ctbPrefix'
]
=
'ctb1n_'
...
...
@@ -157,7 +159,7 @@ class TDTestCase:
tdLog
.
exit
(
"tmq consume rows error with snapshot = 0!"
)
tdLog
.
info
(
"wait subscriptions exit ...."
)
tmqCom
.
waitSubscriptionExit
(
tdSql
)
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicFromDb
)
tdSql
.
query
(
"drop topic %s"
%
topicFromDb
)
tdLog
.
info
(
"success dorp topic: %s"
%
topicFromDb
)
...
...
@@ -191,6 +193,8 @@ class TDTestCase:
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
# again create one new stb1
paraDict
[
"stbName"
]
=
'stb2'
paraDict
[
'ctbPrefix'
]
=
'ctb2n_'
...
...
@@ -209,9 +213,9 @@ class TDTestCase:
tdSql
.
execute
(
"create topic %s as database %s"
%
(
topicFromDb
,
paraDict
[
'dbName'
]))
if
self
.
snapshot
==
0
:
consumerId
=
0
consumerId
=
2
elif
self
.
snapshot
==
1
:
consumerId
=
1
consumerId
=
3
expectrowcnt
=
int
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
2
)
topicList
=
topicFromDb
...
...
@@ -246,7 +250,7 @@ class TDTestCase:
tdLog
.
exit
(
"tmq consume rows error with snapshot = 0!"
)
tdLog
.
info
(
"wait subscriptions exit ...."
)
tmqCom
.
waitSubscriptionExit
(
tdSql
)
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicFromDb
)
tdSql
.
query
(
"drop topic %s"
%
topicFromDb
)
tdLog
.
info
(
"success dorp topic: %s"
%
topicFromDb
)
...
...
tests/system-test/7-tmq/tmqUpdateWithConsume.py
0 → 100644
浏览文件 @
e4b73870
import
taos
import
sys
import
time
import
socket
import
os
import
threading
from
enum
import
Enum
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
def
__init__
(
self
):
self
.
snapshot
=
0
self
.
vgroups
=
4
self
.
ctbNum
=
100
self
.
rowsPerTbl
=
1000
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
def
prepareTestEnv
(
self
):
tdLog
.
printNoPrefix
(
"======== prepare test env include database, stable, ctables, and insert data: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
100
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
100
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk")
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
return
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
100
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
100
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
5
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
paraDict
[
'snapshot'
]
=
self
.
snapshot
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
# update to half tables
paraDict
[
'rowsPerTbl'
]
=
int
(
self
.
rowsPerTbl
/
2
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"create topics from stb1"
)
topicFromStb1
=
'topic_stb1'
queryString
=
"select ts, c1, c2 from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as %s"
%
(
topicFromStb1
,
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
# paraDict['ctbNum'] = self.ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
consumerId
=
0
if
self
.
snapshot
==
0
:
expectrowcnt
=
int
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
(
1
+
1
/
2
+
1
))
elif
self
.
snapshot
==
1
:
expectrowcnt
=
int
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
(
2
))
topicList
=
topicFromStb1
ifcheckdata
=
1
ifManualCommit
=
1
keyList
=
'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
# update to half data
paraDict
[
"startTs"
]
=
paraDict
[
"startTs"
]
+
int
(
self
.
rowsPerTbl
/
2
)
tmqCom
.
asyncInsertDataByInterlace
(
paraDict
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tdLog
.
info
(
"insert process end, and start to check consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
tdSql
.
query
(
queryString
)
totalRowsInserted
=
tdSql
.
getRows
()
tdLog
.
info
(
"act consume rows: %d, expect consume rows: %d, act insert rows: %d"
%
(
totalConsumeRows
,
expectrowcnt
,
totalRowsInserted
))
if
self
.
snapshot
==
0
:
if
totalConsumeRows
!=
expectrowcnt
:
tdLog
.
exit
(
"tmq consume rows error!"
)
elif
self
.
snapshot
==
1
:
if
not
(
totalConsumeRows
<
expectrowcnt
and
totalConsumeRows
>=
totalRowsInserted
):
tdLog
.
exit
(
"tmq consume rows error!"
)
# tmqCom.checkFileContent(consumerId, queryString)
tdSql
.
query
(
"drop topic %s"
%
topicFromStb1
)
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
def
run
(
self
):
tdLog
.
printNoPrefix
(
"======== snapshot is 0: only consume from wal"
)
self
.
ctbNum
=
1
self
.
snapshot
=
0
self
.
prepareTestEnv
()
self
.
tmqCase1
()
tdLog
.
printNoPrefix
(
"======== snapshot is 1: firstly consume from tsbs, and then from wal"
)
self
.
prepareTestEnv
()
self
.
snapshot
=
1
self
.
tmqCase1
()
tdLog
.
printNoPrefix
(
"======== snapshot is 0: only consume from wal"
)
self
.
ctbNum
=
100
self
.
snapshot
=
0
self
.
prepareTestEnv
()
self
.
tmqCase1
()
tdLog
.
printNoPrefix
(
"======== snapshot is 1: firstly consume from tsbs, and then from wal"
)
self
.
prepareTestEnv
()
self
.
snapshot
=
1
self
.
tmqCase1
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/fulltest.sh
浏览文件 @
e4b73870
...
...
@@ -209,11 +209,14 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
python3 ./test.py
-f
7-tmq/tmqAutoCreateTbl.py
#python3 ./test.py -f 7-tmq/tmqDnodeRestart.py
python3 ./test.py
-f
7-tmq/tmqUpdate-1ctb.py
python3 ./test.py
-f
7-tmq/tmqUpdateWithConsume.py
python3 ./test.py
-f
7-tmq/tmqUpdate-multiCtb-snapshot0.py
python3 ./test.py
-f
7-tmq/tmqUpdate-multiCtb-snapshot1.py
python3 ./test.py
-f
7-tmq/tmqDelete-1ctb.py
python3 ./test.py
-f
7-tmq/tmqDelete-multiCtb.py
python3 ./test.py
-f
7-tmq/tmqDropStb.py
python3 ./test.py
-f
7-tmq/tmqDropStbCtb.py
python3 ./test.py
-f
7-tmq/tmqDropNtb.py
python3 ./test.py
-f
7-tmq/tmqUdf.py
# python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot0.py
# python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录