From f6ced36c87ac00a4d18373057089cc3343b2febb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 15 Feb 2023 01:28:50 +0800 Subject: [PATCH] fix(query): fix memory leak. --- source/libs/executor/inc/executil.h | 3 +- source/libs/executor/src/executil.c | 60 +++++-------------- source/libs/executor/src/timewindowoperator.c | 48 ++++++++++----- 3 files changed, 50 insertions(+), 61 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index e6fbcc242f..f99c7de93d 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -45,6 +45,7 @@ typedef struct SGroupResInfo { int32_t index; SArray* pRows; // SArray char* pBuf; + bool freeItem; } SGroupResInfo; typedef struct SResultRow { @@ -127,7 +128,7 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); -int32_t initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SHashObj* pResultHash); +void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); bool hasRemainResults(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 11b176ad29..ec2c819cf3 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -89,9 +89,20 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) { return rowSize; } +static void freeEx(void* p) { + taosMemoryFree(*(void**)p); +} + void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) { taosMemoryFreeClear(pGroupResInfo->pBuf); - pGroupResInfo->pRows = taosArrayDestroy(pGroupResInfo->pRows); + if (pGroupResInfo->freeItem) { + taosArrayDestroy(pGroupResInfo->pRows); +// taosArrayDestroyEx(pGroupResInfo->pRows, freeEx); +// pGroupResInfo->freeItem = false; + pGroupResInfo->pRows = NULL; + } else { + pGroupResInfo->pRows = taosArrayDestroy(pGroupResInfo->pRows); + } pGroupResInfo->index = 0; } @@ -162,54 +173,15 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); } -int32_t initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SHashObj* pResultHash) { - int32_t itemSize = sizeof(SResKeyPos) + sizeof(uint64_t); - int32_t bufLen = taosHashGetSize(pResultHash) * itemSize; - int32_t offset = 0; - void* pIter = NULL; - - int32_t numOfRows = taosHashGetSize(pResultHash); +void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) { if (pGroupResInfo->pRows != NULL) { - taosArrayClear(pGroupResInfo->pRows); - } else { - pGroupResInfo->pRows = taosArrayInit(numOfRows, sizeof(void*)); + taosArrayDestroyP(pGroupResInfo->pRows, taosMemoryFree); } - if (numOfRows == 0) { - pGroupResInfo->index = 0; - return TSDB_CODE_SUCCESS; - } - - if (pGroupResInfo->pBuf == NULL) { - pGroupResInfo->pBuf = taosMemoryMalloc(bufLen); - if (pGroupResInfo->pBuf == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - } else { - char* p = taosMemoryRealloc(pGroupResInfo->pBuf, bufLen); - if (p == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - pGroupResInfo->pBuf = p; - } - - while ((pIter = taosHashIterate(pResultHash, pIter)) != NULL) { - SResKeyPos* p = (SResKeyPos*) (pGroupResInfo->pBuf + offset); - SResKeyPos* p1 = pIter; - - qDebug("key:%"PRId64", gid:%"PRId64, *(uint64_t*)p1->key, p1->groupId); - - memcpy(p, p1, itemSize); - taosArrayPush(pGroupResInfo->pRows, &p); - offset += itemSize; - } - - taosSort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), sizeof(void*), resultrowComparAsc); + pGroupResInfo->freeItem = true; + pGroupResInfo->pRows = pArrayList; pGroupResInfo->index = 0; ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); - - return TSDB_CODE_SUCCESS; } bool hasRemainResults(SGroupResInfo* pGroupResInfo) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 4ca1593b09..0472b90338 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -843,15 +843,19 @@ static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) { } static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SHashObj* pUpdatedMap) { - char buf[sizeof(SResKeyPos) + sizeof(uint64_t)] = {0}; - SResKeyPos* pResPos = (SResKeyPos*)buf; - - *(int64_t*) pResPos->key = ts; - pResPos->groupId = groupId; - pResPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset}; + SResKeyPos* newPos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); + if (newPos == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + newPos->groupId = groupId; + newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset}; + *(int64_t*)newPos->key = ts; SWinKey key = {.ts = ts, .groupId = groupId}; - taosHashPut(pUpdatedMap, &key, sizeof(SWinKey), pResPos, sizeof(SResKeyPos) + sizeof(uint64_t)); + if (taosHashPut(pUpdatedMap, &key, sizeof(SWinKey), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) { + taosMemoryFree(newPos); + } + return TSDB_CODE_SUCCESS; } @@ -2564,7 +2568,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } } -// SArray* pUpdated = taosArrayInit(4, sizeof(SResKeyPos)); + SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); @@ -2607,9 +2611,9 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { continue; } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) { doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pUpdatedMap); -// if (taosArrayGetSize(pUpdated) > 0) { -// break; -// } + if (taosArrayGetSize(pUpdated) > 0) { + break; + } continue; } else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_OP(pInfo)) { processPullOver(pBlock, pInfo->pPullDataMap, &pInfo->interval); @@ -2656,10 +2660,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs; - // todo - int32_t code = initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdatedMap); - + void* pIte = NULL; + while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { + taosArrayPush(pUpdated, pIte); + } taosHashCleanup(pUpdatedMap); + taosArraySort(pUpdated, resultrowComparAsc); + + initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes); @@ -4748,6 +4756,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; + SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); @@ -4800,8 +4810,14 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap, pInfo->pDelWins, pOperator); - // todo - int32_t code = initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdatedMap); + void* pIte = NULL; + while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) { + taosArrayPush(pUpdated, pIte); + } + taosArraySort(pUpdated, resultrowComparAsc); + + initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); + blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); taosHashCleanup(pUpdatedMap); -- GitLab