From aa610b27a48432426e49dec259eafa896c46780b Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 9 Jun 2023 18:51:10 +0800 Subject: [PATCH] feat:add parameters for consumer & add offset rows for subscription --- source/client/src/clientTmq.c | 12 +- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/src/mndDef.c | 42 ++- source/dnode/mnode/impl/src/mndSubscribe.c | 202 +++++------ .../system-test/7-tmq/checkOffsetRowParams.py | 313 ++++++++++++++++++ tests/system-test/7-tmq/tmqCommon.py | 2 +- 6 files changed, 454 insertions(+), 118 deletions(-) create mode 100644 tests/system-test/7-tmq/checkOffsetRowParams.py diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 2e3a634577..c15f6d55bc 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -809,6 +809,7 @@ void tmqSendHbReq(void* param, void* tmrId) { offRows->vgId = pVg->vgId; offRows->rows = pVg->numOfRows; offRows->offset = pVg->offsetInfo.committedOffset; + tscDebug("report row:%lldd, offset:%" PRId64, offRows->rows, offRows->offset.version); } } @@ -1695,7 +1696,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, return pRspObj; } -SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) { +SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) { SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj)); pRspObj->resType = RES_TYPE__TMQ_METADATA; tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); @@ -1710,6 +1711,13 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) { setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); } + // extract the rows in this data packet + for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) { + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pRspObj->rsp.blockData, i); + int64_t rows = htobe64(pRetrieve->numOfRows); + pVg->numOfRows += rows; + (*numOfRows) += rows; + } return pRspObj; } @@ -2007,7 +2015,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (pollRspWrapper->taosxRsp.createTableNum == 0) { pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows); } else { - pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper); + pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows); } tmq->totalRows += numOfRows; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index ca0ee9e058..f74a593b00 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -595,6 +595,7 @@ typedef struct { int64_t stbUid; SHashObj* consumerHash; // consumerId -> SMqConsumerEp SArray* unassignedVgs; // SArray + SArray* offsetRows; char dbName[TSDB_DB_FNAME_LEN]; } SMqSubscribeObj; diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index b7ad56d5d8..b6fb96882d 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -515,7 +515,6 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { SMqConsumerEp newEp = { .consumerId = pConsumerEp->consumerId, .vgs = taosArrayDup(pConsumerEp->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp), - .offsetRows = NULL, }; taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp)); } @@ -535,6 +534,7 @@ void tDeleteSubscribeObj(SMqSubscribeObj *pSub) { } taosHashCleanup(pSub->consumerHash); taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp); + taosArrayDestroy(pSub->offsetRows); } int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { @@ -561,6 +561,23 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { if (cnt != sz) return -1; tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp); tlen += taosEncodeString(buf, pSub->dbName); + + int32_t szVgs = taosArrayGetSize(pSub->offsetRows); + tlen += taosEncodeFixedI32(buf, szVgs); + for (int32_t j= 0; j < szVgs; ++j) { + OffsetRows *offRows = taosArrayGet(pSub->offsetRows, j); + tlen += taosEncodeFixedI32(buf, offRows->vgId); + tlen += taosEncodeFixedI64(buf, offRows->rows); + tlen += taosEncodeFixedI8(buf, offRows->offset.type); + if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) { + tlen += taosEncodeFixedI64(buf, offRows->offset.uid); + tlen += taosEncodeFixedI64(buf, offRows->offset.ts); + } else if (offRows->offset.type == TMQ_OFFSET__LOG) { + tlen += taosEncodeFixedI64(buf, offRows->offset.version); + } else { + // do nothing + } + } return tlen; } @@ -585,6 +602,29 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) { buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp)); buf = taosDecodeStringTo(buf, pSub->dbName); + + if (sver > 1){ + int32_t szVgs = 0; + buf = taosDecodeFixedI32(buf, &szVgs); + if(szVgs > 0){ + pSub->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows)); + if (NULL == pSub->offsetRows) return NULL; + for (int32_t j= 0; j < szVgs; ++j) { + OffsetRows* offRows = taosArrayReserve(pSub->offsetRows, 1); + buf = taosDecodeFixedI32(buf, &offRows->vgId); + buf = taosDecodeFixedI64(buf, &offRows->rows); + buf = taosDecodeFixedI8(buf, &offRows->offset.type); + if (offRows->offset.type == TMQ_OFFSET__SNAPSHOT_DATA || offRows->offset.type == TMQ_OFFSET__SNAPSHOT_META) { + buf = taosDecodeFixedI64(buf, &offRows->offset.uid); + buf = taosDecodeFixedI64(buf, &offRows->offset.ts); + } else if (offRows->offset.type == TMQ_OFFSET__LOG) { + buf = taosDecodeFixedI64(buf, &offRows->offset.version); + } else { + // do nothing + } + } + } + } return (void *)buf; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index acef882c20..cadd1f53f0 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -449,8 +449,26 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR SMqRebOutputVg* pRebOutput = (SMqRebOutputVg *)pRemovedIter; taosArrayPush(pOutput->rebVgs, pRebOutput); - if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){ // if all consumer is removed, put all vg into unassigned - taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); + if(taosHashGetSize(pOutput->pSub->consumerHash) == 0){ // if all consumer is removed + taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp); // put all vg into unassigned + SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pInput->pRebInfo->key); // put all offset rows + if(pSub){ + taosRLockLatch(&pSub->lock); + if(pOutput->pSub->offsetRows == NULL){ + pOutput->pSub->offsetRows = taosArrayInit(4, sizeof(OffsetRows)); + }else{ + taosArrayClear(pOutput->pSub->offsetRows); + } + pIter = NULL; + while(1){ + pIter = taosHashIterate(pSub->consumerHash, pIter); + if (pIter == NULL) break; + SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; + taosArrayAddAll(pOutput->pSub->offsetRows, pConsumerEp->offsetRows); + } + taosRUnLockLatch(&pSub->lock); + mndReleaseSubscribe(pMnode, pSub); + } } } @@ -890,6 +908,10 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubsc pOldSub->unassignedVgs = pNewSub->unassignedVgs; pNewSub->unassignedVgs = tmp1; + SArray *tmp2 = pOldSub->offsetRows; + pOldSub->offsetRows = pNewSub->offsetRows; + pNewSub->offsetRows = tmp2; + taosWUnLockLatch(&pOldSub->lock); return 0; } @@ -1028,6 +1050,61 @@ END: return code; } +static int32_t buildResult(SSDataBlock *pBlock, int32_t* numOfRows, int64_t consumerId, const char* topic, const char* cgroup, SArray* vgs, SArray *offsetRows){ + int32_t sz = taosArrayGetSize(vgs); + for (int32_t j = 0; j < sz; j++) { + SMqVgEp *pVgEp = taosArrayGetP(vgs, j); + + SColumnInfoData *pColInfo; + int32_t cols = 0; + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, *numOfRows, (const char *)topic, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, *numOfRows, (const char *)cgroup, false); + + // vg id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, *numOfRows, (const char *)&pVgEp->vgId, false); + + // consumer id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, *numOfRows, (const char *)&consumerId, consumerId == -1); + + mDebug("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), + consumerId, varDataVal(cgroup), pVgEp->vgId); + + // offset + OffsetRows *data = NULL; + for(int i = 0; i < taosArrayGetSize(offsetRows); i++){ + OffsetRows *tmp = taosArrayGet(offsetRows, i); + if(tmp->vgId != pVgEp->vgId){ + continue; + } + data = tmp; + } + if(data){ + // vg id + char buf[TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0}; + tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset); + varDataSetLen(buf, strlen(varDataVal(buf))); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, *numOfRows, (const char *)buf, false); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, *numOfRows, (const char *)&data->rows, false); + }else{ + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetNULL(pColInfo, *numOfRows); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetNULL(pColInfo, *numOfRows); + mError("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId); + } + (*numOfRows)++; + } + return 0; +} + int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -1048,6 +1125,13 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock blockDataEnsureCapacity(pBlock, numOfRows + pSub->vgNum); } + // topic and cgroup + char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; + char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; + mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false); + varDataSetLen(topic, strlen(varDataVal(topic))); + varDataSetLen(cgroup, strlen(varDataVal(cgroup))); + SMqConsumerEp *pConsumerEp = NULL; void *pIter = NULL; while (1) { @@ -1055,121 +1139,11 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock if (pIter == NULL) break; pConsumerEp = (SMqConsumerEp *)pIter; - int32_t sz = taosArrayGetSize(pConsumerEp->vgs); - for (int32_t j = 0; j < sz; j++) { - SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); - - SColumnInfoData *pColInfo; - int32_t cols = 0; - - // topic and cgroup - char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; - char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; - mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false); - varDataSetLen(topic, strlen(varDataVal(topic))); - varDataSetLen(cgroup, strlen(varDataVal(cgroup))); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)topic, false); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false); - - // vg id - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false); - - // consumer id - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false); - - mDebug("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic), - pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId); - - // offset - OffsetRows *data = NULL; - for(int i = 0; i < taosArrayGetSize(pConsumerEp->offsetRows); i++){ - OffsetRows *tmp = taosArrayGet(pConsumerEp->offsetRows, i); - if(data->vgId != pVgEp->vgId){ - continue; - } - data = tmp; - } - if(data){ - // vg id - char buf[TSDB_OFFSET_LEN + VARSTR_HEADER_SIZE] = {0}; - tFormatOffset(varDataVal(buf), TSDB_OFFSET_LEN, &data->offset); - varDataSetLen(buf, strlen(varDataVal(buf))); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)buf, false); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&data->rows, false); - }else{ - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetNULL(pColInfo, numOfRows); - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetNULL(pColInfo, numOfRows); - mError("mnd show subscriptions: do not find vgId:%d in offsetRows", pVgEp->vgId); - } -//#if 0 -// // subscribe time -// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); -// colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false); -// -// // rebalance time -// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); -// colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0); -//#endif - - numOfRows++; - } + buildResult(pBlock, &numOfRows, pConsumerEp->consumerId, topic, cgroup, pConsumerEp->vgs, pConsumerEp->offsetRows); } // do not show for cleared subscription - int32_t sz = taosArrayGetSize(pSub->unassignedVgs); - for (int32_t i = 0; i < sz; i++) { - SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); - - SColumnInfoData *pColInfo; - int32_t cols = 0; - - // topic and cgroup - char topic[TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; - char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; - mndSplitSubscribeKey(pSub->key, varDataVal(topic), varDataVal(cgroup), false); - varDataSetLen(topic, strlen(varDataVal(topic))); - varDataSetLen(cgroup, strlen(varDataVal(cgroup))); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)topic, false); - - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false); - - // vg id - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pVgEp->vgId, false); - - // consumer id - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, NULL, true); - - mDebug("mnd show subscriptions(unassigned): topic %s, cgroup %s vgid %d", varDataVal(topic), varDataVal(cgroup), - pVgEp->vgId); - - // offset -#if 0 - // subscribe time - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->subscribeTime, false); - - // rebalance time - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataSetVal(pColInfo, numOfRows, (const char *)&pSub->rebalanceTime, pConsumer->rebalanceTime == 0); -#endif - - numOfRows++; - } + buildResult(pBlock, &numOfRows, -1, topic, cgroup, pSub->unassignedVgs, pSub->offsetRows); pBlock->info.rows = numOfRows; diff --git a/tests/system-test/7-tmq/checkOffsetRowParams.py b/tests/system-test/7-tmq/checkOffsetRowParams.py new file mode 100644 index 0000000000..8e5ff63f60 --- /dev/null +++ b/tests/system-test/7-tmq/checkOffsetRowParams.py @@ -0,0 +1,313 @@ + +import taos +import sys +import time +import socket +import os +import threading +from enum import Enum + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class actionType(Enum): + CREATE_DATABASE = 0 + CREATE_STABLE = 1 + CREATE_CTABLE = 2 + INSERT_DATA = 3 + +class TDTestCase: + hostname = socket.gethostname() + #rpcDebugFlagVal = '143' + #clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #print ("===================: ", updatecfgDict) + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files or "taosd.exe" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def newcur(self,cfg,host,port): + user = "root" + password = "taosdata" + con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port) + cur=con.cursor() + print(cur) + return cur + + def initConsumerTable(self,cdbName='cdb'): + tdLog.info("create consume database, and consume info table, and consume result table") + tdSql.query("create database if not exists %s vgroups 1 wal_retention_period 3600"%(cdbName)) + tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) + tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) + + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + def initConsumerInfoTable(self,cdbName='cdb'): + tdLog.info("drop consumeinfo table") + tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) + + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) + tdLog.info("consume info sql: %s"%sql) + tdSql.query(sql) + + def selectConsumeResult(self,expectRows,cdbName='cdb'): + resultList=[] + while 1: + tdSql.query("select * from %s.consumeresult"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == expectRows: + break + else: + time.sleep(5) + + for i in range(expectRows): + tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) + resultList.append(tdSql.getData(i , 3)) + + return resultList + + def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): + if valgrind == 1: + logFile = cfgPath + '/../log/valgrind-tmq.log' + shellCmd = 'nohup valgrind --log-file=' + logFile + shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' + + if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" + else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + + def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1): + if dropFlag == 1: + tsql.execute("drop database if exists %s"%(dbName)) + + tsql.execute("create database if not exists %s vgroups %d replica %d wal_retention_period 3600"%(dbName, vgroups, replica)) + tdLog.debug("complete to create database %s"%(dbName)) + return + + def create_stable(self,tsql, dbName,stbName): + tsql.execute("create table if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(dbName, stbName)) + tdLog.debug("complete to create %s.%s" %(dbName, stbName)) + return + + def create_ctables(self,tsql, dbName,stbName,ctbNum): + tsql.execute("use %s" %dbName) + pre_create = "create table" + sql = pre_create + #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + for i in range(ctbNum): + sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1) + if (i > 0) and (i%100 == 0): + tsql.execute(sql) + sql = pre_create + if sql != pre_create: + tsql.execute(sql) + + tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) + return + + def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=0): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + + if startTs == 0: + t = time.time() + startTs = int(round(t * 1000)) + + #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) + rowsOfSql = 0 + for i in range(ctbNum): + sql += " %s_%d values "%(stbName,i) + for j in range(rowsPerTbl): + sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) + rowsOfSql += 1 + if (j > 0) and ((rowsOfSql == batchNum) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + rowsOfSql = 0 + if j < rowsPerTbl - 1: + sql = "insert into %s_%d values " %(stbName,i) + else: + sql = "insert into " + #end sql + if sql != pre_insert: + #print("insert sql:%s"%sql) + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def prepareEnv(self, **parameterDict): + # create new connector for my thread + tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) + + if parameterDict["actionType"] == actionType.CREATE_DATABASE: + self.create_database(tsql, parameterDict["dbName"]) + elif parameterDict["actionType"] == actionType.CREATE_STABLE: + self.create_stable(tsql, parameterDict["dbName"], parameterDict["stbName"]) + elif parameterDict["actionType"] == actionType.CREATE_CTABLE: + self.create_ctables(tsql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) + elif parameterDict["actionType"] == actionType.INSERT_DATA: + self.insert_data(tsql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"], \ + parameterDict["rowsPerTbl"],parameterDict["batchNum"]) + else: + tdLog.exit("not support's action: ", parameterDict["actionType"]) + + return + + def tmqCase1(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 1: ") + + self.initConsumerTable() + + # create and start thread + parameterDict = {'cfg': '', \ + 'actionType': 0, \ + 'dbName': 'db1', \ + 'dropFlag': 1, \ + 'vgroups': 4, \ + 'replica': 1, \ + 'stbName': 'stb1', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 10000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + + self.create_database(tdSql, parameterDict["dbName"]) + self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"]) + + tdLog.info("create topics from stb1") + topicFromStb1 = 'topic_stb1' + + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + topicList = topicFromStb1 + ifcheckdata = 0 + ifManualCommit = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:true,\ + auto.commit.interval.ms:2000,\ + auto.offset.reset:earliest' + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + pollDelay = 10 + showMsg = 1 + showRow = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + + time.sleep(2) + tdLog.info("start show subscriptions 1") + while(1): + tdSql.query("show subscriptions") + if (tdSql.getRows() == 0): + tdLog.info("sleep") + time.sleep(1) + elif (tdSql.queryResult[0][4] != None): + tdSql.checkData(0, 4, "offset(reset to earlieast)") + tdSql.checkData(0, 5, 0) + break + + tdLog.info("start insert data") + self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) + self.insert_data(tdSql,\ + parameterDict["dbName"],\ + parameterDict["stbName"],\ + parameterDict["ctbNum"],\ + parameterDict["rowsPerTbl"],\ + parameterDict["batchNum"]) + + time.sleep(2) + tdLog.info("start show subscriptions 2") + tdSql.query("show subscriptions") + tdSql.checkRows(4) + print(tdSql.queryResult) + # tdSql.checkData(0, 4, 'offset(log) ver:103') + tdSql.checkData(0, 5, 10000) + # tdSql.checkData(1, 4, 'offset(log) ver:103') + tdSql.checkData(1, 5, 10000) + # tdSql.checkData(2, 4, 'offset(log) ver:303') + tdSql.checkData(2, 5, 50000) + # tdSql.checkData(3, 4, 'offset(log) ver:239') + tdSql.checkData(3, 5, 30000) + + tdLog.info("insert process end, and start to check consume result") + expectRows = 1 + resultList = self.selectConsumeResult(expectRows) + + time.sleep(2) + tdLog.info("start show subscriptions 3") + tdSql.query("show subscriptions") + tdSql.checkRows(4) + print(tdSql.queryResult) + tdSql.checkData(0, 3, None) + # tdSql.checkData(0, 4, 'offset(log) ver:103') + tdSql.checkData(0, 5, 10000) + # tdSql.checkData(1, 4, 'offset(log) ver:103') + tdSql.checkData(1, 5, 10000) + # tdSql.checkData(2, 4, 'offset(log) ver:303') + tdSql.checkData(2, 5, 50000) + # tdSql.checkData(3, 4, 'offset(log) ver:239') + tdSql.checkData(3, 5, 30000) + + tdSql.query("drop topic %s"%topicFromStb1) + + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def run(self): + tdSql.prepare() + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + cfgPath = buildPath + "/../sim/psim/cfg" + tdLog.info("cfgPath: %s" % cfgPath) + + self.tmqCase1(cfgPath, buildPath) + # self.tmqCase2(cfgPath, buildPath) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py index 6b633fa193..97f38f01b6 100644 --- a/tests/system-test/7-tmq/tmqCommon.py +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -171,7 +171,7 @@ class TMQCom: if dropFlag == 1: tsql.execute("drop database if exists %s"%(dbName)) - tsql.execute("create database if not exists %s vgroups %d replica %d"%(dbName, vgroups, replica)) + tsql.execute("create database if not exists %s vgroups %d replica %d wal_retention_period 3600"%(dbName, vgroups, replica)) tdLog.debug("complete to create database %s"%(dbName)) return -- GitLab