From 8a1b0ed05277e4e9157ee74cef03109afc004a2b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 11 May 2023 10:35:09 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/tq/tq.c | 3 +++ source/dnode/vnode/src/tq/tqRead.c | 10 +++++----- source/dnode/vnode/src/tq/tqScan.c | 2 +- source/libs/executor/src/scanoperator.c | 4 ++-- 5 files changed, 12 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 0b355a2b0f..d9460f50d8 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -256,7 +256,7 @@ int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); int32_t tqNextBlockInWal(STqReader* pReader); -bool tqNextBlockImpl(STqReader *pReader); +bool tqNextBlockImpl(STqReader *pReader, const char* idstr); int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData); int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a0f913ec10..4fb7d765d8 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -608,6 +608,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { return -1; } + qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } else if (pTask->taskLevel == TASK_LEVEL__AGG) { pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); if (pTask->pState == NULL) { @@ -621,6 +622,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { if (pTask->exec.pExecutor == NULL) { return -1; } + + qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } // sink diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 9c89bd3f82..af3969cc29 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -419,15 +419,15 @@ int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, i return 0; } -bool tqNextBlockImpl(STqReader* pReader) { +bool tqNextBlockImpl(STqReader* pReader, const char* idstr) { if (pReader->msg.msgStr == NULL) { return false; } - int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); - while (pReader->nextBlk < blockSz) { - tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg.msgStr, pReader->msg.msgLen, - pReader->msg.ver, pReader->nextBlk); + int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData); + while (pReader->nextBlk < numOfBlocks) { + tqDebug("tq reader next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, + pReader->msg.ver, pReader->nextBlk, numOfBlocks, idstr); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); if (pReader->tbIdHash == NULL) { diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 800bcc8b71..52f92cd229 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -205,7 +205,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { STqReader* pReader = pExec->pTqReader; tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver); - while (tqNextBlockImpl(pReader)) { + while (tqNextBlockImpl(pReader, NULL)) { taosArrayClear(pBlocks); taosArrayClear(pSchemas); SSubmitTbData* pSubmitTbDataRet = NULL; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f7e730242a..0508d70661 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1626,7 +1626,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { blockDataCleanup(pInfo->pRes); SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; - while (tqNextBlockImpl(pInfo->tqReader)) { + while (tqNextBlockImpl(pInfo->tqReader, NULL)) { int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL); if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) { continue; @@ -2077,7 +2077,7 @@ FETCH_NEXT_BLOCK: blockDataCleanup(pInfo->pRes); - while (tqNextBlockImpl(pInfo->tqReader)) { + while (tqNextBlockImpl(pInfo->tqReader, pTaskInfo->id.str)) { int32_t code = tqRetrieveDataBlock(pInfo->tqReader, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) { continue; -- GitLab