diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 38a179869f603aaaedc71a3b3f0108075ddfcde4..b07e8f39d5bcae86ebd32208613f98c7e2bff497 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -106,6 +106,9 @@ typedef struct SInterpFuncLogicNode { SNodeList* pFuncs; STimeWindow timeRange; int64_t interval; + EFillMode fillMode; + SNode* pFillValues; // SNodeListNode + SNode* pTimeSeries; // SColumnNode } SInterpFuncLogicNode; typedef enum EModifyTableType { MODIFY_TABLE_TYPE_INSERT = 1, MODIFY_TABLE_TYPE_DELETE } EModifyTableType; @@ -309,6 +312,9 @@ typedef struct SInterpFuncPhysiNode { SNodeList* pFuncs; STimeWindow timeRange; int64_t interval; + EFillMode fillMode; + SNode* pFillValues; // SNodeListNode + SNode* pTimeSeries; // SColumnNode } SInterpFuncPhysiNode; typedef struct SJoinPhysiNode { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index fde016375be764a286c96828d1f95e35ecc333d8..9fb9bcebf7b8ae731b616bc5b151455e248230e7 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -564,6 +564,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265A) #define TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265B) #define TSDB_CODE_PAR_INVALID_TABLE_OPTION TAOS_DEF_ERROR_CODE(0, 0x265C) +#define TSDB_CODE_PAR_INVALID_INTERP_CLAUSE TAOS_DEF_ERROR_CODE(0, 0x265D) //planner #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index b12f910cb1273f7d517ff100981164b03898e5f6..5eaca60ea5d8c9f9439a7b9139789622ecad7dbf 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -464,6 +464,9 @@ static SNode* logicInterpFuncCopy(const SInterpFuncLogicNode* pSrc, SInterpFuncL CLONE_NODE_LIST_FIELD(pFuncs); COPY_OBJECT_FIELD(timeRange, sizeof(STimeWindow)); COPY_SCALAR_FIELD(interval); + COPY_SCALAR_FIELD(fillMode); + CLONE_NODE_FIELD(pFillValues); + CLONE_NODE_FIELD(pTimeSeries); return (SNode*)pDst; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 57d1ee78573eee8807d1e33bdd1f3f1f42e06d54..bde5159087634a2ad06b6b4a1e38ff06f7a32948 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2133,6 +2133,9 @@ static const char* jkInterpFuncPhysiPlanFuncs = "Funcs"; static const char* jkInterpFuncPhysiPlanStartTime = "StartTime"; static const char* jkInterpFuncPhysiPlanEndTime = "EndTime"; static const char* jkInterpFuncPhysiPlanInterval = "Interval"; +static const char* jkInterpFuncPhysiPlanFillMode = "FillMode"; +static const char* jkInterpFuncPhysiPlanFillValues = "FillValues"; +static const char* jkInterpFuncPhysiPlanTimeSeries = "TimeSeries"; static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) { const SInterpFuncPhysiNode* pNode = (const SInterpFuncPhysiNode*)pObj; @@ -2153,6 +2156,15 @@ static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanInterval, pNode->interval); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanFillMode, pNode->fillMode); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanFillValues, nodeToJson, pNode->pFillValues); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanTimeSeries, nodeToJson, pNode->pTimeSeries); + } return code; } @@ -2176,6 +2188,15 @@ static int32_t jsonToPhysiInterpFuncNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBigIntValue(pJson, jkInterpFuncPhysiPlanInterval, &pNode->interval); } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkInterpFuncPhysiPlanFillMode, pNode->fillMode, code); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkInterpFuncPhysiPlanFillValues, &pNode->pFillValues); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkInterpFuncPhysiPlanTimeSeries, &pNode->pTimeSeries); + } return code; } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 6b3fdad27d38eba5046eb50d889a13c1bd0f106b..ccb0d37da65a6be158dfb0295815151bb3ce36f1 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -700,8 +700,11 @@ SNode* addEveryClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pEvery) { SNode* addFillClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pFill) { CHECK_PARSER_STATUS(pCxt); - if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) { - ((SSelectStmt*)pStmt)->pFill = pFill; + if (QUERY_NODE_SELECT_STMT == nodeType(pStmt) && NULL != pFill) { + SFillNode* pFillClause = (SFillNode*)pFill; + nodesDestroyNode(pFillClause->pWStartTs); + pFillClause->pWStartTs = createPrimaryKeyCol(pCxt); + ((SSelectStmt*)pStmt)->pFill = (SNode*)pFillClause; } return pStmt; } @@ -909,7 +912,7 @@ SNode* createDefaultTableOptions(SAstCreateContext* pCxt) { pOptions->watermark1 = TSDB_DEFAULT_ROLLUP_WATERMARK; pOptions->watermark2 = TSDB_DEFAULT_ROLLUP_WATERMARK; pOptions->ttl = TSDB_DEFAULT_TABLE_TTL; - pOptions->commentNull = true; // mark null + pOptions->commentNull = true; // mark null return (SNode*)pOptions; } @@ -918,7 +921,7 @@ SNode* createAlterTableOptions(SAstCreateContext* pCxt) { STableOptions* pOptions = (STableOptions*)nodesMakeNode(QUERY_NODE_TABLE_OPTIONS); CHECK_OUT_OF_MEM(pOptions); pOptions->ttl = -1; - pOptions->commentNull = true; // mark null + pOptions->commentNull = true; // mark null return (SNode*)pOptions; } @@ -940,9 +943,9 @@ SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType case TABLE_OPTION_ROLLUP: ((STableOptions*)pOptions)->pRollupFuncs = pVal; break; - case TABLE_OPTION_TTL:{ + case TABLE_OPTION_TTL: { int64_t ttl = taosStr2Int64(((SToken*)pVal)->z, NULL, 10); - if (ttl > INT32_MAX){ + if (ttl > INT32_MAX) { ttl = INT32_MAX; } // ttl can not be smaller than 0, because there is a limitation in sql.y (TTL NK_INTEGER) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 77ef9027cdc6f49651653eb3744d6091dbcdaea7..a60dba3a9a46a81a0a20dd98397dc83593e59468 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1932,26 +1932,33 @@ static int32_t getFillTimeRange(STranslateContext* pCxt, SNode* pWhere, STimeWin return code; } -static int32_t checkFill(STranslateContext* pCxt, SIntervalWindowNode* pInterval) { - SFillNode* pFill = (SFillNode*)pInterval->pFill; +static int32_t checkFill(STranslateContext* pCxt, SFillNode* pFill, SValueNode* pInterval) { + if (FILL_MODE_NONE == pFill->mode) { + return TSDB_CODE_SUCCESS; + } + if (TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_INITIALIZER) || TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_DESC_INITIALIZER)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE); } - int64_t timeRange = TABS(pFill->timeRange.skey - pFill->timeRange.ekey); - int64_t intervalRange = 0; - SValueNode* pInter = (SValueNode*)pInterval->pInterval; - if (TIME_IS_VAR_DURATION(pInter->unit)) { + // interp FILL clause + if (NULL == pInterval) { + return TSDB_CODE_SUCCESS; + } + + int64_t timeRange = TABS(pFill->timeRange.skey - pFill->timeRange.ekey); + int64_t intervalRange = 0; + if (TIME_IS_VAR_DURATION(pInterval->unit)) { int64_t f = 1; - if (pInter->unit == 'n') { + if (pInterval->unit == 'n') { f = 30L * MILLISECOND_PER_DAY; - } else if (pInter->unit == 'y') { + } else if (pInterval->unit == 'y') { f = 365L * MILLISECOND_PER_DAY; } - intervalRange = pInter->datum.i * f; + intervalRange = pInterval->datum.i * f; } else { - intervalRange = pInter->datum.i; + intervalRange = pInterval->datum.i; } if ((timeRange == 0) || (timeRange / intervalRange) >= MAX_INTERVAL_TIME_WINDOW) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE); @@ -1967,7 +1974,7 @@ static int32_t translateFill(STranslateContext* pCxt, SNode* pWhere, SIntervalWi int32_t code = getFillTimeRange(pCxt, pWhere, &(((SFillNode*)pInterval->pFill)->timeRange)); if (TSDB_CODE_SUCCESS == code) { - code = checkFill(pCxt, pInterval); + code = checkFill(pCxt, (SFillNode*)pInterval->pFill, (SValueNode*)pInterval->pInterval); } return code; } @@ -2109,6 +2116,64 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { return code; } +static int32_t createDefaultFillNode(STranslateContext* pCxt, SNode** pOutput) { + SFillNode* pFill = (SFillNode*)nodesMakeNode(QUERY_NODE_FILL); + if (NULL == pFill) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pFill->mode = FILL_MODE_NONE; + + SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == pCol) { + nodesDestroyNode((SNode*)pFill); + return TSDB_CODE_OUT_OF_MEMORY; + } + pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID; + strcpy(pCol->colName, PK_TS_COL_INTERNAL_NAME); + pFill->pWStartTs = (SNode*)pCol; + + *pOutput = (SNode*)pFill; + return TSDB_CODE_SUCCESS; +} + +static int32_t translateInterpFill(STranslateContext* pCxt, SSelectStmt* pSelect) { + int32_t code = TSDB_CODE_SUCCESS; + + if (NULL == pSelect->pFill) { + code = createDefaultFillNode(pCxt, &pSelect->pFill); + } + if (TSDB_CODE_SUCCESS == code) { + code = translateExpr(pCxt, &pSelect->pFill); + } + if (TSDB_CODE_SUCCESS == code) { + code = getFillTimeRange(pCxt, pSelect->pRange, &(((SFillNode*)pSelect->pFill)->timeRange)); + } + if (TSDB_CODE_SUCCESS == code) { + code = checkFill(pCxt, (SFillNode*)pSelect->pFill, (SValueNode*)pSelect->pEvery); + } + + return code; +} + +static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) { + if (!pSelect->hasInterpFunc) { + if (NULL != pSelect->pRange || NULL != pSelect->pEvery || NULL != pSelect->pFill) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE); + } + return TSDB_CODE_SUCCESS; + } + + int32_t code = translateExpr(pCxt, &pSelect->pRange); + if (TSDB_CODE_SUCCESS == code) { + code = translateExpr(pCxt, &pSelect->pEvery); + } + if (TSDB_CODE_SUCCESS == code) { + code = translateInterpFill(pCxt, pSelect); + } + return code; +} + static int32_t translatePartitionBy(STranslateContext* pCxt, SNodeList* pPartitionByList) { pCxt->currClause = SQL_CLAUSE_PARTITION_BY; return translateExprList(pCxt, pPartitionByList); @@ -2378,6 +2443,9 @@ static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) { if (TSDB_CODE_SUCCESS == code) { code = checkLimit(pCxt, pSelect); } + if (TSDB_CODE_SUCCESS == code) { + code = translateInterp(pCxt, pSelect); + } if (TSDB_CODE_SUCCESS == code) { code = rewriteUniqueStmt(pCxt, pSelect); } @@ -3388,18 +3456,18 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm pReq->delay2 = pStmt->pOptions->maxDelay2; pReq->watermark1 = pStmt->pOptions->watermark1; pReq->watermark2 = pStmt->pOptions->watermark2; -// pReq->ttl = pStmt->pOptions->ttl; + // pReq->ttl = pStmt->pOptions->ttl; columnDefNodeToField(pStmt->pCols, &pReq->pColumns); columnDefNodeToField(pStmt->pTags, &pReq->pTags); pReq->numOfColumns = LIST_LENGTH(pStmt->pCols); pReq->numOfTags = LIST_LENGTH(pStmt->pTags); - if(pStmt->pOptions->commentNull == false){ + if (pStmt->pOptions->commentNull == false) { pReq->comment = strdup(pStmt->pOptions->comment); if (NULL == pReq->comment) { return TSDB_CODE_OUT_OF_MEMORY; } pReq->commentLen = strlen(pStmt->pOptions->comment); - }else{ + } else { pReq->commentLen = -1; } @@ -3452,14 +3520,14 @@ static int32_t buildAlterSuperTableReq(STranslateContext* pCxt, SAlterTableStmt* pAlterReq->alterType = pStmt->alterType; if (TSDB_ALTER_TABLE_UPDATE_OPTIONS == pStmt->alterType) { -// pAlterReq->ttl = pStmt->pOptions->ttl; + // pAlterReq->ttl = pStmt->pOptions->ttl; if (pStmt->pOptions->commentNull == false) { pAlterReq->comment = strdup(pStmt->pOptions->comment); if (NULL == pAlterReq->comment) { return TSDB_CODE_OUT_OF_MEMORY; } pAlterReq->commentLen = strlen(pStmt->pOptions->comment); - }else{ + } else { pAlterReq->commentLen = -1; } @@ -4725,7 +4793,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* return TSDB_CODE_OUT_OF_MEMORY; } req.commentLen = strlen(pStmt->pOptions->comment); - }else{ + } else { req.commentLen = -1; } req.ntb.schemaRow.nCols = LIST_LENGTH(pStmt->pCols); @@ -4878,11 +4946,11 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S struct SVCreateTbReq req = {0}; req.type = TD_CHILD_TABLE; req.name = strdup(pStmt->tableName); - req.ttl = pStmt->pOptions->ttl; + req.ttl = pStmt->pOptions->ttl; if (pStmt->pOptions->commentNull == false) { req.comment = strdup(pStmt->pOptions->comment); req.commentLen = strlen(pStmt->pOptions->comment); - } else{ + } else { req.commentLen = -1; } req.ctb.suid = suid; @@ -5460,15 +5528,14 @@ static int32_t buildUpdateOptionsReq(STranslateContext* pCxt, SAlterTableStmt* p pReq->newTTL = pStmt->pOptions->ttl; } - if (TSDB_CODE_SUCCESS == code){ - if(pStmt->pOptions->commentNull == false) { + if (TSDB_CODE_SUCCESS == code) { + if (pStmt->pOptions->commentNull == false) { pReq->newComment = strdup(pStmt->pOptions->comment); if (NULL == pReq->newComment) { code = TSDB_CODE_OUT_OF_MEMORY; } pReq->newCommentLen = strlen(pReq->newComment); - } - else{ + } else { pReq->newCommentLen = -1; } } diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 95cdf58d5a5bc3d6e02e0bf67154839562d4a6e5..d94b430c452a0e131a1ef91f836615dad90bddaa 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -196,6 +196,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "%s function does not supportted in group query"; case TSDB_CODE_PAR_INVALID_TABLE_OPTION: return "Invalid option %s"; + case TSDB_CODE_PAR_INVALID_INTERP_CLAUSE: + return "Invalid usage of RANGE clause, EVERY clause or FILL clause"; case TSDB_CODE_OUT_OF_MEMORY: return "Out of memory"; default: diff --git a/source/libs/parser/test/parSelectTest.cpp b/source/libs/parser/test/parSelectTest.cpp index 693c422ae0bfd38da6fc69f33430aa46874278e5..11e09cd83d89f5ba02df18f5a439786ffe6d83c0 100644 --- a/source/libs/parser/test/parSelectTest.cpp +++ b/source/libs/parser/test/parSelectTest.cpp @@ -267,8 +267,6 @@ TEST_F(ParserSelectTest, interp) { run("SELECT INTERP(c1) FROM t1 EVERY(5s)"); - run("SELECT INTERP(c1) FROM t1 EVERY(5s) FILL(LINEAR)"); - run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s)"); run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s) FILL(LINEAR)"); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index b365afff02875114b4b61c598e6095c7ff4aab66..c7490faa0c28524830f16515e2eb16278b894706 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -508,10 +508,15 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p code = rewriteExprsForSelect(pInterpFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT); } - if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pRange) { - // SRangeNode* pRange = (SRangeNode*)pSelect->pRange; - // pInterpFunc->timeRange.skey = ((SValueNode*)pRange->pStart)->datum.i; - // pInterpFunc->timeRange.ekey = ((SValueNode*)pRange->pEnd)->datum.i; + if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pFill) { + SFillNode* pFill = (SFillNode*)pSelect->pFill; + pInterpFunc->timeRange = pFill->timeRange; + pInterpFunc->fillMode = pFill->mode; + pInterpFunc->pTimeSeries = nodesCloneNode(pFill->pWStartTs); + pInterpFunc->pFillValues = nodesCloneNode(pFill->pValues); + if (NULL == pInterpFunc->pTimeSeries || (NULL != pFill->pValues && NULL == pInterpFunc->pFillValues)) { + code = TSDB_CODE_OUT_OF_MEMORY; + } } if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pEvery) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 70a8aca8bd2aa1f7968284ed98556a94ca368c62..6af5a15147b4805eec1c374020cdb93c1f67abf8 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -877,6 +877,12 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh if (TSDB_CODE_SUCCESS == code) { pInterpFunc->timeRange = pFuncLogicNode->timeRange; pInterpFunc->interval = pFuncLogicNode->interval; + pInterpFunc->fillMode = pFuncLogicNode->fillMode; + pInterpFunc->pFillValues = nodesCloneNode(pFuncLogicNode->pFillValues); + pInterpFunc->pTimeSeries = nodesCloneNode(pFuncLogicNode->pTimeSeries); + if (NULL == pInterpFunc->pTimeSeries || (NULL != pFuncLogicNode->pFillValues && NULL == pInterpFunc->pFillValues)) { + code = TSDB_CODE_OUT_OF_MEMORY; + } } if (TSDB_CODE_SUCCESS == code) {