未验证 提交 65d90a66 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #15371 from taosdata/fix/TD-16880

fix: fix stream result error issue
...@@ -96,6 +96,11 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo ...@@ -96,6 +96,11 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo
return pRow; return pRow;
} }
static FORCE_INLINE void setResultBufPageDirty(SDiskbasedBuf* pBuf, SResultRowPosition* pos) {
void* pPage = getBufPage(pBuf, pos->pageId);
setBufPageDirty(pPage, true);
}
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order); void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order);
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
......
...@@ -940,6 +940,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -940,6 +940,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResultRow(pResult, tableGroupId, pUpdated); saveResultRow(pResult, tableGroupId, pUpdated);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
} }
} }
...@@ -996,6 +997,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -996,6 +997,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM && pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveResultRow(pResult, tableGroupId, pUpdated); saveResultRow(pResult, tableGroupId, pUpdated);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
} }
ekey = ascScan ? nextWin.ekey : nextWin.skey; ekey = ascScan ? nextWin.ekey : nextWin.skey;
...@@ -2542,6 +2544,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr ...@@ -2542,6 +2544,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
} }
if (find && pUpdated) { if (find && pUpdated) {
saveResultRow(pCurResult, pWinRes->groupId, pUpdated); saveResultRow(pCurResult, pWinRes->groupId, pUpdated);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo.cur);
} }
} }
} }
...@@ -2662,6 +2665,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc ...@@ -2662,6 +2665,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
} }
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) { if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) {
saveResultRow(pResult, tableGroupId, pUpdated); saveResultRow(pResult, tableGroupId, pUpdated);
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册