From 1894608b808d5c67bc69f9f0fa174ba4fb4668f2 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 27 Jul 2023 09:59:58 +0800 Subject: [PATCH] optimize session operator reload state --- source/libs/executor/src/timewindowoperator.c | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 0a46def23d..375c40b015 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3192,11 +3192,12 @@ SStreamStateCur* getNextSessionWinInfo(SStreamAggSupporter* pAggSup, SSHashObj* return pCur; } -static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated, +static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin, SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool addGap) { - SExprSupp* pSup = &pOperator->exprSupp; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + SExprSupp* pSup = &pOperator->exprSupp; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + int32_t winNum = 0; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SResultRow* pCurResult = NULL; @@ -3230,7 +3231,9 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC doDeleteSessionWindow(pAggSup, &winInfo.sessionWin); pAPI->stateStore.streamStateFreeCur(pCur); taosMemoryFree(winInfo.pOutputBuf); + winNum++; } + return winNum; } int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) { @@ -3731,9 +3734,11 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { for (int32_t i = 0; i < num; i++) { SResultWindowInfo winInfo = {0}; setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo); - compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true); - saveSessionOutputBuf(pAggSup, &winInfo); - saveResult(winInfo, pInfo->pStUpdated); + int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true); + if (winNum > 0) { + saveSessionOutputBuf(pAggSup, &winInfo); + saveResult(winInfo, pInfo->pStUpdated); + } } taosMemoryFree(pBuf); -- GitLab