diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 0a46def23de6853e5a11fe3e2e208ad9241edac0..375c40b0152e8f66c8439d668408e42e119d1093 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3192,11 +3192,12 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* return pCur; } -static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated, +static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool addGap) { - SExprSupp* pSup = &pOperator->exprSupp; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SExprSupp* pSup = &pOperator->exprSupp; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + int32_t winNum = 0; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SResultRow* pCurResult = NULL; @@ -3230,7 +3231,9 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC doDeleteSessionWindow(pAggSup, &winInfo.sessionWin); pAPI->stateStore.streamStateFreeCur(pCur); taosMemoryFree(winInfo.pOutputBuf); + winNum++; } + return winNum; } int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) { @@ -3731,9 +3734,11 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { for (int32_t i = 0; i < num; i++) { SResultWindowInfo winInfo = {0}; setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo); - compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true); - saveSessionOutputBuf(pAggSup, &winInfo); - saveResult(winInfo, pInfo->pStUpdated); + int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true); + if (winNum > 0) { + saveSessionOutputBuf(pAggSup, &winInfo); + saveResult(winInfo, pInfo->pStUpdated); + } } taosMemoryFree(pBuf);