From 180abf2229a0a1abad415f8d7fe80e4dfe1ab0b1 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Fri, 19 May 2023 18:02:15 +0800 Subject: [PATCH] set agg task status --- source/dnode/vnode/src/tq/tq.c | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 45b36868e2..0a3173b3cb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1224,6 +1224,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { pTask->id.idStr, pTask->chkInfo.version); streamProcessRunReq(pTask); } else { + if (streamTaskShouldPause(&pTask->status)) { + atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__INACTIVE); + } tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr); } @@ -1301,19 +1304,19 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms if (pReq->igUntreated && pTask->taskLevel == TASK_LEVEL__SOURCE) { // discard all the data when the stream task is suspended. pTask->chkInfo.currentVer = sversion; walReaderSeekVer(pTask->exec.pWalReader, sversion); - tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64, pTq->pStreamMeta->vgId, - pTask->id.idStr, pTask->chkInfo.currentVer, sversion); + tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", pTq->pStreamMeta->vgId, + pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); } else { // from the previous paused version and go on - tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64, pTq->pStreamMeta->vgId, - pTask->id.idStr, pTask->chkInfo.currentVer, sversion); + tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", pTq->pStreamMeta->vgId, + pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); } - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + if (pTask->taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) { tqStartStreamTasks(pTq); } else { streamSchedExec(pTask); } + streamMetaReleaseTask(pTq->pStreamMeta, pTask); } return 0; -- GitLab