diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 79759ff012029bffa732646a7b31fd6ed91c26b6..9548c3c3279ab106bfce43037a4ec3a7d51d5af9 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -235,6 +235,24 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock return 0; } +static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) { + int8_t status = 0; + + SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, pReq->type, pReq->srcVgId); + if (pBlock == NULL) { + streamTaskInputFail(pTask); + status = TASK_INPUT_STATUS__FAILED; + qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId, + pTask->id.idStr); + } else { + int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock); + // input queue is full, upstream is blocked now + status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED; + } + + return status; +} + int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);