From 7084c765d04a2f1a1e9fdbced02db61a77f47120 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 12 Aug 2022 14:45:06 +0800 Subject: [PATCH] fix(query): set correct fill column index. --- source/libs/executor/src/executorimpl.c | 71 ++++++++++++++++++------- source/libs/executor/src/tfill.c | 19 ------- 2 files changed, 51 insertions(+), 39 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index fff0600fe4..e1732cb2aa 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3191,25 +3191,33 @@ int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDa } } -static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, +static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); +static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo *pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) { pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; + SSDataBlock* pResBlock = pInfo->pFinalRes; + + int32_t order = TSDB_ORDER_ASC; + int32_t scanFlag = MAIN_SCAN; + getTableScanInfo(pOperator, &order, &scanFlag); int64_t ekey = Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey : pInfo->existNewGroupBlock->info.window.ekey; taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo)); - taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); - taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); + doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag); + + taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ekey); + taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes); - int32_t numOfResultRows = pResultInfo->capacity - pInfo->pRes->info.rows; - taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pRes, numOfResultRows); + int32_t numOfResultRows = pResultInfo->capacity - pResBlock->info.rows; + taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows); pInfo->curGroupId = pInfo->existNewGroupBlock->info.groupId; pInfo->existNewGroupBlock = NULL; } -static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, +static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) { if (taosFillHasMoreResults(pInfo->pFillInfo)) { int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows; @@ -3220,7 +3228,34 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInf // handle the cached new group data block if (pInfo->existNewGroupBlock) { - doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo); + doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo); + } +} + +static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) { + SFillOperatorInfo* pInfo = pOperator->info; + SExprSupp* pSup = &pOperator->exprSupp; + SSDataBlock* pResBlock = pInfo->pFinalRes; + + setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false); + projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL); + pInfo->pRes->info.groupId = pBlock->info.groupId; + + SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsCol); + SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, pInfo->primarySrcSlotId); + colDataAssign(pDst, pSrc, pInfo->pRes->info.rows, &pResBlock->info); + + for(int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) { + SFillColInfo* pCol = &pInfo->pFillInfo->pFillCol[i + pInfo->numOfExpr]; + ASSERT(pCol->notFillCol); + + SExprInfo* pExpr = pCol->pExpr; + int32_t srcSlotId = pExpr->base.pParam[0].pCol->slotId; + int32_t dstSlotId = pExpr->base.resSchema.slotId; + + SColumnInfoData* pDst1 = taosArrayGet(pInfo->pRes->pDataBlock, dstSlotId); + SColumnInfoData* pSrc1 = taosArrayGet(pBlock->pDataBlock, srcSlotId); + colDataAssign(pDst1, pSrc1, pInfo->pRes->info.rows, &pResBlock->info); } } @@ -3236,8 +3271,9 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { int32_t order = TSDB_ORDER_ASC; int32_t scanFlag = MAIN_SCAN; + getTableScanInfo(pOperator, &order, &scanFlag); - doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo); + doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo); if (pResBlock->info.rows > 0) { pResBlock->info.groupId = pInfo->curGroupId; return pResBlock; @@ -3255,16 +3291,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey); } else { blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol); - SExprSupp* pSup = &pOperator->exprSupp; - - getTableScanInfo(pOperator, &order, &scanFlag); - setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false); - projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL); - pInfo->pRes->info.groupId = pBlock->info.groupId; - - SColumnInfoData* pDst = taosArrayGet(pInfo->pRes->pDataBlock, pInfo->primaryTsCol); - SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, pInfo->primarySrcSlotId); - colDataAssign(pDst, pSrc, pInfo->pRes->info.rows, &pResBlock->info); + doApplyScalarCalculation(pOperator, pBlock, order, scanFlag); if (pInfo->curGroupId == 0 || pInfo->curGroupId == pInfo->pRes->info.groupId) { pInfo->curGroupId = pInfo->pRes->info.groupId; // the first data block @@ -3293,14 +3320,18 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { return pResBlock; } - doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, pTaskInfo); + doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo); if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) { pResBlock->info.groupId = pInfo->curGroupId; return pResBlock; } } else if (pInfo->existNewGroupBlock) { // try next group assert(pBlock != NULL); - doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, pTaskInfo); + + blockDataCleanup(pResBlock); + blockDataCleanup(pInfo->pRes); + + doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo); if (pResBlock->info.rows > pResultInfo->threshold) { pResBlock->info.groupId = pInfo->curGroupId; return pResBlock; diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index f9897c4253..44340a5b5e 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -34,28 +34,9 @@ ((_v1) + ((_v2) - (_v1)) * (((double)(_k)) - ((double)(_k1))) / (((double)(_k2)) - ((double)(_k1)))) #define GET_DEST_SLOT_ID(_p) ((_p)->pExpr->base.resSchema.slotId) -#define GET_SRC_SLOT_ID(_p) ((_p)->pExpr->base.pParam[0].pCol->slotId) static void doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey); -static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) { -#if 0 - for (int32_t j = 0; j < pFillInfo->numOfCols; ++j) { - SFillColInfo* pCol = &pFillInfo->pFillCol[j]; - if (TSDB_COL_IS_NORMAL_COL(pCol->flag) || TSDB_COL_IS_UD_COL(pCol->flag)) { - continue; - } - - SResSchema* pSchema = &pCol->pExpr->base.resSchema; - char* val1 = elePtrAt(data[j], pSchema->bytes, genRows); - - assert(pCol->tagIndex >= 0 && pCol->tagIndex < pFillInfo->numOfTags); - SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; - assignVal(val1, pTag->tagVal, pSchema->bytes, pSchema->type); - } -#endif -} - static void setNullRow(SSDataBlock* pBlock, SFillInfo* pFillInfo, int32_t rowIndex) { for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { SFillColInfo* pCol = &pFillInfo->pFillCol[i]; -- GitLab