From 8890fa578185d7605c9348847f6b34b6c7190472 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 10 Aug 2023 15:13:49 +0800 Subject: [PATCH] fix(stream): execute the stream task directly, instead of executing it in a asynchronized way. --- source/libs/stream/src/streamExec.c | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c7da80fdaf..b479931cd2 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -592,14 +592,21 @@ int32_t streamTryExec(SStreamTask* pTask) { if (pTask->status.transferState) { code = streamTransferStateToStreamTask(pTask); if (code != TSDB_CODE_SUCCESS) { + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); return code; } - streamSchedExec(pTask); - } else { - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), - pTask->status.schedStatus); + + // the schedStatus == TASK_SCHED_STATUS__ACTIVE, streamSchedExec cannot be executed, so execute once again by + // call this function (streamExecForAll) directly. + code = streamExecForAll(pTask); + if (code < 0) { + // do nothing + } } + + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, + streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); } else { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), -- GitLab