diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 92e64f075fc5b649d147c230a8ad7feb8704006a..8482ba8a7866f396b94246adc7a96d0cd710bc9b 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -170,6 +170,7 @@ typedef enum EStreamType { STREAM_PULL_DATA, STREAM_PULL_OVER, STREAM_FILL_OVER, + STREAM_CHECKPOINT, STREAM_CREATE_CHILD_TABLE, STREAM_TRANS_STATE, } EStreamType; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 30c941d1063e4f7698f37b985438728e59583694..b0fed5dde1efe0f5bb2931b08d5c0b0bc6becffb 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -52,7 +52,6 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); -int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData); int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId, diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b6a03153d37f3ce36a4c22790a37961afe2c6f1a..41ed784d1644b9b171541af7ed86a9bff9805153 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -440,9 +440,8 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S return 0; } -int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { +static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { int32_t code = 0; - int32_t numOfBlocks = taosArrayGetSize(pData->blocks); ASSERT(numOfBlocks != 0); @@ -450,7 +449,7 @@ int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { SStreamDispatchReq req = {0}; int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId; - code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, ); + code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -491,7 +490,7 @@ int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { for (int32_t i = 0; i < vgSz; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId); + code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type); if (code != TSDB_CODE_SUCCESS) { goto FAIL_SHUFFLE_DISPATCH; } @@ -501,8 +500,7 @@ int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); // TODO: do not use broadcast - if (pDataBlock->info.type == STREAM_DELETE_RESULT) { - + if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT || pDataBlock->info.type == STREAM_TRANS_STATE) { for (int32_t j = 0; j < vgSz; j++) { if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { goto FAIL_SHUFFLE_DISPATCH; @@ -522,14 +520,14 @@ int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { } } - qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->info.selfChildId, - numOfBlocks, vgSz); + qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, + pTask->info.selfChildId, numOfBlocks, vgSz); for (int32_t i = 0; i < vgSz; i++) { if (pReqs[i].blockNum > 0) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->info.selfChildId, - pReqs[i].blockNum, pVgInfo->vgId); + qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, + pTask->info.selfChildId, pReqs[i].blockNum, pVgInfo->vgId); code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet); if (code < 0) { @@ -540,7 +538,7 @@ int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { code = 0; - FAIL_SHUFFLE_DISPATCH: + FAIL_SHUFFLE_DISPATCH: for (int32_t i = 0; i < vgSz; i++) { taosArrayDestroyP(pReqs[i].data, taosMemoryFree); taosArrayDestroy(pReqs[i].dataLen);