diff --git a/tests/system-test/7-tmq/subscribeDb4.py b/tests/system-test/7-tmq/subscribeDb4.py index de475803fe3297cc56143baaa3639a2e9e8d2553..b30f99b542d06999c41112ad05c316d12073935d 100644 --- a/tests/system-test/7-tmq/subscribeDb4.py +++ b/tests/system-test/7-tmq/subscribeDb4.py @@ -14,19 +14,24 @@ sys.path.append("./7-tmq") from tmqCommon import * class TDTestCase: - paraDict = {'dbName': 'db12', - 'dropFlag':1, - 'vgroups': 4, - 'precision': 'ms', - 'stbName': 'stb0', - 'ctbNum': 10, + paraDict = {'dbName': 'db12', + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': 'stb0', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':16, 'count':1}, {'type': 'timestamp','count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, 'rowsPerTbl': 10000, - 'batchNum': 10, - 'startTs': 0, # 1640966400000 ----> 2022-01-01 00:00:00.000 - 'event':'', - 'columnDict': {'int':2}, - 'tagDict': {'int':1} - } + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 20, + 'showMsg': 1, + 'showRow': 1} cdbName = 'cdb' # some parameter to consumer processor @@ -57,17 +62,19 @@ class TDTestCase: tmqCom.initConsumerTable(self.cdbName) - tdCom.create_database(tdSql,self.paraDict["dbName"],self.paraDict["dropFlag"], self.paraDict['precision']) + tdCom.create_database(tdSql,self.paraDict["dbName"],self.paraDict["dropFlag"]) self.paraDict["stbName"] = 'stb1' - tdCom.create_stable(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["columnDict"],self.paraDict["tagDict"]) - tdCom.create_ctables(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["tagDict"]) - tdCom.insert_data(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"]) + tdCom.create_stable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],column_elm_list=self.paraDict["colSchema"],tag_elm_list=self.paraDict["tagSchema"],count=1, default_stbname_prefix=self.paraDict["stbName"]) + tdCom.create_ctable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],tag_elm_list=self.paraDict['tagSchema'],count=self.paraDict["ctbNum"],default_ctbname_prefix=self.paraDict["ctbPrefix"]) + tmqCom.insert_data_2(tdSql,self.paraDict["dbName"],self.paraDict["ctbPrefix"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"],self.paraDict["startTs"],self.paraDict["ctbStartIdx"]) + tdLog.info("22222222222222222") self.paraDict["stbName"] = 'stb2' - tdCom.create_stable(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["columnDict"],self.paraDict["tagDict"]) - tdCom.create_ctables(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["tagDict"]) - tdCom.insert_data(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"]) + self.paraDict["ctbPrefix"] = 'newctb' + tdCom.create_stable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],column_elm_list=self.paraDict["colSchema"],tag_elm_list=self.paraDict["tagSchema"],count=1, default_stbname_prefix=self.paraDict["stbName"]) + tdCom.create_ctable(tdSql,dbname=self.paraDict["dbName"],stbname=self.paraDict["stbName"],tag_elm_list=self.paraDict['tagSchema'],count=self.paraDict["ctbNum"],default_ctbname_prefix=self.paraDict["ctbPrefix"]) + tmqCom.insert_data_2(tdSql,self.paraDict["dbName"],self.paraDict["ctbPrefix"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"],self.paraDict["startTs"],self.paraDict["ctbStartIdx"]) tdLog.info("create topics from db") topicName1 = 'topic_%s'%(self.paraDict['dbName']) @@ -97,7 +104,7 @@ class TDTestCase: tdLog.info("act consume rows: %d, expect consume rows: between %d and %d"%(totalConsumeRows, self.expectrowcnt/2, self.expectrowcnt)) tdLog.exit("tmq consume rows error!") - time.sleep(15) + time.sleep(10) tdSql.query("drop topic %s"%topicName1) tdLog.printNoPrefix("======== test case 12 end ...... ") diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py index 9254f57c40342761e144053e751b3412efaf6b0a..b545340153477ed8cab9d18e4e0be4bd14c427de 100644 --- a/tests/system-test/7-tmq/tmqCommon.py +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -76,6 +76,22 @@ class TMQCom: resultList.append(tdSql.getData(i , 3)) return resultList + + def selectConsumeMsgResult(self,expectRows,cdbName='cdb'): + resultList=[] + while 1: + tdSql.query("select * from %s.consumeresult"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == expectRows: + break + else: + time.sleep(5) + + for i in range(expectRows): + tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) + resultList.append(tdSql.getData(i , 2)) + + return resultList def startTmqSimProcess(self,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0,alias=0): buildPath = tdCom.getBuildPath() diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index f6dfe95f453f2e38acafa185cc74cca8cea1dac5..44a3afcf73f7290a9bdf32aa667aea5d947597cb 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -136,6 +136,7 @@ python3 ./test.py -f 7-tmq/subscribeDb0.py python3 ./test.py -f 7-tmq/subscribeDb1.py python3 ./test.py -f 7-tmq/subscribeDb2.py python3 ./test.py -f 7-tmq/subscribeDb3.py +#python3 ./test.py -f 7-tmq/subscribeDb4.py python3 ./test.py -f 7-tmq/subscribeStb.py python3 ./test.py -f 7-tmq/subscribeStb0.py python3 ./test.py -f 7-tmq/subscribeStb1.py diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 81fa72d15adfb8bfb6c7fc96acc3c1c5a6dcbc40..d21b1faba4fe7b6772563e6ec86d63c02c9e5de0 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -635,6 +635,9 @@ void loop_consume(SThreadInfo* pInfo) { } } + uint64_t lastPrintTime = taosGetTimestampMs(); + uint64_t startTs = taosGetTimestampMs(); + int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000); while (running) { TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay); @@ -646,7 +649,15 @@ void loop_consume(SThreadInfo* pInfo) { taos_free_result(tmqMsg); totalMsgs++; - + + int64_t currentPrintTime = taosGetTimestampMs(); + if (currentPrintTime - lastPrintTime > 10 * 1000) { + taosFprintfFile(g_fp, + "consumer id %d has currently poll total msgs: %" PRId64 "\n", + pInfo->consumerId, totalMsgs); + lastPrintTime = currentPrintTime; + } + if (0 == once_flag) { once_flag = 1; notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM); @@ -663,7 +674,7 @@ void loop_consume(SThreadInfo* pInfo) { break; } } - + if (0 == running) { taosFprintfFile(g_fp, "receive stop signal and not continue consume\n"); } @@ -676,8 +687,6 @@ void loop_consume(SThreadInfo* pInfo) { } void* consumeThreadFunc(void* param) { - int32_t totalMsgs = 0; - SThreadInfo* pInfo = (SThreadInfo*)param; pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0); @@ -859,12 +868,27 @@ int main(int32_t argc, char* argv[]) { (void*)(&(g_stConfInfo.stThreads[i]))); } + int64_t start = taosGetTimestampUs(); + for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL); taosThreadClear(&g_stConfInfo.stThreads[i].thread); } - // printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); + int64_t end = taosGetTimestampUs(); + + int64_t totalMsgs = 0; + for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { + totalMsgs += g_stConfInfo.stThreads[i].consumeMsgCnt; + } + + int64_t t = end - start; + if (0 == t) t = 1; + + double tInMs = (double)t / 1000000.0; + taosFprintfFile(g_fp, + "Spent %.4f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.2f msgs/second\n\n", + tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs)); taosFprintfFile(g_fp, "==== close tmqlog ====\n"); taosCloseFile(&g_fp);