From 30c2a9c61927c66a61673d2f9996455ebae3c0ac Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 12 Aug 2023 18:27:51 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/stream.c | 2 +- source/libs/stream/src/streamDispatch.c | 9 +++++---- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 32d6dc65d9..30c941d106 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -52,7 +52,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); -int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData); +int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData); int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId, diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index fa24c01418..39f4e7fc7a 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -268,7 +268,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } else { qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr, pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount); - return streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); + return doDispatchAllBlocks(pTask, pTask->msgInfo.pData); } } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 8334ea1c88..7eef42e289 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -436,7 +436,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S return 0; } -int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { +int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { int32_t code = 0; int32_t numOfBlocks = taosArrayGetSize(pData->blocks); @@ -552,7 +552,7 @@ static void doRetryDispatchData(void* param, void* tmrId) { SStreamTask* pTask = param; ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); - int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); + int32_t code = doDispatchAllBlocks(pTask, pTask->msgInfo.pData); if (code != TSDB_CODE_SUCCESS) { qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr); atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); @@ -593,12 +593,13 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } pTask->msgInfo.pData = pBlock; - ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); + ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK || pBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER || + pBlock->type == STREAM_INPUT__TRANS_STATE); int32_t retryCount = 0; while (1) { - int32_t code = streamDispatchAllBlocks(pTask, pBlock); + int32_t code = doDispatchAllBlocks(pTask, pBlock); if (code == TSDB_CODE_SUCCESS) { break; } -- GitLab