diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index a8a95b513afdb2ff33e54f141cdf5a39da55dd35..8110df30902951619b1a3ecf2638c0e74b6c713c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -809,6 +809,11 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo, bool isStream); + +SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, + SExecTaskInfo* pTaskInfo); + SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, @@ -907,6 +912,9 @@ int64_t getSmaWaterMark(int64_t interval, double filesFactor); bool isSmaStream(int8_t triggerType); int32_t compareTimeWindow(const void* p1, const void* p2, const void* param); +int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, + SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, const int32_t* rowCellOffset, + SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); #ifdef __cplusplus } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f50c7cdae57438d5fa4d030e4c8e7fe98cd083d5..726be6d0a2bf337ac5876ebeea07113f2ef3de65 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1956,6 +1956,57 @@ static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_ } } +int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, + SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, int32_t numOfExprs, + const int32_t* rowCellOffset, SSDataBlock* pBlock, + SExecTaskInfo* pTaskInfo) { + SFilePage* page = getBufPage(pBuf, resultRowPosition->pageId); + SResultRow* pRow = (SResultRow*)((char*)page + resultRowPosition->offset); + + doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset); + if (pRow->numOfRows == 0) { + releaseBufPage(pBuf, page); + return 0; + } + + while (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { + int32_t code = blockDataEnsureCapacity(pBlock, pBlock->info.capacity * 1.25); + if (TAOS_FAILED(code)) { + releaseBufPage(pBuf, page); + qError("%s ensure result data capacity failed, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); + longjmp(pTaskInfo->env, code); + } + } + + for (int32_t j = 0; j < numOfExprs; ++j) { + int32_t slotId = pExprInfo[j].base.resSchema.slotId; + + pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset); + if (pCtx[j].fpSet.finalize) { + int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); + if (TAOS_FAILED(code)) { + qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); + longjmp(pTaskInfo->env, code); + } + } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { + // do nothing, todo refactor + } else { + // expand the result into multiple rows. E.g., _wstartts, top(k, 20) + // the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows. + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId); + char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo); + for (int32_t k = 0; k < pRow->numOfRows; ++k) { + colDataAppend(pColInfoData, pBlock->info.rows + k, in, pCtx[j].resultInfo->isNullRes); + } + } + } + + releaseBufPage(pBuf, page); + pBlock->info.rows += pRow->numOfRows; + + return 0; +} + int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) { @@ -4689,6 +4740,21 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo, isStream); + } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) { + SMergeIntervalPhysiNode * pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode; + + SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); + SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); + + SInterval interval = {.interval = pIntervalPhyNode->interval, + .sliding = pIntervalPhyNode->sliding, + .intervalUnit = pIntervalPhyNode->intervalUnit, + .slidingUnit = pIntervalPhyNode->slidingUnit, + .offset = pIntervalPhyNode->offset, + .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision}; + + int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; + pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) { qDebug("[******]create Semi"); int32_t children = 0; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index d981b5034c751b159339da11fefe3abe9d10d0fb..c1c504a400bb0f29b5a03e3777fd3ce5b4a57b8c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1881,8 +1881,8 @@ _error: return NULL; } -static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, - int32_t tableGroupId, SArray* pUpdated) { +static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId, + SArray* pUpdated) { SStreamFinalIntervalOperatorInfo* pInfo = (SStreamFinalIntervalOperatorInfo*)pOperatorInfo->info; SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; @@ -1897,7 +1897,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); tsCols = (int64_t*)pColDataInfo->pData; } else { - return ; + return; } int32_t startPos = ascScan ? 0 : (pSDataBlock->info.rows - 1); @@ -1914,13 +1914,14 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc pos->groupId = tableGroupId; pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; *(int64_t*)pos->key = pResult->win.skey; - forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, - nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); + forwardRows = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, + TSDB_ORDER_ASC); if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdated) { - saveResult(pResult, tableGroupId, pUpdated); + saveResult(pResult, tableGroupId, pUpdated); } // window start(end) key interpolation - // doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardRows); + // doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, + // forwardRows); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); @@ -2049,10 +2050,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { int32_t childIndex = getChildIndex(pBlock); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SIntervalAggOperatorInfo* pChildInfo = pChildOp->info; - doClearWindows(&pChildInfo->aggSup, &pChildInfo->binfo, &pChildInfo->interval, - pChildInfo->primaryTsIndex, pChildOp->numOfExprs, pBlock, NULL); - rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId, - pOperator->numOfExprs, pOperator->pTaskInfo); + doClearWindows(&pChildInfo->aggSup, &pChildInfo->binfo, &pChildInfo->interval, pChildInfo->primaryTsIndex, + pChildOp->numOfExprs, pBlock, NULL); + rebuildIntervalWindow(pInfo, pUpWins, pInfo->binfo.pRes->info.groupId, pOperator->numOfExprs, + pOperator->pTaskInfo); taosArrayDestroy(pUpWins); continue; } @@ -2062,7 +2063,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { break; } if (isFinalInterval(pInfo)) { - int32_t chIndex = getChildIndex(pBlock); + int32_t chIndex = getChildIndex(pBlock); int32_t size = taosArrayGetSize(pInfo->pChildren); // if chIndex + 1 - size > 0, add new child for (int32_t i = 0; i < chIndex + 1 - size; i++) { @@ -2072,7 +2073,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } taosArrayPush(pInfo->pChildren, &pChildOp); } - SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex); + SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex); SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info; setInputDataBlock(pChildOp, pChInfo->binfo.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true); doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL); @@ -2080,12 +2081,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { doHashInterval(pOperator, pBlock, pBlock->info.groupId, pUpdated); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); } - + if (isFinalInterval(pInfo)) { - closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, - &pInfo->interval, pClosed); - finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, - pInfo->binfo.rowCellInfoOffset); + closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pClosed); + finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed, pInfo->binfo.rowCellInfoOffset); if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { taosArrayAddAll(pUpdated, pClosed); } @@ -2109,31 +2108,32 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return pInfo->binfo.pRes; } -SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, - SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild) { - SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; +SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, + SExecTaskInfo* pTaskInfo, int32_t numOfChild) { + SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SStreamFinalIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFinalIntervalOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } pInfo->order = TSDB_ORDER_ASC; - pInfo->interval = (SInterval) {.interval = pIntervalPhyNode->interval, - .sliding = pIntervalPhyNode->sliding, - .intervalUnit = pIntervalPhyNode->intervalUnit, - .slidingUnit = pIntervalPhyNode->slidingUnit, - .offset = pIntervalPhyNode->offset, - .precision = - ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision}; - pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark, + pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval, + .sliding = pIntervalPhyNode->sliding, + .intervalUnit = pIntervalPhyNode->intervalUnit, + .slidingUnit = pIntervalPhyNode->slidingUnit, + .offset = pIntervalPhyNode->offset, + .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision}; + pInfo->twAggSup = (STimeWindowAggSupp){ + .waterMark = pIntervalPhyNode->window.watermark, .calTrigger = pIntervalPhyNode->window.triggerType, .maxTs = INT64_MIN, - .winMap = NULL, }; + .winMap = NULL, + }; pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(pOperator, 4096); - int32_t numOfCols = 0; - SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); + int32_t numOfCols = 0; + SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); @@ -2160,7 +2160,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, if (!isFinalInterval(pInfo)) { pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; } - pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc);\ + pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); pInfo->pUpdateRes->info.type = STREAM_REPROCESS; blockDataEnsureCapacity(pInfo->pUpdateRes, 128); pInfo->pPhyNode = nodesCloneNode(pPhyNode); @@ -2174,9 +2174,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pOperator->numOfExprs = numOfCols; pOperator->info = pInfo; - pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, - destroyStreamFinalIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, - NULL); + pOperator->fpSet = + createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo, + aggEncodeResultRow, aggDecodeResultRow, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -2216,8 +2216,7 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) { } } -int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo, - int32_t numOfCols, SSDataBlock* pResultBlock) { +int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock) { pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset); pBasicInfo->pRes = pResultBlock; for (int32_t i = 0; i < numOfCols; ++i) { @@ -3195,3 +3194,281 @@ _error: pTaskInfo->code = code; return NULL; } + +typedef struct SMergeIntervalAggOperatorInfo { + SIntervalAggOperatorInfo intervalAggOperatorInfo; + + SHashObj* groupIntervalHash; + bool hasGroupId; + uint64_t groupId; + SSDataBlock* prefetchedBlock; + bool inputBlocksFinished; +} SMergeIntervalAggOperatorInfo; + +void destroyMergeIntervalOperatorInfo(void* param, int32_t numOfOutput) { + SMergeIntervalAggOperatorInfo* miaInfo = (SMergeIntervalAggOperatorInfo*)param; + taosHashCleanup(miaInfo->groupIntervalHash); + destroyIntervalOperatorInfo(&miaInfo->intervalAggOperatorInfo, numOfOutput); +} + +static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t tableGroupId, SSDataBlock* pResultBlock, + STimeWindow* newWin) { + SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; + SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; + SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; + bool ascScan = (iaInfo->order == TSDB_ORDER_ASC); + + STimeWindow* prevWin = taosHashGet(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId)); + if (prevWin == NULL) { + taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow)); + return 0; + } + + if (newWin == NULL || (ascScan && newWin->skey > prevWin->ekey || (!ascScan) && newWin->skey < prevWin->ekey) ) { + SET_RES_WINDOW_KEY(iaInfo->aggSup.keyBuf, &prevWin->skey, TSDB_KEYSIZE, tableGroupId); + SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, + GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); + ASSERT(p1 != NULL); + + finalizeResultRowIntoResultDataBlock(iaInfo->aggSup.pResultBuf, p1, iaInfo->binfo.pCtx, pOperatorInfo->pExpr, + pOperatorInfo->numOfExprs, iaInfo->binfo.rowCellInfoOffset, pResultBlock, + pTaskInfo); + taosHashRemove(iaInfo->aggSup.pResultRowHashTable, iaInfo->aggSup.keyBuf, GET_RES_WINDOW_KEY_LEN(TSDB_KEYSIZE)); + if (newWin == NULL) { + taosHashRemove(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId)); + } else { + taosHashPut(miaInfo->groupIntervalHash, &tableGroupId, sizeof(tableGroupId), newWin, sizeof(STimeWindow)); + } + } + + return 0; +} + +static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, + int32_t scanFlag, SSDataBlock* pResultBlock) { + SMergeIntervalAggOperatorInfo* miaInfo = pOperatorInfo->info; + SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; + + SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; + + int32_t startPos = 0; + int32_t numOfOutput = pOperatorInfo->numOfExprs; + int64_t* tsCols = extractTsCol(pBlock, iaInfo); + uint64_t tableGroupId = pBlock->info.groupId; + bool ascScan = (iaInfo->order == TSDB_ORDER_ASC); + TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols); + SResultRow* pResult = NULL; + + STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval, + iaInfo->interval.precision, &iaInfo->win); + + int32_t ret = + setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, iaInfo->binfo.pCtx, + numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, pTaskInfo); + if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + TSKEY ekey = ascScan ? win.ekey : win.skey; + int32_t forwardRows = + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order); + ASSERT(forwardRows > 0); + + // prev time window not interpolation yet. + if (iaInfo->timeWindowInterpo) { + SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult); + doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos); + + // restore current time window + ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, + iaInfo->binfo.pCtx, numOfOutput, iaInfo->binfo.rowCellInfoOffset, &iaInfo->aggSup, + pTaskInfo); + if (ret != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + // window start key interpolation + doWindowBorderInterpolation(iaInfo, pBlock, numOfOutput, iaInfo->binfo.pCtx, pResult, &win, startPos, forwardRows); + } + + updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true); + doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &win, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols, + pBlock->info.rows, numOfOutput, iaInfo->order); + doCloseWindow(pResultRowInfo, iaInfo, pResult); + + // output previous interval results after this interval (&win) is closed + outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &win); + + STimeWindow nextWin = win; + while (1) { + int32_t prevEndPos = forwardRows - 1 + startPos; + startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->order); + if (startPos < 0) { + break; + } + + // null data, failed to allocate more memory buffer + int32_t code = setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, + iaInfo->binfo.pCtx, numOfOutput, iaInfo->binfo.rowCellInfoOffset, + &iaInfo->aggSup, pTaskInfo); + if (code != TSDB_CODE_SUCCESS || pResult == NULL) { + longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + ekey = ascScan ? nextWin.ekey : nextWin.skey; + forwardRows = + getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, iaInfo->order); + + // window start(end) key interpolation + doWindowBorderInterpolation(iaInfo, pBlock, numOfOutput, iaInfo->binfo.pCtx, pResult, &nextWin, startPos, + forwardRows); + + updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true); + doApplyFunctions(pTaskInfo, iaInfo->binfo.pCtx, &nextWin, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, + tsCols, pBlock->info.rows, numOfOutput, iaInfo->order); + doCloseWindow(pResultRowInfo, iaInfo, pResult); + + // output previous interval results after this interval (&nextWin) is closed + outputPrevIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, &nextWin); + } + + if (iaInfo->timeWindowInterpo) { + saveDataBlockLastRow(iaInfo->pPrevValues, pBlock, iaInfo->pInterpCols); + } +} + +static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + SMergeIntervalAggOperatorInfo* miaInfo = pOperator->info; + SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SSDataBlock* pRes = iaInfo->binfo.pRes; + blockDataCleanup(pRes); + blockDataEnsureCapacity(pRes, pOperator->resultInfo.capacity); + + if (!miaInfo->inputBlocksFinished) { + SOperatorInfo* downstream = pOperator->pDownstream[0]; + int32_t scanFlag = MAIN_SCAN; + while (1) { + SSDataBlock* pBlock = NULL; + if (miaInfo->prefetchedBlock == NULL) { + pBlock = downstream->fpSet.getNextFn(downstream); + } else { + pBlock = miaInfo->prefetchedBlock; + miaInfo->groupId = pBlock->info.groupId; + } + + if (pBlock == NULL) { + miaInfo->inputBlocksFinished = true; + break; + } + + if (!miaInfo->hasGroupId) { + miaInfo->hasGroupId = true; + miaInfo->groupId = pBlock->info.groupId; + } else if (miaInfo->groupId != pBlock->info.groupId) { + miaInfo->prefetchedBlock = pBlock; + break; + } + + getTableScanInfo(pOperator, &iaInfo->order, &scanFlag); + setInputDataBlock(pOperator, iaInfo->binfo.pCtx, pBlock, iaInfo->order, scanFlag, true); + STableQueryInfo* pTableQueryInfo = iaInfo->pCurrent; + + setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); + doMergeIntervalAggImpl(pOperator, &iaInfo->binfo.resultRowInfo, pBlock, scanFlag, pRes); + + if (pRes->info.rows >= pOperator->resultInfo.threshold) { + break; + } + } + + pRes->info.groupId = miaInfo->groupId; + } else { + void* p = taosHashIterate(miaInfo->groupIntervalHash, NULL); + if (p != NULL) { + size_t len = 0; + uint64_t* pKey = taosHashGetKey(p, &len); + outputPrevIntervalResult(pOperator, *pKey, pRes, NULL); + } + } + + if (pRes->info.rows == 0) { + doSetOperatorCompleted(pOperator); + } + + size_t rows = pRes->info.rows; + pOperator->resultInfo.totalRows += rows; + return (rows == 0) ? NULL : pRes; +} + +SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, + SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, + SExecTaskInfo* pTaskInfo) { + SMergeIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (miaInfo == NULL || pOperator == NULL) { + goto _error; + } + + SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; + + iaInfo->win = pTaskInfo->window; + iaInfo->order = TSDB_ORDER_ASC; + iaInfo->interval = *pInterval; + + iaInfo->execModel = pTaskInfo->execModel; + + iaInfo->primaryTsIndex = primaryTsSlotId; + miaInfo->groupIntervalHash = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK); + + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + initResultSizeInfo(pOperator, 4096); + + int32_t code = + initAggInfo(&iaInfo->binfo, &iaInfo->aggSup, pExprInfo, numOfCols, pResBlock, keyBufSize, pTaskInfo->id.str); + + initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win); + + iaInfo->timeWindowInterpo = timeWindowinterpNeeded(iaInfo->binfo.pCtx, numOfCols, iaInfo); + if (iaInfo->timeWindowInterpo) { + iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); + } + + // iaInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); + if (code != TSDB_CODE_SUCCESS /* || iaInfo->pTableQueryInfo == NULL*/) { + goto _error; + } + + initResultRowInfo(&iaInfo->binfo.resultRowInfo, (int32_t)1); + + pOperator->name = "TimeMergeIntervalAggOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->numOfExprs = numOfCols; + pOperator->info = miaInfo; + + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL, + destroyMergeIntervalOperatorInfo, NULL, NULL, NULL); + + code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + return pOperator; + +_error: + destroyMergeIntervalOperatorInfo(miaInfo, numOfCols); + taosMemoryFreeClear(miaInfo); + taosMemoryFreeClear(pOperator); + pTaskInfo->code = code; + return NULL; +}