提交 a853c6dd 编写于 作者: H Haojun Liao

[td-13039] refactor.

上级 dc27021f
......@@ -93,6 +93,8 @@ typedef struct SWindowLogicNode {
int64_t interval;
int64_t offset;
int64_t sliding;
int8_t intervalUnit;
int8_t slidingUnit;
SFillNode* pFill;
} SWindowLogicNode;
......@@ -203,7 +205,7 @@ typedef struct SDownstreamSourceNode {
typedef struct SExchangePhysiNode {
SPhysiNode node;
int32_t srcGroupId; // group id of datasource suplans
int32_t srcGroupId; // group id of datasource suplans
SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
} SExchangePhysiNode;
......@@ -211,9 +213,11 @@ typedef struct SIntervalPhysiNode {
SPhysiNode node;
SNodeList* pExprs; // these are expression list of parameter expression of function
SNodeList* pFuncs;
int64_t interval;
int64_t offset;
int64_t sliding;
int64_t interval;
int64_t offset;
int64_t sliding;
int8_t intervalUnit;
int8_t slidingUnit;
SFillNode* pFill;
} SIntervalPhysiNode;
......
......@@ -275,36 +275,36 @@ typedef struct SExecTaskInfo {
} SExecTaskInfo;
typedef struct STaskRuntimeEnv {
jmp_buf env;
STaskAttr* pQueryAttr;
uint32_t status; // query status
void* qinfo;
uint8_t scanFlag; // denotes reversed scan of data or not
void* pTsdbReadHandle;
int32_t prevGroupId; // previous executed group id
bool enableGroupData;
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pResultRowHashTable; // quick locate the window object for each result
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer
jmp_buf env;
STaskAttr* pQueryAttr;
uint32_t status; // query status
void* qinfo;
uint8_t scanFlag; // denotes reversed scan of data or not
void* pTsdbReadHandle;
int32_t prevGroupId; // previous executed group id
bool enableGroupData;
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pResultRowHashTable; // quick locate the window object for each result
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer
// The window result objects pool, all the resultRow Objects are allocated and managed by this object.
char** prevRow;
char** prevRow;
SResultRowPool* pool;
SArray* prevResult; // intermediate result, SArray<SInterResult>
STSBuf* pTsBuf; // timestamp filter list
STSCursor cur;
SArray* prevResult; // intermediate result, SArray<SInterResult>
STSBuf* pTsBuf; // timestamp filter list
STSCursor cur;
char* tagVal; // tag value of current data block
char* tagVal; // tag value of current data block
struct SScalarFunctionSupport* scalarSup;
SSDataBlock* outputBuf;
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
struct SOperatorInfo* proot;
SGroupResInfo groupResInfo;
int64_t currentOffset; // dynamic offset value
SGroupResInfo groupResInfo;
int64_t currentOffset; // dynamic offset value
STableQueryInfo* current;
SRspResultInfo resultInfo;
......@@ -365,28 +365,6 @@ typedef struct SQInfo {
STaskCostInfo summary;
} SQInfo;
typedef struct STaskParam {
char* sql;
char* tagCond;
char* colCond;
char* tbnameCond;
char* prevResult;
SArray* pTableIdList;
SExprBasicInfo** pExpr;
SExprBasicInfo** pSecExpr;
SExprInfo* pExprs;
SExprInfo* pSecExprs;
SFilterInfo* pFilters;
SColIndex* pGroupColIndex;
SColumnInfo* pTagColumnInfo;
SGroupbyExpr* pGroupbyExpr;
int32_t tableScanOperator;
SArray* pOperator;
struct SUdfInfo* pUdfInfo;
} STaskParam;
enum {
EX_SOURCE_DATA_NOT_READY = 0x1,
EX_SOURCE_DATA_READY = 0x2,
......@@ -472,78 +450,76 @@ typedef struct SSysTableScanInfo {
} SSysTableScanInfo;
typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo;
int32_t* rowCellInfoOffset; // offset value for each row result cell info
SqlFunctionCtx* pCtx;
SSDataBlock* pRes;
int32_t capacity;
SResultRowInfo resultRowInfo;
int32_t* rowCellInfoOffset; // offset value for each row result cell info
SqlFunctionCtx* pCtx;
SSDataBlock* pRes;
int32_t capacity;
} SOptrBasicInfo;
//TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset
typedef struct SAggSupporter {
SHashObj* pResultRowHashTable; // quick locate the window object for each result
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer
SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
SHashObj* pResultRowHashTable; // quick locate the window object for each result
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer
SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
} SAggSupporter;
typedef struct STableIntervalOperatorInfo {
SOptrBasicInfo binfo;
SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file
SGroupResInfo groupResInfo;
SInterval interval;
STimeWindow win;
int32_t precision;
bool timeWindowInterpo;
char **pRow;
SAggSupporter aggSup;
STableQueryInfo *pCurrent;
int32_t order;
SOptrBasicInfo binfo;
SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file
SGroupResInfo groupResInfo;
SInterval interval;
STimeWindow win;
int32_t precision;
bool timeWindowInterpo;
char **pRow;
SAggSupporter aggSup;
STableQueryInfo *pCurrent;
int32_t order;
} STableIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
SOptrBasicInfo binfo;
SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file
SAggSupporter aggSup;
STableQueryInfo *current;
uint32_t groupId;
SGroupResInfo groupResInfo;
STableQueryInfo *pTableQueryInfo;
SOptrBasicInfo binfo;
SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file
SAggSupporter aggSup;
STableQueryInfo *current;
uint32_t groupId;
SGroupResInfo groupResInfo;
STableQueryInfo *pTableQueryInfo;
} SAggOperatorInfo;
typedef struct SProjectOperatorInfo {
SOptrBasicInfo binfo;
SSDataBlock *existDataBlock;
int32_t threshold;
bool hasVarCol;
SOptrBasicInfo binfo;
SSDataBlock *existDataBlock;
int32_t threshold;
bool hasVarCol;
} SProjectOperatorInfo;
typedef struct SLimitOperatorInfo {
SLimit limit;
int64_t currentOffset;
int64_t currentRows;
SLimit limit;
int64_t currentOffset;
int64_t currentRows;
} SLimitOperatorInfo;
typedef struct SSLimitOperatorInfo {
int64_t groupTotal;
int64_t currentGroupOffset;
int64_t rowsTotal;
int64_t currentOffset;
SLimit limit;
SLimit slimit;
char** prevRow;
SArray* orderColumnList;
bool hasPrev;
bool ignoreCurrentGroup;
bool multigroupResult;
SSDataBlock* pRes; // result buffer
SSDataBlock* pPrevBlock;
int64_t capacity;
int64_t threshold;
int64_t groupTotal;
int64_t currentGroupOffset;
int64_t rowsTotal;
int64_t currentOffset;
SLimit limit;
SLimit slimit;
char** prevRow;
SArray* orderColumnList;
bool hasPrev;
bool ignoreCurrentGroup;
bool multigroupResult;
SSDataBlock* pRes; // result buffer
SSDataBlock* pPrevBlock;
int64_t capacity;
int64_t threshold;
} SSLimitOperatorInfo;
typedef struct SFilterOperatorInfo {
......@@ -586,23 +562,6 @@ typedef struct SStateWindowOperatorInfo {
bool reptScan;
} SStateWindowOperatorInfo;
typedef struct SDistinctDataInfo {
int32_t index;
int32_t type;
int32_t bytes;
} SDistinctDataInfo;
typedef struct SDistinctOperatorInfo {
SHashObj* pSet;
SSDataBlock* pRes;
bool recordNullVal; // has already record the null value, no need to try again
int64_t threshold;
int64_t outputCapacity;
int32_t totalBytes;
char* buf;
SArray* pDistinctDataInfo;
} SDistinctOperatorInfo;
typedef struct SSortedMergeOperatorInfo {
SOptrBasicInfo binfo;
bool hasVarCol;
......@@ -628,24 +587,40 @@ typedef struct SSortedMergeOperatorInfo {
} SSortedMergeOperatorInfo;
typedef struct SOrderOperatorInfo {
uint32_t sortBufSize; // max buffer size for in-memory sort
SSDataBlock *pDataBlock;
bool hasVarCol; // has variable length column, such as binary/varchar/nchar
SArray *orderInfo;
bool nullFirst;
SSortHandle *pSortHandle;
int32_t bufPageSize;
int32_t numOfRowsInRes;
uint32_t sortBufSize; // max buffer size for in-memory sort
SSDataBlock *pDataBlock;
bool hasVarCol; // has variable length column, such as binary/varchar/nchar
SArray *orderInfo;
bool nullFirst;
SSortHandle *pSortHandle;
int32_t bufPageSize;
int32_t numOfRowsInRes;
// TODO extact struct
int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
uint64_t totalSize; // total load bytes from remote
uint64_t totalRows; // total number of rows
uint64_t totalElapsed; // total elapsed time
int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
uint64_t totalSize; // total load bytes from remote
uint64_t totalRows; // total number of rows
uint64_t totalElapsed; // total elapsed time
} SOrderOperatorInfo;
typedef struct SDistinctDataInfo {
int32_t index;
int32_t type;
int32_t bytes;
} SDistinctDataInfo;
typedef struct SDistinctOperatorInfo {
SHashObj* pSet;
SSDataBlock* pRes;
bool recordNullVal; // has already record the null value, no need to try again
int64_t threshold;
int64_t outputCapacity;
int32_t totalBytes;
char* buf;
SArray* pDistinctDataInfo;
} SDistinctOperatorInfo;
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
......@@ -659,7 +634,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const S
int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, int32_t numOfDownstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SInterval* pInterval, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
......@@ -709,9 +685,6 @@ void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters);
int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start,
int32_t prevResultLen, void* merger);
int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId);
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters);
......
......@@ -7128,15 +7128,19 @@ static void cleanupAggSup(SAggSupporter* pAggSup) {
destroyResultRowPool(pAggSup->pool);
}
static int32_t initAggInfo(SAggOperatorInfo* pInfo, SExprInfo* pExprInfo, int32_t numOfCols, int32_t numOfRows, SSDataBlock* pResultBlock, const STableGroupInfo* pTableGroupInfo) {
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = pResultBlock;
pInfo->binfo.capacity = numOfRows;
doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols);
pInfo->pTableQueryInfo = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
if (pInfo->pTableQueryInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
static int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
int32_t numOfRows, SSDataBlock* pResultBlock) {
pBasicInfo->pCtx = createSqlFunctionCtx_rv(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset);
pBasicInfo->pRes = pResultBlock;
pBasicInfo->capacity = numOfRows;
doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols);
}
static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInfo) {
STableQueryInfo* pTableQueryInfo = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
if (pTableQueryInfo == NULL) {
return NULL;
}
int32_t index = 0;
......@@ -7145,7 +7149,7 @@ static int32_t initAggInfo(SAggOperatorInfo* pInfo, SExprInfo* pExprInfo, int32_
for(int32_t j = 0; j < taosArrayGetSize(pa); ++j) {
STableKeyInfo* pk = taosArrayGet(pa, j);
STableQueryInfo* pTQueryInfo = &pInfo->pTableQueryInfo[index++];
STableQueryInfo* pTQueryInfo = &pTableQueryInfo[index++];
pTQueryInfo->uid = pk->uid;
pTQueryInfo->lastKey = pk->lastKey;
pTQueryInfo->groupIndex = i;
......@@ -7153,9 +7157,8 @@ static int32_t initAggInfo(SAggOperatorInfo* pInfo, SExprInfo* pExprInfo, int32_
}
STimeWindow win = {0, INT64_MAX};
createTableQueryInfo(pInfo->pTableQueryInfo, false, win);
return TSDB_CODE_SUCCESS;
createTableQueryInfo(pTableQueryInfo, false, win);
return pTableQueryInfo;
}
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock,
......@@ -7168,8 +7171,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
//(int32_t)(getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
int32_t numOfRows = 1;
int32_t code = initAggInfo(pInfo, pExprInfo, numOfCols, numOfRows, pResultBlock, pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS) {
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResultBlock);
pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) {
goto _error;
}
......@@ -7308,7 +7312,11 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprI
SAggOperatorInfo* pInfo = calloc(1, sizeof(SAggOperatorInfo));
int32_t numOfRows = 1;
initAggInfo(pInfo, pExprInfo, numOfCols, numOfRows, pResBlock, pTableGroupInfo);
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock);
pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) {
goto _error;
}
size_t tableGroup = taosArrayGetSize(pTableGroupInfo->pGroupList);
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)tableGroup);
......@@ -7325,9 +7333,15 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprI
pOperator->getNextFn = doMultiTableAggregate;
pOperator->closeFn = destroyAggOperatorInfo;
int32_t code = appendDownstream(pOperator, &downstream, 1);
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator;
_error:
return NULL;
}
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) {
......@@ -7433,24 +7447,35 @@ SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, int32_t numOfD
return NULL;
}
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SInterval* pInterval, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval,
const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->order = TSDB_ORDER_ASC;
pInfo->precision = TSDB_TIME_PRECISION_MICRO;
pInfo->win = pTaskInfo->window;
pInfo->interval = *pInterval;
int32_t code = doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols);
int32_t numOfRows = 1;
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock);
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
if (code != TSDB_CODE_SUCCESS/* || pInfo->pTableQueryInfo == NULL*/) {
goto _error;
}
code = createDiskbasedBuf(&pInfo->pResultBuf, 4096, 4096 * 256, pTaskInfo->id.str, "/tmp/");
pInfo->binfo.pCtx = createSqlFunctionCtx_rv(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initResultRowInfo(&pInfo->binfo.resultRowInfo, (int32_t)1);
pOperator->name = "TimeIntervalAggOperator";
// pOperator->operatorType = OP_TimeWindow;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INTERVAL;
pOperator->blockingOptr = true;
pOperator->status = OP_NOT_OPENED;
pOperator->pExpr = pExprInfo;
......@@ -7463,6 +7488,10 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pOperator->closeFn = destroyIntervalOperatorInfo;
code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
return pOperator;
_error:
......@@ -7686,15 +7715,13 @@ SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInf
SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) {
SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo));
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr);
pInfo->slimit = pQueryAttr->slimit;
pInfo->limit = pQueryAttr->limit;
pInfo->capacity = pRuntimeEnv->resultInfo.capacity;
pInfo->threshold = (int64_t)(pInfo->capacity * 0.8);
pInfo->currentOffset = pQueryAttr->limit.offset;
pInfo->currentGroupOffset = pQueryAttr->slimit.offset;
// pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr);
// pInfo->slimit = pQueryAttr->slimit;
// pInfo->limit = pQueryAttr->limit;
// pInfo->capacity = pRuntimeEnv->resultInfo.capacity;
// pInfo->threshold = (int64_t)(pInfo->capacity * 0.8);
// pInfo->currentOffset = pQueryAttr->limit.offset;
// pInfo->currentGroupOffset = pQueryAttr->slimit.offset;
pInfo->multigroupResult= multigroupResult;
// TODO refactor
......@@ -8269,6 +8296,19 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createAggregateOperatorInfo(op, pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo);
}
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == nodeType(pPhyNode)) {
size_t size = LIST_LENGTH(pPhyNode->pChildren);
assert(size == 1);
for (int32_t i = 0; i < size; ++i) {
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(((SIntervalPhysiNode*)pPhyNode)->pFuncs, &num);
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, NULL, pTableGroupInfo, pTaskInfo);
}
} /*else if (pPhyNode->info.type == OP_MultiTableAggregate) {
size_t size = taosArrayGetSize(pPhyNode->pChildren);
assert(size == 1);
......
......@@ -580,6 +580,8 @@ static const char* jkIntervalPhysiPlanFuncs = "Funcs";
static const char* jkIntervalPhysiPlanInterval = "Interval";
static const char* jkIntervalPhysiPlanOffset = "Offset";
static const char* jkIntervalPhysiPlanSliding = "Sliding";
static const char* jkIntervalPhysiPlanIntervalUnit = "intervalUnit";
static const char* jkIntervalPhysiPlanSlidingUnit = "slidingUnit";
static const char* jkIntervalPhysiPlanFill = "Fill";
static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) {
......@@ -601,6 +603,12 @@ static int32_t physiIntervalNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanSliding, pNode->sliding);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanIntervalUnit, pNode->intervalUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkIntervalPhysiPlanSlidingUnit, pNode->slidingUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkIntervalPhysiPlanFill, nodeToJson, pNode->pFill);
}
......@@ -627,6 +635,12 @@ static int32_t jsonToPhysiIntervalNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkIntervalPhysiPlanSliding, &pNode->sliding);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkIntervalPhysiPlanIntervalUnit, &pNode->intervalUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkIntervalPhysiPlanSlidingUnit, &pNode->slidingUnit);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkIntervalPhysiPlanFill, (SNode**)&pNode->pFill);
}
......@@ -1644,7 +1658,10 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToSubplan(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN:
return jsonToPlan(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
return jsonToPhysiIntervalNode(pJson, pObj);
default:
assert(0);
break;
}
nodesWarn("jsonToSpecificNode unknown node = %s", nodesNodeName(nodeType(pObj)));
......
......@@ -480,6 +480,9 @@ static SPhysiNode* createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* p
pInterval->interval = pWindowLogicNode->interval;
pInterval->offset = pWindowLogicNode->offset;
pInterval->sliding = pWindowLogicNode->sliding;
pInterval->intervalUnit = pWindowLogicNode->intervalUnit;
pInterval->slidingUnit = pWindowLogicNode->slidingUnit;
pInterval->pFill = nodesCloneNode(pWindowLogicNode->pFill);
SNodeList* pPrecalcExprs = NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册