From c79b7223721702d16185b8602df52fda2f38a355 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 9 Jan 2023 10:12:24 +0800 Subject: [PATCH] remove state commit --- source/libs/executor/src/timewindowoperator.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index d78e9c4edf..449c52d77f 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -119,8 +119,8 @@ static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsL pRowSup->groupId = groupId; } -FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, - int32_t pos, int32_t order, int64_t* pData) { +FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int32_t pos, + int32_t order, int64_t* pData) { int32_t forwardRows = 0; if (order == TSDB_ORDER_ASC) { @@ -639,7 +639,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num if (NULL == pr) { T_LONG_JMP(pTaskInfo->env, terrno); } - + ASSERT(pr->offset == p1->offset && pr->pageId == p1->pageId); if (pr->closed) { @@ -1318,11 +1318,11 @@ static void setInverFunction(SqlFunctionCtx* pCtx, int32_t num, EStreamType type } static void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprSupp* pSup, int32_t numOfOutput) { - SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false); + SResultRow* pResult = getResultRowByPos(pResultBuf, p1, false); if (NULL == pResult) { return; } - + SqlFunctionCtx* pCtx = pSup->pCtx; for (int32_t i = 0; i < numOfOutput; ++i) { pCtx[i].resultInfo = getResultEntryInfo(pResult, i, pSup->rowEntryInfoOffset); @@ -2534,7 +2534,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } else { deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval, &pInfo->delKey); - streamStateCommit(pTaskInfo->streamInfo.pState); + // streamStateCommit(pTaskInfo->streamInfo.pState); } return NULL; } else { @@ -4734,7 +4734,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval, &pInfo->delKey); setOperatorCompleted(pOperator); - streamStateCommit(pTaskInfo->streamInfo.pState); + // streamStateCommit(pTaskInfo->streamInfo.pState); return NULL; } -- GitLab