未验证 提交 bb8840d4 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #21749 from taosdata/szhou/fix-ts3543

fix: table merge scan oom with muliti-level merge sort
...@@ -55,7 +55,6 @@ typedef struct STableMergeScanSortSourceParam { ...@@ -55,7 +55,6 @@ typedef struct STableMergeScanSortSourceParam {
int32_t readerIdx; int32_t readerIdx;
uint64_t uid; uint64_t uid;
SSDataBlock* inputBlock; SSDataBlock* inputBlock;
bool multiReader;
STsdbReader* dataReader; STsdbReader* dataReader;
} STableMergeScanSortSourceParam; } STableMergeScanSortSourceParam;
...@@ -2658,8 +2657,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { ...@@ -2658,8 +2657,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
void* p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex); void* p = tableListGetInfo(pInfo->base.pTableListInfo, readIdx + pInfo->tableStartIndex);
SReadHandle* pHandle = &pInfo->base.readHandle; SReadHandle* pHandle = &pInfo->base.readHandle;
if (NULL == source->dataReader) {
if (NULL == source->dataReader || !source->multiReader) {
code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, (void**)&source->dataReader, GET_TASKID(pTaskInfo), false, NULL); code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, (void**)&source->dataReader, GET_TASKID(pTaskInfo), false, NULL);
if (code != 0) { if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
...@@ -2723,19 +2721,14 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { ...@@ -2723,19 +2721,14 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
qTrace("tsdb/read-table-data: %p, close reader", reader); qTrace("tsdb/read-table-data: %p, close reader", reader);
if (!source->multiReader) {
pAPI->tsdReader.tsdReaderClose(pInfo->base.dataReader);
source->dataReader = NULL;
}
pInfo->base.dataReader = NULL; pInfo->base.dataReader = NULL;
return pBlock; return pBlock;
} }
if (!source->multiReader) { pAPI->tsdReader.tsdReaderClose(source->dataReader);
pAPI->tsdReader.tsdReaderClose(pInfo->base.dataReader); source->dataReader = NULL;
source->dataReader = NULL;
}
pInfo->base.dataReader = NULL; pInfo->base.dataReader = NULL;
return NULL; return NULL;
} }
...@@ -2791,7 +2784,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2791,7 +2784,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
// todo the total available buffer should be determined by total capacity of buffer of this task. // todo the total available buffer should be determined by total capacity of buffer of this task.
// the additional one is reserved for merge result // the additional one is reserved for merge result
pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1); // pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
pInfo->sortBufSize = pInfo->bufPageSize * (256 + 1);
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str); pInfo->pSortInputBlock, pTaskInfo->id.str);
...@@ -2806,7 +2800,6 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { ...@@ -2806,7 +2800,6 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
STableMergeScanSortSourceParam param = {0}; STableMergeScanSortSourceParam param = {0};
param.readerIdx = i; param.readerIdx = i;
param.pOperator = pOperator; param.pOperator = pOperator;
param.multiReader = (numOfTable <= MULTI_READER_MAX_TABLE_NUM) ? true : false;
param.inputBlock = createOneDataBlock(pInfo->pResBlock, false); param.inputBlock = createOneDataBlock(pInfo->pResBlock, false);
blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity); blockDataEnsureCapacity(param.inputBlock, pOperator->resultInfo.capacity);
......
...@@ -101,7 +101,11 @@ static int32_t sortComparCleanup(SMsortComparParam* cmpParam) { ...@@ -101,7 +101,11 @@ static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
SSortSource* pSource = cmpParam->pSources[i]; SSortSource* pSource = cmpParam->pSources[i];
blockDataDestroy(pSource->src.pBlock); blockDataDestroy(pSource->src.pBlock);
if (pSource->pageIdList) {
taosArrayDestroy(pSource->pageIdList);
}
taosMemoryFreeClear(pSource); taosMemoryFreeClear(pSource);
cmpParam->pSources[i] = NULL;
} }
cmpParam->numOfSources = 0; cmpParam->numOfSources = 0;
...@@ -123,9 +127,11 @@ void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *f ...@@ -123,9 +127,11 @@ void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *f
// release pageIdList // release pageIdList
if ((*pSource)->pageIdList) { if ((*pSource)->pageIdList) {
taosArrayDestroy((*pSource)->pageIdList); taosArrayDestroy((*pSource)->pageIdList);
(*pSource)->pageIdList = NULL;
} }
if ((*pSource)->param && !(*pSource)->onlyRef) { if ((*pSource)->param && !(*pSource)->onlyRef) {
taosMemoryFree((*pSource)->param); taosMemoryFree((*pSource)->param);
(*pSource)->param = NULL;
} }
if (!(*pSource)->onlyRef && (*pSource)->src.pBlock) { if (!(*pSource)->onlyRef && (*pSource)->src.pBlock) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册