From 8323ad86701cc80f0e00251eb9a2a7e36a264bdf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Apr 2023 17:29:08 +0800 Subject: [PATCH] enh(stream): add more check to stop stream asap. --- source/dnode/vnode/src/tq/tqRestore.c | 2 +- source/libs/stream/src/streamExec.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 657dd376a1..9a9c750194 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -96,7 +96,7 @@ int32_t doCreateReqsByScanWal(SStreamMeta* pStreamMeta, STqOffsetStore* pOffsetS continue; } - if (pTask->taskLevel != TASK_LEVEL__SOURCE) { + if ((pTask->taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.taskStatus == TASK_STATUS__DROPPING)) { streamMetaReleaseTask(pStreamMeta, pTask); continue; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 9a6ff302ef..f52af66387 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -368,7 +368,7 @@ int32_t streamTryExec(SStreamTask* pTask) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); qDebug("s-task:%s exec completed", pTask->id.idStr); - if (!taosQueueEmpty(pTask->inputQueue->queue)) { + if (!taosQueueEmpty(pTask->inputQueue->queue) && (pTask->status.taskStatus != TASK_STATUS__DROPPING)) { streamSchedExec(pTask); } } -- GitLab