From cddc989808793b4b62ff6797f510d685e72cada0 Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 7 Jul 2022 14:30:08 +0800 Subject: [PATCH] feat: add filter to fill operator --- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/executorimpl.c | 42 +++++++++++++++++++++---- 2 files changed, 37 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 8994c9d192..514620fdf7 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -528,6 +528,7 @@ typedef struct SFillOperatorInfo { SSDataBlock* existNewGroupBlock; bool multigroupResult; STimeWindow win; + SNode* pCondition; } SFillOperatorInfo; typedef struct SGroupbyOperatorInfo { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3bf6c9fa0a..8742b5830b 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3364,18 +3364,13 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInf } } -static SSDataBlock* doFill(SOperatorInfo* pOperator) { +static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { SFillOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SResultInfo* pResultInfo = &pOperator->resultInfo; SSDataBlock* pResBlock = pInfo->pRes; - blockDataCleanup(pResBlock); - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - // todo handle different group data interpolation bool n = false; bool* newgroup = &n; @@ -3440,6 +3435,40 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { } } +static SSDataBlock* doFill(SOperatorInfo* pOperator) { + SFillOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + SResultInfo* pResultInfo = &pOperator->resultInfo; + SSDataBlock* pResBlock = pInfo->pRes; + + blockDataCleanup(pResBlock); + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + while (true) { + SSDataBlock* fillResult = doFillImpl(pOperator); + if (fillResult != NULL) { + doFilter(pInfo->pCondition, fillResult); + } + + if (fillResult == NULL) { + doSetOperatorCompleted(pOperator); + break; + } + + if (fillResult->info.rows > 0) { + break; + } + } + + size_t rows = pResBlock->info.rows; + pOperator->resultInfo.totalRows += rows; + + return (rows == 0)? NULL:pResBlock; +} + static void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) { for (int32_t i = 0; i < numOfExprs; ++i) { SExprInfo* pExprInfo = &pExpr[i]; @@ -3958,6 +3987,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pInfo->pRes = pResBlock; pInfo->multigroupResult = multigroupResult; + pInfo->pCondition = pPhyFillNode->node.pConditions; pOperator->name = "FillOperator"; pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; -- GitLab