diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8dc06849763ab822975553b4723da72412372c8a..3542788b4b429f7a3c545e1325aae35b49bf6580 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -403,7 +403,7 @@ typedef struct { int64_t streamId; int32_t type; int32_t taskId; - int32_t dataSrcVgId; + int32_t srcVgId; int32_t upstreamTaskId; int32_t upstreamChildId; int32_t upstreamNodeId; @@ -582,6 +582,7 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask); int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); +void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index d73bb1562ec8074d776e7e3608ca9b02682838ea..caf20a499cf4ea621c56302573d709236e46d3f5 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -145,7 +145,7 @@ int32_t streamSchedExec(SStreamTask* pTask) { int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { int8_t status = 0; - SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->dataSrcVgId); + SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->srcVgId); if (pBlock == NULL) { streamTaskInputFail(pTask); status = TASK_INPUT_STATUS__FAILED; @@ -235,6 +235,8 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock return 0; } + + static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) { int8_t status = 0; @@ -272,6 +274,13 @@ static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchR return TSDB_CODE_SUCCESS; } +void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { + SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); + if (pInfo != NULL) { + pInfo->dataAllowed = false; + } +} + int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 7eef42e289ea8037cb1141d76f2295447df08639..a162f0e7703f4ed9edecebe1f946b1232482556e 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -36,7 +36,7 @@ static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatc if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->dataSrcVgId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;