From 0cd871010f7f6d38319e422c2b96249c6768043b Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 7 Mar 2023 13:34:17 +0800 Subject: [PATCH] fix:fix fill history bug --- include/libs/executor/executor.h | 1 + source/libs/executor/src/executor.c | 8 +++++++- source/libs/executor/src/scanoperator.c | 1 + source/libs/stream/src/streamExec.c | 11 +++++++++-- source/libs/stream/src/streamRecover.c | 2 +- 5 files changed, 19 insertions(+), 4 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 095d2f6d10..c3d2010351 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -219,6 +219,7 @@ int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); int32_t qStreamRestoreParam(qTaskInfo_t tinfo); bool qStreamRecoverScanFinished(qTaskInfo_t tinfo); void qStreamCloseTsdbReader(void* task); +void resetTaskInfo(qTaskInfo_t tinfo); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 9fe0f4f8a7..04d54a95ae 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -104,6 +104,12 @@ static void clearStreamBlock(SOperatorInfo* pOperator) { } } +void resetTaskInfo(qTaskInfo_t tinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + pTaskInfo->code = 0; + clearStreamBlock(pTaskInfo->pRoot); +} + static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { @@ -618,7 +624,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { pTaskInfo->cost.start = taosGetTimestampUs(); } - if (isTaskKilled(pTaskInfo) && pTaskInfo->code != TSDB_CODE_QRY_IN_EXEC) { + if (isTaskKilled(pTaskInfo)) { clearStreamBlock(pTaskInfo->pRoot); atomic_store_64(&pTaskInfo->owner, 0); qDebug("%s already killed, abort", GET_TASKID(pTaskInfo)); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a7178af20d..2f3b757241 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -946,6 +946,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* } FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) { + qDebug("clear buff blocks:%d", (int32_t)taosArrayGetSize(pInfo->pBlockLists)); taosArrayClear(pInfo->pBlockLists); pInfo->validBlockIndex = 0; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 9226d6ebb8..cb9774b584 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -20,6 +20,11 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { int32_t code; void* exec = pTask->exec.executor; + while(atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { + qError("stream task wait for the end of fill history"); + taosMsleep(2); + continue; + } // set input const SStreamQueueItem* pItem = (const SStreamQueueItem*)data; @@ -58,6 +63,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* SSDataBlock* output = NULL; uint64_t ts = 0; if ((code = qExecTask(exec, &output, &ts)) < 0) { + if (code == TSDB_CODE_QRY_IN_EXEC) { + resetTaskInfo(exec); + } /*ASSERT(false);*/ qError("unexpected stream execution, stream %" PRId64 " task: %d, since %s", pTask->streamId, pTask->taskId, terrstr()); @@ -121,8 +129,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { SSDataBlock* output = NULL; uint64_t ts = 0; if (qExecTask(exec, &output, &ts) < 0) { - taosArrayDestroy(pRes); - return -1; + continue; } if (output == NULL) { if (qStreamRecoverScanFinished(exec)) { diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 061b211ddf..87058bf490 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -168,7 +168,7 @@ int32_t streamRestoreParam(SStreamTask* pTask) { return qStreamRestoreParam(exec); } int32_t streamSetStatusNormal(SStreamTask* pTask) { - pTask->taskStatus = TASK_STATUS__NORMAL; + atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL); return 0; } -- GitLab