提交 ecd39d33 编写于 作者: P plum-lihui

test: add tmq case

上级 856912de
......@@ -20,7 +20,7 @@ from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 32
self.ctbNum = 15000
self.ctbNum = 100
self.rowsPerTbl = 1000
self.snapshot = 1
self.replicaVar = 3
......@@ -91,7 +91,7 @@ class TDTestCase:
'rowsPerTbl': 10000,
'batchNum': 1000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 10,
'pollDelay': 20,
'showMsg': 1,
'showRow': 1,
'snapshot': 1}
......@@ -101,8 +101,7 @@ class TDTestCase:
paraDict['rowsPerTbl'] = self.rowsPerTbl
paraDict['snapshot'] = self.snapshot
topicNameList = ['topic1']
expectRowsList = []
topicNameList = ['topic1', 'topic2']
tmqCom.initConsumerTable()
tdLog.info("create topics from stb with filter")
......@@ -112,14 +111,18 @@ class TDTestCase:
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
sqlString = "create topic %s as %s" %(topicNameList[1], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# tdSql.query(queryString)
# expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicNameList[0]
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 4
topicList = topicNameList[0] + ',' + topicNameList[0] + ',' + topicNameList[1]
ifcheckdata = 0
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
......@@ -135,18 +138,16 @@ class TDTestCase:
# continue to insert new rows
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl)
pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
pInsertThread.join()
expectRowsList[0] = expectRowsList[0] * 2
pInsertThread.join()
expectRows = 2
resultList = tmqCom.selectConsumeResult(expectRows)
actConsumeTotalRows = resultList[0] + resultList[1]
tdLog.info("act consume rows: %d, expect consume rows: %d"%(actConsumeTotalRows, expectRowsList[0]))
tdLog.info("act consume rows: %d, expect consume rows: %d"%(actConsumeTotalRows, expectrowcnt))
if (expectRowsList[0] <= actConsumeTotalRows) or ((resultList[0] == 0) and (resultList[1] >= expectRowsList[0])) or ((resultList[1] == 0) and (resultList[0] >= expectRowsList[0])):
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
if not ((expectrowcnt <= actConsumeTotalRows) or ((resultList[0] == 0) and (resultList[1] >= expectrowcnt)) or ((resultList[1] == 0) and (resultList[0] >= expectrowcnt))):
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt, actConsumeTotalRows))
tdLog.exit("%d tmq consume rows error!"%consumerId)
# tmqCom.checkFileContent(consumerId, queryString)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册