From e7df9b8fa0e029c7c3ed1ef8fdc7ba0d997cb697 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 16 May 2023 14:12:48 +0800 Subject: [PATCH] enh: optimize copy group result --- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 5 ++++ source/libs/executor/src/executorInt.c | 31 +++++++++++---------- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 0152e5d0b1..bbd77c0828 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -120,6 +120,11 @@ int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { } int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { + if (NULL == pMgmt->pMnode) { + const STraceId *trace = &pMsg->info.traceId; + dGError("msg:%p, stop to pre-process in mnode since mnode is NULL, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); + return -1; + } pMsg->info.node = pMgmt->pMnode; if (mndPreProcessQueryMsg(pMsg) != 0) { const STraceId *trace = &pMsg->info.traceId; diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 62ab2d9df2..fc69ad1add 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -82,7 +82,7 @@ static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SC static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol); static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, - SGroupResInfo* pGroupResInfo, int32_t threshold); + SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup); SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) { SFilePage* pData = NULL; @@ -776,7 +776,7 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos } int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, - SGroupResInfo* pGroupResInfo, int32_t threshold) { + SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup) { SExprInfo* pExprInfo = pSup->pExprInfo; int32_t numOfExprs = pSup->numOfExprs; int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; @@ -803,20 +803,23 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS continue; } - if (pBlock->info.id.groupId == 0) { - pBlock->info.id.groupId = pPos->groupId; - } else { - // current value belongs to different group, it can't be packed into one datablock - if (pBlock->info.id.groupId != pPos->groupId) { - releaseBufPage(pBuf, page); - break; + if (!ignoreGroup) { + if (pBlock->info.id.groupId == 0) { + pBlock->info.id.groupId = pPos->groupId; + } else { + // current value belongs to different group, it can't be packed into one datablock + if (pBlock->info.id.groupId != pPos->groupId) { + releaseBufPage(pBuf, page); + break; + } } } if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { - blockDataEnsureCapacity(pBlock, pBlock->info.rows + pRow->numOfRows); + uint32_t newSize = pBlock->info.rows + pRow->numOfRows + (numOfRows - i) > 1 ? 1 : 0; + blockDataEnsureCapacity(pBlock, newSize); qDebug("datablock capacity not sufficient, expand to required:%" PRId64 ", current capacity:%d, %s", - (pRow->numOfRows + pBlock->info.rows), pBlock->info.capacity, GET_TASKID(pTaskInfo)); + newSize, pBlock->info.capacity, GET_TASKID(pTaskInfo)); // todo set the pOperator->resultInfo size } @@ -853,7 +856,7 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr // clear the existed group id pBlock->info.id.groupId = 0; ASSERT(!pbInfo->mergeResultBlock); - doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold); + doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, false); void* tbname = NULL; if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { @@ -880,10 +883,10 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG // clear the existed group id pBlock->info.id.groupId = 0; if (!pbInfo->mergeResultBlock) { - doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold); + doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, false); } else { while (hasRemainResults(pGroupResInfo)) { - doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold); + doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, true); if (pBlock->info.rows >= pOperator->resultInfo.threshold) { break; } -- GitLab