diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index f40a6c9338b165fac1099a6687f7430713931d2f..8dc06849763ab822975553b4723da72412372c8a 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -251,6 +251,7 @@ typedef struct SStreamChildEpInfo { int32_t nodeId; int32_t childId; int32_t taskId; + int8_t dataAllowed; SEpSet epSet; } SStreamChildEpInfo; @@ -400,6 +401,7 @@ typedef struct { typedef struct { int64_t streamId; + int32_t type; int32_t taskId; int32_t dataSrcVgId; int32_t upstreamTaskId; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 39f4e7fc7afc33bf085ec555cd2a97216448958b..79759ff012029bffa732646a7b31fd6ed91c26b6 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -239,21 +239,61 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); - // todo add the input queue buffer limitation - streamTaskEnqueueBlocks(pTask, pReq, pRsp); - tDeleteStreamDispatchReq(pReq); + int32_t status = 0; - if (exec) { - if (streamTryExec(pTask) < 0) { - return -1; - } + SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); + ASSERT(pInfo != NULL); + + if (!pInfo->dataAllowed) { + qWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", pTask->id.idStr, pReq->upstreamTaskId); + status = TASK_INPUT_STATUS__BLOCKED; } else { - streamSchedExec(pTask); + // Current task has received the checkpoint req from the upstream task, from which the message should all be blocked + if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); + qDebug("s-task:%s close inputQ for upstream:0x%x", pTask->id.idStr, pReq->upstreamTaskId); + } + + status = streamTaskAppendInputBlocks(pTask, pReq); } + { + // do send response with the input status + int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont); + if (code != TSDB_CODE_SUCCESS) { + // todo handle failure + return code; + } + + pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); + tmsgSendRsp(pRsp); + } + + tDeleteStreamDispatchReq(pReq); + streamSchedExec(pTask); + return 0; } +//int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { +// qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, +// pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); +// +// // todo add the input queue buffer limitation +// streamTaskEnqueueBlocks(pTask, pReq, pRsp); +// tDeleteStreamDispatchReq(pReq); +// +// if (exec) { +// if (streamTryExec(pTask) < 0) { +// return -1; +// } +// } else { +// streamSchedExec(pTask); +// } +// +// return 0; +//} + // todo record the idle time for dispatch data int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { if (code != TSDB_CODE_SUCCESS) {