diff --git a/include/util/tlosertree.h b/include/util/tlosertree.h index 51906443f5ab874a2e7b16a11304ed6890f90437..b3aa37a537fda3aab2241b263b085624d0b6464a 100644 --- a/include/util/tlosertree.h +++ b/include/util/tlosertree.h @@ -43,7 +43,7 @@ typedef struct SMultiwayMergeTreeInfo { int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo **pTree, uint32_t numOfEntries, void *param, __merge_compare_fn_t compareFn); -void tMergeTreeDestroy(SMultiwayMergeTreeInfo *pTree); +void tMergeTreeDestroy(SMultiwayMergeTreeInfo **pTree); void tMergeTreeAdjust(SMultiwayMergeTreeInfo *pTree, int32_t idx); diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index d51a24bb43253e09d6ab21e6a1785d6ef6b5e5ff..930b52b95406df8c2df1da83b6693ea056cdb8a9 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -170,6 +170,10 @@ SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle); */ int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols); + +bool tsortIsClosed(SSortHandle* pHandle); +void tsortSetClosed(SSortHandle* pHandle); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 1b23ed1c1696d945affcff755f1e8883c90ec2bc..117593a9e56d41b1275c1694f651231d03e62c75 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2895,6 +2895,11 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* } } + if (tsortIsClosed(pHandle)) { + terrno = TSDB_CODE_TSC_QUERY_CANCELLED; + T_LONG_JMP(pOperator->pTaskInfo->env, terrno); + } + bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo); qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows, pInfo->limitInfo.numOfOutputRows); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 0357828732f5d14576d85c1a74a610d22ed8c787..8d99161441873ad3b8edc4eb3168fae44cb6d6b7 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -228,6 +228,11 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { // multi-group case not handle here SSDataBlock* pBlock = NULL; while (1) { + if (tsortIsClosed(pInfo->pSortHandle)) { + terrno = TSDB_CODE_TSC_QUERY_CANCELLED; + T_LONG_JMP(pOperator->pTaskInfo->env, terrno); + } + pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->matchInfo.pList, pInfo); if (pBlock == NULL) { @@ -439,6 +444,11 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { SSDataBlock* pBlock = NULL; while (pInfo->pCurrSortHandle != NULL) { + if (tsortIsClosed(pInfo->pCurrSortHandle)) { + terrno = TSDB_CODE_TSC_QUERY_CANCELLED; + T_LONG_JMP(pOperator->pTaskInfo->env, terrno); + } + // beginSortGroup would fetch all child blocks of pInfo->currGroupId; ASSERT(pInfo->childOpStatus != CHILD_OP_SAME_GROUP); pBlock = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 3033441aadf5b94c8cfb8f3ce29ac0a09b1b9907..56d74f2b52f6b5f90348a1686f00116c8c5f5024 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -46,6 +46,7 @@ struct SSortHandle { SMsortComparParam cmpParam; int32_t numOfCompletedSources; bool opened; + int8_t closed; const char* idStr; bool inMemSort; bool needAdjust; @@ -152,7 +153,7 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) { tsortClose(pSortHandle); if (pSortHandle->pMergeTree != NULL) { - tMergeTreeDestroy(pSortHandle->pMergeTree); + tMergeTreeDestroy(&pSortHandle->pMergeTree); } destroyDiskbasedBuf(pSortHandle->pBuf); @@ -581,6 +582,11 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); while (1) { + if (tsortIsClosed(pHandle)) { + code = terrno = TSDB_CODE_TSC_QUERY_CANCELLED; + return code; + } + SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows); if (pDataBlock == NULL) { break; @@ -609,7 +615,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { } sortComparCleanup(&pHandle->cmpParam); - tMergeTreeDestroy(pHandle->pMergeTree); + tMergeTreeDestroy(&pHandle->pMergeTree); pHandle->numOfCompletedSources = 0; SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false); @@ -803,10 +809,19 @@ int32_t tsortOpen(SSortHandle* pHandle) { } int32_t tsortClose(SSortHandle* pHandle) { - // do nothing + atomic_val_compare_exchange_8(&pHandle->closed, 0, 1); + taosMsleep(10); return TSDB_CODE_SUCCESS; } +bool tsortIsClosed(SSortHandle* pHandle) { + return atomic_val_compare_exchange_8(&pHandle->closed, 1, 2); +} + +void tsortSetClosed(SSortHandle* pHandle) { + atomic_store_8(&pHandle->closed, 2); +} + int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*), void* param) { pHandle->fetchfp = fetchFp; @@ -826,6 +841,9 @@ int32_t tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId) { } STupleHandle* tsortNextTuple(SSortHandle* pHandle) { + if (tsortIsClosed(pHandle)) { + return NULL; + } if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) { return NULL; } diff --git a/source/util/src/tlosertree.c b/source/util/src/tlosertree.c index c476baa7908172458cb5ffba718cd55fca62772d..f85ab0ecad01e8deb6d99315469e2133b5fc8282 100644 --- a/source/util/src/tlosertree.c +++ b/source/util/src/tlosertree.c @@ -71,12 +71,12 @@ int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo** pTree, uint32_t numOfSources, return 0; } -void tMergeTreeDestroy(SMultiwayMergeTreeInfo* pTree) { - if (pTree == NULL) { +void tMergeTreeDestroy(SMultiwayMergeTreeInfo** pTree) { + if (pTree == NULL || *pTree == NULL) { return; } - taosMemoryFreeClear(pTree); + taosMemoryFreeClear(*pTree); } void tMergeTreeAdjust(SMultiwayMergeTreeInfo* pTree, int32_t idx) {