From 4e9146f061a6671bf169d6c794600c342a3ddd12 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 14 Feb 2023 22:46:31 +0800 Subject: [PATCH] fix(query): fix memory leak. --- source/libs/executor/inc/executil.h | 2 +- source/libs/executor/src/executil.c | 46 ++++++++++++++++-- source/libs/executor/src/timewindowoperator.c | 47 +++++++------------ tests/system-test/2-query/unique.py | 2 +- 4 files changed, 62 insertions(+), 35 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 523957b54d..e6fbcc242f 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -127,7 +127,7 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); -void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); +int32_t initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SHashObj* pResultHash); bool hasRemainResults(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index da4d8317a8..11b176ad29 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -162,14 +162,54 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); } -void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) { +int32_t initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SHashObj* pResultHash) { + int32_t itemSize = sizeof(SResKeyPos) + sizeof(uint64_t); + int32_t bufLen = taosHashGetSize(pResultHash) * itemSize; + int32_t offset = 0; + void* pIter = NULL; + + int32_t numOfRows = taosHashGetSize(pResultHash); if (pGroupResInfo->pRows != NULL) { - taosArrayDestroyP(pGroupResInfo->pRows, taosMemoryFree); + taosArrayClear(pGroupResInfo->pRows); + } else { + pGroupResInfo->pRows = taosArrayInit(numOfRows, sizeof(void*)); + } + + if (numOfRows == 0) { + pGroupResInfo->index = 0; + return TSDB_CODE_SUCCESS; + } + + if (pGroupResInfo->pBuf == NULL) { + pGroupResInfo->pBuf = taosMemoryMalloc(bufLen); + if (pGroupResInfo->pBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } else { + char* p = taosMemoryRealloc(pGroupResInfo->pBuf, bufLen); + if (p == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pGroupResInfo->pBuf = p; } - pGroupResInfo->pRows = pArrayList; + while ((pIter = taosHashIterate(pResultHash, pIter)) != NULL) { + SResKeyPos* p = (SResKeyPos*) (pGroupResInfo->pBuf + offset); + SResKeyPos* p1 = pIter; + + qDebug("key:%"PRId64", gid:%"PRId64, *(uint64_t*)p1->key, p1->groupId); + + memcpy(p, p1, itemSize); + taosArrayPush(pGroupResInfo->pRows, &p); + offset += itemSize; + } + + taosSort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), sizeof(void*), resultrowComparAsc); pGroupResInfo->index = 0; ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); + + return TSDB_CODE_SUCCESS; } bool hasRemainResults(SGroupResInfo* pGroupResInfo) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 1fe1b9081b..4ca1593b09 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -843,19 +843,15 @@ static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) { } static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SHashObj* pUpdatedMap) { - SResKeyPos* newPos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); - if (newPos == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } + char buf[sizeof(SResKeyPos) + sizeof(uint64_t)] = {0}; + SResKeyPos* pResPos = (SResKeyPos*)buf; - newPos->groupId = groupId; - newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset}; - *(int64_t*)newPos->key = ts; - SWinKey key = {.ts = ts, .groupId = groupId}; - if (taosHashPut(pUpdatedMap, &key, sizeof(SWinKey), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) { - taosMemoryFree(newPos); - } + *(int64_t*) pResPos->key = ts; + pResPos->groupId = groupId; + pResPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset}; + SWinKey key = {.ts = ts, .groupId = groupId}; + taosHashPut(pUpdatedMap, &key, sizeof(SWinKey), pResPos, sizeof(SResKeyPos) + sizeof(uint64_t)); return TSDB_CODE_SUCCESS; } @@ -2568,7 +2564,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } } - SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); +// SArray* pUpdated = taosArrayInit(4, sizeof(SResKeyPos)); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); while (1) { @@ -2610,9 +2607,9 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { continue; } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) { doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pUpdatedMap); - if (taosArrayGetSize(pUpdated) > 0) { - break; - } +// if (taosArrayGetSize(pUpdated) > 0) { +// break; +// } continue; } else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_OP(pInfo)) { processPullOver(pBlock, pInfo->pPullDataMap, &pInfo->interval); @@ -2659,14 +2656,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs; - void* pIte = NULL; - while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { - taosArrayPush(pUpdated, pIte); - } - taosHashCleanup(pUpdatedMap); - taosArraySort(pUpdated, resultrowComparAsc); + // todo + int32_t code = initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdatedMap); - initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); + taosHashCleanup(pUpdatedMap); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes); @@ -4755,7 +4748,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; - SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); @@ -4808,13 +4800,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap, pInfo->pDelWins, pOperator); - void* pIte = NULL; - while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { - taosArrayPush(pUpdated, pIte); - } - taosArraySort(pUpdated, resultrowComparAsc); - - initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); + // todo + int32_t code = initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdatedMap); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); taosHashCleanup(pUpdatedMap); diff --git a/tests/system-test/2-query/unique.py b/tests/system-test/2-query/unique.py index 6af9b130ef..9b5da50e1f 100644 --- a/tests/system-test/2-query/unique.py +++ b/tests/system-test/2-query/unique.py @@ -433,7 +433,7 @@ class TDTestCase: tdSql.checkRows(11) tdSql.checkData(1,0,0) tdSql.checkData(10,0,9) - tdSql.query(f"select unique(t1) from (select _rowts , t1 , tbname from {dbname}.stb1 )") + tdSql.query(f"select unique(t1) v from (select _rowts , t1 , tbname from {dbname}.stb1 ) order by v desc") tdSql.checkRows(2) tdSql.checkData(0,0,4) tdSql.checkData(1,0,1) -- GitLab