diff --git a/tests/system-test/7-tmq/basic5.py b/tests/system-test/7-tmq/basic5.py index c7d13d0039e613fb116b454e56daa01c23c96d7b..65840349bacd0f25d4eb99833c5c118ca2b523e0 100644 --- a/tests/system-test/7-tmq/basic5.py +++ b/tests/system-test/7-tmq/basic5.py @@ -114,7 +114,7 @@ class TDTestCase: def tmqCase1(self, cfgPath, buildPath): - tdLog.printNoPrefix("======== test case 1: ") + tdLog.printNoPrefix("======== test case 1: Produce while consume") tdLog.info("step 1: create database, stb, ctb and insert data") # create and start thread parameterDict = {'cfg': '', \ @@ -122,8 +122,8 @@ class TDTestCase: 'vgroups': 1, \ 'stbName': 'stb', \ 'ctbNum': 10, \ - 'rowsPerTbl': 10, \ - 'batchNum': 200, \ + 'rowsPerTbl': 1000, \ + 'batchNum': 100, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) @@ -219,7 +219,6 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 1 end ...... ") - def tmqCase2(self, cfgPath, buildPath): tdLog.printNoPrefix("======== test case 2: add child table with consuming ") # create and start thread @@ -340,6 +339,128 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 2 end ...... ") + def tmqCase3(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 3: tow topics, each contains a stable, \ + but at the beginning, no ctables in the stable of one topic,\ + after starting consumer, create ctables ") + # create and start thread + parameterDict = {'cfg': '', \ + 'dbName': 'db2', \ + 'vgroups': 1, \ + 'stbName': 'stb', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + prepareEnvThread.start() + + # wait db ready + while 1: + tdSql.query("show databases") + if tdSql.getRows() == 4: + print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),) + break + else: + time.sleep(1) + + tdSql.query("use %s"%parameterDict['dbName']) + # wait stb ready + while 1: + tdSql.query("show %s.stables"%parameterDict['dbName']) + if tdSql.getRows() == 1: + break + else: + time.sleep(1) + + tdLog.info("create topics from super table") + topicFromStb = 'topic_stb_column2' + topicFromCtb = 'topic_ctb_column2' + + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) + + time.sleep(1) + tdSql.query("show topics") + topic1 = tdSql.getData(0 , 0) + topic2 = tdSql.getData(1 , 0) + tdLog.info("show topics: %s, %s"%(topic1, topic2)) + if topic1 != topicFromStb and topic1 != topicFromCtb: + tdLog.exit("topic error1") + if topic2 != topicFromStb and topic2 != topicFromCtb: + tdLog.exit("topic error2") + + tdLog.info("create consume info table and consume result table") + cdbName = parameterDict["dbName"] + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + rowsOfNewCtb = 1000 + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb + topicList = topicFromStb + ifcheckdata = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + sql = "insert into consumeinfo values " + sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata) + tdSql.query(sql) + + tdLog.info("check stb if there are data") + while 1: + tdSql.query("select count(*) from %s"%parameterDict["stbName"]) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + countOfStb = tdSql.getData(0, 0) + if countOfStb != 0: + tdLog.info("count from stb: %d"%countOfStb) + break + else: + time.sleep(1) + + tdLog.info("start consume processor") + pollDelay = 5 + showMsg = 1 + showRow = 1 + + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + + # create new child table and insert data + newCtbName = 'newctb' + tdSql.query("create table %s.%s using %s.%s tags(9999)"%(parameterDict["dbName"], newCtbName, parameterDict["dbName"], parameterDict["stbName"])) + startTs = parameterDict["startTs"] + for j in range(rowsOfNewCtb): + sql = "insert into %s.%s values (%d, %d, 'tmqrow_%d') "%(parameterDict["dbName"], newCtbName, startTs + j, j, j) + tdSql.execute(sql) + tdLog.debug("insert data into new child table ............ [OK]") + + # wait for data ready + prepareEnvThread.join() + + tdLog.info("insert process end, and start to check consume result") + while 1: + tdSql.query("select * from consumeresult") + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == 1: + break + else: + time.sleep(5) + + tdSql.checkData(0 , 1, consumerId) + tdSql.checkData(0 , 3, expectrowcnt) + + tdSql.query("drop topic %s"%topicFromStb) + tdSql.query("drop topic %s"%topicFromCtb) + + tdLog.printNoPrefix("======== test case 3 end ...... ") + def run(self): tdSql.prepare() @@ -352,7 +473,7 @@ class TDTestCase: tdLog.info("cfgPath: %s" % cfgPath) self.tmqCase1(cfgPath, buildPath) - #self.tmqCase2(cfgPath, buildPath) + self.tmqCase2(cfgPath, buildPath) #self.tmqCase3(cfgPath, buildPath) def stop(self): diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 75d461c6964af4c8bfe60f2133cca8c6cbe1335a..33ddd23d8c407995fffc9ba07278dc6f2de28332 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -98,12 +98,24 @@ static void printHelp() { } void initLogFile() { - // FILE *fp = fopen(g_stConfInfo.resultFileName, "a"); - char file[256]; - sprintf(file, "%s/../log/tmqlog.txt", configDir); - TdFilePtr pFile = taosOpenFile(file, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); + time_t now; + struct tm curTime; + char filename[256]; + + now = taosTime(NULL); + taosLocalTime(&now, &curTime); + sprintf(filename,"%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt", + configDir, + curTime.tm_year+1900, + curTime.tm_mon+1, + curTime.tm_mday, + curTime.tm_hour, + curTime.tm_min, + curTime.tm_sec); + //sprintf(filename, "%s/../log/tmqlog.txt", configDir); + TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); if (NULL == pFile) { - fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt"); + fprintf(stderr, "Failed to open %s for save result\n", filename); exit(-1); } g_fp = pFile;