Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ad317c2a
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
ad317c2a
编写于
6月 24, 2022
作者:
H
Hui Li
提交者:
GitHub
6月 24, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14209 from taosdata/test3.0/lihui
test: refine some cases
上级
c3aabbdd
bd13d0b7
变更
5
显示空白变更内容
内联
并排
Showing
5 changed file
with
238 addition
and
23 deletion
+238
-23
tests/system-test/6-cluster/5dnode3mnodeStop.py
tests/system-test/6-cluster/5dnode3mnodeStop.py
+1
-0
tests/system-test/7-tmq/tmqCommon.py
tests/system-test/7-tmq/tmqCommon.py
+41
-16
tests/system-test/7-tmq/tmqConsumerGroup.py
tests/system-test/7-tmq/tmqConsumerGroup.py
+170
-0
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+1
-0
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+25
-7
未找到文件。
tests/system-test/6-cluster/5dnode3mnodeStop.py
浏览文件 @
ad317c2a
...
...
@@ -227,6 +227,7 @@ class TDTestCase:
# fisr add three mnodes;
tdSql
.
execute
(
"create mnode on dnode 2"
)
time
.
sleep
(
10
)
tdSql
.
execute
(
"create mnode on dnode 3"
)
# fisrt check statut ready
...
...
tests/system-test/7-tmq/tmqCommon.py
浏览文件 @
ad317c2a
...
...
@@ -11,7 +11,9 @@
# -*- coding: utf-8 -*-
from
asyncore
import
loop
from
collections
import
defaultdict
import
subprocess
import
random
import
string
import
threading
...
...
@@ -75,7 +77,7 @@ class TMQCom:
return
resultList
def
startTmqSimProcess
(
self
,
pollDelay
,
dbName
,
showMsg
=
1
,
showRow
=
1
,
cdbName
=
'cdb'
,
valgrind
=
0
):
def
startTmqSimProcess
(
self
,
pollDelay
,
dbName
,
showMsg
=
1
,
showRow
=
1
,
cdbName
=
'cdb'
,
valgrind
=
0
,
alias
=
0
):
buildPath
=
tdCom
.
getBuildPath
()
cfgPath
=
tdCom
.
getClientCfgPath
()
if
valgrind
==
1
:
...
...
@@ -88,29 +90,52 @@ class TMQCom:
shellCmd
+=
" -y %d -d %s -g %d -r %d -w %s "
%
(
pollDelay
,
dbName
,
showMsg
,
showRow
,
cdbName
)
shellCmd
+=
"> nul 2>&1 &"
else
:
shellCmd
=
'nohup '
+
buildPath
+
'/build/bin/tmq_sim -c '
+
cfgPath
processorName
=
buildPath
+
'/build/bin/tmq_sim'
if
alias
!=
0
:
processorNameNew
=
buildPath
+
'/build/bin/tmq_sim_new'
shellCmd
=
'cp %s %s'
%
(
processorName
,
processorNameNew
)
os
.
system
(
shellCmd
)
processorName
=
processorNameNew
shellCmd
=
'nohup '
+
processorName
+
' -c '
+
cfgPath
shellCmd
+=
" -y %d -d %s -g %d -r %d -w %s "
%
(
pollDelay
,
dbName
,
showMsg
,
showRow
,
cdbName
)
shellCmd
+=
"> /dev/null 2>&1 &"
tdLog
.
info
(
shellCmd
)
os
.
system
(
shellCmd
)
def
getStartConsumeNotifyFromTmqsim
(
self
,
cdbName
=
'cdb'
):
while
1
:
def
stopTmqSimProcess
(
self
,
processorName
):
psCmd
=
"ps -ef|grep -w %s|grep -v grep | awk '{print $2}'"
%
(
processorName
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
).
decode
(
"utf-8"
)
while
(
processID
):
killCmd
=
"kill -INT %s > /dev/null 2>&1"
%
processID
os
.
system
(
killCmd
)
time
.
sleep
(
0.2
)
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
).
decode
(
"utf-8"
)
tdLog
.
debug
(
"%s is stopped by kill -INT"
%
(
processorName
))
def
getStartConsumeNotifyFromTmqsim
(
self
,
cdbName
=
'cdb'
,
rows
=
1
):
loopFlag
=
1
while
loopFlag
:
tdSql
.
query
(
"select * from %s.notifyinfo"
%
cdbName
)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if
(
tdSql
.
getRows
()
==
1
)
and
(
tdSql
.
getData
(
0
,
1
)
==
0
):
actRows
=
tdSql
.
getRows
()
if
(
actRows
>=
rows
):
for
i
in
range
(
actRows
):
if
tdSql
.
getData
(
i
,
1
)
==
0
:
loopFlag
=
0
break
else
:
time
.
sleep
(
0.1
)
return
def
getStartCommitNotifyFromTmqsim
(
self
,
cdbName
=
'cdb'
):
while
1
:
def
getStartCommitNotifyFromTmqsim
(
self
,
cdbName
=
'cdb'
,
rows
=
2
):
loopFlag
=
1
while
loopFlag
:
tdSql
.
query
(
"select * from %s.notifyinfo"
%
cdbName
)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if
tdSql
.
getRows
()
==
2
:
print
(
tdSql
.
getData
(
0
,
1
),
tdSql
.
getData
(
1
,
1
))
if
tdSql
.
getData
(
1
,
1
)
==
1
:
actRows
=
tdSql
.
getRows
()
if
(
actRows
>=
rows
):
for
i
in
range
(
actRows
):
if
tdSql
.
getData
(
i
,
1
)
==
1
:
loopFlag
=
0
break
time
.
sleep
(
0.1
)
return
...
...
tests/system-test/7-tmq/tmqConsumerGroup.py
0 → 100644
浏览文件 @
ad317c2a
import
taos
import
sys
import
time
import
socket
import
os
import
threading
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def
checkFileContent
(
self
,
consumerId
,
queryString
):
buildPath
=
tdCom
.
getBuildPath
()
cfgPath
=
tdCom
.
getClientCfgPath
()
dstFile
=
'%s/../log/dstrows_%d.txt'
%
(
cfgPath
,
consumerId
)
cmdStr
=
'%s/build/bin/taos -c %s -s "%s >> %s"'
%
(
buildPath
,
cfgPath
,
queryString
,
dstFile
)
tdLog
.
info
(
cmdStr
)
os
.
system
(
cmdStr
)
consumeRowsFile
=
'%s/../log/consumerid_%d.txt'
%
(
cfgPath
,
consumerId
)
tdLog
.
info
(
"rows file: %s, %s"
%
(
consumeRowsFile
,
dstFile
))
consumeFile
=
open
(
consumeRowsFile
,
mode
=
'r'
)
queryFile
=
open
(
dstFile
,
mode
=
'r'
)
# skip first line for it is schema
queryFile
.
readline
()
while
True
:
dst
=
queryFile
.
readline
()
src
=
consumeFile
.
readline
()
if
dst
:
if
dst
!=
src
:
tdLog
.
exit
(
"consumerId %d consume rows is not match the rows by direct query"
%
consumerId
)
else
:
break
return
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'db1'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
2
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
10
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
20
,
'showMsg'
:
1
,
'showRow'
:
1
}
topicNameList
=
[
'topic1'
,
'topic2'
]
expectRowsList
=
[]
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
4
,
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tdCom
.
create_stable
(
tdSql
,
dbname
=
paraDict
[
"dbName"
],
stbname
=
paraDict
[
"stbName"
],
column_elm_list
=
paraDict
[
'colSchema'
],
tag_elm_list
=
paraDict
[
'tagSchema'
])
tdLog
.
info
(
"create ctb"
)
tdCom
.
create_ctable
(
tdSql
,
dbname
=
paraDict
[
"dbName"
],
stbname
=
paraDict
[
"stbName"
],
tag_elm_list
=
paraDict
[
'tagSchema'
],
count
=
paraDict
[
"ctbNum"
],
default_ctbname_prefix
=
paraDict
[
'ctbPrefix'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_2
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"ctbPrefix"
],
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
],
paraDict
[
"startTs"
])
tdLog
.
info
(
"create topics from stb with filter"
)
# queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
queryString
=
"select ts, log(c1), ceil(pow(c1,3)) from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as %s"
%
(
topicNameList
[
0
],
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
tdSql
.
query
(
queryString
)
expectRowsList
.
append
(
tdSql
.
getRows
())
# create one stb2
paraDict
[
"stbName"
]
=
'stb2'
paraDict
[
"ctbPrefix"
]
=
'ctbx'
paraDict
[
"rowsPerTbl"
]
=
5000
tdLog
.
info
(
"create stb2"
)
tdCom
.
create_stable
(
tdSql
,
dbname
=
paraDict
[
"dbName"
],
stbname
=
paraDict
[
"stbName"
],
column_elm_list
=
paraDict
[
'colSchema'
],
tag_elm_list
=
paraDict
[
'tagSchema'
])
tdLog
.
info
(
"create ctb2"
)
tdCom
.
create_ctable
(
tdSql
,
dbname
=
paraDict
[
"dbName"
],
stbname
=
paraDict
[
"stbName"
],
tag_elm_list
=
paraDict
[
'tagSchema'
],
count
=
paraDict
[
"ctbNum"
],
default_ctbname_prefix
=
paraDict
[
'ctbPrefix'
])
# queryString = "select ts, sin(c1), abs(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
queryString
=
"select ts, sin(c1), abs(pow(c1,3)) from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as %s"
%
(
topicNameList
[
1
],
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
# tdSql.query(queryString)
# expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog
.
info
(
"insert consume info to consume processor"
)
consumerId
=
0
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
2
topicList
=
"%s,%s"
%
(
topicNameList
[
0
],
topicNameList
[
1
])
ifcheckdata
=
1
ifManualCommit
=
1
keyList
=
'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:3000, auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor 1"
)
tmqCom
.
startTmqSimProcess
(
paraDict
[
'pollDelay'
],
paraDict
[
"dbName"
],
paraDict
[
'showMsg'
],
paraDict
[
'showRow'
])
tdLog
.
info
(
"start consume processor 2"
)
tmqCom
.
startTmqSimProcess
(
paraDict
[
'pollDelay'
],
paraDict
[
"dbName"
],
paraDict
[
'showMsg'
],
paraDict
[
'showRow'
],
'cdb'
,
0
,
1
)
tdLog
.
info
(
"async insert data"
)
pThread
=
tmqCom
.
asyncInsertData
(
paraDict
)
tdLog
.
info
(
"wait consumer commit notify"
)
tmqCom
.
getStartCommitNotifyFromTmqsim
(
rows
=
4
)
tdLog
.
info
(
"pkill one consume processor"
)
tmqCom
.
stopTmqSimProcess
(
'tmq_sim_new'
)
pThread
.
join
()
tdLog
.
info
(
"wait the consume result"
)
expectRows
=
2
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
actTotalRows
=
0
for
i
in
range
(
len
(
resultList
)):
actTotalRows
+=
resultList
[
i
]
tdSql
.
query
(
queryString
)
expectRowsList
.
append
(
tdSql
.
getRows
())
expectTotalRows
=
0
for
i
in
range
(
len
(
expectRowsList
)):
expectTotalRows
+=
expectRowsList
[
i
]
tdLog
.
info
(
"act consume rows: %d, expect consume rows: %d"
%
(
actTotalRows
,
expectTotalRows
))
if
expectTotalRows
<=
resultList
[
0
]:
tdLog
.
info
(
"act consume rows: %d should >= expect consume rows: %d"
%
(
actTotalRows
,
expectTotalRows
))
tdLog
.
exit
(
"0 tmq consume rows error!"
)
# time.sleep(10)
# for i in range(len(topicNameList)):
# tdSql.query("drop topic %s"%topicNameList[i])
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
def
run
(
self
):
tdSql
.
prepare
()
self
.
tmqCase1
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/fulltest.sh
浏览文件 @
ad317c2a
...
...
@@ -138,3 +138,4 @@ python3 ./test.py -f 7-tmq/stbFilter.py
python3 ./test.py
-f
7-tmq/tmqCheckData.py
python3 ./test.py
-f
7-tmq/tmqUdf.py
#python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 5
python3 ./test.py
-f
7-tmq/tmqConsumerGroup.py
tests/test/c/tmqSim.c
浏览文件 @
ad317c2a
...
...
@@ -93,8 +93,6 @@ static SConfInfo g_stConfInfo;
TdFilePtr
g_fp
=
NULL
;
static
int
running
=
1
;
int8_t
useSnapshot
=
0
;
// char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL;
...
...
@@ -126,11 +124,23 @@ char* getCurrentTimeString(char* timeString) {
return
timeString
;
}
static
void
tmqStop
(
int
signum
,
void
*
info
,
void
*
ctx
)
{
running
=
0
;
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"%s tmqStop() receive stop signal[%d]
\n
"
,
getCurrentTimeString
(
tmpString
),
signum
);
}
static
void
tmqSetSignalHandle
()
{
taosSetSignal
(
SIGINT
,
tmqStop
);
}
void
initLogFile
()
{
char
filename
[
256
];
char
tmpString
[
128
];
sprintf
(
filename
,
"%s/../log/tmqlog_%s.txt"
,
configDir
,
getCurrentTimeString
(
tmpString
));
pid_t
process_id
=
getpid
();
sprintf
(
filename
,
"%s/../log/tmqlog-%d-%s.txt"
,
configDir
,
process_id
,
getCurrentTimeString
(
tmpString
));
#ifdef WINDOWS
for
(
int
i
=
2
;
i
<
sizeof
(
filename
);
i
++
)
{
if
(
filename
[
i
]
==
':'
)
filename
[
i
]
=
'-'
;
...
...
@@ -205,7 +215,7 @@ void parseArgument(int32_t argc, char* argv[]) {
}
else
if
(
strcmp
(
argv
[
i
],
"-y"
)
==
0
)
{
g_stConfInfo
.
consumeDelay
=
atol
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-e"
)
==
0
)
{
useSnapshot
=
(
int8_t
)
atol
(
argv
[
++
i
]);
g_stConfInfo
.
useSnapshot
=
atol
(
argv
[
++
i
]);
}
else
{
pError
(
"%s unknow para: %s %s"
,
GREEN
,
argv
[
++
i
],
NC
);
exit
(
-
1
);
...
...
@@ -519,7 +529,9 @@ static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
g_once_commit_flag
=
1
;
notifyMainScript
((
SThreadInfo
*
)
param
,
(
int32_t
)
NOTIFY_CMD_START_COMMIT
);
}
taosFprintfFile
(
g_fp
,
"tmq_commit_cb_print() be called
\n
"
);
char
tmpString
[
128
];
taosFprintfFile
(
g_fp
,
"%s tmq_commit_cb_print() be called
\n
"
,
getCurrentTimeString
(
tmpString
));
}
void
build_consumer
(
SThreadInfo
*
pInfo
)
{
...
...
@@ -552,7 +564,7 @@ void build_consumer(SThreadInfo* pInfo) {
// tmq_conf_set(conf, "auto.offset.reset", "earliest");
// tmq_conf_set(conf, "auto.offset.reset", "latest");
//
if
(
useSnapshot
)
{
if
(
g_stConfInfo
.
useSnapshot
)
{
tmq_conf_set
(
conf
,
"experiment.use.snapshot"
,
"true"
);
}
...
...
@@ -651,6 +663,10 @@ void loop_consume(SThreadInfo* pInfo) {
}
}
if
(
0
==
running
)
{
taosFprintfFile
(
g_fp
,
"receive stop signal and not continue consume
\n
"
);
}
pInfo
->
consumeMsgCnt
=
totalMsgs
;
pInfo
->
consumeRowCnt
=
totalRows
;
...
...
@@ -680,7 +696,7 @@ void* consumeThreadFunc(void* param) {
int32_t
err
=
tmq_subscribe
(
pInfo
->
tmq
,
pInfo
->
topicList
);
if
(
err
!=
0
)
{
pError
(
"tmq_subscribe() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
taosFprintfFile
(
g_fp
,
"tmq_subscribe()! reason: %s
\n
"
,
tmq_err2str
(
err
));
taosFprintfFile
(
g_fp
,
"tmq_subscribe()
fail
! reason: %s
\n
"
,
tmq_err2str
(
err
));
assert
(
0
);
return
NULL
;
}
...
...
@@ -829,6 +845,8 @@ int main(int32_t argc, char* argv[]) {
getConsumeInfo
();
saveConfigToLogFile
();
tmqSetSignalHandle
();
TdThreadAttr
thattr
;
taosThreadAttrInit
(
&
thattr
);
taosThreadAttrSetDetachState
(
&
thattr
,
PTHREAD_CREATE_JOINABLE
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录