From 01c94a775b6779e501454cddef459cc958c12a62 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 28 Sep 2022 14:17:26 +0800 Subject: [PATCH] feat(tmq): push optimization --- source/client/src/clientTmq.c | 18 +++++++++--------- source/dnode/mnode/impl/src/mndConsumer.c | 1 + source/dnode/mnode/impl/src/mndTopic.c | 2 ++ source/dnode/vnode/src/tq/tq.c | 8 +++++++- source/dnode/vnode/src/tq/tqPush.c | 12 ++++++------ source/libs/executor/src/projectoperator.c | 10 +++++++--- source/libs/executor/src/scanoperator.c | 2 ++ 7 files changed, 34 insertions(+), 19 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index c9c02a77e1..e0f48a4534 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -515,7 +515,7 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg; topic = pMetaRspObj->topic; vgId = pMetaRspObj->vgId; - } else if(TD_RES_TMQ_METADATA(msg)) { + } else if (TD_RES_TMQ_METADATA(msg)) { SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg; topic = pRspObj->topic; vgId = pRspObj->vgId; @@ -715,7 +715,7 @@ void tmqSendHbReq(void* param, void* tmrId) { int32_t epoch = tmq->epoch; SMqHbReq* pReq = taosMemoryMalloc(sizeof(SMqHbReq)); if (pReq == NULL) goto OVER; - pReq->consumerId = consumerId; + pReq->consumerId = htobe64(consumerId); pReq->epoch = epoch; SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); @@ -1661,9 +1661,9 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { // build rsp void* pRsp = NULL; - if(pollRspWrapper->taosxRsp.createTableNum == 0){ + if (pollRspWrapper->taosxRsp.createTableNum == 0) { pRsp = tmqBuildRspFromWrapper(pollRspWrapper); - }else{ + } else { pRsp = tmqBuildTaosxRspFromWrapper(pollRspWrapper); } taosFreeQitem(pollRspWrapper); @@ -1850,12 +1850,12 @@ const char* tmq_get_table_name(TAOS_RES* res) { return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); } else if (TD_RES_TMQ_METADATA(res)) { SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)res; - if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 || - pRspObj->resIter >= pRspObj->rsp.blockNum) { - return NULL; - } - return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); + if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 || + pRspObj->resIter >= pRspObj->rsp.blockNum) { + return NULL; } + return (const char*)taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter); + } return NULL; } diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index abc23e3d95..3dfc10e554 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -272,6 +272,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { + mError("consumer %ld not exist", consumerId); terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST; return -1; } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index b9647a28fb..7308dc375e 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -379,6 +379,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * topicObj.ast = strdup(pCreate->ast); topicObj.astLen = strlen(pCreate->ast) + 1; + qDebugL("ast %s", topicObj.ast); + SNode *pAst = NULL; if (nodesStringToNode(pCreate->ast, &pAst) != 0) { taosMemoryFree(topicObj.ast); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 21136405cb..490a313cd4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -65,6 +65,11 @@ static void destroySTqHandle(void* data) { } } +static void tqPushEntryFree(void* data) { + void* p = *(void**)data; + taosMemoryFree(p); +} + STQ* tqOpen(const char* path, SVnode* pVnode) { STQ* pTq = taosMemoryCalloc(1, sizeof(STQ)); if (pTq == NULL) { @@ -80,7 +85,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { taosInitRWLatch(&pTq->pushLock); pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - taosHashSetFreeFp(pTq->pPushMgr, taosMemoryFree); + taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree); pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); @@ -548,6 +553,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (pPushEntry != NULL) { pPushEntry->pHandle = pHandle; pPushEntry->pInfo = pMsg->info; + dataRsp.withTbName = 0; memcpy(&pPushEntry->dataRsp, &dataRsp, sizeof(SMqDataRsp)); pPushEntry->rspHead.consumerId = consumerId; pPushEntry->rspHead.epoch = reqEpoch; diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index c42cfeb7b8..13ed36e2bc 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -245,7 +245,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; - if (qExecTask(task, NULL, &ts) < 0) { + if (qExecTask(task, &pDataBlock, &ts) < 0) { ASSERT(0); } @@ -256,21 +256,19 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); pRsp->blockNum++; } + if (pRsp->blockNum > 0) { // set offset tqOffsetResetToLog(&pRsp->rspOffset, ver); // remove from hash size_t kLen; - void* key = taosHashGetKey(pPushEntry, &kLen); + void* key = taosHashGetKey(pIter, &kLen); void* keyCopy = taosMemoryMalloc(kLen); memcpy(keyCopy, key, kLen); taosArrayPush(cachedKeys, &keyCopy); taosArrayPush(cachedKeyLens, &kLen); - if (taosHashRemove(pTq->pPushMgr, key, kLen) != 0) { - ASSERT(0); - } tqPushDataRsp(pTq, pPushEntry); } } @@ -278,7 +276,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) for (int32_t i = 0; i < taosArrayGetSize(cachedKeys); i++) { void* key = taosArrayGetP(cachedKeys, i); size_t kLen = *(size_t*)taosArrayGet(cachedKeyLens, i); - taosHashRemove(pTq->pPushMgr, key, kLen); + if (taosHashRemove(pTq->pPushMgr, key, kLen) != 0) { + ASSERT(0); + } } taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree); taosArrayDestroy(cachedKeyLens); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 2f12a0d19b..b7c5b5634e 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -53,7 +53,7 @@ static void destroyIndefinitOperatorInfo(void* param) { SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -184,7 +184,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) { int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows); blockDataKeepFirstNRows(pBlock, keepRows); - //TODO: optimize it later when partition by + limit + // TODO: optimize it later when partition by + limit if ((pLimitInfo->slimit.limit == -1 && pLimitInfo->currentGroupId == 0) || (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups)) { doSetOperatorCompleted(pOperator); @@ -206,6 +206,10 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { blockDataCleanup(pFinalRes); SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + if (pTaskInfo->streamInfo.pReq) { + pOperator->status = OP_OPENED; + } + if (pOperator->status == OP_EXEC_DONE) { if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { pOperator->status = OP_OPENED; @@ -254,7 +258,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } setInfoForNewGroup(pBlock, pLimitInfo, pOperator); - if (pOperator->status == OP_EXEC_DONE) { + if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH && pOperator->status == OP_EXEC_DONE) { break; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index fa05608ced..46280295b9 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1443,6 +1443,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) { qError("submit msg messed up when initing stream submit block %p", pSubmit); pInfo->tqReader->pMsg = NULL; + pTaskInfo->streamInfo.pReq = NULL; ASSERT(0); } } @@ -1468,6 +1469,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { pInfo->tqReader->pMsg = NULL; pTaskInfo->streamInfo.pReq = NULL; + return NULL; } if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { -- GitLab