diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index be79054c1b1b5aa40cd566cc1a50553660ec7062..0b55eb4a457fbad294478e5b490fb1600903333c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -881,6 +881,7 @@ SExprInfo* createExpr(SNodeList* pNodeList, int32_t* numOfExprs); void copyResultrowToDataBlock(SExprInfo* pExprInfo, int32_t numOfExprs, SResultRow* pRow, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const int32_t* rowEntryOffset, SExecTaskInfo* pTaskInfo); void doUpdateNumOfRows(SqlFunctionCtx* pCtx, SResultRow* pRow, int32_t numOfExprs, const int32_t* rowEntryOffset); +void doClearBufferedBlocks(SStreamScanInfo* pInfo); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c2bf001c86d4af8fffccd544c2b4de70798f870a..d8e7c25bbb31667ef8392cedabf8fb2ffee110da 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -93,6 +93,17 @@ static int32_t doSetStreamOpOpen(SOperatorInfo* pOperator, char* id) { return 0; } +static void clearStreamBlock(SOperatorInfo* pOperator) { + if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { + if (pOperator->numOfDownstream == 1) { + return clearStreamBlock(pOperator->pDownstream[0]); + } + } else { + SStreamScanInfo* pInfo = pOperator->info; + doClearBufferedBlocks(pInfo); + } +} + static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { @@ -607,7 +618,8 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) { pTaskInfo->cost.start = taosGetTimestampUs(); } - if (isTaskKilled(pTaskInfo)) { + if (isTaskKilled(pTaskInfo) && pTaskInfo->code != TSDB_CODE_QRY_IN_EXEC) { + clearStreamBlock(pTaskInfo->pRoot); atomic_store_64(&pTaskInfo->owner, 0); qDebug("%s already killed, abort", GET_TASKID(pTaskInfo)); return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7961d5518a560b59d568cc6122479202aac9aa21..38822a4565b805a6189395bd614f39389ecc2296 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -945,7 +945,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* return pOperator; } -static FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) { +FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) { taosArrayClear(pInfo->pBlockLists); pInfo->validBlockIndex = 0; }