未验证 提交 6914c4dd 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #21804 from taosdata/fix/TD-24895

fix: merge tree free rc issue
...@@ -43,7 +43,7 @@ typedef struct SMultiwayMergeTreeInfo { ...@@ -43,7 +43,7 @@ typedef struct SMultiwayMergeTreeInfo {
int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo **pTree, uint32_t numOfEntries, void *param, int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo **pTree, uint32_t numOfEntries, void *param,
__merge_compare_fn_t compareFn); __merge_compare_fn_t compareFn);
void tMergeTreeDestroy(SMultiwayMergeTreeInfo *pTree); void tMergeTreeDestroy(SMultiwayMergeTreeInfo **pTree);
void tMergeTreeAdjust(SMultiwayMergeTreeInfo *pTree, int32_t idx); void tMergeTreeAdjust(SMultiwayMergeTreeInfo *pTree, int32_t idx);
......
...@@ -170,6 +170,10 @@ SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle); ...@@ -170,6 +170,10 @@ SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle);
*/ */
int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols); int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols);
bool tsortIsClosed(SSortHandle* pHandle);
void tsortSetClosed(SSortHandle* pHandle);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -2895,6 +2895,11 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* ...@@ -2895,6 +2895,11 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
} }
} }
if (tsortIsClosed(pHandle)) {
terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
T_LONG_JMP(pOperator->pTaskInfo->env, terrno);
}
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo); bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows, qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
pInfo->limitInfo.numOfOutputRows); pInfo->limitInfo.numOfOutputRows);
......
...@@ -228,6 +228,11 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { ...@@ -228,6 +228,11 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) {
// multi-group case not handle here // multi-group case not handle here
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
while (1) { while (1) {
if (tsortIsClosed(pInfo->pSortHandle)) {
terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
T_LONG_JMP(pOperator->pTaskInfo->env, terrno);
}
pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
pInfo->matchInfo.pList, pInfo); pInfo->matchInfo.pList, pInfo);
if (pBlock == NULL) { if (pBlock == NULL) {
...@@ -439,6 +444,11 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { ...@@ -439,6 +444,11 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = NULL; SSDataBlock* pBlock = NULL;
while (pInfo->pCurrSortHandle != NULL) { while (pInfo->pCurrSortHandle != NULL) {
if (tsortIsClosed(pInfo->pCurrSortHandle)) {
terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
T_LONG_JMP(pOperator->pTaskInfo->env, terrno);
}
// beginSortGroup would fetch all child blocks of pInfo->currGroupId; // beginSortGroup would fetch all child blocks of pInfo->currGroupId;
ASSERT(pInfo->childOpStatus != CHILD_OP_SAME_GROUP); ASSERT(pInfo->childOpStatus != CHILD_OP_SAME_GROUP);
pBlock = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pBlock = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
......
...@@ -46,6 +46,7 @@ struct SSortHandle { ...@@ -46,6 +46,7 @@ struct SSortHandle {
SMsortComparParam cmpParam; SMsortComparParam cmpParam;
int32_t numOfCompletedSources; int32_t numOfCompletedSources;
bool opened; bool opened;
int8_t closed;
const char* idStr; const char* idStr;
bool inMemSort; bool inMemSort;
bool needAdjust; bool needAdjust;
...@@ -152,7 +153,7 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) { ...@@ -152,7 +153,7 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
tsortClose(pSortHandle); tsortClose(pSortHandle);
if (pSortHandle->pMergeTree != NULL) { if (pSortHandle->pMergeTree != NULL) {
tMergeTreeDestroy(pSortHandle->pMergeTree); tMergeTreeDestroy(&pSortHandle->pMergeTree);
} }
destroyDiskbasedBuf(pSortHandle->pBuf); destroyDiskbasedBuf(pSortHandle->pBuf);
...@@ -581,6 +582,11 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { ...@@ -581,6 +582,11 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t)); SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
while (1) { while (1) {
if (tsortIsClosed(pHandle)) {
code = terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
return code;
}
SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows); SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
if (pDataBlock == NULL) { if (pDataBlock == NULL) {
break; break;
...@@ -609,7 +615,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { ...@@ -609,7 +615,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
} }
sortComparCleanup(&pHandle->cmpParam); sortComparCleanup(&pHandle->cmpParam);
tMergeTreeDestroy(pHandle->pMergeTree); tMergeTreeDestroy(&pHandle->pMergeTree);
pHandle->numOfCompletedSources = 0; pHandle->numOfCompletedSources = 0;
SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false); SSDataBlock* pBlock = createOneDataBlock(pHandle->pDataBlock, false);
...@@ -803,10 +809,19 @@ int32_t tsortOpen(SSortHandle* pHandle) { ...@@ -803,10 +809,19 @@ int32_t tsortOpen(SSortHandle* pHandle) {
} }
int32_t tsortClose(SSortHandle* pHandle) { int32_t tsortClose(SSortHandle* pHandle) {
// do nothing atomic_val_compare_exchange_8(&pHandle->closed, 0, 1);
taosMsleep(10);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
bool tsortIsClosed(SSortHandle* pHandle) {
return atomic_val_compare_exchange_8(&pHandle->closed, 1, 2);
}
void tsortSetClosed(SSortHandle* pHandle) {
atomic_store_8(&pHandle->closed, 2);
}
int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*), int32_t tsortSetFetchRawDataFp(SSortHandle* pHandle, _sort_fetch_block_fn_t fetchFp, void (*fp)(SSDataBlock*, void*),
void* param) { void* param) {
pHandle->fetchfp = fetchFp; pHandle->fetchfp = fetchFp;
...@@ -826,6 +841,9 @@ int32_t tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId) { ...@@ -826,6 +841,9 @@ int32_t tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId) {
} }
STupleHandle* tsortNextTuple(SSortHandle* pHandle) { STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
if (tsortIsClosed(pHandle)) {
return NULL;
}
if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) { if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) {
return NULL; return NULL;
} }
......
...@@ -71,12 +71,12 @@ int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo** pTree, uint32_t numOfSources, ...@@ -71,12 +71,12 @@ int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo** pTree, uint32_t numOfSources,
return 0; return 0;
} }
void tMergeTreeDestroy(SMultiwayMergeTreeInfo* pTree) { void tMergeTreeDestroy(SMultiwayMergeTreeInfo** pTree) {
if (pTree == NULL) { if (pTree == NULL || *pTree == NULL) {
return; return;
} }
taosMemoryFreeClear(pTree); taosMemoryFreeClear(*pTree);
} }
void tMergeTreeAdjust(SMultiwayMergeTreeInfo* pTree, int32_t idx) { void tMergeTreeAdjust(SMultiwayMergeTreeInfo* pTree, int32_t idx) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册