diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index df7955771d799b35030bc745df7f220e6920ed43..68b8dd72019df1ed623170249272e3cda3c5ad1a 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1306,7 +1306,7 @@ int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) { int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (mndPauseStreamTask(pTrans, pTask) < 0) { + if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndPauseStreamTask(pTrans, pTask) < 0) { return -1; } } @@ -1430,7 +1430,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { + if (pTask->taskLevel == TASK_LEVEL__SOURCE && mndResumeStreamTask(pTrans, pTask, igUntreated) < 0) { return -1; } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 5f9aec4db0c51e0e15d9cc96c6965d81255b8cb3..f4d8522f317ee4c0247edfcce134039d39674d8b 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -35,7 +35,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* while (pTask->taskLevel == TASK_LEVEL__SOURCE) { int8_t status = atomic_load_8(&pTask->status.taskStatus); - if (status != TASK_STATUS__NORMAL) { + if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) { qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr, atomic_load_8(&pTask->status.taskStatus)); taosMsleep(2); @@ -408,7 +408,7 @@ int32_t streamTryExec(SStreamTask* pTask) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); qDebug("s-task:%s exec completed", pTask->id.idStr); - if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status))) { + if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) { streamSchedExec(pTask); } }