diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d3d7db3341f2e0de76a79513621d33eadc9b3ef8..2cbf5b1837e4e20562b0f4a02654286452939232 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -442,6 +442,8 @@ typedef struct SIntervalAggOperatorInfo { SArray* pDelWins; // SWinRes int32_t delIndex; SSDataBlock* pDelRes; + + SNode *pCondition; } SIntervalAggOperatorInfo; typedef struct SStreamFinalIntervalOperatorInfo { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index fdef432d9596552a61b210120c1ea384ed269731..4683d662608b0be761f0de42f7dfa19a9fe19d5a 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1208,10 +1208,17 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { } blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); - doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - - if (pBlock->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { - doSetOperatorCompleted(pOperator); + while (1) { + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + doFilter(pInfo->pCondition, pBlock); + bool hasRemain = hasDataInGroupInfo(&pInfo->groupResInfo); + if (!hasRemain) { + doSetOperatorCompleted(pOperator); + break; + } + if (pBlock->info.rows > 0) { + break; + } } size_t rows = pBlock->info.rows; @@ -1662,6 +1669,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pInfo->execModel = pTaskInfo->execModel; pInfo->twAggSup = *pTwAggSupp; pInfo->ignoreExpiredData = pPhyNode->window.igExpired; + pInfo->pCondition = pPhyNode->window.node.pConditions; if (pPhyNode->window.pExprs != NULL) { int32_t numOfScalar = 0;