From 757ac9b1d9749be7a2d52de06bcc67e4b7af8419 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 30 May 2023 14:20:21 +0800 Subject: [PATCH] fix:add test cases --- tests/system-test/7-tmq/stbFilterWhere.py | 72 ++++++++++++++++++++++- 1 file changed, 70 insertions(+), 2 deletions(-) diff --git a/tests/system-test/7-tmq/stbFilterWhere.py b/tests/system-test/7-tmq/stbFilterWhere.py index baf042d71c..8d8d046cef 100644 --- a/tests/system-test/7-tmq/stbFilterWhere.py +++ b/tests/system-test/7-tmq/stbFilterWhere.py @@ -56,7 +56,7 @@ class TDTestCase: startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) return - def tmqCaseError(self, topicName, condition): + def tmqCase_columnError(self, topicName, condition): tdLog.printNoPrefix("======== test case error: ") paraDict = {'dbName': 'dbt', 'dropFlag': 1, @@ -140,14 +140,82 @@ class TDTestCase: tdLog.printNoPrefix("======== test case end ...... ") + def tmqCase_addNewTable_dropTag(self, topicName, condition): + tdLog.printNoPrefix("======== test case1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'replica': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 100, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 2, + 'showMsg': 1, + 'showRow': 1} + + expectRowsList = [] + tmqCom.initConsumerTable() + + tdLog.info("create topics from stb with tag filter") + topicString = "create topic %s as stable %s.%s where %s" %(topicName, paraDict['dbName'], paraDict['stbName'], condition) + tdLog.info("create topic sql: %s"%topicString) + tdSql.execute(topicString) + + queryString = "select * from %s.%s where %s" %(paraDict['dbName'], paraDict['stbName'], condition) + tdSql.query(queryString) + expectRowsList.append(tdSql.getRows() + 1) + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] + topicList = topicName + ifcheckdata = 0 + ifManualCommit = 1 + keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow']) + + #add new table with one data + tdLog.info("start insert data") + insertString = "insert into %s.tmp using %s.%s tags(1, 1, 1, 't4', 't5') values(now, 1, 1, 1, 'c4', 'c5', now)" %(paraDict['dbName'], paraDict['dbName'], paraDict['stbName']) + tdSql.execute(insertString) + + #test drop tag + tdSql.error("alter stable %s.%s drop tag t1" %(paraDict['dbName'], paraDict['stbName'])) + tdSql.execute("alter stable %s.%s drop tag t2" %(paraDict['dbName'], paraDict['stbName'])) + tdSql.execute("alter stable %s.%s drop column c2" %(paraDict['dbName'], paraDict['stbName'])) + + tdLog.info("wait the consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + + if expectRowsList[0] != resultList[0]: + tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0])) + tdLog.exit("0 tmq consume rows error!") + + tdLog.printNoPrefix("======== test case1 end ...... ") + def run(self): tdSql.prepare() self.prepareTestEnv() - self.tmqCaseError("t1", "c1 = 4 and t1 = 3") + self.tmqCase_columnError("t1", "c1 = 4 and t1 = 3") self.tmqCase("t2", "2 > 1") self.tmqCase("t3", "t4 = 'beijing'") self.tmqCase("t4", "t4 > t3") self.tmqCase("t5", "t3 = t4") + self.tmqCase_addNewTable_dropTag("t6", "t1 = 1") def stop(self): tdSql.close() -- GitLab