From 75e6fb0f16627525725f1a55038cb2f4137c98fd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 12 Aug 2023 18:47:01 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 3 ++- source/libs/stream/src/stream.c | 11 ++++++++++- source/libs/stream/src/streamDispatch.c | 2 +- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8dc0684976..3542788b4b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -403,7 +403,7 @@ typedef struct { int64_t streamId; int32_t type; int32_t taskId; - int32_t dataSrcVgId; + int32_t srcVgId; int32_t upstreamTaskId; int32_t upstreamChildId; int32_t upstreamNodeId; @@ -582,6 +582,7 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask); int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); +void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index d73bb1562e..caf20a499c 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -145,7 +145,7 @@ int32_t streamSchedExec(SStreamTask* pTask) { int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { int8_t status = 0; - SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->dataSrcVgId); + SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->srcVgId); if (pBlock == NULL) { streamTaskInputFail(pTask); status = TASK_INPUT_STATUS__FAILED; @@ -235,6 +235,8 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock return 0; } + + static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) { int8_t status = 0; @@ -272,6 +274,13 @@ static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchR return TSDB_CODE_SUCCESS; } +void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { + SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); + if (pInfo != NULL) { + pInfo->dataAllowed = false; + } +} + int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 7eef42e289..a162f0e770 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -36,7 +36,7 @@ static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatc if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->dataSrcVgId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; -- GitLab