diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a1e72015ffb99951b23ffecafadb8ff7ae1a7ff7..b8ae8fc7fd94ac298ddabccaa978fd8481ce8839 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -330,7 +330,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (1) { // downstream task's input queue is blocked, stop immediately - if (streamTaskShouldPause(&pTask->status) || (pTask->outputStatus == TASK_INPUT_STATUS__BLOCKED)) { + if (streamTaskShouldPause(&pTask->status) || (pTask->outputStatus == TASK_INPUT_STATUS__BLOCKED) || + streamTaskShouldStop(&pTask->status)) { if (batchSize > 1) { break; } else { @@ -461,7 +462,7 @@ int32_t streamTryExec(SStreamTask* pTask) { // todo the task should be commit here atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - qDebug("s-task:%s exec completed", pTask->id.idStr); + qDebug("s-task:%s exec completed, status:%d", pTask->id.idStr, pTask->status.taskStatus); if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) {