From 0ff775c46f99d5a6c13f52444c8cfa5bf1184c55 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 29 Jul 2022 14:33:50 +0800 Subject: [PATCH] fix(query): keep order info in fill operator. --- source/libs/executor/inc/executorimpl.h | 7 +- source/libs/executor/inc/tfill.h | 3 +- source/libs/executor/src/executorimpl.c | 9 +-- source/libs/executor/src/joinoperator.c | 8 +- source/libs/executor/src/tfill.c | 3 +- source/libs/executor/src/timewindowoperator.c | 75 +++++++++---------- 6 files changed, 51 insertions(+), 54 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 54c98fa948..37818dd91d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -523,8 +523,8 @@ typedef struct SIntervalAggOperatorInfo { STimeWindow win; // query time range bool timeWindowInterpo; // interpolation needed or not SArray* pInterpCols; // interpolation columns - int32_t order; // current SSDataBlock scan order int32_t resultTsOrder; // result timestamp order + int32_t inputOrder; // input data ts order EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] STimeWindowAggSupp twAggSup; bool invertible; @@ -534,8 +534,7 @@ typedef struct SIntervalAggOperatorInfo { SArray* pDelWins; // SWinRes int32_t delIndex; SSDataBlock* pDelRes; - - SNode *pCondition; + SNode* pCondition; } SIntervalAggOperatorInfo; typedef struct SMergeAlignedIntervalAggOperatorInfo { @@ -805,7 +804,7 @@ typedef struct STagFilterOperatorInfo { typedef struct SJoinOperatorInfo { SSDataBlock *pRes; int32_t joinType; - int32_t inputTsOrder; + int32_t inputOrder; SSDataBlock *pLeft; int32_t leftPos; diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index db04c5212c..b604794dad 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -76,10 +76,9 @@ bool taosFillHasMoreResults(struct SFillInfo* pFillInfo); SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId, - const char* id); + int32_t order, const char* id); void* taosDestroyFillInfo(struct SFillInfo *pFillInfo); -void taosFillSetDataOrderInfo(SFillInfo* pFillInfo, int32_t order); int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity); int64_t getFillInfoStart(struct SFillInfo *pFillInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index e2727cfb4a..7359a4ea90 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3234,8 +3234,6 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { return pResBlock; } - taosFillSetDataOrderInfo(pInfo->pFillInfo, TSDB_ORDER_ASC); - SOperatorInfo* pDownstream = pOperator->pDownstream[0]; while (1) { SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream); @@ -3588,14 +3586,14 @@ void doDestroyExchangeOperatorInfo(void* param) { } static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode, - STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) { + STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType, int32_t order) { SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode); STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey); w = getFirstQualifiedTimeWindow(win.skey, &w, pInterval, TSDB_ORDER_ASC); pInfo->pFillInfo = - taosCreateFillInfo(w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, pInfo->primaryTsCol, id); + taosCreateFillInfo(w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, pInfo->primaryTsCol, order, id); pInfo->win = win; pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES); @@ -3625,6 +3623,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval : &((SIntervalAggOperatorInfo*)downstream->info)->interval; + int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC; int32_t type = convertFillType(pPhyFillNode->mode); SResultInfo* pResultInfo = &pOperator->resultInfo; @@ -3636,7 +3635,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* &numOfOutputCols, COL_MATCH_FROM_SLOT_ID); int32_t code = initFillInfo(pInfo, pExprInfo, num, (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, - pResultInfo->capacity, pTaskInfo->id.str, pInterval, type); + pResultInfo->capacity, pTaskInfo->id.str, pInterval, type, order); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 8902804fab..17b81cdb82 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -77,11 +77,11 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pInfo->pCondAfterMerge = NULL; } - pInfo->inputTsOrder = TSDB_ORDER_ASC; + pInfo->inputOrder = TSDB_ORDER_ASC; if (pJoinNode->inputTsOrder == ORDER_ASC) { - pInfo->inputTsOrder = TSDB_ORDER_ASC; + pInfo->inputOrder = TSDB_ORDER_ASC; } else if (pJoinNode->inputTsOrder == ORDER_DESC) { - pInfo->inputTsOrder = TSDB_ORDER_DESC; + pInfo->inputOrder = TSDB_ORDER_DESC; } pOperator->fpSet = @@ -312,7 +312,7 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes) int32_t nrows = pRes->info.rows; - bool asc = (pJoinInfo->inputTsOrder == TSDB_ORDER_ASC) ? true : false; + bool asc = (pJoinInfo->inputOrder == TSDB_ORDER_ASC) ? true : false; while (1) { int64_t leftTs = 0; diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index be5631cd85..c5d68676d2 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -435,7 +435,7 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) { struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, - int32_t primaryTsSlotId, const char* id) { + int32_t primaryTsSlotId, int32_t order, const char* id) { if (fillType == TSDB_FILL_NONE) { return NULL; } @@ -446,6 +446,7 @@ struct SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfTags, int32_t capa return NULL; } + pFillInfo->order = order; pFillInfo->tsSlotId = primaryTsSlotId; taosResetFillInfo(pFillInfo, skey); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 4023009489..5e64d14c4c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -362,7 +362,7 @@ static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, in static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, int32_t pos, SSDataBlock* pBlock, const TSKEY* tsCols, STimeWindow* win, SExprSupp* pSup) { - bool ascQuery = (pInfo->order == TSDB_ORDER_ASC); + bool ascQuery = (pInfo->inputOrder == TSDB_ORDER_ASC); TSKEY curTs = tsCols[pos]; @@ -392,7 +392,7 @@ static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, i static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo* pInfo, SExprSupp* pSup, int32_t endRowIndex, SArray* pDataBlock, const TSKEY* tsCols, TSKEY blockEkey, STimeWindow* win) { - int32_t order = pInfo->order; + int32_t order = pInfo->inputOrder; TSKEY actualEndKey = tsCols[endRowIndex]; TSKEY key = (order == TSDB_ORDER_ASC) ? win->ekey : win->skey; @@ -550,7 +550,7 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataB if (!done) { int32_t endRowIndex = startPos + forwardRows - 1; - TSKEY endKey = (pInfo->order == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey; + TSKEY endKey = (pInfo->inputOrder == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey; bool interp = setTimeWindowInterpolationEndTs(pInfo, pSup, endRowIndex, pBlock->pDataBlock, tsCols, endKey, win); if (interp) { setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); @@ -639,7 +639,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); doApplyFunctions(pTaskInfo, pSup->pCtx, &w, &pInfo->twAggSup.timeWindowData, startPos, 0, tsCols, pBlock->info.rows, - numOfExprs, pInfo->order); + numOfExprs, pInfo->inputOrder); if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) { closeResultRow(pr); @@ -924,11 +924,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul int32_t numOfOutput = pSup->numOfExprs; int64_t* tsCols = extractTsCol(pBlock, pInfo); uint64_t tableGroupId = pBlock->info.groupId; - bool ascScan = (pInfo->order == TSDB_ORDER_ASC); + bool ascScan = (pInfo->inputOrder == TSDB_ORDER_ASC); TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols); SResultRow* pResult = NULL; - STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->order); + STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->inputOrder); int32_t ret = TSDB_CODE_SUCCESS; if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { @@ -946,7 +946,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul TSKEY ekey = ascScan ? win.ekey : win.skey; int32_t forwardRows = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder); ASSERT(forwardRows > 0); // prev time window not interpolation yet. @@ -969,7 +969,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, - pBlock->info.rows, numOfOutput, pInfo->order); + pBlock->info.rows, numOfOutput, pInfo->inputOrder); } doCloseWindow(pResultRowInfo, pInfo, pResult); @@ -977,14 +977,14 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul STimeWindow nextWin = win; while (1) { int32_t prevEndPos = forwardRows - 1 + startPos; - startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->order); + startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->inputOrder); if (startPos < 0) { break; } if (pInfo->ignoreExpiredData && isCloseWindow(&nextWin, &pInfo->twAggSup)) { ekey = ascScan ? nextWin.ekey : nextWin.skey; forwardRows = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder); continue; } @@ -1002,14 +1002,14 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ekey = ascScan ? nextWin.ekey : nextWin.skey; forwardRows = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->inputOrder); // window start(end) key interpolation doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, - pBlock->info.rows, numOfOutput, pInfo->order); + pBlock->info.rows, numOfOutput, pInfo->inputOrder); doCloseWindow(pResultRowInfo, pInfo, pResult); } @@ -1082,7 +1082,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { break; } - getTableScanInfo(pOperator, &pInfo->order, &scanFlag); + getTableScanInfo(pOperator, &pInfo->inputOrder, &scanFlag); if (pInfo->scalarSupp.pExprInfo != NULL) { SExprSupp* pExprSup = &pInfo->scalarSupp; @@ -1090,7 +1090,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { } // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, scanFlag, true); + setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->inputOrder, scanFlag, true); blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL); @@ -1549,7 +1549,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SIntervalAggOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - pInfo->order = TSDB_ORDER_ASC; + pInfo->inputOrder = TSDB_ORDER_ASC; SExprSupp* pSup = &pOperator->exprSupp; if (pOperator->status == OP_EXEC_DONE) { @@ -1609,7 +1609,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { // The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the // caller. Note that all the time window are not close till now. // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->order, MAIN_SCAN, true); + setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->inputOrder, MAIN_SCAN, true); if (pInfo->invertible) { setInverFunction(pSup->pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.type); } @@ -1789,8 +1789,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* } pInfo->win = pTaskInfo->window; - pInfo->order = TSDB_ORDER_ASC; - pInfo->resultTsOrder = TSDB_ORDER_ASC; + pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC; + pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC; pInfo->interval = *pInterval; pInfo->execModel = pTaskInfo->execModel; pInfo->twAggSup = *pTwAggSupp; @@ -1878,7 +1878,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr goto _error; } - pInfo->order = TSDB_ORDER_ASC; + pInfo->inputOrder = TSDB_ORDER_ASC; pInfo->interval = *pInterval; pInfo->execModel = OPTR_EXEC_MODEL_STREAM; pInfo->win = pTaskInfo->window; @@ -4563,7 +4563,7 @@ static int32_t outputMergeAlignedIntervalResult(SOperatorInfo* pOperatorInfo, ui SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; SExprSupp* pSup = &pOperatorInfo->exprSupp; - bool ascScan = (iaInfo->order == TSDB_ORDER_ASC); + bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC); SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &wstartTs, TSDB_KEYSIZE, tableGroupId); SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, @@ -4617,7 +4617,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR } else { updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, - tsCols, pBlock->info.rows, numOfOutput, iaInfo->order); + tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); @@ -4636,7 +4636,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, - tsCols, pBlock->info.rows, numOfOutput, iaInfo->order); + tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, currTs); } @@ -4681,8 +4681,8 @@ static SSDataBlock* doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { break; } - getTableScanInfo(pOperator, &iaInfo->order, &scanFlag); - setInputDataBlock(pOperator, pSup->pCtx, pBlock, iaInfo->order, scanFlag, true); + getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag); + setInputDataBlock(pOperator, pSup->pCtx, pBlock, iaInfo->inputOrder, scanFlag, true); doMergeAlignedIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); doFilter(miaInfo->pCondition, pRes, NULL); if (pRes->info.rows >= pOperator->resultInfo.capacity) { @@ -4723,7 +4723,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, miaInfo->pCondition = pCondition; iaInfo->win = pTaskInfo->window; - iaInfo->order = TSDB_ORDER_ASC; + iaInfo->inputOrder = TSDB_ORDER_ASC; iaInfo->interval = *pInterval; iaInfo->execModel = pTaskInfo->execModel; iaInfo->primaryTsIndex = primaryTsSlotId; @@ -4805,7 +4805,7 @@ static int32_t finalizeWindowResult(SOperatorInfo* pOperatorInfo, uint64_t table SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; - bool ascScan = (iaInfo->order == TSDB_ORDER_ASC); + bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC); SExprSupp* pExprSup = &pOperatorInfo->exprSupp; SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &win->skey, TSDB_KEYSIZE, tableGroupId); @@ -4823,7 +4823,7 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; - bool ascScan = (iaInfo->order == TSDB_ORDER_ASC); + bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC); SExprSupp* pExprSup = &pOperatorInfo->exprSupp; SGroupTimeWindow groupTimeWindow = {.groupId = tableGroupId, .window = *newWin}; @@ -4859,12 +4859,12 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* int32_t numOfOutput = pExprSup->numOfExprs; int64_t* tsCols = extractTsCol(pBlock, iaInfo); uint64_t tableGroupId = pBlock->info.groupId; - bool ascScan = (iaInfo->order == TSDB_ORDER_ASC); + bool ascScan = (iaInfo->inputOrder == TSDB_ORDER_ASC); TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols); SResultRow* pResult = NULL; STimeWindow win = - getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval, iaInfo->order); + getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval, iaInfo->inputOrder); int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx, @@ -4875,7 +4875,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* TSKEY ekey = ascScan ? win.ekey : win.skey; int32_t forwardRows = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order); + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->inputOrder); ASSERT(forwardRows > 0); // prev time window not interpolation yet. @@ -4896,7 +4896,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true); doApplyFunctions(pTaskInfo, pExprSup->pCtx, &win, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, - pBlock->info.rows, numOfOutput, iaInfo->order); + pBlock->info.rows, numOfOutput, iaInfo->inputOrder); doCloseWindow(pResultRowInfo, iaInfo, pResult); // output previous interval results after this interval (&win) is closed @@ -4905,7 +4905,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* STimeWindow nextWin = win; while (1) { int32_t prevEndPos = forwardRows - 1 + startPos; - startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->order); + startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->inputOrder); if (startPos < 0) { break; } @@ -4920,14 +4920,14 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ekey = ascScan ? nextWin.ekey : nextWin.skey; forwardRows = - getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order); + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->inputOrder); // window start(end) key interpolation doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true); doApplyFunctions(pTaskInfo, pExprSup->pCtx, &nextWin, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, - tsCols, pBlock->info.rows, numOfOutput, iaInfo->order); + tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder); doCloseWindow(pResultRowInfo, iaInfo, pResult); // output previous interval results after this interval (&nextWin) is closed @@ -4981,8 +4981,8 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { break; } - getTableScanInfo(pOperator, &iaInfo->order, &scanFlag); - setInputDataBlock(pOperator, pExpSupp->pCtx, pBlock, iaInfo->order, scanFlag, true); + getTableScanInfo(pOperator, &iaInfo->inputOrder, &scanFlag); + setInputDataBlock(pOperator, pExpSupp->pCtx, pBlock, iaInfo->inputOrder, scanFlag, true); doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); if (pRes->info.rows >= pOperator->resultInfo.threshold) { @@ -5024,9 +5024,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI miaInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow)); SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; - iaInfo->win = pTaskInfo->window; - iaInfo->order = TSDB_ORDER_ASC; + iaInfo->inputOrder = TSDB_ORDER_ASC; iaInfo->interval = *pInterval; iaInfo->execModel = pTaskInfo->execModel; -- GitLab