From 4f814db5d5ded7a0a207dcc9ef1b00cc15a91ab6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 6 Jul 2023 18:34:01 +0800 Subject: [PATCH] fix(stream): fix error during transferring executor state, while a task is not in normal status. --- source/libs/stream/src/streamExec.c | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index bcb479e71e..d0d63215e6 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -351,9 +351,13 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId); - qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr); - - // todo handle stream task is dropped here + if (pStreamTask == NULL) { + qError("s-task:%s failed to find related stream task:0x%x, it may have been destoryed or closed", + pTask->id.idStr, pTask->streamTaskId.taskId); + return TSDB_CODE_STREAM_TASK_NOT_EXIST; + } else { + qDebug("s-task:%s scan history task end, update stream task:%s info, transfer exec state", pTask->id.idStr, pStreamTask->id.idStr); + } ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId); STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; @@ -377,7 +381,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { // update the scan data range for source task. qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64 - ", status:%s, sched-status:%d", + ", status:%s, sched-status:%d", pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus); } else { @@ -473,6 +477,9 @@ int32_t streamExecForAll(SStreamTask* pTask) { ASSERT(batchSize == 0); if (pTask->info.fillHistory && pTask->status.transferState) { int32_t code = streamTransferStateToStreamTask(pTask); + if (code != TSDB_CODE_SUCCESS) { // todo handle this + return 0; + } } break; @@ -564,7 +571,7 @@ int32_t streamTryExec(SStreamTask* pTask) { if (schedStatus == TASK_SCHED_STATUS__WAITING) { int32_t code = streamExecForAll(pTask); - if (code < 0) { + if (code < 0) { // todo this status shoudl be removed atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED); return -1; } -- GitLab