diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c7da80fdaf216896a6838d372cda4650990b7e90..b479931cd256681b213fa469cd22c58a220f44b4 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -592,14 +592,21 @@ int32_t streamTryExec(SStreamTask* pTask) { if (pTask->status.transferState) { code = streamTransferStateToStreamTask(pTask); if (code != TSDB_CODE_SUCCESS) { + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); return code; } - streamSchedExec(pTask); - } else { - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->status.schedStatus); + + // the schedStatus == TASK_SCHED_STATUS__ACTIVE, streamSchedExec cannot be executed, so execute once again by + // call this function (streamExecForAll) directly. + code = streamExecForAll(pTask); + if (code < 0) { + // do nothing + } } + + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, + streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); } else { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),