提交 a1b9dcf5 编写于 作者: dengyihao's avatar dengyihao

fix mem leak

上级 f20b28bc
...@@ -34,7 +34,6 @@ typedef struct SStateWindowInfo { ...@@ -34,7 +34,6 @@ typedef struct SStateWindowInfo {
SStateKeys* pStateKey; SStateKeys* pStateKey;
} SStateWindowInfo; } SStateWindowInfo;
typedef struct SSessionAggOperatorInfo { typedef struct SSessionAggOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SAggSupporter aggSup; SAggSupporter aggSup;
...@@ -1615,7 +1614,7 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt ...@@ -1615,7 +1614,7 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt
} }
SStreamScanInfo* pScanInfo = downstream->info; SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->windowSup.parentType = type; pScanInfo->windowSup.parentType = type;
pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup; pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup;
if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) { if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo = updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark); pScanInfo->pUpdateInfo = updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark);
} }
...@@ -2504,7 +2503,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2504,7 +2503,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
qDebug("===stream===clear semi operator"); qDebug("===stream===clear semi operator");
} else { } else {
if (pInfo->twAggSup.maxTs > 0 && pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { if (pInfo->twAggSup.maxTs > 0 &&
pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) {
streamStateCommit(pInfo->pState); streamStateCommit(pInfo->pState);
streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark);
pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs; pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs;
...@@ -2533,7 +2533,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2533,7 +2533,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
while (1) { while (1) {
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
if (pInfo->pUpdated != NULL) { if (pInfo->pUpdated != NULL) {
pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated);
} }
...@@ -2857,7 +2856,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin ...@@ -2857,7 +2856,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin
SStreamScanInfo* pScanInfo = downstream->info; SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type}; pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
pScanInfo->pState = pAggSup->pState; pScanInfo->pState = pAggSup->pState;
if ( (!pScanInfo->igCheckUpdate || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) && !pScanInfo->pUpdateInfo ) { if ((!pScanInfo->igCheckUpdate || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) && !pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark); pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark);
} }
pScanInfo->twAggSup = *pTwSup; pScanInfo->twAggSup = *pTwSup;
...@@ -3061,6 +3060,7 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* ...@@ -3061,6 +3060,7 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj*
pNextWin->sessionWin = pCurWin->sessionWin; pNextWin->sessionWin = pCurWin->sessionWin;
int32_t code = streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size); int32_t code = streamStateSessionGetKVByCur(pCur, &pNextWin->sessionWin, &pNextWin->pOutputBuf, &size);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pNextWin->pOutputBuf);
SET_SESSION_WIN_INVALID(*pNextWin); SET_SESSION_WIN_INVALID(*pNextWin);
} }
return pCur; return pCur;
...@@ -3080,6 +3080,7 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC ...@@ -3080,6 +3080,7 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC
SResultWindowInfo winInfo = {0}; SResultWindowInfo winInfo = {0};
SStreamStateCur* pCur = getNextSessionWinInfo(pAggSup, pStUpdated, pCurWin, &winInfo); SStreamStateCur* pCur = getNextSessionWinInfo(pAggSup, pStUpdated, pCurWin, &winInfo);
if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap)) { if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap)) {
taosMemoryFree(winInfo.pOutputBuf);
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
break; break;
} }
...@@ -3095,6 +3096,7 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC ...@@ -3095,6 +3096,7 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC
removeSessionResult(pStUpdated, pAggSup->pResultRows, winInfo.sessionWin); removeSessionResult(pStUpdated, pAggSup->pResultRows, winInfo.sessionWin);
doDeleteSessionWindow(pAggSup, &winInfo.sessionWin); doDeleteSessionWindow(pAggSup, &winInfo.sessionWin);
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
taosMemoryFree(winInfo.pOutputBuf);
} }
} }
...@@ -4723,7 +4725,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4723,7 +4725,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
if (pInfo->twAggSup.maxTs > 0 && pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { if (pInfo->twAggSup.maxTs > 0 &&
pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) {
streamStateCommit(pInfo->pState); streamStateCommit(pInfo->pState);
streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
......
...@@ -1152,6 +1152,7 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo ...@@ -1152,6 +1152,7 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo
memcpy(*pVal, tmp, *pVLen); memcpy(*pVal, tmp, *pVLen);
} }
} }
taosMemoryFree(tmp);
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
// impl later // impl later
return code; return code;
...@@ -1296,7 +1297,11 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey* ...@@ -1296,7 +1297,11 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamStateCur* pCur, SSessionKey*
if (len < 0) { if (len < 0) {
return -1; return -1;
} }
if (pVal != NULL) *pVal = (char*)val; if (pVal != NULL) {
*pVal = (char*)val;
} else {
taosMemoryFree(val);
}
if (pVLen != NULL) *pVLen = len; if (pVLen != NULL) *pVLen = len;
if (pKTmp->opNum != pCur->number) { if (pKTmp->opNum != pCur->number) {
...@@ -1535,13 +1540,16 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe ...@@ -1535,13 +1540,16 @@ int32_t streamStateSessionAddIfNotExist_rocksdb(SStreamState* pState, SSessionKe
if (code == 0) { if (code == 0) {
if (sessionRangeKeyCmpr(&searchKey, key) == 0) { if (sessionRangeKeyCmpr(&searchKey, key) == 0) {
memcpy(tmp, *pVal, valSize); memcpy(tmp, *pVal, valSize);
taosMemoryFreeClear(*pVal);
streamStateSessionDel_rocksdb(pState, key); streamStateSessionDel_rocksdb(pState, key);
goto _end; goto _end;
} }
taosMemoryFreeClear(*pVal);
streamStateCurNext_rocksdb(pState, pCur); streamStateCurNext_rocksdb(pState, pCur);
} else { } else {
*key = originKey; *key = originKey;
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
taosMemoryFreeClear(*pVal);
pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key); pCur = streamStateSessionSeekKeyNext_rocksdb(pState, key);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册