diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 4442338a7bbc8789f883f7a427687130c50a548f..00a30816baaf1bc2b908a2e9fad3bad899f59b48 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -8269,7 +8269,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) { for (int32_t i = 0; i < numOfExprs; ++i) { SExprInfo* pExpr = taosArrayGetP(pQueryInfo->exprList, i); - + int16_t functionId = pExpr->base.functionId; if (functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_SCALAR_EXPR || functionId == TSDB_FUNC_TS_DUMMY || functionId == TSDB_FUNC_STATE_COUNT || @@ -8277,7 +8277,8 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) { continue; } - if (functionId == TSDB_FUNC_WSTART || functionId == TSDB_FUNC_WSTOP || functionId == TSDB_FUNC_WDURATION) { + if (functionId == TSDB_FUNC_WSTART || functionId == TSDB_FUNC_WSTOP || functionId == TSDB_FUNC_WDURATION || + functionId == TSDB_FUNC_QSTART || functionId == TSDB_FUNC_QSTOP || functionId == TSDB_FUNC_QDURATION) { numOfTimeWindow++; } diff --git a/src/common/src/tname.c b/src/common/src/tname.c index 9357f657ba7059bef133616673bbe93f92f46001..97062cf3dd1279158315c55a4ee96f332e59f7ac 100644 --- a/src/common/src/tname.c +++ b/src/common/src/tname.c @@ -244,10 +244,13 @@ static struct SSchema _s = { .name = TSQL_TBNAME_L, }; -static struct SSchema _tswin[3] = { +static struct SSchema _tswin[6] = { {TSDB_DATA_TYPE_TIMESTAMP, TSQL_TSWIN_START, TSDB_TSWIN_START_COLUMN_INDEX, LONG_BYTES}, {TSDB_DATA_TYPE_TIMESTAMP, TSQL_TSWIN_STOP, TSDB_TSWIN_STOP_COLUMN_INDEX, LONG_BYTES}, {TSDB_DATA_TYPE_BIGINT, TSQL_TSWIN_DURATION, TSDB_TSWIN_DURATION_COLUMN_INDEX, LONG_BYTES}, + {TSDB_DATA_TYPE_TIMESTAMP, TSQL_QUERY_START, TSDB_QUERY_START_COLUMN_INDEX, LONG_BYTES}, + {TSDB_DATA_TYPE_TIMESTAMP, TSQL_QUERY_STOP, TSDB_QUERY_STOP_COLUMN_INDEX, LONG_BYTES}, + {TSDB_DATA_TYPE_BIGINT, TSQL_QUERY_DURATION, TSDB_QUERY_DURATION_COLUMN_INDEX, LONG_BYTES}, }; SSchema* tGetTimeWindowColumnSchema(int16_t columnIndex) { @@ -261,6 +264,15 @@ SSchema* tGetTimeWindowColumnSchema(int16_t columnIndex) { case TSDB_TSWIN_DURATION_COLUMN_INDEX: { return &_tswin[2]; } + case TSDB_QUERY_START_COLUMN_INDEX: { + return &_tswin[3]; + } + case TSDB_QUERY_STOP_COLUMN_INDEX: { + return &_tswin[4]; + } + case TSDB_QUERY_DURATION_COLUMN_INDEX: { + return &_tswin[5]; + } default: { return NULL; } diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 9779115432cfc5720f84734b656d12de3ff8571d..436f6cc3abcd8b062d81810b0c426c3e7cf9c167 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -280,9 +280,12 @@ do { \ #define TSDB_TSWIN_START_COLUMN_INDEX (-2) #define TSDB_TSWIN_STOP_COLUMN_INDEX (-3) #define TSDB_TSWIN_DURATION_COLUMN_INDEX (-4) -#define TSDB_MIN_VALID_COLUMN_INDEX (-4) +#define TSDB_QUERY_START_COLUMN_INDEX (-5) +#define TSDB_QUERY_STOP_COLUMN_INDEX (-6) +#define TSDB_QUERY_DURATION_COLUMN_INDEX (-7) +#define TSDB_MIN_VALID_COLUMN_INDEX (-7) -#define TSDB_COL_IS_TSWIN_COL(_i) ((_i) <= TSDB_TSWIN_START_COLUMN_INDEX && (_i) >= TSDB_TSWIN_DURATION_COLUMN_INDEX) +#define TSDB_COL_IS_TSWIN_COL(_i) ((_i) <= TSDB_TSWIN_START_COLUMN_INDEX && (_i) >= TSDB_QUERY_DURATION_COLUMN_INDEX) #define TSDB_UD_COLUMN_INDEX (-1000) #define TSDB_RES_COL_ID (-5000) diff --git a/src/query/inc/qAggMain.h b/src/query/inc/qAggMain.h index 0b87c546d570e8bf4dfc723ae9dc380442009280..323dff5aeb4055c84dcb631feec2bfb2db65ae1c 100644 --- a/src/query/inc/qAggMain.h +++ b/src/query/inc/qAggMain.h @@ -86,8 +86,11 @@ extern "C" { #define TSDB_FUNC_WSTART 44 #define TSDB_FUNC_WSTOP 45 #define TSDB_FUNC_WDURATION 46 +#define TSDB_FUNC_QSTART 47 +#define TSDB_FUNC_QSTOP 48 +#define TSDB_FUNC_QDURATION 49 -#define TSDB_FUNC_MAX_NUM 47 +#define TSDB_FUNC_MAX_NUM 50 #define TSDB_FUNCSTATE_SO 0x1u // single output #define TSDB_FUNCSTATE_MO 0x2u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM @@ -183,7 +186,7 @@ typedef struct SQLFunctionCtx { uint32_t order; // asc|desc int16_t inputType; int32_t inputBytes; - + int16_t outputType; int32_t outputBytes; // size of results, determined by function and input column data type int32_t interBufBytes; // internal buffer size @@ -211,6 +214,7 @@ typedef struct SQLFunctionCtx { SHashObj **pUniqueSet; // for unique function SHashObj **pModeSet; // for mode function + STimeWindow qWindow; // for _qstart/_qstop/_qduration column } SQLFunctionCtx; typedef struct SAggFunctionInfo { @@ -235,6 +239,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI int32_t *len, int32_t *interBytes, int16_t extLength, bool isSuperTable, SUdfInfo* pUdfInfo); int16_t getTimeWindowFunctionID(int16_t colIndex); +bool isTimeWindowFunction(int32_t functionId); int32_t isValidFunction(const char* name, int32_t len); bool isValidStateOper(char *oper, int32_t len); diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index 9c330b52375899215b764437751622db97c1c9e6..c2822559775c2c67760114acee1603d8c116c9a8 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -613,6 +613,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI return TSDB_CODE_SUCCESS; } +bool isTimeWindowFunction(int32_t functionId) { + return ((functionId >= TSDB_FUNC_WSTART) && (functionId <= TSDB_FUNC_QDURATION)); +} + // TODO use hash table int32_t isValidFunction(const char* name, int32_t len) { @@ -5788,27 +5792,51 @@ int16_t getTimeWindowFunctionID(int16_t colIndex) { case TSDB_TSWIN_DURATION_COLUMN_INDEX: { return TSDB_FUNC_WDURATION; } + case TSDB_QUERY_START_COLUMN_INDEX: { + return TSDB_FUNC_QSTART; + } + case TSDB_QUERY_STOP_COLUMN_INDEX: { + return TSDB_FUNC_QSTOP; + } + case TSDB_QUERY_DURATION_COLUMN_INDEX: { + return TSDB_FUNC_QDURATION; + } default: return TSDB_FUNC_INVALID_ID; } } -static void wstart_function(SQLFunctionCtx *pCtx) { +static void window_start_function(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, pCtx->size, 1); - *(int64_t *)(pCtx->pOutput) = pCtx->startTs; + if (pCtx->functionId == TSDB_FUNC_WSTART) { + *(int64_t *)(pCtx->pOutput) = pCtx->startTs; + } else { //TSDB_FUNC_QSTART + *(TSKEY *)(pCtx->pOutput) = pCtx->qWindow.skey; + } } -static void wstop_function(SQLFunctionCtx *pCtx) { +static void window_stop_function(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, pCtx->size, 1); - *(int64_t *)(pCtx->pOutput) = pCtx->endTs; + if (pCtx->functionId == TSDB_FUNC_WSTART) { + *(int64_t *)(pCtx->pOutput) = pCtx->endTs; + } else { //TSDB_FUNC_QSTOP + *(TSKEY *)(pCtx->pOutput) = pCtx->qWindow.ekey; + } } -static void wduration_function(SQLFunctionCtx *pCtx) { +static void window_duration_function(SQLFunctionCtx *pCtx) { SET_VAL(pCtx, pCtx->size, 1); - int64_t duration = pCtx->endTs - pCtx->startTs; + int64_t duration; + if (pCtx->functionId == TSDB_FUNC_WSTART) { + duration = pCtx->endTs - pCtx->startTs; + } else { //TSDB_FUNC_QDURATION + duration = pCtx->qWindow.ekey - pCtx->qWindow.skey; + } + if (duration < 0) { duration = -duration; } + *(int64_t *)(pCtx->pOutput) = duration; } ///////////////////////////////////////////////////////////////////////////////////////////// @@ -6373,7 +6401,7 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{ TSDB_FUNC_WSTART, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, function_setup, - wstart_function, + window_start_function, doFinalizer, copy_function, dataBlockRequired, @@ -6385,7 +6413,7 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{ TSDB_FUNC_WSTOP, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, function_setup, - wstop_function, + window_stop_function, doFinalizer, copy_function, dataBlockRequired, @@ -6397,7 +6425,43 @@ SAggFunctionInfo aAggs[TSDB_FUNC_MAX_NUM] = {{ TSDB_FUNC_WDURATION, TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, function_setup, - wduration_function, + window_duration_function, + doFinalizer, + copy_function, + dataBlockRequired, + }, + { + // 47 + "_qstart", + TSDB_FUNC_QSTART, + TSDB_FUNC_QSTART, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, + function_setup, + window_start_function, + doFinalizer, + copy_function, + dataBlockRequired, + }, + { + // 48 + "_qstop", + TSDB_FUNC_QSTOP, + TSDB_FUNC_QSTOP, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, + function_setup, + window_stop_function, + doFinalizer, + copy_function, + dataBlockRequired, + }, + { + // 49 + "_qduration", + TSDB_FUNC_QDURATION, + TSDB_FUNC_QDURATION, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_SELECTIVITY, + function_setup, + window_duration_function, doFinalizer, copy_function, dataBlockRequired, diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 8cb05977cd6430e41e8eceaeeb50f84f0abdc2fc..2f6dda527602ff1da294dfe1ec4620baf85b0c29 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -9171,7 +9171,7 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp type = s->type; bytes = s->bytes; } else if (TSDB_COL_IS_TSWIN_COL(pExprs[i].base.colInfo.colId) && - (pExprs[i].base.functionId >= TSDB_FUNC_WSTART || pExprs[i].base.functionId <= TSDB_FUNC_WDURATION)) { + (pExprs[i].base.functionId >= TSDB_FUNC_WSTART || pExprs[i].base.functionId <= TSDB_FUNC_QDURATION)) { SSchema* s = tGetTimeWindowColumnSchema(pExprs[i].base.colInfo.colId); type = s->type; bytes = s->bytes; @@ -9222,19 +9222,15 @@ int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExp int32_t param = (int32_t)pExprs[i].base.param[0].i64; if (pExprs[i].base.functionId > 0 && pExprs[i].base.functionId != TSDB_FUNC_SCALAR_EXPR && - pExprs[i].base.functionId != TSDB_FUNC_WSTART && - pExprs[i].base.functionId != TSDB_FUNC_WSTOP && - pExprs[i].base.functionId != TSDB_FUNC_WDURATION && - (type != pExprs[i].base.colType || bytes != pExprs[i].base.colBytes)) { + !isTimeWindowFunction(pExprs[i].base.functionId) && + (type != pExprs[i].base.colType || bytes != pExprs[i].base.colBytes)) { tfree(pExprs); return TSDB_CODE_QRY_INVALID_MSG; } // todo remove it if (pExprs[i].base.functionId != TSDB_FUNC_SCALAR_EXPR && - pExprs[i].base.functionId != TSDB_FUNC_WSTART && - pExprs[i].base.functionId != TSDB_FUNC_WSTOP && - pExprs[i].base.functionId != TSDB_FUNC_WDURATION && + !isTimeWindowFunction(pExprs[i].base.functionId) && getResultDataInfo(type, bytes, pExprs[i].base.functionId, param, &pExprs[i].base.resType, &pExprs[i].base.resBytes, &pExprs[i].base.interBytes, 0, isSuperTable, pUdfInfo) != TSDB_CODE_SUCCESS) { tfree(pExprs); @@ -9450,9 +9446,7 @@ static void doUpdateExprColumnIndex(SQueryAttr *pQueryAttr) { for (int32_t k = 0; k < pQueryAttr->numOfOutput; ++k) { SSqlExpr *pSqlExprMsg = &pQueryAttr->pExpr1[k].base; if (pSqlExprMsg->functionId == TSDB_FUNC_SCALAR_EXPR || - pSqlExprMsg->functionId == TSDB_FUNC_WSTART || - pSqlExprMsg->functionId == TSDB_FUNC_WSTOP || - pSqlExprMsg->functionId == TSDB_FUNC_WDURATION) { + isTimeWindowFunction(pSqlExprMsg->functionId)) { continue; } diff --git a/src/util/inc/ttoken.h b/src/util/inc/ttoken.h index da84d2586dbb56710500aa93b69d60f00352a7d1..79ad074f59578eae44bbb6519822809b367b5337 100644 --- a/src/util/inc/ttoken.h +++ b/src/util/inc/ttoken.h @@ -31,6 +31,10 @@ extern "C" { #define TSQL_TSWIN_STOP "_wstop" #define TSQL_TSWIN_DURATION "_wduration" +#define TSQL_QUERY_START "_qstart" +#define TSQL_QUERY_STOP "_qstop" +#define TSQL_QUERY_DURATION "_qduration" + #define TSQL_BLOCK_DIST "_BLOCK_DIST" #define TSQL_BLOCK_DIST_L "_block_dist"