From 403617976c500b77066519e752e6dffbff7b8e48 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 1 Jun 2023 22:45:03 +0800 Subject: [PATCH] fix(query): fix error in fill. --- source/libs/executor/src/filloperator.c | 40 ++++++++++++------------- 1 file changed, 19 insertions(+), 21 deletions(-) diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index e257879a04..75bfe7b671 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -61,24 +61,24 @@ typedef struct SFillOperatorInfo { SExprSupp noFillExprSupp; } SFillOperatorInfo; -static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock); +static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t order); static void destroyFillOperatorInfo(void* param); static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag); static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, - SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) { + SResultInfo* pResultInfo, int32_t order) { pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; SSDataBlock* pResBlock = pInfo->pFinalRes; - int32_t order = TSDB_ORDER_ASC; +// int32_t order = TSDB_ORDER_ASC; int32_t scanFlag = MAIN_SCAN; - getTableScanInfo(pOperator, &order, &scanFlag, false); +// getTableScanInfo(pOperator, &order, &scanFlag, false); taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo)); blockDataCleanup(pInfo->pRes); doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag); - revisedFillStartKey(pInfo, pInfo->existNewGroupBlock); + revisedFillStartKey(pInfo, pInfo->existNewGroupBlock, order); int64_t ts = (order == TSDB_ORDER_ASC)? pInfo->existNewGroupBlock->info.window.ekey:pInfo->existNewGroupBlock->info.window.skey; taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ts); @@ -93,7 +93,7 @@ static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOp } static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo, - SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) { + SResultInfo* pResultInfo, int32_t order) { if (taosFillHasMoreResults(pInfo->pFillInfo)) { int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows; taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows); @@ -103,7 +103,7 @@ static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOpera // handle the cached new group data block if (pInfo->existNewGroupBlock) { - doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo); + doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, order); } } @@ -123,9 +123,7 @@ void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int } // todo refactor: decide the start key according to the query time range. -static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock) { - int32_t order = pInfo->pFillInfo->order; - +static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t order) { if (order == TSDB_ORDER_ASC) { int64_t skey = pBlock->info.window.skey; if (skey < pInfo->pFillInfo->start) { // the start key may be smaller than the @@ -137,7 +135,7 @@ static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock) { while(1) { int64_t prev = taosTimeAdd(t, -pInterval->sliding, pInterval->slidingUnit, pInterval->precision); - if (prev < pInfo->pFillInfo->start) { + if (prev <= pInfo->pFillInfo->start) { t = prev; break; } @@ -158,7 +156,7 @@ static void revisedFillStartKey(SFillOperatorInfo* pInfo, SSDataBlock* pBlock) { while(1) { int64_t prev = taosTimeAdd(t, pInterval->sliding, pInterval->slidingUnit, pInterval->precision); - if (prev < pInfo->pFillInfo->start) { + if (prev >= pInfo->pFillInfo->start) { t = prev; break; } @@ -184,12 +182,6 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { int32_t scanFlag = MAIN_SCAN; getTableScanInfo(pOperator, &order, &scanFlag, false); - doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo); - if (pResBlock->info.rows > 0) { - pResBlock->info.id.groupId = pInfo->curGroupId; - return pResBlock; - } - SOperatorInfo* pDownstream = pOperator->pDownstream[0]; // the scan order may be different from the output result order for agg interval operator. @@ -197,6 +189,12 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { order = ((SIntervalAggOperatorInfo*) pDownstream->info)->resultTsOrder; } + doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, order); + if (pResBlock->info.rows > 0) { + pResBlock->info.id.groupId = pInfo->curGroupId; + return pResBlock; + } + while (1) { SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream); if (pBlock == NULL) { @@ -218,7 +216,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { if (pInfo->curGroupId == 0 || (pInfo->curGroupId == pInfo->pRes->info.id.groupId)) { if (pInfo->curGroupId == 0) { - revisedFillStartKey(pInfo, pBlock); + revisedFillStartKey(pInfo, pBlock, order); } pInfo->curGroupId = pInfo->pRes->info.id.groupId; // the first data block @@ -249,7 +247,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { return pResBlock; } - doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo); + doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, order); if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) { pResBlock->info.id.groupId = pInfo->curGroupId; return pResBlock; @@ -257,7 +255,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { } else if (pInfo->existNewGroupBlock) { // try next group blockDataCleanup(pResBlock); - doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo); + doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, order); if (pResBlock->info.rows > pResultInfo->threshold) { pResBlock->info.id.groupId = pInfo->curGroupId; return pResBlock; -- GitLab