未验证 提交 17c7a8e7 编写于 作者: H Hui Li 提交者: GitHub

Merge pull request #15284 from taosdata/test3.0/lihui

test: add test case for tmq
......@@ -535,6 +535,18 @@ class TMQCom:
column_value_str = column_value_str.rstrip()[:-1]
insert_sql = f'insert into {dbname}.{tbname_prefix}{tblIdx+tbname_index_start_num} values ({column_value_str});'
tsql.execute(insert_sql)
def waitSubscriptionExit(self, tsql, max_wait_count=20):
wait_cnt = 0
while (wait_cnt < max_wait_count):
tsql.query("show subscriptions")
if tsql.getRows() == 0:
break
else:
time.sleep(2)
wait_cnt += 1
tdLog.info("wait subscriptions exit for %d s"%wait_cnt)
def close(self):
self.cursor.close()
......
......@@ -18,8 +18,8 @@ class TDTestCase:
def __init__(self):
self.snapshot = 0
self.vgroups = 2
self.ctbNum = 100
self.rowsPerTbl = 10000
self.ctbNum = 1000
self.rowsPerTbl = 1000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
......@@ -38,8 +38,8 @@ class TDTestCase:
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 10000,
'ctbNum': 1000,
'rowsPerTbl': 1000,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
......@@ -83,8 +83,8 @@ class TDTestCase:
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 10000,
'ctbNum': 1000,
'rowsPerTbl': 1000,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5,
......@@ -117,13 +117,13 @@ class TDTestCase:
tdSql.execute(sqlString)
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
topicList = topicFromStb1
ifcheckdata = 0
ifManualCommit = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:500,\
auto.commit.interval.ms:3000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
......@@ -147,10 +147,46 @@ class TDTestCase:
tdSql.query(queryString)
totalRowsFromQury = tdSql.getRows()
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQury))
if totalConsumeRows != totalRowsFromQury:
tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury))
if not (totalConsumeRows == totalRowsFromQury):
tdLog.exit("tmq consume rows error!")
# tdLog.info("****************************************************************************")
# tmqCom.initConsumerTable()
# consumerId = 1
# expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
# topicList = topicFromStb1
# ifcheckdata = 0
# ifManualCommit = 0
# keyList = 'group.id:cgrp2,\
# enable.auto.commit:true,\
# auto.commit.interval.ms:3000,\
# auto.offset.reset:earliest'
# tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
# tdLog.info("start consume processor")
# tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# expectRows = 1
# resultList = tmqCom.selectConsumeResult(expectRows)
# totalConsumeRows = 0
# for i in range(expectRows):
# totalConsumeRows += resultList[i]
# tdSql.query(queryString)
# totalRowsFromQury = tdSql.getRows()
# tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury))
# if not (totalConsumeRows == totalRowsFromQury):
# tdLog.exit("tmq consume rows error!")
# tdLog.info("****************************************************************************")
tmqCom.waitSubscriptionExit(tdSql)
tdSql.query("drop topic %s"%topicFromStb1)
tdLog.printNoPrefix("======== test case 1 end ...... ")
......@@ -168,8 +204,8 @@ class TDTestCase:
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 10000,
'ctbNum': 1000,
'rowsPerTbl': 1000,
'batchNum': 3000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5,
......@@ -201,7 +237,7 @@ class TDTestCase:
tdSql.execute(sqlString)
consumerId = 1
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 + 100000
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
topicList = topicFromStb1
ifcheckdata = 0
ifManualCommit = 0
......@@ -220,8 +256,10 @@ class TDTestCase:
tdDnodes.start(1)
time.sleep(3)
tdLog.info("create some new child table and insert data ")
tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
# tdLog.info("create some new child table and insert data ")
# paraDict["batchNum"] = 1000
# paraDict["ctbPrefix"] = 'newCtb'
# tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
tdLog.info("insert process end, and start to check consume result")
expectRows = 1
......@@ -242,9 +280,9 @@ class TDTestCase:
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
tdSql.prepare()
# tdSql.prepare()
self.prepareTestEnv()
# self.tmqCase1()
self.tmqCase1()
self.tmqCase2()
def stop(self):
......
......@@ -25,18 +25,6 @@ class TDTestCase:
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def waitSubscriptionExit(self, max_wait_count=20):
wait_cnt = 0
while (wait_cnt < max_wait_count):
tdSql.query("show subscriptions")
if tdSql.getRows() == 0:
break
else:
time.sleep(1)
wait_cnt += 1
tdLog.info("wait subscriptions exit for %d s"%wait_cnt)
# drop some ntbs
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
......@@ -115,7 +103,7 @@ class TDTestCase:
tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....")
self.waitSubscriptionExit()
tmqCom.waitSubscriptionExit(tdSql)
tdSql.query("drop topic %s"%topicFromDb)
tdLog.info("success dorp topic: %s"%topicFromDb)
......@@ -208,7 +196,7 @@ class TDTestCase:
tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....")
self.waitSubscriptionExit()
tmqCom.waitSubscriptionExit(tdSql)
tdSql.query("drop topic %s"%topicFromDb)
tdLog.info("success dorp topic: %s"%topicFromDb)
......
......@@ -24,19 +24,7 @@ class TDTestCase:
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def waitSubscriptionExit(self, max_wait_count=20):
wait_cnt = 0
while (wait_cnt < max_wait_count):
tdSql.query("show subscriptions")
if tdSql.getRows() == 0:
break
else:
time.sleep(2)
wait_cnt += 1
tdLog.info("wait subscriptions exit for %d s"%wait_cnt)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
......@@ -169,7 +157,7 @@ class TDTestCase:
tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....")
self.waitSubscriptionExit()
tmqCom.waitSubscriptionExit(tdSql)
tdSql.query("drop topic %s"%topicFromDb)
tdLog.info("success dorp topic: %s"%topicFromDb)
......@@ -258,7 +246,7 @@ class TDTestCase:
tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....")
self.waitSubscriptionExit()
tmqCom.waitSubscriptionExit(tdSql)
tdSql.query("drop topic %s"%topicFromDb)
tdLog.info("success dorp topic: %s"%topicFromDb)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册