diff --git a/include/common/tmsg.h b/include/common/tmsg.h index de2163817968389f025df96583f61ed1bd2ec710..ca46657473d1f7988c6288cad958e92a1bfbc697 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -469,8 +469,7 @@ typedef struct { int32_t tz; // query client timezone char intervalUnit; char slidingUnit; - char - offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration. + char offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration. int8_t precision; int64_t interval; int64_t sliding; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 2bbd16fbd02648d136164eb090088355c2210395..99bab32157027d20dacb6df84ea2397a98d0e2e6 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -161,20 +161,8 @@ typedef struct STaskCostInfo { typedef struct SOperatorCostInfo { uint64_t openCost; uint64_t execCost; -// uint64_t totalRows; -// uint64_t totalBytes; } SOperatorCostInfo; -typedef struct { - int64_t vgroupLimit; - int64_t ts; -} SOrderedPrjQueryInfo; - -typedef struct { - char* tags; - SArray* pResult; // SArray -} SInterResult; - // The basic query information extracted from the SQueryInfo tree to support the // execution of query in a data node. typedef struct STaskAttr { @@ -230,7 +218,6 @@ typedef struct STaskAttr { SColumnInfo* tagColList; int32_t numOfFilterCols; int64_t* fillVal; - SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query. SSingleColumnFilterInfo* pFilterInfo; // SFilterInfo *pFilters; @@ -245,6 +232,7 @@ struct SOperatorInfo; typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char **result, int32_t *length); typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char *result, int32_t length); + typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* param); typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* param, bool* newgroup); typedef void (*__optr_close_fn_t)(void* param, int32_t num); @@ -330,11 +318,12 @@ typedef struct SOperatorInfo { SResultInfo resultInfo; struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator + __optr_open_fn_t _openFn; // DO NOT invoke this function directly __optr_fn_t getNextFn; - __optr_fn_t cleanupFn; + __optr_fn_t getStreamResFn; // execute the aggregate in the stream model. + __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP __optr_close_fn_t closeFn; - __optr_open_fn_t _openFn; // DO NOT invoke this function directly - __optr_encode_fn_t encodeResultRow; // + __optr_encode_fn_t encodeResultRow; __optr_decode_fn_t decodeResultRow; } SOperatorInfo; @@ -363,18 +352,18 @@ typedef struct SQInfo { STaskCostInfo summary; } SQInfo; -enum { - DATA_NOT_READY = 0x1, - DATA_READY = 0x2, - DATA_EXHAUSTED = 0x3, -}; +typedef enum { + EX_SOURCE_DATA_NOT_READY = 0x1, + EX_SOURCE_DATA_READY = 0x2, + EX_SOURCE_DATA_EXHAUSTED = 0x3, +} EX_SOURCE_STATUS; typedef struct SSourceDataInfo { struct SExchangeInfo *pEx; - int32_t index; - SRetrieveTableRsp *pRsp; - uint64_t totalRows; - int32_t status; + int32_t index; + SRetrieveTableRsp *pRsp; + uint64_t totalRows; + EX_SOURCE_STATUS status; } SSourceDataInfo; typedef struct SLoadRemoteDataInfo { @@ -383,12 +372,6 @@ typedef struct SLoadRemoteDataInfo { uint64_t totalElapsed; // total elapsed time } SLoadRemoteDataInfo; -enum { - EX_SOURCE_DATA_NOT_READY = 0x1, - EX_SOURCE_DATA_READY = 0x2, - EX_SOURCE_DATA_EXHAUSTED = 0x3, -}; - typedef struct SExchangeInfo { SArray* pSources; SArray* pSourceDataInfo; @@ -483,17 +466,23 @@ typedef struct SAggSupporter { int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row } SAggSupporter; +typedef enum { + OPTR_EXEC_MODEL_BATCH = 0x1, + OPTR_EXEC_MODEL_STREAM = 0x2, +} OPTR_EXEC_MODEL; + typedef struct STableIntervalOperatorInfo { - SOptrBasicInfo binfo; - SGroupResInfo groupResInfo; - SInterval interval; - STimeWindow win; - int32_t precision; - bool timeWindowInterpo; - char **pRow; - SAggSupporter aggSup; - STableQueryInfo *pCurrent; - int32_t order; + SOptrBasicInfo binfo; // basic info + SGroupResInfo groupResInfo; // multiple results build supporter + SInterval interval; // interval info + STimeWindow win; // query time range + bool timeWindowInterpo; // interpolation needed or not + char **pRow; // previous row/tuple of already processed datablock + SAggSupporter aggSup; // aggregate supporter + STableQueryInfo *pCurrent; // current tableQueryInfo struct + int32_t order; // current SSDataBlock scan order + OPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] + SArray *pUpdatedWindow; // updated time window due to the input data block from the downstream operator. } STableIntervalOperatorInfo; typedef struct SAggOperatorInfo { @@ -695,12 +684,6 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput); -void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock); -bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p); -void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p); - -SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows); - void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols); void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 186bf4ea66e56dc86eb02e5e21422b77511a68c3..3475df0e87d9bc7fa8bd680adc45535f1e1f2111 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1054,7 +1054,7 @@ static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext, int32_t order = pInfo->order; bool ascQuery = (order == TSDB_ORDER_ASC); - int32_t precision = pInfo->precision; + int32_t precision = pInterval->precision; getNextTimeWindow(pInterval, precision, order, pNext); // next time window is not in current block @@ -1489,15 +1489,20 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc } } -static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, +static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) { STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*)pOperatorInfo->info; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; int32_t numOfOutput = pOperatorInfo->numOfOutput; + SArray* pUpdated = NULL; + if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { + pUpdated = taosArrayInit(4, sizeof(SResultRowPosition)); + } + int32_t step = 1; - bool ascQuery = true; + bool ascScan = true; int32_t prevIndex = pResultRowInfo->curPos; @@ -1509,10 +1514,10 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey); } - int32_t startPos = ascQuery ? 0 : (pSDataBlock->info.rows - 1); - TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascQuery); + int32_t startPos = ascScan? 0 : (pSDataBlock->info.rows - 1); + TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascScan); - STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, &pInfo->interval, pInfo->precision, &pInfo->win); + STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, &pInfo->interval, pInfo->interval.precision, &pInfo->win); bool masterScan = true; SResultRow* pResult = NULL; @@ -1523,6 +1528,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } + if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { + SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; + taosArrayPush(pUpdated, &pos); + } + int32_t forwardStep = 0; TSKEY ekey = win.ekey; forwardStep = @@ -1534,8 +1544,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already. SResultRow* pRes = getResultRow(pResultRowInfo, j); if (pRes->closed) { - assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && - resultRowInterpolated(pRes, RESULT_ROW_END_INTERP)); + assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP)); continue; } @@ -1548,7 +1557,6 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); - doTimeWindowInterpolation(pOperatorInfo, &pInfo->binfo, pSDataBlock->pDataBlock, *(TSKEY*)pInfo->pRow[0], -1, tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP); @@ -1589,6 +1597,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } + if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { + SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; + taosArrayPush(pUpdated, &pos); + } + ekey = nextWin.ekey; // reviseWindowEkey(pQueryAttr, &nextWin); forwardStep = getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); @@ -1601,10 +1614,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } if (pInfo->timeWindowInterpo) { - int32_t rowIndex = ascQuery ? (pSDataBlock->info.rows - 1) : 0; + int32_t rowIndex = ascScan ? (pSDataBlock->info.rows - 1) : 0; saveDataBlockLastRow(pInfo->pRow, pSDataBlock->pDataBlock, rowIndex, pSDataBlock->info.numOfCols); } + return pUpdated; // updateResultRowInfoActiveIndex(pResultRowInfo, &pInfo->win, pRuntimeEnv->current->lastKey, true, false); } @@ -3590,6 +3604,42 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD } } +void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SArray* pUpdateList, + int32_t* rowCellInfoOffset) { + size_t num = taosArrayGetSize(pUpdateList); + + for (int32_t i = 0; i < num; ++i) { + SResultRowPosition* pPos = taosArrayGet(pUpdateList, i); + + SFilePage* bufPage = getBufPage(pBuf, pPos->pageId); + SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->offset); + + for (int32_t j = 0; j < numOfOutput; ++j) { + pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset); + + struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo; + if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { + continue; + } + + if (pCtx[j].fpSet.process) { // TODO set the dummy function. + pCtx[j].fpSet.finalize(&pCtx[j]); + } + + if (pRow->numOfRows < pResInfo->numOfRes) { + pRow->numOfRows = pResInfo->numOfRes; + } + } + + releaseBufPage(pBuf, bufPage); + /* + * set the number of output results for group by normal columns, the number of output rows usually is 1 except + * the top and bottom query + */ + // buf->numOfRows = (uint16_t)getNumOfResult(pCtx, numOfOutput); + } +} + static bool hasMainOutput(STaskAttr* pQueryAttr) { for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) { int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]); @@ -5074,12 +5124,12 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx for (int32_t i = 0; i < totalSources; ++i) { SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i); - if (pDataInfo->status == DATA_EXHAUSTED) { + if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) { completed += 1; continue; } - if (pDataInfo->status != DATA_READY) { + if (pDataInfo->status != EX_SOURCE_DATA_READY) { continue; } @@ -5093,7 +5143,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx " try next", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows); - pDataInfo->status = DATA_EXHAUSTED; + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; completed += 1; continue; } @@ -5111,16 +5161,15 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, i + 1, totalSources); - pDataInfo->status = DATA_EXHAUSTED; + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; } else { - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 - ", totalBytes:%" PRIu64, + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows, pLoadInfo->totalSize); } - if (pDataInfo->status != DATA_EXHAUSTED) { - pDataInfo->status = DATA_NOT_READY; + if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) { + pDataInfo->status = EX_SOURCE_DATA_NOT_READY; code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -5223,7 +5272,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows); - pDataInfo->status = DATA_EXHAUSTED; + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pExchangeInfo->current += 1; continue; } @@ -5240,7 +5289,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, totalSources); - pDataInfo->status = DATA_EXHAUSTED; + pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pExchangeInfo->current += 1; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 @@ -6798,37 +6847,6 @@ static SSDataBlock* doLimit(SOperatorInfo* pOperator, bool* newgroup) { return pBlock; } -static SSDataBlock* doFilter(void* param, bool* newgroup) { - SOperatorInfo* pOperator = (SOperatorInfo*)param; - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SFilterOperatorInfo* pCondInfo = pOperator->info; - STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; - - while (1) { - publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); - publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); - - if (pBlock == NULL) { - break; - } - - doSetFilterColumnInfo(pCondInfo->pFilterInfo, pCondInfo->numOfFilterCols, pBlock); - assert(pRuntimeEnv->pTsBuf == NULL); - filterRowsInDataBlock(pRuntimeEnv, pCondInfo->pFilterInfo, pCondInfo->numOfFilterCols, pBlock, true); - - if (pBlock->info.rows > 0) { - return pBlock; - } - } - - doSetOperatorCompleted(pOperator); - return NULL; -} - static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { if (OPTR_IS_OPENED(pOperator)) { return TSDB_CODE_SUCCESS; @@ -6837,8 +6855,8 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { STableIntervalOperatorInfo* pInfo = pOperator->info; int32_t order = TSDB_ORDER_ASC; - // STimeWindow win = pQueryAttr->window; - bool newgroup = false; + // STimeWindow win = {0}; + bool newgroup = false; SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -6851,7 +6869,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { } // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); - // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); @@ -6890,7 +6907,60 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; } -static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator) { + STableIntervalOperatorInfo* pInfo = pOperator->info; + int32_t order = TSDB_ORDER_ASC; + + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + if (pOperator->status == OP_RES_TO_RETURN) { + toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, + pInfo->binfo.rowCellInfoOffset); + if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + pOperator->status = OP_EXEC_DONE; + } + return pInfo->binfo.pRes; + } + + // STimeWindow win = {0}; + bool newgroup = false; + SOperatorInfo* downstream = pOperator->pDownstream[0]; + + SArray* pUpdated = NULL; + + while (1) { + publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + SSDataBlock* pBlock = downstream->getNextFn(downstream, &newgroup); + publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); + + if (pBlock == NULL) { + break; + } + + // The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the caller. + // Note that all the time window are not close till now. + + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); + // the pDataBlock are always the same one, no need to call this again + setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); + pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); + } + + finalizeUpdatedResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); + + blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); + toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, + pInfo->binfo.rowCellInfoOffset); + + ASSERT(pInfo->binfo.pRes->info.rows > 0); + pOperator->status = OP_RES_TO_RETURN; + + return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; +} + +static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -7773,11 +7843,13 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* } pInfo->order = TSDB_ORDER_ASC; - pInfo->precision = TSDB_TIME_PRECISION_MILLI; pInfo->win = pTaskInfo->window; pInfo->interval = *pInterval; - pInfo->win.skey = INT64_MIN; - pInfo->win.ekey = INT64_MAX; + + pInfo->execModel = OPTR_EXEC_MODEL_BATCH; + + pInfo->win.skey = INT64_MIN; + pInfo->win.ekey = INT64_MAX; int32_t numOfRows = 4096; int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); @@ -8748,7 +8820,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo .intervalUnit = pIntervalPhyNode->intervalUnit, .slidingUnit = pIntervalPhyNode->slidingUnit, .offset = pIntervalPhyNode->offset, - .precision = TSDB_TIME_PRECISION_MILLI,}; + .precision = TSDB_TIME_PRECISION_MILLI}; return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, pTableGroupInfo, pTaskInfo); } } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == nodeType(pPhyNode)) {