From 17c6143471b88c32d34f91a7fdb16815c6ff3e78 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 8 Aug 2023 15:08:11 +0800 Subject: [PATCH] fix(stream): check more status when handling the state transfer. --- source/dnode/vnode/src/tq/tq.c | 16 +++++++--------- source/libs/executor/src/executor.c | 2 +- source/libs/stream/src/streamExec.c | 6 ++++-- source/libs/stream/src/streamMeta.c | 3 ++- 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4d433042ad..41e9268452 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1277,7 +1277,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (done) { pTask->tsInfo.step2Start = taosGetTimestampMs(); streamTaskEndScanWAL(pTask); - streamMetaReleaseTask(pMeta, pTask); } else { STimeWindow* pWindow = &pTask->dataRange.window; tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 @@ -1303,13 +1302,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamSetStatusNormal(pTask); } - // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. - // 5. resume the related stream task. - streamMetaReleaseTask(pMeta, pTask); - streamMetaReleaseTask(pMeta, pStreamTask); - tqStartStreamTasks(pTq); } + + streamMetaReleaseTask(pMeta, pTask); + streamMetaReleaseTask(pMeta, pStreamTask); } else { // todo update the chkInfo version for current task. // this task has an associated history stream task, so we need to scan wal from the end version of @@ -1515,7 +1512,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { if (pTask != NULL) { // even in halt status, the data in inputQ must be processed int8_t st = pTask->status.taskStatus; - if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY/* || st == TASK_STATUS__SCAN_HISTORY_WAL*/) { + if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY) { tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.version); streamProcessRunReq(pTask); @@ -1528,8 +1525,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); tqStartStreamTasks(pTq); return 0; - } else { - tqError("vgId:%d failed to found s-task, taskId:%d", vgId, taskId); + } else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec. + // todo add one function to handle this + tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, taskId); return -1; } } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 05767db286..7832834cee 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1046,7 +1046,7 @@ int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow; - qDebug("%s set remove scan-history filter window:%" PRId64 "-%" PRId64 ", new window:%" PRId64 "-%" PRId64, + qDebug("%s remove scan-history filter window:%" PRId64 "-%" PRId64 ", set new window:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX); pWindow->skey = INT64_MIN; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 62296efe84..1fd2f7edf4 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -545,8 +545,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { return 0; } +// the task may be set dropping/stopping, while it is still in the task queue, therefore, the sched-status can not +// be updated by tryExec function, therefore, the schedStatus will always be the TASK_SCHED_STATUS__WAITING. bool streamTaskIsIdle(const SStreamTask* pTask) { - return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE); + return (pTask->status.schedStatus == TASK_SCHED_STATUS__INACTIVE || pTask->status.taskStatus == TASK_STATUS__STOP || + pTask->status.taskStatus == TASK_STATUS__DROPPING); } int32_t streamTaskEndScanWAL(SStreamTask* pTask) { @@ -584,7 +587,6 @@ int32_t streamTryExec(SStreamTask* pTask) { return -1; } - // todo the task should be commit here // todo the task should be commit here if (taosQueueEmpty(pTask->inputQueue->queue)) { // fill-history WAL scan has completed diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 7886091401..80b690e20d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -330,7 +330,8 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int32_t taskId) { } taosWUnLockLatch(&pMeta->lock); - qDebug("s-task:0x%x set task status:%s", taskId, streamGetTaskStatusStr(TASK_STATUS__DROPPING)); + qDebug("s-task:0x%x set task status:%s and start to unregister it", taskId, + streamGetTaskStatusStr(TASK_STATUS__DROPPING)); while (1) { taosRLockLatch(&pMeta->lock); -- GitLab