提交 dfe4ed92 编写于 作者: D dapan1121

fix: fix disk page direty issue

上级 3cc22ef1
...@@ -96,6 +96,11 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo ...@@ -96,6 +96,11 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo
return pRow; return pRow;
} }
static FORCE_INLINE void setResultBufPageDirty(SDiskbasedBuf* pBuf, SResultRowPosition* pos) {
void* pPage = getBufPage(pBuf, pos->pageId);
setBufPageDirty(pPage, true);
}
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order); void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order);
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
......
...@@ -905,6 +905,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -905,6 +905,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResultRow(pResult, tableGroupId, pUpdated); saveResultRow(pResult, tableGroupId, pUpdated);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
} }
} }
...@@ -961,6 +962,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -961,6 +962,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResultRow(pResult, tableGroupId, pUpdated); saveResultRow(pResult, tableGroupId, pUpdated);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
} }
ekey = ascScan ? nextWin.ekey : nextWin.skey; ekey = ascScan ? nextWin.ekey : nextWin.skey;
...@@ -2507,6 +2509,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr ...@@ -2507,6 +2509,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
} }
if (find && pUpdated) { if (find && pUpdated) {
saveResultRow(pCurResult, pWinRes->groupId, pUpdated); saveResultRow(pCurResult, pWinRes->groupId, pUpdated);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo.cur);
} }
} }
} }
...@@ -2627,6 +2630,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc ...@@ -2627,6 +2630,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
} }
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
saveResultRow(pResult, tableGroupId, pUpdated); saveResultRow(pResult, tableGroupId, pUpdated);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册