From a1b9dcf595de88b882a103a0ec359a9ff5cbfdbf Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 5 May 2023 02:20:11 +0000 Subject: [PATCH] fix mem leak --- source/libs/executor/src/timewindowoperator.c | 15 +++++++++------ source/libs/stream/src/streamBackendRocksdb.c | 10 +++++++++- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 700ca92881..56ce0ac0c0 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -34,7 +34,6 @@ typedef struct SStateWindowInfo { SStateKeys* pStateKey; } SStateWindowInfo; - typedef struct SSessionAggOperatorInfo { SOptrBasicInfo binfo; SAggSupporter aggSup; @@ -1615,7 +1614,7 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt } SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->windowSup.parentType = type; - pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup; + pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup; if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) { pScanInfo->pUpdateInfo = updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark); } @@ -2504,7 +2503,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); qDebug("===stream===clear semi operator"); } else { - if (pInfo->twAggSup.maxTs > 0 && pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { + if (pInfo->twAggSup.maxTs > 0 && + pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { streamStateCommit(pInfo->pState); streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs; @@ -2533,7 +2533,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { while (1) { if (isTaskKilled(pTaskInfo)) { - if (pInfo->pUpdated != NULL) { pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); } @@ -2857,7 +2856,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type}; pScanInfo->pState = pAggSup->pState; - if ( (!pScanInfo->igCheckUpdate || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) && !pScanInfo->pUpdateInfo ) { + if ((!pScanInfo->igCheckUpdate || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) && !pScanInfo->pUpdateInfo) { pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark); } pScanInfo->twAggSup = *pTwSup; @@ -3061,6 +3060,7 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* pNextWin->sessionWin = pCurWin->sessionWin; int32_t code = streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size); if (code != TSDB_CODE_SUCCESS) { + taosMemoryFreeClear(pNextWin->pOutputBuf); SET_SESSION_WIN_INVALID(*pNextWin); } return pCur; @@ -3080,6 +3080,7 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC SResultWindowInfo winInfo = {0}; SStreamStateCur* pCur = getNextSessionWinInfo(pAggSup, pStUpdated, pCurWin, &winInfo); if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap)) { + taosMemoryFree(winInfo.pOutputBuf); streamStateFreeCur(pCur); break; } @@ -3095,6 +3096,7 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC removeSessionResult(pStUpdated, pAggSup->pResultRows, winInfo.sessionWin); doDeleteSessionWindow(pAggSup, &winInfo.sessionWin); streamStateFreeCur(pCur); + taosMemoryFree(winInfo.pOutputBuf); } } @@ -4723,7 +4725,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { return pInfo->binfo.pRes; } setOperatorCompleted(pOperator); - if (pInfo->twAggSup.maxTs > 0 && pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { + if (pInfo->twAggSup.maxTs > 0 && + pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { streamStateCommit(pInfo->pState); streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index ee8fbf7785..d8ad769741 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1152,6 +1152,7 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo memcpy(*pVal, tmp, *pVLen); } } + taosMemoryFree(tmp); streamStateFreeCur(pCur); // impl later return code; @@ -1296,7 +1297,11 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* if (len < 0) { return -1; } - if (pVal != NULL) *pVal = (char*)val; + if (pVal != NULL) { + *pVal = (char*)val; + } else { + taosMemoryFree(val); + } if (pVLen != NULL) *pVLen = len; if (pKTmp->opNum != pCur->number) { @@ -1535,13 +1540,16 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe if (code == 0) { if (sessionRangeKeyCmpr(&searchKey, key) == 0) { memcpy(tmp, *pVal, valSize); + taosMemoryFreeClear(*pVal); streamStateSessionDel_rocksdb(pState, key); goto _end; } + taosMemoryFreeClear(*pVal); streamStateCurNext_rocksdb(pState, pCur); } else { *key = originKey; streamStateFreeCur(pCur); + taosMemoryFreeClear(*pVal); pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key); } -- GitLab