From a74c830125f19553d438dfe5e853074938cd8d1e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 6 Aug 2022 19:34:15 +0800 Subject: [PATCH] fix(query): support scalar function in fill operator. --- source/libs/executor/inc/executorimpl.h | 5 ++++- source/libs/executor/inc/tfill.h | 1 + source/libs/executor/src/executorimpl.c | 19 +++++++++++-------- source/libs/executor/src/tfill.c | 24 +++++++++++++++--------- tests/script/tsim/parser/fill_us.sim | 1 + 5 files changed, 32 insertions(+), 18 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index b62ff2bef1..1207ea83cc 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -627,12 +627,15 @@ typedef struct SFillOperatorInfo { int64_t totalInputRows; void** p; SSDataBlock* existNewGroupBlock; - bool multigroupResult; STimeWindow win; SNode* pCondition; SArray* pColMatchColInfo; int32_t primaryTsCol; uint64_t curGroupId; // current handled group id + SExprInfo* pExprInfo; + int32_t numOfExpr; + SExprInfo* pNotFillExprInfo; + int32_t numOfNotFillExpr; } SFillOperatorInfo; typedef struct SGroupbyOperatorInfo { diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index 2d39cd8eb1..9f3a95aca8 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -44,6 +44,7 @@ typedef struct SFillInfo { TSKEY end; // endKey for fill TSKEY currentKey; // current active timestamp, the value may be changed during the fill procedure. int32_t tsSlotId; // primary time stamp slot id + int32_t srcTsSlotId; // timestamp column id in the source data block. int32_t order; // order [TSDB_ORDER_ASC|TSDB_ORDER_DESC] int32_t type; // fill type int32_t numOfRows; // number of rows in the input data block diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 213638c73f..2fc113c7d6 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3244,7 +3244,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream); if (pBlock == NULL) { if (pInfo->totalInputRows == 0) { - pOperator->status = OP_EXEC_DONE; + doSetOperatorCompleted(pOperator); return NULL; } @@ -3252,6 +3252,9 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { } else { blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol); + SExprSupp* pSup = &pOperator->exprSupp; + projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL); + if (pInfo->curGroupId == 0 || pInfo->curGroupId == pBlock->info.groupId) { pInfo->curGroupId = pBlock->info.groupId; // the first data block @@ -3629,10 +3632,9 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* goto _error; } - int32_t num = 0, num1 = 0; SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc); - SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &num); - SExprInfo* pCopyColumnExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &num1); + SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr); + SExprInfo* pCopyColumnExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pInfo->numOfNotFillExpr); SInterval* pInterval = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType @@ -3645,6 +3647,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* SResultInfo* pResultInfo = &pOperator->resultInfo; initResultSizeInfo(&pOperator->resultInfo, 4096); blockDataEnsureCapacity(pResBlock, pOperator->resultInfo.capacity); + initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr); pInfo->primaryTsCol = ((SColumnNode*)pPhyFillNode->pWStartTs)->slotId; @@ -3652,9 +3655,9 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); - int32_t code = initFillInfo(pInfo, pExprInfo, num, pCopyColumnExprInfo, num1, - (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, - pResultInfo->capacity, pTaskInfo->id.str, pInterval, type, order); + int32_t code = + initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pCopyColumnExprInfo, pInfo->numOfNotFillExpr, (SNodeListNode*)pPhyFillNode->pValues, + pPhyFillNode->timeRange, pResultInfo->capacity, pTaskInfo->id.str, pInterval, type, order); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -3667,7 +3670,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pOperator->status = OP_NOT_OPENED; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_FILL; pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = num; + pOperator->exprSupp.numOfExprs = pInfo->numOfExpr; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index c1bcf12cb2..4a885fb2ce 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -258,7 +258,7 @@ static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t outputRows) { pFillInfo->numOfCurrent = 0; - SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->tsSlotId); + SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); bool ascFill = FILL_IS_ASC_FILL(pFillInfo); @@ -349,10 +349,6 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t } if (pFillInfo->index >= pFillInfo->numOfRows || pFillInfo->numOfCurrent >= outputRows) { - /* the raw data block is exhausted, next value does not exists */ - // if (pFillInfo->index >= pFillInfo->numOfRows) { - // taosMemoryFreeClear(*next); - // } pFillInfo->numOfTotal += pFillInfo->numOfCurrent; return pFillInfo->numOfCurrent; } @@ -413,7 +409,17 @@ struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t } pFillInfo->order = order; - pFillInfo->tsSlotId = primaryTsSlotId; + pFillInfo->srcTsSlotId = primaryTsSlotId; + + for(int32_t i = 0; i < numOfNotFillCols; ++i) { + SFillColInfo* p = &pCol[i + numOfFillCols]; + int32_t srcSlotId = GET_SRC_SLOT_ID(p); + if (srcSlotId == primaryTsSlotId) { + pFillInfo->tsSlotId = i + numOfFillCols; + break; + } + } + taosResetFillInfo(pFillInfo, skey); switch (fillType) { @@ -531,7 +537,7 @@ bool taosFillHasMoreResults(SFillInfo* pFillInfo) { } int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) { - SColumnInfoData* pCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, 0); + SColumnInfoData* pCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId); int64_t* tsList = (int64_t*)pCol->pData; int32_t numOfRows = taosNumOfRemainRows(pFillInfo); @@ -619,9 +625,9 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn nodesValueNodeToVariant(pv, &pFillCol[i].fillVal); } - if (pExprInfo->base.numOfParams > 0) { +// if (pExprInfo->base.numOfParams > 0) { // pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query - } +// } } for(int32_t i = 0; i < numOfNotFillExpr; ++i) { diff --git a/tests/script/tsim/parser/fill_us.sim b/tests/script/tsim/parser/fill_us.sim index 82d282642e..d42d604ad3 100644 --- a/tests/script/tsim/parser/fill_us.sim +++ b/tests/script/tsim/parser/fill_us.sim @@ -1010,6 +1010,7 @@ if $data31 != 9.000000000 then return -1 endi if $data41 != 12.500000000 then + print expect 12.500000000, actual: $data41 return -1 endi if $data51 != 16.000000000 then -- GitLab