From d72580dad768517cdf8435ed19253dcdb79875d3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 31 Jul 2023 19:12:23 +0800 Subject: [PATCH] fix(stream): transfer the state for agg tasks. --- source/libs/stream/src/streamExec.c | 58 ++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 18 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 7aa82ed5f6..e4dc28b76b 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -292,12 +292,12 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { } } -static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { +static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.taskId); if (pStreamTask == NULL) { - // todo: destroy this task here + // todo: destroy the fill-history task here qError("s-task:%s failed to find related stream task:0x%x, it may have been destroyed or closed", pTask->id.idStr, pTask->streamTaskId.taskId); return TSDB_CODE_STREAM_TASK_NOT_EXIST; @@ -338,34 +338,36 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr); } - // expand the query time window for stream scanner + // 1. expand the query time window for stream task of WAL scanner pTimeWindow->skey = INT64_MIN; qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor); - // transfer the ownership of executor state + // 2. transfer the ownership of executor state streamTaskReleaseState(pTask); streamTaskReloadState(pStreamTask); - // clear the link between fill-history task and stream task info + // 3. clear the link between fill-history task and stream task info pStreamTask->historyTaskId.taskId = 0; + + // 4. resume the state of stream task, after this function, the stream task will run immidately. But it can not be + // pause, since the pause allowed attribute is not set yet. streamTaskResumeFromHalt(pStreamTask); qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr); int32_t taskId = pTask->id.taskId; - // free it and remove it from disk meta-store + // 5. free it and remove fill-history task from disk meta-store streamMetaUnregisterTask(pMeta, taskId); - // save to disk + // 6. save to disk taosWLockLatch(&pMeta->lock); - streamMetaSaveTask(pMeta, pStreamTask); if (streamMetaCommit(pMeta) < 0) { // persist to disk } taosWUnLockLatch(&pMeta->lock); - // pause allowed + // 7. pause allowed. streamTaskEnablePause(pStreamTask); streamSchedExec(pStreamTask); @@ -373,6 +375,25 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } +static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { + int32_t code = TSDB_CODE_SUCCESS; + if (!pTask->status.transferState) { + return code; + } + + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + streamTaskFillHistoryFinished(pTask); + streamTaskEndScanWAL(pTask); + } else { // do transfer task operator states. + code = streamDoTransferStateToStreamTask(pTask); + if (code != TSDB_CODE_SUCCESS) { // todo handle this + return code; + } + } + + return code; +} + static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, const char* id) { int32_t retryTimes = 0; @@ -526,17 +547,16 @@ int32_t streamTaskEndScanWAL(SStreamTask* pTask) { double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); - // 3. notify downstream tasks to transfer executor state after handle all history blocks. - pTask->status.transferState = true; - + // 1. notify all downstream tasks to transfer executor state after handle all history blocks. int32_t code = streamDispatchTransferStateMsg(pTask); if (code != TSDB_CODE_SUCCESS) { // todo handle error } - // the last execution of fill-history task, in order to transfer task operator states. - code = streamTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { // todo handle this + // 2. do transfer stream task operator states. + pTask->status.transferState = true; + code = streamDoTransferStateToStreamTask(pTask); + if (code != TSDB_CODE_SUCCESS) { // todo handle error return code; } @@ -560,9 +580,11 @@ int32_t streamTryExec(SStreamTask* pTask) { // todo the task should be commit here if (taosQueueEmpty(pTask->inputQueue->queue)) { // fill-history WAL scan has completed - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && pTask->status.transferState == true) { - streamTaskFillHistoryFinished(pTask); - streamTaskEndScanWAL(pTask); + if (pTask->status.transferState) { + code = streamTransferStateToStreamTask(pTask); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } else { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), -- GitLab