diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index c028c4a1b99663abaa80b4c8ab4a8a4b4d1b1f48..8871bae4e9012e0bb0af1dcf4081879948bdb734 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -93,6 +93,8 @@ typedef struct SWindowLogicNode { int64_t interval; int64_t offset; int64_t sliding; + int8_t intervalUnit; + int8_t slidingUnit; SFillNode* pFill; } SWindowLogicNode; @@ -203,7 +205,7 @@ typedef struct SDownstreamSourceNode { typedef struct SExchangePhysiNode { SPhysiNode node; - int32_t srcGroupId; // group id of datasource suplans + int32_t srcGroupId; // group id of datasource suplans SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode } SExchangePhysiNode; @@ -211,9 +213,11 @@ typedef struct SIntervalPhysiNode { SPhysiNode node; SNodeList* pExprs; // these are expression list of parameter expression of function SNodeList* pFuncs; - int64_t interval; - int64_t offset; - int64_t sliding; + int64_t interval; + int64_t offset; + int64_t sliding; + int8_t intervalUnit; + int8_t slidingUnit; SFillNode* pFill; } SIntervalPhysiNode; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 99a42d6ad6b5201fc2a3bbd2059a5bc50b66f915..351903dfdcf9f41b3fc15470103882eba2877ca6 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -275,36 +275,36 @@ typedef struct SExecTaskInfo { } SExecTaskInfo; typedef struct STaskRuntimeEnv { - jmp_buf env; - STaskAttr* pQueryAttr; - uint32_t status; // query status - void* qinfo; - uint8_t scanFlag; // denotes reversed scan of data or not - void* pTsdbReadHandle; - - int32_t prevGroupId; // previous executed group id - bool enableGroupData; - SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file - SHashObj* pResultRowHashTable; // quick locate the window object for each result - SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not - SArray* pResultRowArrayList; // The array list that contains the Result rows - char* keyBuf; // window key buffer + jmp_buf env; + STaskAttr* pQueryAttr; + uint32_t status; // query status + void* qinfo; + uint8_t scanFlag; // denotes reversed scan of data or not + void* pTsdbReadHandle; + + int32_t prevGroupId; // previous executed group id + bool enableGroupData; + SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file + SHashObj* pResultRowHashTable; // quick locate the window object for each result + SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not + SArray* pResultRowArrayList; // The array list that contains the Result rows + char* keyBuf; // window key buffer // The window result objects pool, all the resultRow Objects are allocated and managed by this object. - char** prevRow; + char** prevRow; SResultRowPool* pool; - SArray* prevResult; // intermediate result, SArray - STSBuf* pTsBuf; // timestamp filter list - STSCursor cur; + SArray* prevResult; // intermediate result, SArray + STSBuf* pTsBuf; // timestamp filter list + STSCursor cur; - char* tagVal; // tag value of current data block + char* tagVal; // tag value of current data block struct SScalarFunctionSupport* scalarSup; SSDataBlock* outputBuf; STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure struct SOperatorInfo* proot; - SGroupResInfo groupResInfo; - int64_t currentOffset; // dynamic offset value + SGroupResInfo groupResInfo; + int64_t currentOffset; // dynamic offset value STableQueryInfo* current; SRspResultInfo resultInfo; @@ -365,28 +365,6 @@ typedef struct SQInfo { STaskCostInfo summary; } SQInfo; -typedef struct STaskParam { - char* sql; - char* tagCond; - char* colCond; - char* tbnameCond; - char* prevResult; - SArray* pTableIdList; - SExprBasicInfo** pExpr; - SExprBasicInfo** pSecExpr; - SExprInfo* pExprs; - SExprInfo* pSecExprs; - - SFilterInfo* pFilters; - - SColIndex* pGroupColIndex; - SColumnInfo* pTagColumnInfo; - SGroupbyExpr* pGroupbyExpr; - int32_t tableScanOperator; - SArray* pOperator; - struct SUdfInfo* pUdfInfo; -} STaskParam; - enum { EX_SOURCE_DATA_NOT_READY = 0x1, EX_SOURCE_DATA_READY = 0x2, @@ -472,78 +450,76 @@ typedef struct SSysTableScanInfo { } SSysTableScanInfo; typedef struct SOptrBasicInfo { - SResultRowInfo resultRowInfo; - int32_t* rowCellInfoOffset; // offset value for each row result cell info - SqlFunctionCtx* pCtx; - SSDataBlock* pRes; - int32_t capacity; + SResultRowInfo resultRowInfo; + int32_t* rowCellInfoOffset; // offset value for each row result cell info + SqlFunctionCtx* pCtx; + SSDataBlock* pRes; + int32_t capacity; } SOptrBasicInfo; //TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset typedef struct SAggSupporter { - SHashObj* pResultRowHashTable; // quick locate the window object for each result - SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not - SArray* pResultRowArrayList; // The array list that contains the Result rows - char* keyBuf; // window key buffer - SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. - int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row + SHashObj* pResultRowHashTable; // quick locate the window object for each result + SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not + SArray* pResultRowArrayList; // The array list that contains the Result rows + char* keyBuf; // window key buffer + SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. + int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row } SAggSupporter; typedef struct STableIntervalOperatorInfo { - SOptrBasicInfo binfo; - SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file - SGroupResInfo groupResInfo; - SInterval interval; - STimeWindow win; - int32_t precision; - bool timeWindowInterpo; - char **pRow; - SAggSupporter aggSup; - STableQueryInfo *pCurrent; - int32_t order; + SOptrBasicInfo binfo; + SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file + SGroupResInfo groupResInfo; + SInterval interval; + STimeWindow win; + int32_t precision; + bool timeWindowInterpo; + char **pRow; + SAggSupporter aggSup; + STableQueryInfo *pCurrent; + int32_t order; } STableIntervalOperatorInfo; typedef struct SAggOperatorInfo { - SOptrBasicInfo binfo; - SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file - SAggSupporter aggSup; - STableQueryInfo *current; - uint32_t groupId; - SGroupResInfo groupResInfo; - STableQueryInfo *pTableQueryInfo; + SOptrBasicInfo binfo; + SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file + SAggSupporter aggSup; + STableQueryInfo *current; + uint32_t groupId; + SGroupResInfo groupResInfo; + STableQueryInfo *pTableQueryInfo; } SAggOperatorInfo; typedef struct SProjectOperatorInfo { - SOptrBasicInfo binfo; - SSDataBlock *existDataBlock; - int32_t threshold; - bool hasVarCol; + SOptrBasicInfo binfo; + SSDataBlock *existDataBlock; + int32_t threshold; + bool hasVarCol; } SProjectOperatorInfo; typedef struct SLimitOperatorInfo { - SLimit limit; - int64_t currentOffset; - int64_t currentRows; + SLimit limit; + int64_t currentOffset; + int64_t currentRows; } SLimitOperatorInfo; typedef struct SSLimitOperatorInfo { - int64_t groupTotal; - int64_t currentGroupOffset; - - int64_t rowsTotal; - int64_t currentOffset; - SLimit limit; - SLimit slimit; - - char** prevRow; - SArray* orderColumnList; - bool hasPrev; - bool ignoreCurrentGroup; - bool multigroupResult; - SSDataBlock* pRes; // result buffer - SSDataBlock* pPrevBlock; - int64_t capacity; - int64_t threshold; + int64_t groupTotal; + int64_t currentGroupOffset; + int64_t rowsTotal; + int64_t currentOffset; + SLimit limit; + SLimit slimit; + char** prevRow; + SArray* orderColumnList; + bool hasPrev; + bool ignoreCurrentGroup; + bool multigroupResult; + SSDataBlock* pRes; // result buffer + SSDataBlock* pPrevBlock; + int64_t capacity; + int64_t threshold; } SSLimitOperatorInfo; typedef struct SFilterOperatorInfo { @@ -586,23 +562,6 @@ typedef struct SStateWindowOperatorInfo { bool reptScan; } SStateWindowOperatorInfo; -typedef struct SDistinctDataInfo { - int32_t index; - int32_t type; - int32_t bytes; -} SDistinctDataInfo; - -typedef struct SDistinctOperatorInfo { - SHashObj* pSet; - SSDataBlock* pRes; - bool recordNullVal; // has already record the null value, no need to try again - int64_t threshold; - int64_t outputCapacity; - int32_t totalBytes; - char* buf; - SArray* pDistinctDataInfo; -} SDistinctOperatorInfo; - typedef struct SSortedMergeOperatorInfo { SOptrBasicInfo binfo; bool hasVarCol; @@ -628,24 +587,40 @@ typedef struct SSortedMergeOperatorInfo { } SSortedMergeOperatorInfo; typedef struct SOrderOperatorInfo { - uint32_t sortBufSize; // max buffer size for in-memory sort - SSDataBlock *pDataBlock; - bool hasVarCol; // has variable length column, such as binary/varchar/nchar - SArray *orderInfo; - bool nullFirst; - SSortHandle *pSortHandle; - - int32_t bufPageSize; - int32_t numOfRowsInRes; + uint32_t sortBufSize; // max buffer size for in-memory sort + SSDataBlock *pDataBlock; + bool hasVarCol; // has variable length column, such as binary/varchar/nchar + SArray *orderInfo; + bool nullFirst; + SSortHandle *pSortHandle; + int32_t bufPageSize; + int32_t numOfRowsInRes; // TODO extact struct - int64_t startTs; // sort start time - uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. - uint64_t totalSize; // total load bytes from remote - uint64_t totalRows; // total number of rows - uint64_t totalElapsed; // total elapsed time + int64_t startTs; // sort start time + uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. + uint64_t totalSize; // total load bytes from remote + uint64_t totalRows; // total number of rows + uint64_t totalElapsed; // total elapsed time } SOrderOperatorInfo; +typedef struct SDistinctDataInfo { + int32_t index; + int32_t type; + int32_t bytes; +} SDistinctDataInfo; + +typedef struct SDistinctOperatorInfo { + SHashObj* pSet; + SSDataBlock* pRes; + bool recordNullVal; // has already record the null value, no need to try again + int64_t threshold; + int64_t outputCapacity; + int32_t totalBytes; + char* buf; + SArray* pDistinctDataInfo; +} SDistinctOperatorInfo; + SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); @@ -659,7 +634,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const S int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo); SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, int32_t numOfDownstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SInterval* pInterval, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, + const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); @@ -709,9 +685,6 @@ void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters); -int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start, - int32_t prevResultLen, void* merger); - int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId); void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 356375c7caa9d6775a4dc709db62512d0bcf9a9c..9d3a80d7b4ebe2211cd2ec10d3c7924f1d3765b1 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -7128,15 +7128,19 @@ static void cleanupAggSup(SAggSupporter* pAggSup) { destroyResultRowPool(pAggSup->pool); } -static int32_t initAggInfo(SAggOperatorInfo* pInfo, SExprInfo* pExprInfo, int32_t numOfCols, int32_t numOfRows, SSDataBlock* pResultBlock, const STableGroupInfo* pTableGroupInfo) { - pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset); - pInfo->binfo.pRes = pResultBlock; - pInfo->binfo.capacity = numOfRows; - - doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols); - pInfo->pTableQueryInfo = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); - if (pInfo->pTableQueryInfo == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; +static int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, + int32_t numOfRows, SSDataBlock* pResultBlock) { + pBasicInfo->pCtx = createSqlFunctionCtx_rv(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset); + pBasicInfo->pRes = pResultBlock; + pBasicInfo->capacity = numOfRows; + + doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols); +} + +static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInfo) { + STableQueryInfo* pTableQueryInfo = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); + if (pTableQueryInfo == NULL) { + return NULL; } int32_t index = 0; @@ -7145,7 +7149,7 @@ static int32_t initAggInfo(SAggOperatorInfo* pInfo, SExprInfo* pExprInfo, int32_ for(int32_t j = 0; j < taosArrayGetSize(pa); ++j) { STableKeyInfo* pk = taosArrayGet(pa, j); - STableQueryInfo* pTQueryInfo = &pInfo->pTableQueryInfo[index++]; + STableQueryInfo* pTQueryInfo = &pTableQueryInfo[index++]; pTQueryInfo->uid = pk->uid; pTQueryInfo->lastKey = pk->lastKey; pTQueryInfo->groupIndex = i; @@ -7153,9 +7157,8 @@ static int32_t initAggInfo(SAggOperatorInfo* pInfo, SExprInfo* pExprInfo, int32_ } STimeWindow win = {0, INT64_MAX}; - createTableQueryInfo(pInfo->pTableQueryInfo, false, win); - - return TSDB_CODE_SUCCESS; + createTableQueryInfo(pTableQueryInfo, false, win); + return pTableQueryInfo; } SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, @@ -7168,8 +7171,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* //(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); int32_t numOfRows = 1; - int32_t code = initAggInfo(pInfo, pExprInfo, numOfCols, numOfRows, pResultBlock, pTableGroupInfo); - if (code != TSDB_CODE_SUCCESS) { + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock); + pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); + if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) { goto _error; } @@ -7308,7 +7312,11 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprI SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo)); int32_t numOfRows = 1; - initAggInfo(pInfo, pExprInfo, numOfCols, numOfRows, pResBlock, pTableGroupInfo); + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock); + pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); + if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) { + goto _error; + } size_t tableGroup = taosArrayGetSize(pTableGroupInfo->pGroupList); initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup); @@ -7325,9 +7333,15 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprI pOperator->getNextFn = doMultiTableAggregate; pOperator->closeFn = destroyAggOperatorInfo; - int32_t code = appendDownstream(pOperator, &downstream, 1); + code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } return pOperator; + +_error: + return NULL; } SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { @@ -7433,24 +7447,35 @@ SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, int32_t numOfD return NULL; } -SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SInterval* pInterval, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, +const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); + if (pInfo == NULL || pOperator == NULL) { + goto _error; + } pInfo->order = TSDB_ORDER_ASC; pInfo->precision = TSDB_TIME_PRECISION_MICRO; pInfo->win = pTaskInfo->window; pInfo->interval = *pInterval; - int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols); + int32_t numOfRows = 1; + int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock); +// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); + if (code != TSDB_CODE_SUCCESS/* || pInfo->pTableQueryInfo == NULL*/) { + goto _error; + } code = createDiskbasedBuf(&pInfo->pResultBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/"); - pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1); pOperator->name = "TimeIntervalAggOperator"; - // pOperator->operatorType = OP_TimeWindow; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; pOperator->pExpr = pExprInfo; @@ -7463,6 +7488,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->closeFn = destroyIntervalOperatorInfo; code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + return pOperator; _error: @@ -7686,15 +7715,13 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) { SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo)); - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - - pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr); - pInfo->slimit = pQueryAttr->slimit; - pInfo->limit = pQueryAttr->limit; - pInfo->capacity = pRuntimeEnv->resultInfo.capacity; - pInfo->threshold = (int64_t)(pInfo->capacity * 0.8); - pInfo->currentOffset = pQueryAttr->limit.offset; - pInfo->currentGroupOffset = pQueryAttr->slimit.offset; +// pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr); +// pInfo->slimit = pQueryAttr->slimit; +// pInfo->limit = pQueryAttr->limit; +// pInfo->capacity = pRuntimeEnv->resultInfo.capacity; +// pInfo->threshold = (int64_t)(pInfo->capacity * 0.8); +// pInfo->currentOffset = pQueryAttr->limit.offset; +// pInfo->currentGroupOffset = pQueryAttr->slimit.offset; pInfo->multigroupResult= multigroupResult; // TODO refactor @@ -8269,6 +8296,19 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); } + } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == nodeType(pPhyNode)) { + size_t size = LIST_LENGTH(pPhyNode->pChildren); + assert(size == 1); + + for (int32_t i = 0; i < size; ++i) { + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); + SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(((SIntervalPhysiNode*)pPhyNode)->pFuncs, &num); + SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, NULL, pTableGroupInfo, pTaskInfo); + } } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { size_t size = taosArrayGetSize(pPhyNode->pChildren); assert(size == 1); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 57ebc2c4b6fb3956a6a80b6e688aae8751406c19..4c0acb9412c5b7aba6851304ac4d607271dd8be1 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -580,6 +580,8 @@ static const char* jkIntervalPhysiPlanFuncs = "Funcs"; static const char* jkIntervalPhysiPlanInterval = "Interval"; static const char* jkIntervalPhysiPlanOffset = "Offset"; static const char* jkIntervalPhysiPlanSliding = "Sliding"; +static const char* jkIntervalPhysiPlanIntervalUnit = "intervalUnit"; +static const char* jkIntervalPhysiPlanSlidingUnit = "slidingUnit"; static const char* jkIntervalPhysiPlanFill = "Fill"; static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) { @@ -601,6 +603,12 @@ static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanSliding, pNode->sliding); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanIntervalUnit, pNode->intervalUnit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanSlidingUnit, pNode->slidingUnit); + } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkIntervalPhysiPlanFill, nodeToJson, pNode->pFill); } @@ -627,6 +635,12 @@ static int32_t jsonToPhysiIntervalNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBigIntValue(pJson, jkIntervalPhysiPlanSliding, &pNode->sliding); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkIntervalPhysiPlanIntervalUnit, &pNode->intervalUnit); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetTinyIntValue(pJson, jkIntervalPhysiPlanSlidingUnit, &pNode->slidingUnit); + } if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeObject(pJson, jkIntervalPhysiPlanFill, (SNode**)&pNode->pFill); } @@ -1644,7 +1658,10 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToSubplan(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN: return jsonToPlan(pJson, pObj); + case QUERY_NODE_PHYSICAL_PLAN_INTERVAL: + return jsonToPhysiIntervalNode(pJson, pObj); default: + assert(0); break; } nodesWarn("jsonToSpecificNode unknown node = %s", nodesNodeName(nodeType(pObj))); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 7b7dd26df1a985463f915408f8984d00f245653f..0affd93f4de0d8dab9ed4eef2251dc4088f85718 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -480,6 +480,9 @@ static SPhysiNode* createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* p pInterval->interval = pWindowLogicNode->interval; pInterval->offset = pWindowLogicNode->offset; pInterval->sliding = pWindowLogicNode->sliding; + pInterval->intervalUnit = pWindowLogicNode->intervalUnit; + pInterval->slidingUnit = pWindowLogicNode->slidingUnit; + pInterval->pFill = nodesCloneNode(pWindowLogicNode->pFill); SNodeList* pPrecalcExprs = NULL;