提交 f6ced36c 编写于 作者: H Haojun Liao

fix(query): fix memory leak.

上级 4e9146f0
...@@ -45,6 +45,7 @@ typedef struct SGroupResInfo { ...@@ -45,6 +45,7 @@ typedef struct SGroupResInfo {
int32_t index; int32_t index;
SArray* pRows; // SArray<SResKeyPos> SArray* pRows; // SArray<SResKeyPos>
char* pBuf; char* pBuf;
bool freeItem;
} SGroupResInfo; } SGroupResInfo;
typedef struct SResultRow { typedef struct SResultRow {
...@@ -127,7 +128,7 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo ...@@ -127,7 +128,7 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order); void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order);
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
int32_t initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SHashObj* pResultHash); void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
bool hasRemainResults(SGroupResInfo* pGroupResInfo); bool hasRemainResults(SGroupResInfo* pGroupResInfo);
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
......
...@@ -89,9 +89,20 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) { ...@@ -89,9 +89,20 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
return rowSize; return rowSize;
} }
static void freeEx(void* p) {
taosMemoryFree(*(void**)p);
}
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) { void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
taosMemoryFreeClear(pGroupResInfo->pBuf); taosMemoryFreeClear(pGroupResInfo->pBuf);
pGroupResInfo->pRows = taosArrayDestroy(pGroupResInfo->pRows); if (pGroupResInfo->freeItem) {
taosArrayDestroy(pGroupResInfo->pRows);
// taosArrayDestroyEx(pGroupResInfo->pRows, freeEx);
// pGroupResInfo->freeItem = false;
pGroupResInfo->pRows = NULL;
} else {
pGroupResInfo->pRows = taosArrayDestroy(pGroupResInfo->pRows);
}
pGroupResInfo->index = 0; pGroupResInfo->index = 0;
} }
...@@ -162,54 +173,15 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in ...@@ -162,54 +173,15 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in
assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
} }
int32_t initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SHashObj* pResultHash) { void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
int32_t itemSize = sizeof(SResKeyPos) + sizeof(uint64_t);
int32_t bufLen = taosHashGetSize(pResultHash) * itemSize;
int32_t offset = 0;
void* pIter = NULL;
int32_t numOfRows = taosHashGetSize(pResultHash);
if (pGroupResInfo->pRows != NULL) { if (pGroupResInfo->pRows != NULL) {
taosArrayClear(pGroupResInfo->pRows); taosArrayDestroyP(pGroupResInfo->pRows, taosMemoryFree);
} else {
pGroupResInfo->pRows = taosArrayInit(numOfRows, sizeof(void*));
} }
if (numOfRows == 0) { pGroupResInfo->freeItem = true;
pGroupResInfo->index = 0; pGroupResInfo->pRows = pArrayList;
return TSDB_CODE_SUCCESS;
}
if (pGroupResInfo->pBuf == NULL) {
pGroupResInfo->pBuf = taosMemoryMalloc(bufLen);
if (pGroupResInfo->pBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} else {
char* p = taosMemoryRealloc(pGroupResInfo->pBuf, bufLen);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pGroupResInfo->pBuf = p;
}
while ((pIter = taosHashIterate(pResultHash, pIter)) != NULL) {
SResKeyPos* p = (SResKeyPos*) (pGroupResInfo->pBuf + offset);
SResKeyPos* p1 = pIter;
qDebug("key:%"PRId64", gid:%"PRId64, *(uint64_t*)p1->key, p1->groupId);
memcpy(p, p1, itemSize);
taosArrayPush(pGroupResInfo->pRows, &p);
offset += itemSize;
}
taosSort(pGroupResInfo->pRows->pData, taosArrayGetSize(pGroupResInfo->pRows), sizeof(void*), resultrowComparAsc);
pGroupResInfo->index = 0; pGroupResInfo->index = 0;
ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
return TSDB_CODE_SUCCESS;
} }
bool hasRemainResults(SGroupResInfo* pGroupResInfo) { bool hasRemainResults(SGroupResInfo* pGroupResInfo) {
......
...@@ -843,15 +843,19 @@ static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) { ...@@ -843,15 +843,19 @@ static int32_t saveResult(SResultWindowInfo winInfo, SSHashObj* pStUpdated) {
} }
static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SHashObj* pUpdatedMap) { static int32_t saveWinResult(int64_t ts, int32_t pageId, int32_t offset, uint64_t groupId, SHashObj* pUpdatedMap) {
char buf[sizeof(SResKeyPos) + sizeof(uint64_t)] = {0}; SResKeyPos* newPos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
SResKeyPos* pResPos = (SResKeyPos*)buf; if (newPos == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
*(int64_t*) pResPos->key = ts; }
pResPos->groupId = groupId;
pResPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset};
newPos->groupId = groupId;
newPos->pos = (SResultRowPosition){.pageId = pageId, .offset = offset};
*(int64_t*)newPos->key = ts;
SWinKey key = {.ts = ts, .groupId = groupId}; SWinKey key = {.ts = ts, .groupId = groupId};
taosHashPut(pUpdatedMap, &key, sizeof(SWinKey), pResPos, sizeof(SResKeyPos) + sizeof(uint64_t)); if (taosHashPut(pUpdatedMap, &key, sizeof(SWinKey), &newPos, sizeof(void*)) != TSDB_CODE_SUCCESS) {
taosMemoryFree(newPos);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2564,7 +2568,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2564,7 +2568,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} }
} }
// SArray* pUpdated = taosArrayInit(4, sizeof(SResKeyPos)); SArray* pUpdated = taosArrayInit(4, POINTER_BYTES);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
...@@ -2607,9 +2611,9 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2607,9 +2611,9 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
continue; continue;
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) { } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) {
doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pUpdatedMap); doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pUpdatedMap);
// if (taosArrayGetSize(pUpdated) > 0) { if (taosArrayGetSize(pUpdated) > 0) {
// break; break;
// } }
continue; continue;
} else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_OP(pInfo)) { } else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_OP(pInfo)) {
processPullOver(pBlock, pInfo->pPullDataMap, &pInfo->interval); processPullOver(pBlock, pInfo->pPullDataMap, &pInfo->interval);
...@@ -2656,10 +2660,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2656,10 +2660,14 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} }
pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs; pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
// todo void* pIte = NULL;
int32_t code = initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdatedMap); while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
taosArrayPush(pUpdated, pIte);
}
taosHashCleanup(pUpdatedMap); taosHashCleanup(pUpdatedMap);
taosArraySort(pUpdated, resultrowComparAsc);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes); doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
...@@ -4748,6 +4756,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4748,6 +4756,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); SHashObj* pUpdatedMap = taosHashInit(1024, hashFn, false, HASH_NO_LOCK);
...@@ -4800,8 +4810,14 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4800,8 +4810,14 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap, closeStreamIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdatedMap,
pInfo->pDelWins, pOperator); pInfo->pDelWins, pOperator);
// todo void* pIte = NULL;
int32_t code = initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdatedMap); while ((pIte = taosHashIterate(pUpdatedMap, pIte)) != NULL) {
taosArrayPush(pUpdated, pIte);
}
taosArraySort(pUpdated, resultrowComparAsc);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
taosHashCleanup(pUpdatedMap); taosHashCleanup(pUpdatedMap);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册