未验证 提交 9c7b5308 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #22223 from taosdata/fix/td-25378

fix: fix memleak during create initial sources for block merge sort
......@@ -2779,7 +2779,7 @@ _error:
return NULL;
}
static SSDataBlock* getTableDataBlockImpl(void* param) {
static SSDataBlock* getBlockForTableMergeScan(void* param) {
STableMergeScanSortSourceParam* source = param;
SOperatorInfo* pOperator = source->pOperator;
STableMergeScanInfo* pInfo = pOperator->info;
......@@ -2797,6 +2797,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
code = pAPI->tsdReader.tsdNextDataBlock(reader, &hasNext);
if (code != 0) {
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
qError("table merge scan fetch next data block error code: %d, %s", code, GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -2805,8 +2806,9 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
}
if (isTaskKilled(pTaskInfo)) {
qInfo("table merge scan fetch next data block found task killed. %s", GET_TASKID(pTaskInfo));
pAPI->tsdReader.tsdReaderReleaseDataBlock(reader);
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
break;
}
// process this data block based on the probabilities
......@@ -2819,6 +2821,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
code = loadDataBlock(pOperator, &pInfo->base, pBlock, &status);
// code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
qInfo("table merge scan load datablock code %d, %s", code, GET_TASKID(pTaskInfo));
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -2909,7 +2912,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit);
}
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL);
// one table has one data block
int32_t numOfTable = tableEndIdx - tableStartIdx + 1;
......
......@@ -1049,12 +1049,24 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
}
if (pBlk == NULL) {
break;
};
}
if (tsortIsClosed(pHandle)) {
tSimpleHashClear(mUidBlk);
for (int i = 0; i < taosArrayGetSize(aBlkSort); ++i) {
blockDataDestroy(taosArrayGetP(aBlkSort, i));
}
taosArrayClear(aBlkSort);
break;
}
}
tSimpleHashCleanup(mUidBlk);
taosArrayDestroy(aBlkSort);
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
if (!tsortIsClosed(pHandle)) {
taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
}
taosArrayDestroy(aExtSrc);
pHandle->type = SORT_SINGLESOURCE_SORT;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册