diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 23732a6f9a627b192a739db94dd068fecbc9fc26..2f96482643fda0e1a2498eaaf561b15c83415766 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -96,6 +96,11 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo return pRow; } +static FORCE_INLINE void setResultBufPageDirty(SDiskbasedBuf* pBuf, SResultRowPosition* pos) { + void* pPage = getBufPage(pBuf, pos->pageId); + setBufPageDirty(pPage, true); +} + void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 0f1272c9642936f7e82be7ffa26295ea1bdd09b1..4204988eb0fbe4596ba27a9975a5f0322c43a89a 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -940,6 +940,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { saveResultRow(pResult, tableGroupId, pUpdated); + setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); } } @@ -996,6 +997,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { saveResultRow(pResult, tableGroupId, pUpdated); + setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); } ekey = ascScan ? nextWin.ekey : nextWin.skey; @@ -2542,6 +2544,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr } if (find && pUpdated) { saveResultRow(pCurResult, pWinRes->groupId, pUpdated); + setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo.cur); } } } @@ -2662,6 +2665,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) { saveResultRow(pResult, tableGroupId, pUpdated); + setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur); } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,