diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 095d2f6d10dbe6faa0460f5532c569b543fb4ac0..c3d20103512ae0ffa34e7dda4157912f184fb181 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -219,6 +219,7 @@ int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); int32_t qStreamRestoreParam(qTaskInfo_t tinfo); bool qStreamRecoverScanFinished(qTaskInfo_t tinfo); void qStreamCloseTsdbReader(void* task); +void resetTaskInfo(qTaskInfo_t tinfo); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 9fe0f4f8a7a521666678f88ab310e5ab3a3595c2..04d54a95aed736f9d299bac3d5ea33c457dd14d5 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -104,6 +104,12 @@ static void clearStreamBlock(SOperatorInfo* pOperator) { } } +void resetTaskInfo(qTaskInfo_t tinfo) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + pTaskInfo->code = 0; + clearStreamBlock(pTaskInfo->pRoot); +} + static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { @@ -618,7 +624,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { pTaskInfo->cost.start = taosGetTimestampUs(); } - if (isTaskKilled(pTaskInfo) && pTaskInfo->code != TSDB_CODE_QRY_IN_EXEC) { + if (isTaskKilled(pTaskInfo)) { clearStreamBlock(pTaskInfo->pRoot); atomic_store_64(&pTaskInfo->owner, 0); qDebug("%s already killed, abort", GET_TASKID(pTaskInfo)); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a7178af20dd6c63f544d263e68c9ae2ba617f8b6..2f3b75724115fe7a79a62bc6d073d6dd47068c1e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -946,6 +946,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* } FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) { + qDebug("clear buff blocks:%d", (int32_t)taosArrayGetSize(pInfo->pBlockLists)); taosArrayClear(pInfo->pBlockLists); pInfo->validBlockIndex = 0; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 9226d6ebb8dabc349c333470b0fc2d44396c2c0b..cb9774b584937bc0846a0e761cceda37036b83d4 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -20,6 +20,11 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { int32_t code; void* exec = pTask->exec.executor; + while(atomic_load_8(&pTask->taskStatus) != TASK_STATUS__NORMAL) { + qError("stream task wait for the end of fill history"); + taosMsleep(2); + continue; + } // set input const SStreamQueueItem* pItem = (const SStreamQueueItem*)data; @@ -58,6 +63,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* SSDataBlock* output = NULL; uint64_t ts = 0; if ((code = qExecTask(exec, &output, &ts)) < 0) { + if (code == TSDB_CODE_QRY_IN_EXEC) { + resetTaskInfo(exec); + } /*ASSERT(false);*/ qError("unexpected stream execution, stream %" PRId64 " task: %d, since %s", pTask->streamId, pTask->taskId, terrstr()); @@ -121,8 +129,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { SSDataBlock* output = NULL; uint64_t ts = 0; if (qExecTask(exec, &output, &ts) < 0) { - taosArrayDestroy(pRes); - return -1; + continue; } if (output == NULL) { if (qStreamRecoverScanFinished(exec)) { diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 061b211ddfa7045e0121b01d28dc77a581a401d2..87058bf490d43f2cbbd1b18eb8411c034b40a9cd 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -168,7 +168,7 @@ int32_t streamRestoreParam(SStreamTask* pTask) { return qStreamRestoreParam(exec); } int32_t streamSetStatusNormal(SStreamTask* pTask) { - pTask->taskStatus = TASK_STATUS__NORMAL; + atomic_store_8(&pTask->taskStatus, TASK_STATUS__NORMAL); return 0; }