diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py index 060e17c11f994ec1ce01628cc6936b145ef2fe84..151558a33aa6531a7188392914459965f8f4ded1 100644 --- a/tests/system-test/7-tmq/tmqCommon.py +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -428,7 +428,7 @@ class TMQCom: pThread.start() return pThread - def checkFileContent(self, consumerId, queryString): + def checkFileContent(self, consumerId, queryString, skipRowsOfCons=0): buildPath = tdCom.getBuildPath() cfgPath = tdCom.getClientCfgPath() dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId) @@ -445,6 +445,10 @@ class TMQCom: # skip first line for it is schema queryFile.readline() + # skip offset for consumer + for i in range(0,skipRowsOfCons): + consumeFile.readline() + lines = 0 while True: dst = queryFile.readline() diff --git a/tests/system-test/7-tmq/tmqDelete-1ctb.py b/tests/system-test/7-tmq/tmqDelete-1ctb.py new file mode 100644 index 0000000000000000000000000000000000000000..c5f7510a07a9fbed11f9e5c50691137ca04f38c5 --- /dev/null +++ b/tests/system-test/7-tmq/tmqDelete-1ctb.py @@ -0,0 +1,275 @@ + +import taos +import sys +import time +import socket +import os +import threading +from enum import Enum + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def __init__(self): + self.snapshot = 0 + self.vgroups = 4 + self.ctbNum = 1 + self.rowsPerTbl = 10000 + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 1, + 'rowsPerTbl': 100000, + 'batchNum': 1200, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("insert data") + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + # tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx", + # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + # tdLog.info("restart taosd to ensure that the data falls into the disk") + # tdSql.query("flush database %s"%(paraDict['dbName'])) + return + + def delData(self,tsql,dbName,ctbPrefix,ctbNum,startTs=0,endTs=0,ctbStartIdx=0): + tdLog.debug("start to del data ............") + for i in range(ctbNum): + sql = "delete from %s.%s%d where _c0 >= %d and _c0 <= %d "%(dbName,ctbPrefix,i+ctbStartIdx,startTs,endTs) + tsql.execute(sql) + + tdLog.debug("del data ............ [OK]") + return + + def tmqCase1(self): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 1, + 'rowsPerTbl': 100000, + 'batchNum': 3000, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'endTs': 0, + 'pollDelay': 5, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + paraDict['snapshot'] = self.snapshot + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + # del some data + rowsOfDelete = int(paraDict["rowsPerTbl"] / 4) + paraDict["endTs"] = paraDict["startTs"] + rowsOfDelete - 1 + self.delData(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"], + startTs=paraDict["startTs"], endTs=paraDict["endTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("create topics from stb1") + topicFromStb1 = 'topic_stb1' + queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicFromStb1, queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + # paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + consumerId = 0 + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"]) + topicList = topicFromStb1 + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:true,\ + auto.commit.interval.ms:1000,\ + auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + + tdLog.info("insert process end, and start to check consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdSql.query(queryString) + totalRowsInserted = tdSql.getRows() + + tdLog.info("act consume rows: %d, expect consume rows: %d, act insert rows: %d"%(totalConsumeRows, expectrowcnt, totalRowsInserted)) + if totalConsumeRows != expectrowcnt: + tdLog.exit("tmq consume rows error!") + + tmqCom.checkFileContent(consumerId, queryString, rowsOfDelete) + + tdSql.query("drop topic %s"%topicFromStb1) + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def tmqCase2(self): + tdLog.printNoPrefix("======== test case 2: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 1, + 'rowsPerTbl': 10000, + 'batchNum': 5000, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 5, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['snapshot'] = self.snapshot + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tdLog.info("restart taosd to ensure that the data falls into the disk") + tdSql.query("flush database %s"%(paraDict['dbName'])) + + # update to 1/4 rows and insert 3/4 new rows + paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl * 3 / 4) + # paraDict['rowsPerTbl'] = self.rowsPerTbl + tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + # del some data + rowsOfDelete = int(self.rowsPerTbl / 4 ) + paraDict["endTs"] = paraDict["startTs"] + rowsOfDelete - 1 + self.delData(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"], + startTs=paraDict["startTs"], endTs=paraDict["endTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + tmqCom.initConsumerTable() + tdLog.info("create topics from stb1") + topicFromStb1 = 'topic_stb1' + queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicFromStb1, queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + # paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + consumerId = 1 + + if self.snapshot == 0: + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 + 1/4 + 3/4)) + elif self.snapshot == 1: + expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * (1 - 1/4 + 1/4 + 3/4 - 1/4)) + + topicList = topicFromStb1 + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:true,\ + auto.commit.interval.ms:1000,\ + auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + + tdLog.info("start to check consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdSql.query(queryString) + totalRowsInserted = tdSql.getRows() + + tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt)) + + if totalConsumeRows != expectrowcnt: + tdLog.exit("tmq consume rows error!") + + # tmqCom.checkFileContent(consumerId, queryString) + + tdSql.query("drop topic %s"%topicFromStb1) + + tdLog.printNoPrefix("======== test case 2 end ...... ") + + def run(self): + tdSql.prepare() + self.prepareTestEnv() + tdLog.printNoPrefix("=============================================") + tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") + self.tmqCase1() + self.tmqCase2() + + self.prepareTestEnv() + tdLog.printNoPrefix("====================================================================") + tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") + self.snapshot = 1 + self.tmqCase1() + self.tmqCase2() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmqUpdate-1ctb.py b/tests/system-test/7-tmq/tmqUpdate-1ctb.py index 8513092be92b058a8d5c6b203351c8ec12c96a47..de11f0f9ab6c9c4a318163a3f041cce60af3e4d0 100644 --- a/tests/system-test/7-tmq/tmqUpdate-1ctb.py +++ b/tests/system-test/7-tmq/tmqUpdate-1ctb.py @@ -235,18 +235,18 @@ class TDTestCase: def run(self): tdSql.prepare() - # self.prepareTestEnv() - # tdLog.printNoPrefix("=============================================") - # tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") - # self.tmqCase1() - # self.tmqCase2() + self.prepareTestEnv() + tdLog.printNoPrefix("=============================================") + tdLog.printNoPrefix("======== snapshot is 0: only consume from wal") + self.tmqCase1() + self.tmqCase2() self.prepareTestEnv() tdLog.printNoPrefix("====================================================================") tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal") self.snapshot = 1 self.tmqCase1() - # self.tmqCase2() + self.tmqCase2() def stop(self):