diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index 78b3cd2f40b839dddd39d12bf0b0d6fc1ba82555..79837480d79ad265e8e6ce307a09828a3dc5e1d7 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -120,6 +120,8 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey); void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp); void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput); +void taosFillUpdateStartTimestampInfo(SFillInfo* pFillInfo, int64_t ts); +bool taosFillNotStarted(const SFillInfo* pFillInfo); SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr, int32_t numOfNotFillCols, const struct SNodeListNode* val); bool taosFillHasMoreResults(struct SFillInfo* pFillInfo); diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 839c845bac73817e916f30e2ffa61e9ad4a3b227..3d1df6482ec7d8c32557a90d049ab4a2f380c8b3 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -139,6 +139,12 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { } SOperatorInfo* pDownstream = pOperator->pDownstream[0]; + + // the scan order may be different from the output result order for agg interval operator. + if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL) { + order = ((SIntervalAggOperatorInfo*) pDownstream->info)->resultTsOrder; + } + while (1) { SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream); if (pBlock == NULL) { @@ -162,11 +168,24 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { pInfo->curGroupId = pInfo->pRes->info.id.groupId; // the first data block pInfo->totalInputRows += pInfo->pRes->info.rows; - if (order == pInfo->pFillInfo->order) { + if (order == TSDB_ORDER_ASC) { + int64_t skey = pBlock->info.window.skey; + if (skey < pInfo->pFillInfo->start) { // the start key may be smaller than the + ASSERT( taosFillNotStarted(pInfo->pFillInfo)); + taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey); + } + taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.ekey); } else { + int64_t ekey = pBlock->info.window.ekey; + if (ekey > pInfo->pFillInfo->start) { + ASSERT( taosFillNotStarted(pInfo->pFillInfo)); + taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, ekey); + } + taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.skey); } + taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes); } else if (pInfo->curGroupId != pBlock->info.id.groupId) { // the new group data block pInfo->existNewGroupBlock = pBlock; @@ -256,11 +275,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t const char* id, SInterval* pInterval, int32_t fillType, int32_t order) { SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode); - STimeWindow w = {0}; - int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey; - - getInitialStartTimeWindow(pInterval, startKey, &w, order == TSDB_ORDER_ASC); - pInfo->pFillInfo = taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo, + int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey; + pInfo->pFillInfo = taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo, pInfo->primaryTsCol, order, id); if (order == TSDB_ORDER_ASC) { diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index e4d5d2c268bbc76e79b72384139dcd42f2b510e9..7cc50a70ab799184de5b350f72526ee44ac23295 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -517,11 +517,16 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) return; } + // the endKey is now the aligned time window value. truncate time window isn't correct. pFillInfo->end = endKey; - if (!FILL_IS_ASC_FILL(pFillInfo)) { - pFillInfo->end = taosTimeTruncate(endKey, &pFillInfo->interval); - pFillInfo->end = taosTimeAdd(pFillInfo->end, pFillInfo->interval.interval, pFillInfo->interval.intervalUnit,pFillInfo->interval.precision); + +#if 0 + if (pFillInfo->order == TSDB_ORDER_ASC) { + ASSERT(pFillInfo->start <= pFillInfo->end); + } else { + ASSERT(pFillInfo->start >= pFillInfo->end); } +#endif pFillInfo->index = 0; pFillInfo->numOfRows = numOfRows; @@ -531,6 +536,13 @@ void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) pFillInfo->pSrcBlock = (SSDataBlock*)pInput; } +void taosFillUpdateStartTimestampInfo(SFillInfo* pFillInfo, int64_t ts) { + pFillInfo->start = ts; + pFillInfo->currentKey = ts; +} + +bool taosFillNotStarted(const SFillInfo* pFillInfo) {return pFillInfo->start == pFillInfo->currentKey;} + bool taosFillHasMoreResults(SFillInfo* pFillInfo) { int32_t remain = taosNumOfRemainRows(pFillInfo); if (remain > 0) { @@ -565,6 +577,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma (ekey1 > pFillInfo->currentKey && !FILL_IS_ASC_FILL(pFillInfo))) { return 0; } + numOfRes = taosTimeCountInterval(ekey1, pFillInfo->currentKey, pFillInfo->interval.sliding, pFillInfo->interval.slidingUnit, pFillInfo->interval.precision); numOfRes += 1; diff --git a/tests/script/tsim/query/partitionby.sim b/tests/script/tsim/query/partitionby.sim index da7dfc00908c7a20c3a8831c724187dc92746313..76d4f87908590839cdec1e8689404be974d4e86d 100644 --- a/tests/script/tsim/query/partitionby.sim +++ b/tests/script/tsim/query/partitionby.sim @@ -121,7 +121,7 @@ sql select count(*) from (select ts from $mt1 where ts is not null partition by if $rows != 1 then return -1 endi -if $data00 != 2 then +if $data00 != 4 then return -1 endi @@ -129,7 +129,7 @@ sql select count(*) from (select ts from $mt1 where ts is not null partition by if $rows != 1 then return -1 endi -if $data00 != 8 then +if $data00 != 16 then return -1 endi