未验证 提交 7820ca2a 编写于 作者: H Hui Li 提交者: GitHub

Merge pull request #17705 from taosdata/test3.0/lihui

test:modify test cases of tmq for poll delay
......@@ -87,7 +87,7 @@ class TDTestCase:
'rowsPerTbl': 1000,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'pollDelay': 30,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
......@@ -173,7 +173,7 @@ class TDTestCase:
'rowsPerTbl': 1000,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 20,
'pollDelay': 30,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
......
......@@ -40,9 +40,9 @@ class TDTestCase:
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 1000,
'batchNum': 10,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'pollDelay': 30,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
......@@ -87,7 +87,7 @@ class TDTestCase:
'rowsPerTbl': 1000,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'pollDelay': 30,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
......@@ -101,6 +101,7 @@ class TDTestCase:
topicFromStb = 'topic_stb'
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as stable %s.%s" %(topicFromStb, paraDict['dbName'], paraDict['stbName'])
#sqlString = "create topic %s as %s" %(topicFromStb, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
......@@ -111,7 +112,7 @@ class TDTestCase:
ifManualCommit = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.commit.interval.ms:200,\
auto.offset.reset:latest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
......@@ -119,7 +120,15 @@ class TDTestCase:
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# time.sleep(3)
tmqCom.getStartCommitNotifyFromTmqsim()
tmqCom.getStartCommitNotifyFromTmqsim('cdb',1)
tdLog.info("create some new child table and insert data for latest mode")
paraDict["batchNum"] = 100
paraDict["ctbPrefix"] = 'newCtb'
paraDict["ctbNum"] = 10
paraDict["rowsPerTbl"] = 10
tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
tdLog.info("================= restart dnode ===========================")
tdDnodes.stoptaosd(1)
tdDnodes.starttaosd(1)
......@@ -132,8 +141,7 @@ class TDTestCase:
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdSql.query(queryString)
totalRowsFromQury = tdSql.getRows()
totalRowsFromQury = paraDict["ctbNum"] * paraDict["rowsPerTbl"]
tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury))
if (totalConsumeRows < totalRowsFromQury):
......@@ -161,7 +169,7 @@ class TDTestCase:
'rowsPerTbl': 1000,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'pollDelay': 30,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
......@@ -185,19 +193,25 @@ class TDTestCase:
ifManualCommit = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.commit.interval.ms:200,\
auto.offset.reset:latest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# time.sleep(3)
tmqCom.getStartCommitNotifyFromTmqsim()
tmqCom.getStartCommitNotifyFromTmqsim('cdb',1)
tdLog.info("create some new child table and insert data for latest mode")
paraDict["batchNum"] = 100
paraDict["ctbPrefix"] = 'newCtb'
paraDict["ctbNum"] = 10
paraDict["rowsPerTbl"] = 10
tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
tdLog.info("================= restart dnode ===========================")
tdDnodes.stoptaosd(1)
tdDnodes.starttaosd(1)
# time.sleep(3)
tdLog.info(" restart taosd end and wait to check consume result")
expectRows = 1
......@@ -206,8 +220,7 @@ class TDTestCase:
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdSql.query(queryString)
totalRowsFromQury = tdSql.getRows()
totalRowsFromQury = paraDict["ctbNum"] * paraDict["rowsPerTbl"]
tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury))
if (totalConsumeRows < totalRowsFromQury):
......
......@@ -11,6 +11,8 @@ from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
sys.path.append("./7-tmq")
from tmqCommon import *
class actionType(Enum):
CREATE_DATABASE = 0
......@@ -193,32 +195,41 @@ class TDTestCase:
and restart a consumption process to complete a consumption
'''
tdLog.printNoPrefix("======== test case 1: ")
self.initConsumerTable()
# create and start thread
parameterDict = {'cfg': '', \
'actionType': 0, \
'dbName': 'db3', \
'dropFlag': 1, \
'vgroups': 4, \
'replica': 1, \
'stbName': 'stb1', \
'ctbNum': 10, \
'rowsPerTbl': 20000, \
'batchNum': 100, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
parameterDict['cfg'] = cfgPath
self.create_database(tdSql, parameterDict["dbName"])
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
self.insert_data(tdSql,parameterDict["dbName"],parameterDict["stbName"],parameterDict["ctbNum"],parameterDict["rowsPerTbl"],parameterDict["batchNum"])
tmqCom.initConsumerTable()
#self.initConsumerTable()
# create and start thread
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 20000,
'batchNum': 1000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 30,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['cfg'] = cfgPath
self.create_database(tdSql, paraDict["dbName"])
self.create_stable(tdSql, paraDict["dbName"], paraDict["stbName"])
self.create_ctables(tdSql, paraDict["dbName"], paraDict["stbName"], paraDict["ctbNum"])
self.insert_data(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, paraDict['dbName'], paraDict['stbName']))
consumerId = 0
# expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
expectrowcnt = 90000000000
......@@ -229,41 +240,45 @@ class TDTestCase:
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
#self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
pollDelay = 9000000 # Forever loop
showMsg = 1
showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
#self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
time.sleep(3)
#time.sleep(3)
tmqCom.getStartConsumeNotifyFromTmqsim()
tdLog.info("================= stop dnode, and remove data file, then start dnode ===========================")
tdDnodes.stop(1)
# time.sleep(5)
time.sleep(5)
dataPath = buildPath + "/../sim/dnode1/data/*"
shellCmd = 'rm -rf ' + dataPath
tdLog.info(shellCmd)
os.system(shellCmd)
#tdDnodes.start(1)
tdDnodes.starttaosd(1)
time.sleep(2)
time.sleep(5)
######### redo to consume
self.initConsumerTable()
self.create_database(tdSql, parameterDict["dbName"])
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
self.insert_data(tdSql,parameterDict["dbName"],parameterDict["stbName"],parameterDict["ctbNum"],parameterDict["rowsPerTbl"],parameterDict["batchNum"])
self.create_database(tdSql, paraDict["dbName"])
self.create_stable(tdSql, paraDict["dbName"], paraDict["stbName"])
self.create_ctables(tdSql, paraDict["dbName"], paraDict["stbName"], paraDict["ctbNum"])
self.insert_data(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1'
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, paraDict['dbName'], paraDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicFromStb1
ifcheckdata = 0
ifManualCommit = 0
......@@ -271,13 +286,17 @@ class TDTestCase:
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
#self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
pollDelay = 20
showMsg = 1
showRow = 1
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
paraDict['pollDelay'] = 20
#self.startTmqSimProcess(buildPath,cfgPath,pollDelay,paraDict["dbName"],showMsg, showRow)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
expectRows = 1
resultList = self.selectConsumeResult(expectRows)
......
......@@ -41,7 +41,7 @@ class TDTestCase:
'rowsPerTbl': 4000,
'batchNum': 15,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 20,
'pollDelay': 30,
'showMsg': 1,
'showRow': 1}
......@@ -124,7 +124,7 @@ class TDTestCase:
tdLog.info("async insert data")
pThread = tmqCom.asyncInsertData(paraDict)
tmqCom.getStartConsumeNotifyFromTmqsim();
tmqCom.getStartConsumeNotifyFromTmqsim()
#time.sleep(5)
tdLog.info("check show consumers")
tdSql.query("show consumers")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册