From 9e8058dd5dd67c28dfe9eeb71376c05ed17ca200 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Mon, 20 Jun 2022 12:00:57 +0800 Subject: [PATCH] feat: sql function 'interp' --- include/libs/nodes/plannodes.h | 6 ++ include/util/taoserror.h | 1 + source/libs/nodes/src/nodesCloneFuncs.c | 3 + source/libs/nodes/src/nodesCodeFuncs.c | 21 ++++ source/libs/parser/src/parAstCreater.c | 15 +-- source/libs/parser/src/parTranslater.c | 113 ++++++++++++++++----- source/libs/parser/src/parUtil.c | 2 + source/libs/parser/test/parSelectTest.cpp | 2 - source/libs/planner/src/planLogicCreater.c | 13 ++- source/libs/planner/src/planPhysiCreater.c | 6 ++ 10 files changed, 147 insertions(+), 35 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 38a179869f..b07e8f39d5 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -106,6 +106,9 @@ typedef struct SInterpFuncLogicNode { SNodeList* pFuncs; STimeWindow timeRange; int64_t interval; + EFillMode fillMode; + SNode* pFillValues; // SNodeListNode + SNode* pTimeSeries; // SColumnNode } SInterpFuncLogicNode; typedef enum EModifyTableType { MODIFY_TABLE_TYPE_INSERT = 1, MODIFY_TABLE_TYPE_DELETE } EModifyTableType; @@ -309,6 +312,9 @@ typedef struct SInterpFuncPhysiNode { SNodeList* pFuncs; STimeWindow timeRange; int64_t interval; + EFillMode fillMode; + SNode* pFillValues; // SNodeListNode + SNode* pTimeSeries; // SColumnNode } SInterpFuncPhysiNode; typedef struct SJoinPhysiNode { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index fde016375b..9fb9bcebf7 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -564,6 +564,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265A) #define TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265B) #define TSDB_CODE_PAR_INVALID_TABLE_OPTION TAOS_DEF_ERROR_CODE(0, 0x265C) +#define TSDB_CODE_PAR_INVALID_INTERP_CLAUSE TAOS_DEF_ERROR_CODE(0, 0x265D) //planner #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index b12f910cb1..5eaca60ea5 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -464,6 +464,9 @@ static SNode* logicInterpFuncCopy(const SInterpFuncLogicNode* pSrc, SInterpFuncL CLONE_NODE_LIST_FIELD(pFuncs); COPY_OBJECT_FIELD(timeRange, sizeof(STimeWindow)); COPY_SCALAR_FIELD(interval); + COPY_SCALAR_FIELD(fillMode); + CLONE_NODE_FIELD(pFillValues); + CLONE_NODE_FIELD(pTimeSeries); return (SNode*)pDst; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 57d1ee7857..bde5159087 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2133,6 +2133,9 @@ static const char* jkInterpFuncPhysiPlanFuncs = "Funcs"; static const char* jkInterpFuncPhysiPlanStartTime = "StartTime"; static const char* jkInterpFuncPhysiPlanEndTime = "EndTime"; static const char* jkInterpFuncPhysiPlanInterval = "Interval"; +static const char* jkInterpFuncPhysiPlanFillMode = "FillMode"; +static const char* jkInterpFuncPhysiPlanFillValues = "FillValues"; +static const char* jkInterpFuncPhysiPlanTimeSeries = "TimeSeries"; static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) { const SInterpFuncPhysiNode* pNode = (const SInterpFuncPhysiNode*)pObj; @@ -2153,6 +2156,15 @@ static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanInterval, pNode->interval); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanFillMode, pNode->fillMode); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanFillValues, nodeToJson, pNode->pFillValues); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanTimeSeries, nodeToJson, pNode->pTimeSeries); + } return code; } @@ -2176,6 +2188,15 @@ static int32_t jsonToPhysiInterpFuncNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBigIntValue(pJson, jkInterpFuncPhysiPlanInterval, &pNode->interval); } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkInterpFuncPhysiPlanFillMode, pNode->fillMode, code); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkInterpFuncPhysiPlanFillValues, &pNode->pFillValues); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkInterpFuncPhysiPlanTimeSeries, &pNode->pTimeSeries); + } return code; } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 6b3fdad27d..ccb0d37da6 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -700,8 +700,11 @@ SNode* addEveryClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pEvery) { SNode* addFillClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pFill) { CHECK_PARSER_STATUS(pCxt); - if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) { - ((SSelectStmt*)pStmt)->pFill = pFill; + if (QUERY_NODE_SELECT_STMT == nodeType(pStmt) && NULL != pFill) { + SFillNode* pFillClause = (SFillNode*)pFill; + nodesDestroyNode(pFillClause->pWStartTs); + pFillClause->pWStartTs = createPrimaryKeyCol(pCxt); + ((SSelectStmt*)pStmt)->pFill = (SNode*)pFillClause; } return pStmt; } @@ -909,7 +912,7 @@ SNode* createDefaultTableOptions(SAstCreateContext* pCxt) { pOptions->watermark1 = TSDB_DEFAULT_ROLLUP_WATERMARK; pOptions->watermark2 = TSDB_DEFAULT_ROLLUP_WATERMARK; pOptions->ttl = TSDB_DEFAULT_TABLE_TTL; - pOptions->commentNull = true; // mark null + pOptions->commentNull = true; // mark null return (SNode*)pOptions; } @@ -918,7 +921,7 @@ SNode* createAlterTableOptions(SAstCreateContext* pCxt) { STableOptions* pOptions = (STableOptions*)nodesMakeNode(QUERY_NODE_TABLE_OPTIONS); CHECK_OUT_OF_MEM(pOptions); pOptions->ttl = -1; - pOptions->commentNull = true; // mark null + pOptions->commentNull = true; // mark null return (SNode*)pOptions; } @@ -940,9 +943,9 @@ SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType case TABLE_OPTION_ROLLUP: ((STableOptions*)pOptions)->pRollupFuncs = pVal; break; - case TABLE_OPTION_TTL:{ + case TABLE_OPTION_TTL: { int64_t ttl = taosStr2Int64(((SToken*)pVal)->z, NULL, 10); - if (ttl > INT32_MAX){ + if (ttl > INT32_MAX) { ttl = INT32_MAX; } // ttl can not be smaller than 0, because there is a limitation in sql.y (TTL NK_INTEGER) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 77ef9027cd..a60dba3a9a 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1932,26 +1932,33 @@ static int32_t getFillTimeRange(STranslateContext* pCxt, SNode* pWhere, STimeWin return code; } -static int32_t checkFill(STranslateContext* pCxt, SIntervalWindowNode* pInterval) { - SFillNode* pFill = (SFillNode*)pInterval->pFill; +static int32_t checkFill(STranslateContext* pCxt, SFillNode* pFill, SValueNode* pInterval) { + if (FILL_MODE_NONE == pFill->mode) { + return TSDB_CODE_SUCCESS; + } + if (TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_INITIALIZER) || TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_DESC_INITIALIZER)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE); } - int64_t timeRange = TABS(pFill->timeRange.skey - pFill->timeRange.ekey); - int64_t intervalRange = 0; - SValueNode* pInter = (SValueNode*)pInterval->pInterval; - if (TIME_IS_VAR_DURATION(pInter->unit)) { + // interp FILL clause + if (NULL == pInterval) { + return TSDB_CODE_SUCCESS; + } + + int64_t timeRange = TABS(pFill->timeRange.skey - pFill->timeRange.ekey); + int64_t intervalRange = 0; + if (TIME_IS_VAR_DURATION(pInterval->unit)) { int64_t f = 1; - if (pInter->unit == 'n') { + if (pInterval->unit == 'n') { f = 30L * MILLISECOND_PER_DAY; - } else if (pInter->unit == 'y') { + } else if (pInterval->unit == 'y') { f = 365L * MILLISECOND_PER_DAY; } - intervalRange = pInter->datum.i * f; + intervalRange = pInterval->datum.i * f; } else { - intervalRange = pInter->datum.i; + intervalRange = pInterval->datum.i; } if ((timeRange == 0) || (timeRange / intervalRange) >= MAX_INTERVAL_TIME_WINDOW) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE); @@ -1967,7 +1974,7 @@ static int32_t translateFill(STranslateContext* pCxt, SNode* pWhere, SIntervalWi int32_t code = getFillTimeRange(pCxt, pWhere, &(((SFillNode*)pInterval->pFill)->timeRange)); if (TSDB_CODE_SUCCESS == code) { - code = checkFill(pCxt, pInterval); + code = checkFill(pCxt, (SFillNode*)pInterval->pFill, (SValueNode*)pInterval->pInterval); } return code; } @@ -2109,6 +2116,64 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { return code; } +static int32_t createDefaultFillNode(STranslateContext* pCxt, SNode** pOutput) { + SFillNode* pFill = (SFillNode*)nodesMakeNode(QUERY_NODE_FILL); + if (NULL == pFill) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pFill->mode = FILL_MODE_NONE; + + SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == pCol) { + nodesDestroyNode((SNode*)pFill); + return TSDB_CODE_OUT_OF_MEMORY; + } + pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID; + strcpy(pCol->colName, PK_TS_COL_INTERNAL_NAME); + pFill->pWStartTs = (SNode*)pCol; + + *pOutput = (SNode*)pFill; + return TSDB_CODE_SUCCESS; +} + +static int32_t translateInterpFill(STranslateContext* pCxt, SSelectStmt* pSelect) { + int32_t code = TSDB_CODE_SUCCESS; + + if (NULL == pSelect->pFill) { + code = createDefaultFillNode(pCxt, &pSelect->pFill); + } + if (TSDB_CODE_SUCCESS == code) { + code = translateExpr(pCxt, &pSelect->pFill); + } + if (TSDB_CODE_SUCCESS == code) { + code = getFillTimeRange(pCxt, pSelect->pRange, &(((SFillNode*)pSelect->pFill)->timeRange)); + } + if (TSDB_CODE_SUCCESS == code) { + code = checkFill(pCxt, (SFillNode*)pSelect->pFill, (SValueNode*)pSelect->pEvery); + } + + return code; +} + +static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) { + if (!pSelect->hasInterpFunc) { + if (NULL != pSelect->pRange || NULL != pSelect->pEvery || NULL != pSelect->pFill) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE); + } + return TSDB_CODE_SUCCESS; + } + + int32_t code = translateExpr(pCxt, &pSelect->pRange); + if (TSDB_CODE_SUCCESS == code) { + code = translateExpr(pCxt, &pSelect->pEvery); + } + if (TSDB_CODE_SUCCESS == code) { + code = translateInterpFill(pCxt, pSelect); + } + return code; +} + static int32_t translatePartitionBy(STranslateContext* pCxt, SNodeList* pPartitionByList) { pCxt->currClause = SQL_CLAUSE_PARTITION_BY; return translateExprList(pCxt, pPartitionByList); @@ -2378,6 +2443,9 @@ static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) { if (TSDB_CODE_SUCCESS == code) { code = checkLimit(pCxt, pSelect); } + if (TSDB_CODE_SUCCESS == code) { + code = translateInterp(pCxt, pSelect); + } if (TSDB_CODE_SUCCESS == code) { code = rewriteUniqueStmt(pCxt, pSelect); } @@ -3388,18 +3456,18 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm pReq->delay2 = pStmt->pOptions->maxDelay2; pReq->watermark1 = pStmt->pOptions->watermark1; pReq->watermark2 = pStmt->pOptions->watermark2; -// pReq->ttl = pStmt->pOptions->ttl; + // pReq->ttl = pStmt->pOptions->ttl; columnDefNodeToField(pStmt->pCols, &pReq->pColumns); columnDefNodeToField(pStmt->pTags, &pReq->pTags); pReq->numOfColumns = LIST_LENGTH(pStmt->pCols); pReq->numOfTags = LIST_LENGTH(pStmt->pTags); - if(pStmt->pOptions->commentNull == false){ + if (pStmt->pOptions->commentNull == false) { pReq->comment = strdup(pStmt->pOptions->comment); if (NULL == pReq->comment) { return TSDB_CODE_OUT_OF_MEMORY; } pReq->commentLen = strlen(pStmt->pOptions->comment); - }else{ + } else { pReq->commentLen = -1; } @@ -3452,14 +3520,14 @@ static int32_t buildAlterSuperTableReq(STranslateContext* pCxt, SAlterTableStmt* pAlterReq->alterType = pStmt->alterType; if (TSDB_ALTER_TABLE_UPDATE_OPTIONS == pStmt->alterType) { -// pAlterReq->ttl = pStmt->pOptions->ttl; + // pAlterReq->ttl = pStmt->pOptions->ttl; if (pStmt->pOptions->commentNull == false) { pAlterReq->comment = strdup(pStmt->pOptions->comment); if (NULL == pAlterReq->comment) { return TSDB_CODE_OUT_OF_MEMORY; } pAlterReq->commentLen = strlen(pStmt->pOptions->comment); - }else{ + } else { pAlterReq->commentLen = -1; } @@ -4725,7 +4793,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* return TSDB_CODE_OUT_OF_MEMORY; } req.commentLen = strlen(pStmt->pOptions->comment); - }else{ + } else { req.commentLen = -1; } req.ntb.schemaRow.nCols = LIST_LENGTH(pStmt->pCols); @@ -4878,11 +4946,11 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S struct SVCreateTbReq req = {0}; req.type = TD_CHILD_TABLE; req.name = strdup(pStmt->tableName); - req.ttl = pStmt->pOptions->ttl; + req.ttl = pStmt->pOptions->ttl; if (pStmt->pOptions->commentNull == false) { req.comment = strdup(pStmt->pOptions->comment); req.commentLen = strlen(pStmt->pOptions->comment); - } else{ + } else { req.commentLen = -1; } req.ctb.suid = suid; @@ -5460,15 +5528,14 @@ static int32_t buildUpdateOptionsReq(STranslateContext* pCxt, SAlterTableStmt* p pReq->newTTL = pStmt->pOptions->ttl; } - if (TSDB_CODE_SUCCESS == code){ - if(pStmt->pOptions->commentNull == false) { + if (TSDB_CODE_SUCCESS == code) { + if (pStmt->pOptions->commentNull == false) { pReq->newComment = strdup(pStmt->pOptions->comment); if (NULL == pReq->newComment) { code = TSDB_CODE_OUT_OF_MEMORY; } pReq->newCommentLen = strlen(pReq->newComment); - } - else{ + } else { pReq->newCommentLen = -1; } } diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 95cdf58d5a..d94b430c45 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -196,6 +196,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "%s function does not supportted in group query"; case TSDB_CODE_PAR_INVALID_TABLE_OPTION: return "Invalid option %s"; + case TSDB_CODE_PAR_INVALID_INTERP_CLAUSE: + return "Invalid usage of RANGE clause, EVERY clause or FILL clause"; case TSDB_CODE_OUT_OF_MEMORY: return "Out of memory"; default: diff --git a/source/libs/parser/test/parSelectTest.cpp b/source/libs/parser/test/parSelectTest.cpp index 693c422ae0..11e09cd83d 100644 --- a/source/libs/parser/test/parSelectTest.cpp +++ b/source/libs/parser/test/parSelectTest.cpp @@ -267,8 +267,6 @@ TEST_F(ParserSelectTest, interp) { run("SELECT INTERP(c1) FROM t1 EVERY(5s)"); - run("SELECT INTERP(c1) FROM t1 EVERY(5s) FILL(LINEAR)"); - run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s)"); run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s) FILL(LINEAR)"); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index b365afff02..c7490faa0c 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -508,10 +508,15 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p code = rewriteExprsForSelect(pInterpFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT); } - if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pRange) { - // SRangeNode* pRange = (SRangeNode*)pSelect->pRange; - // pInterpFunc->timeRange.skey = ((SValueNode*)pRange->pStart)->datum.i; - // pInterpFunc->timeRange.ekey = ((SValueNode*)pRange->pEnd)->datum.i; + if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pFill) { + SFillNode* pFill = (SFillNode*)pSelect->pFill; + pInterpFunc->timeRange = pFill->timeRange; + pInterpFunc->fillMode = pFill->mode; + pInterpFunc->pTimeSeries = nodesCloneNode(pFill->pWStartTs); + pInterpFunc->pFillValues = nodesCloneNode(pFill->pValues); + if (NULL == pInterpFunc->pTimeSeries || (NULL != pFill->pValues && NULL == pInterpFunc->pFillValues)) { + code = TSDB_CODE_OUT_OF_MEMORY; + } } if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pEvery) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 70a8aca8bd..6af5a15147 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -877,6 +877,12 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh if (TSDB_CODE_SUCCESS == code) { pInterpFunc->timeRange = pFuncLogicNode->timeRange; pInterpFunc->interval = pFuncLogicNode->interval; + pInterpFunc->fillMode = pFuncLogicNode->fillMode; + pInterpFunc->pFillValues = nodesCloneNode(pFuncLogicNode->pFillValues); + pInterpFunc->pTimeSeries = nodesCloneNode(pFuncLogicNode->pTimeSeries); + if (NULL == pInterpFunc->pTimeSeries || (NULL != pFuncLogicNode->pFillValues && NULL == pInterpFunc->pFillValues)) { + code = TSDB_CODE_OUT_OF_MEMORY; + } } if (TSDB_CODE_SUCCESS == code) { -- GitLab