From a81cc9aebfead3f455e2b5c251ab59ae0934b8dd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 13 Aug 2023 17:31:09 +0800 Subject: [PATCH] fix(stream): fix the error when no agg tasks exist. --- source/libs/stream/src/streamExec.c | 40 ++++++++++++++++++----------- 1 file changed, 25 insertions(+), 15 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 269334f54d..fa3f149a43 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -509,24 +509,34 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock } } - // transfer the ownership of executor state - if (level == TASK_LEVEL__SOURCE) { - qDebug("s-task:%s add transfer-state block into outputQ", id); - } else { - qDebug("s-task:%s all upstream tasks send transfer-state block, add transfer-state block into outputQ", id); - ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1); - } - // dispatch the tran-state block to downstream task immediately int32_t type = pTask->outputInfo.type; - if ((level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) && - (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH)) { - pBlock->srcVgId = pTask->pMeta->vgId; - code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock); - if (code == 0) { - streamDispatchStreamBlock(pTask); + + // transfer the ownership of executor state + if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + if (level == TASK_LEVEL__SOURCE) { + qDebug("s-task:%s add transfer-state block into outputQ", id); } else { - streamFreeQitem((SStreamQueueItem*)pBlock); + qDebug("s-task:%s all upstream tasks send transfer-state block, add transfer-state block into outputQ", id); + ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1); + } + + if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { + pBlock->srcVgId = pTask->pMeta->vgId; + code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock); + if (code == 0) { + streamDispatchStreamBlock(pTask); + } else { + streamFreeQitem((SStreamQueueItem*)pBlock); + } + } + } else { // non-dispatch task, do task state transfer directly + qDebug("s-task:%s non-dispatch task, start to transfer state directly", id); + ASSERT(pTask->info.fillHistory == 1); + code = streamTransferStateToStreamTask(pTask); + + if (code != TSDB_CODE_SUCCESS) { + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); } } -- GitLab