From d1ca2ae2a0bf2cc07b30e836a22c87e47ec57252 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 26 Jun 2023 17:08:52 +0800 Subject: [PATCH] stream single session op end window function --- source/libs/executor/src/timewindowoperator.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index e0abcd96c5..78d1e97554 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3182,7 +3182,7 @@ int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pW } static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pStUpdated, - SSHashObj* pStDeleted, bool hasEndTs) { + SSHashObj* pStDeleted, bool hasEndTs, bool addGap) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; int32_t numOfOutput = pOperator->exprSupp.numOfExprs; @@ -3221,7 +3221,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData } int64_t winDelta = 0; - if (IS_FINAL_OP(pInfo)) { + if (addGap) { winDelta = pAggSup->gap; } code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &winInfo, &pResult, i, winRows, rows, numOfOutput, @@ -3547,7 +3547,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); - doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, IS_FINAL_OP(pInfo)); + doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, IS_FINAL_OP(pInfo), true); if (IS_FINAL_OP(pInfo)) { int32_t chIndex = getChildIndex(pBlock); int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -3562,7 +3562,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { } SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex); setInputDataBlock(&pChildOp->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); - doStreamSessionAggImpl(pChildOp, pBlock, NULL, NULL, true); + doStreamSessionAggImpl(pChildOp, pBlock, NULL, NULL, true, false); } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark); @@ -3767,7 +3767,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); - doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, NULL, false); + doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, NULL, false, false); maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); } -- GitLab