diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 52188941b4f42137ea4007a38495118cbd2a9f6f..c0afd7adac39a077e8b6dc91dc05ce513ea7f384 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -301,7 +301,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; // too many time window in query - if (taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) { + if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH && + taosHashGetSize(pSup->pResultRowHashTable) > MAX_INTERVAL_TIME_WINDOW) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); } @@ -3598,7 +3599,8 @@ void doDestroyExchangeOperatorInfo(void* param) { } static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SNodeListNode* pValNode, - STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType, int32_t order) { + STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType, + int32_t order) { SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pValNode); STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, win.skey); @@ -3635,7 +3637,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval : &((SIntervalAggOperatorInfo*)downstream->info)->interval; - int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC; + int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; int32_t type = convertFillType(pPhyFillNode->mode); SResultInfo* pResultInfo = &pOperator->resultInfo; @@ -3833,7 +3835,7 @@ static int32_t sortTableGroup(STableListInfo* pTableListInfo, int32_t groupNum) return TDB_CODE_SUCCESS; } -bool groupbyTbname(SNodeList* pGroupList) { +bool groupbyTbname(SNodeList* pGroupList) { bool bytbname = false; if (LIST_LENGTH(pGroupList) > 0) { SNode* p = nodesListGetNode(pGroupList, 0); @@ -3875,7 +3877,7 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, bool assignUid = groupbyTbname(group); int32_t groupNum = 0; - size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); + size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); for (int32_t i = 0; i < numOfTables; i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); @@ -4608,7 +4610,7 @@ void releaseQueryBuf(size_t numOfTables) { } int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) { - SExplainExecInfo execInfo = {0}; + SExplainExecInfo execInfo = {0}; SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo); pExplainInfo->numOfRows = operatorInfo->resultInfo.totalRows; @@ -4618,7 +4620,8 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf pExplainInfo->verboseInfo = NULL; if (operatorInfo->fpSet.getExplainFn) { - int32_t code = operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen); + int32_t code = + operatorInfo->fpSet.getExplainFn(operatorInfo, &pExplainInfo->verboseInfo, &pExplainInfo->verboseLen); if (code) { qError("%s operator getExplainFn failed, code:%s", GET_TASKID(operatorInfo->pTaskInfo), tstrerror(code)); return code; @@ -4629,7 +4632,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) { code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pExecInfoList); if (code != TSDB_CODE_SUCCESS) { -// taosMemoryFreeClear(*pRes); + // taosMemoryFreeClear(*pRes); return TSDB_CODE_QRY_OUT_OF_MEMORY; } } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 53e25c7e9731b7bbef92b85647c36fadf13da257..9a7c3cf7fb0d3e1b2d1497cdf7ecf6b04ccb30bc 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -56,6 +56,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys goto _error; } + pOperator->pTaskInfo = pTaskInfo; + int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols); @@ -63,7 +65,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo); pInfo->binfo.pRes = pResBlock; - pInfo->pFinalRes = createOneDataBlock(pResBlock, false); + pInfo->pFinalRes = createOneDataBlock(pResBlock, false); pInfo->pFilterNode = pProjPhyNode->node.pConditions; pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock; @@ -73,7 +75,6 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys pInfo->mergeDataBlocks = false; } - int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; @@ -89,12 +90,11 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfCols); pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols); - pOperator->name = "ProjectOperator"; + pOperator->name = "ProjectOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, destroyProjectOperatorInfo, NULL, NULL, NULL); @@ -106,7 +106,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys return pOperator; - _error: +_error: pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -156,7 +156,8 @@ static int32_t setInfoForNewGroup(SSDataBlock* pBlock, SLimitInfo* pLimitInfo, S return PROJECT_RETRIEVE_DONE; } -static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SSDataBlock* pBlock, SOperatorInfo* pOperator) { +static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SSDataBlock* pBlock, + SOperatorInfo* pOperator) { // set current group id pLimitInfo->currentGroupId = groupId; @@ -170,8 +171,7 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS } // check for the limitation in each group - if (pLimitInfo->limit.limit >= 0 && - pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) { + if (pLimitInfo->limit.limit >= 0 && pLimitInfo->numOfOutputRows + pBlock->info.rows >= pLimitInfo->limit.limit) { int32_t keepRows = (int32_t)(pLimitInfo->limit.limit - pLimitInfo->numOfOutputRows); blockDataKeepFirstNRows(pBlock, keepRows); if (pLimitInfo->slimit.limit > 0 && pLimitInfo->slimit.limit <= pLimitInfo->numOfOutputGroups) { @@ -222,7 +222,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } SOperatorInfo* downstream = pOperator->pDownstream[0]; - SLimitInfo* pLimitInfo = &pProjectInfo->limitInfo; + SLimitInfo* pLimitInfo = &pProjectInfo->limitInfo; if (downstream == NULL) { return doGenerateSourceData(pOperator); @@ -317,7 +317,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { if (pOperator->cost.openCost == 0) { pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; } - + // printDataBlock1(p, "project"); return (p->info.rows > 0) ? p : NULL; } @@ -330,6 +330,8 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy goto _error; } + pOperator->pTaskInfo = pTaskInfo; + SExprSupp* pSup = &pOperator->exprSupp; SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode; @@ -373,7 +375,6 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL, destroyIndefinitOperatorInfo, NULL, NULL, NULL); @@ -385,7 +386,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy return pOperator; - _error: +_error: taosMemoryFree(pInfo); taosMemoryFree(pOperator); pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; @@ -593,7 +594,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) { pRes->info.rows = 1; doFilter(pProjectInfo->pFilterNode, pRes, NULL); - /*int32_t status = */doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator); + /*int32_t status = */ doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator); pOperator->resultInfo.totalRows += pRes->info.rows; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index e371e6d9cf00709c8de227002303489f83d824b8..16f35b1b0d0fc845c2348914dc3101cdefdea729 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -30,6 +30,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* goto _error; } + pOperator->pTaskInfo = pTaskInfo; SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc; int32_t numOfCols = 0; @@ -45,7 +46,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* initResultSizeInfo(&pOperator->resultInfo, 1024); pInfo->binfo.pRes = pResBlock; - pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); + pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys); pInfo->pCondition = pSortNode->node.pConditions; pInfo->pColMatchInfo = pColMatchColInfo; initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo); @@ -57,7 +58,6 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pOperator->info = pInfo; pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->pTaskInfo = pTaskInfo; // lazy evaluation for the following parameter since the input datablock is not known till now. // pInfo->bufPageSize = rowSize < 1024 ? 1024 * 2 : rowSize * 2; @@ -222,7 +222,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { } // todo add the limit/offset info - if (pInfo->limitInfo.remainOffset > 0) { + if (pInfo->limitInfo.remainOffset > 0) { if (pInfo->limitInfo.remainOffset >= blockDataGetNumOfRows(pBlock)) { pInfo->limitInfo.remainOffset -= pBlock->info.rows; continue; @@ -247,7 +247,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { } } - return blockDataGetNumOfRows(pBlock) > 0? pBlock:NULL; + return blockDataGetNumOfRows(pBlock) > 0 ? pBlock : NULL; } void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { @@ -474,7 +474,7 @@ void destroyGroupSortOperatorInfo(void* param, int32_t numOfOutput) { taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->pColMatchInfo); - + taosMemoryFreeClear(param); } @@ -609,8 +609,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData pInfo->groupId = tsortGetGroupId(pTupleHandle); pInfo->prefetchedTuple = NULL; } - } - else { + } else { pTupleHandle = tsortNextTuple(pHandle); pInfo->groupId = 0; } @@ -694,7 +693,7 @@ void destroyMultiwayMergeOperatorInfo(void* param, int32_t numOfOutput) { tsortDestroySortHandle(pInfo->pSortHandle); taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->pColMatchInfo); - + taosMemoryFreeClear(param); } @@ -711,7 +710,7 @@ int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplai } SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numStreams, - SMergePhysiNode* pMergePhyNode, SExecTaskInfo* pTaskInfo) { + SMergePhysiNode* pMergePhyNode, SExecTaskInfo* pTaskInfo) { SPhysiNode* pPhyNode = (SPhysiNode*)pMergePhyNode; SMultiwayMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwayMergeOperatorInfo)); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index a14f554cf5227dc305f3b9e84f902be80fb7acbf..c8951c30b6c884e25229ad321a6f627cbb32a657 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -928,8 +928,9 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols); SResultRow* pResult = NULL; - STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->inputOrder); - int32_t ret = TSDB_CODE_SUCCESS; + STimeWindow win = + getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, pInfo->inputOrder); + int32_t ret = TSDB_CODE_SUCCESS; if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) && inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) { ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, @@ -1091,8 +1092,8 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pSup->pCtx, pBlock, pInfo->inputOrder, scanFlag, true); - blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); - + blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); + hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag, NULL); } @@ -1790,9 +1791,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } + pOperator->pTaskInfo = pTaskInfo; pInfo->win = pTaskInfo->window; - pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC; - pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC; + pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; + pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; pInfo->interval = *pInterval; pInfo->execModel = pTaskInfo->execModel; pInfo->twAggSup = *pTwAggSupp; @@ -1845,7 +1847,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->pTaskInfo = pTaskInfo; pOperator->exprSupp.numOfExprs = numOfCols; pOperator->info = pInfo; @@ -1880,6 +1881,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr goto _error; } + pOperator->pTaskInfo = pTaskInfo; pInfo->inputOrder = TSDB_ORDER_ASC; pInfo->interval = *pInterval; pInfo->execModel = OPTR_EXEC_MODEL_STREAM; @@ -1906,7 +1908,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->pTaskInfo = pTaskInfo; pOperator->exprSupp.numOfExprs = numOfCols; pOperator->info = pInfo; @@ -2180,7 +2181,6 @@ static void genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp break; } } - } static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { @@ -2457,7 +2457,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); - int32_t numOfCols = 0; + int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createResDataBlock(pSessionNode->window.node.pOutputDataBlockDesc); @@ -2475,11 +2475,11 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW initResultRowInfo(&pInfo->binfo.resultRowInfo); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; - pInfo->binfo.pRes = pResBlock; + pInfo->tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; + pInfo->binfo.pRes = pResBlock; pInfo->winSup.prevTs = INT64_MIN; - pInfo->reptScan = false; - pInfo->pCondition = pSessionNode->window.node.pConditions; + pInfo->reptScan = false; + pInfo->pCondition = pSessionNode->window.node.pConditions; pOperator->name = "SessionWindowAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION; @@ -3028,6 +3028,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, goto _error; } + pOperator->pTaskInfo = pTaskInfo; pInfo->order = TSDB_ORDER_ASC; pInfo->interval = (SInterval){.interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, @@ -3114,7 +3115,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->pTaskInfo = pTaskInfo; pOperator->exprSupp.numOfExprs = numOfCols; pOperator->info = pInfo; @@ -3155,7 +3155,7 @@ void destroyStateWinInfo(void* ptr) { if (ptr == NULL) { return; } - SStateWindowInfo* pWin = (SStateWindowInfo*) ptr; + SStateWindowInfo* pWin = (SStateWindowInfo*)ptr; taosMemoryFreeClear(pWin->stateKey.pData); } @@ -3246,6 +3246,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh goto _error; } + pOperator->pTaskInfo = pTaskInfo; + initResultSizeInfo(&pOperator->resultInfo, 4096); if (pSessionNode->window.pExprs != NULL) { int32_t numOfScalar = 0; @@ -3308,7 +3310,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); - pOperator->pTaskInfo = pTaskInfo; if (downstream) { initDownStream(downstream, &pInfo->streamAggSup, pInfo->gap, pInfo->twAggSup.waterMark, pOperator->operatorType); code = appendDownstream(pOperator, &downstream, 1); @@ -3465,7 +3466,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes assert(pWinInfo->win.skey <= pWinInfo->win.ekey); // too many time window in query int32_t size = taosArrayGetSize(pAggSup->pCurWins); - if (size > MAX_INTERVAL_TIME_WINDOW) { + if (pTaskInfo->execModel == OPTR_EXEC_MODEL_BATCH && size > MAX_INTERVAL_TIME_WINDOW) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); } @@ -3647,8 +3648,8 @@ void deleteWindow(SArray* pWinInfos, int32_t index, FDelete fp) { taosArrayRemove(pWinInfos, index); } -static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, int64_t gap, - SArray* result, FDelete fp) { +static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, int64_t gap, SArray* result, + FDelete fp) { SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* startDatas = (TSKEY*)pStartTsCol->pData; SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); @@ -4673,7 +4674,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR currTs = tsCols[currPos]; currWin.skey = currTs; currWin.ekey = taosTimeAdd(currWin.skey, iaInfo->interval.interval, iaInfo->interval.intervalUnit, - iaInfo->interval.precision) - 1; + iaInfo->interval.precision) - + 1; startPos = currPos; ret = setTimeWindowOutputBuf(pResultRowInfo, &currWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pSup->pCtx, @@ -4933,8 +4935,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* TSKEY blockStartTs = getStartTsKey(&pBlock->info.window, tsCols); SResultRow* pResult = NULL; - STimeWindow win = - getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval, iaInfo->inputOrder); + STimeWindow win = getActiveTimeWindow(iaInfo->aggSup.pResultBuf, pResultRowInfo, blockStartTs, &iaInfo->interval, + iaInfo->inputOrder); int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pExprSup->pCtx, @@ -4975,7 +4977,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* STimeWindow nextWin = win; while (1) { int32_t prevEndPos = forwardRows - 1 + startPos; - startPos = getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->inputOrder); + startPos = + getNextQualifiedWindow(&iaInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, iaInfo->inputOrder); if (startPos < 0) { break; }