diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 0a4b7e9f787dbd01685c2913e513250e46136b4a..bb21b43e677c50abf49223ff86bce1d1ac95b5d7 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -2505,6 +2505,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col const char* msg13 = "parameter list required"; const char* msg14 = "third parameter algorithm must be 'default' or 't-digest'"; const char* msg15 = "parameter is out of range [1, 1000]"; + const char* msg16 = "elapsed duration should be greater than or equal to database precision"; switch (functionId) { case TSDB_FUNC_COUNT: { @@ -2599,19 +2600,21 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col case TSDB_FUNC_FLOOR: case TSDB_FUNC_ROUND: case TSDB_FUNC_STDDEV: - case TSDB_FUNC_LEASTSQR: { + case TSDB_FUNC_LEASTSQR: + case TSDB_FUNC_ELAPSED: { // 1. valid the number of parameters int32_t numOfParams = (pItem->pNode->Expr.paramList == NULL)? 0: (int32_t) taosArrayGetSize(pItem->pNode->Expr.paramList); // no parameters or more than one parameter for function if (pItem->pNode->Expr.paramList == NULL || - (functionId != TSDB_FUNC_LEASTSQR && functionId != TSDB_FUNC_DERIVATIVE && numOfParams != 1) || - ((functionId == TSDB_FUNC_LEASTSQR || functionId == TSDB_FUNC_DERIVATIVE) && numOfParams != 3)) { + (functionId != TSDB_FUNC_LEASTSQR && functionId != TSDB_FUNC_DERIVATIVE && functionId != TSDB_FUNC_ELAPSED && numOfParams != 1) || + ((functionId == TSDB_FUNC_LEASTSQR || functionId == TSDB_FUNC_DERIVATIVE) && numOfParams != 3) || + (functionId == TSDB_FUNC_ELAPSED && numOfParams > 2)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); } tSqlExprItem* pParamElem = taosArrayGet(pItem->pNode->Expr.paramList, 0); - if (pParamElem->pNode->tokenId != TK_ALL && pParamElem->pNode->tokenId != TK_ID) { + if ((pParamElem->pNode->tokenId != TK_ALL && pParamElem->pNode->tokenId != TK_ID) || 0 == pParamElem->pNode->columnName.n) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2); } @@ -2620,6 +2623,11 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3); } + // elapsed only can be applied to primary key + if (functionId == TSDB_FUNC_ELAPSED && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), "elapsed only can be applied to primary key"); + } + pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex); STableComInfo info = tscGetTableInfo(pTableMetaInfo->pTableMeta); @@ -2631,7 +2639,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col // 2. check if sql function can be applied on this column data type SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex); - if (!IS_NUMERIC_TYPE(pSchema->type)) { + if (!IS_NUMERIC_TYPE(pSchema->type) && (functionId != TSDB_FUNC_ELAPSED)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); } else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) && (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9); @@ -2676,11 +2684,11 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col } else if (functionId == TSDB_FUNC_IRATE) { int64_t prec = info.precision; tscExprAddParams(&pExpr->base, (char*)&prec, TSDB_DATA_TYPE_BIGINT, LONG_BYTES); - } else if (functionId == TSDB_FUNC_DERIVATIVE) { + } else if (functionId == TSDB_FUNC_DERIVATIVE || (functionId == TSDB_FUNC_ELAPSED && 2 == numOfParams)) { char val[8] = {0}; int64_t tickPerSec = 0; - if (tVariantDump(&pParamElem[1].pNode->value, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, true) < 0) { + if ((TSDB_DATA_TYPE_NULL == pParamElem[1].pNode->value.nType) || tVariantDump(&pParamElem[1].pNode->value, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, true) < 0) { return TSDB_CODE_TSC_INVALID_OPERATION; } @@ -2690,23 +2698,27 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col tickPerSec /= TSDB_TICK_PER_SECOND(TSDB_TIME_PRECISION_MILLI); } - if (tickPerSec <= 0 || tickPerSec < TSDB_TICK_PER_SECOND(info.precision)) { + if ((tickPerSec < TSDB_TICK_PER_SECOND(info.precision)) && (functionId == TSDB_FUNC_DERIVATIVE)) { return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg10); - } + } else if (tickPerSec <= 0) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg16); + } tscExprAddParams(&pExpr->base, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, LONG_BYTES); - memset(val, 0, tListLen(val)); + if (functionId == TSDB_FUNC_DERIVATIVE) { + memset(val, 0, tListLen(val)); - if (tVariantDump(&pParamElem[2].pNode->value, val, TSDB_DATA_TYPE_BIGINT, true) < 0) { - return TSDB_CODE_TSC_INVALID_OPERATION; - } + if (tVariantDump(&pParamElem[2].pNode->value, val, TSDB_DATA_TYPE_BIGINT, true) < 0) { + return TSDB_CODE_TSC_INVALID_OPERATION; + } - int64_t v = *(int64_t*) val; - if (v != 0 && v != 1) { - return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg11); - } + int64_t v = *(int64_t*) val; + if (v != 0 && v != 1) { + return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg11); + } - tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, LONG_BYTES); + tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, LONG_BYTES); + } } SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex); @@ -3125,7 +3137,6 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col return TSDB_CODE_SUCCESS; } - default: { pUdfInfo = isValidUdf(pQueryInfo->pUdfInfo, pItem->pNode->Expr.operand.z, pItem->pNode->Expr.operand.n); if (pUdfInfo == NULL) { @@ -3496,7 +3507,7 @@ int32_t tscTansformFuncForSTableQuery(SQueryInfo* pQueryInfo) { if ((functionId >= TSDB_FUNC_SUM && functionId <= TSDB_FUNC_TWA) || (functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_STDDEV_DST) || (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_IRATE) || - (functionId == TSDB_FUNC_SAMPLE)) { + (functionId == TSDB_FUNC_SAMPLE) || (functionId == TSDB_FUNC_ELAPSED)) { if (getResultDataInfo(pSrcSchema->type, pSrcSchema->bytes, functionId, (int32_t)pExpr->base.param[0].i64, &type, &bytes, &interBytes, 0, true, NULL) != TSDB_CODE_SUCCESS) { return TSDB_CODE_TSC_INVALID_OPERATION; @@ -3551,8 +3562,8 @@ void tscRestoreFuncForSTableQuery(SQueryInfo* pQueryInfo) { } bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) { - const char* msg1 = "TWA/Diff/Derivative/Irate/CSUM/MAVG/SAMPLE/INTERP are not allowed to apply to super table directly"; - const char* msg2 = "TWA/Diff/Derivative/Irate/CSUM/MAVG/SAMPLE/INTERP only support group by tbname for super table query"; + const char* msg1 = "TWA/Diff/Derivative/Irate/CSUM/MAVG/SAMPLE/INTERP/Elapsed are not allowed to apply to super table directly"; + const char* msg2 = "TWA/Diff/Derivative/Irate/CSUM/MAVG/SAMPLE/INTERP/Elapsed only support group by tbname for super table query"; const char* msg3 = "functions not support for super table query"; // filter sql function not supported by metric query yet. @@ -3570,7 +3581,7 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) } if (tscIsTWAQuery(pQueryInfo) || tscIsDiffDerivLikeQuery(pQueryInfo) || tscIsIrateQuery(pQueryInfo) || - tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_SAMPLE) || tscGetPointInterpQuery(pQueryInfo)) { + tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_SAMPLE) || tscGetPointInterpQuery(pQueryInfo) || tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_ELAPSED)) { if (pQueryInfo->groupbyExpr.numOfGroupCols == 0) { invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1); return true; @@ -7474,7 +7485,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char* const char* msg3 = "group by/session/state_window not allowed on projection query"; const char* msg4 = "retrieve tags not compatible with group by or interval query"; const char* msg5 = "functions can not be mixed up"; - const char* msg6 = "TWA/Diff/Derivative/Irate/CSum/MAvg only support group by tbname"; + const char* msg6 = "TWA/Diff/Derivative/Irate/CSum/MAvg/Elapsed only support group by tbname"; // only retrieve tags, group by is not supportted if (tscQueryTags(pQueryInfo)) { @@ -7536,7 +7547,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char* } if ((!pQueryInfo->stateWindow) && (f == TSDB_FUNC_DIFF || f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || - f == TSDB_FUNC_IRATE || f == TSDB_FUNC_CSUM || f == TSDB_FUNC_MAVG)) { + f == TSDB_FUNC_IRATE || f == TSDB_FUNC_CSUM || f == TSDB_FUNC_MAVG || f == TSDB_FUNC_ELAPSED)) { for (int32_t j = 0; j < pQueryInfo->groupbyExpr.numOfGroupCols; ++j) { SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, j); if (j == 0) { @@ -7585,7 +7596,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char* int32_t validateFunctionFromUpstream(SQueryInfo* pQueryInfo, char* msg) { - const char* msg1 = "TWA/Diff/Derivative/Irate are not allowed to apply to super table without group by tbname"; + const char* msg1 = "TWA/Diff/Derivative/Irate/elapsed are not allowed to apply to super table without group by tbname"; const char* msg2 = "group by not supported in nested interp query"; const char* msg3 = "order by not supported in nested interp query"; const char* msg4 = "first column should be timestamp for interp query"; @@ -7598,7 +7609,7 @@ int32_t validateFunctionFromUpstream(SQueryInfo* pQueryInfo, char* msg) { SExprInfo* pExpr = tscExprGet(pQueryInfo, i); int32_t f = pExpr->base.functionId; - if (f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE || f == TSDB_FUNC_DIFF) { + if (f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE || f == TSDB_FUNC_DIFF || f == TSDB_FUNC_ELAPSED) { for (int32_t j = 0; j < upNum; ++j) { SQueryInfo* pUp = taosArrayGetP(pQueryInfo->pUpstream, j); STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pUp, 0); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 98693c94f1d68c194946fbf8b4c00e92c410c9ea..361f73945533b03017b5e156fff975fa1106925f 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -943,6 +943,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->tsCompQuery = query.tsCompQuery; pQueryMsg->simpleAgg = query.simpleAgg; pQueryMsg->pointInterpQuery = query.pointInterpQuery; + pQueryMsg->needTableSeqScan = query.needTableSeqScan; pQueryMsg->needReverseScan = query.needReverseScan; pQueryMsg->stateWindow = query.stateWindow; pQueryMsg->numOfTags = htonl(numOfTags); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 94b4b45eda919e704f2551624b120081d903f50b..835b32eaaa198445945aff5ddd72cedc444f8318 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -375,6 +375,10 @@ bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo) { return true; } +bool tscNeedTableSeqScan(SQueryInfo* pQueryInfo) { + return pQueryInfo->stableQuery && (tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_TWA) || tscQueryContainsFunction(pQueryInfo, TSDB_FUNC_ELAPSED)); +} + bool tscGetPointInterpQuery(SQueryInfo* pQueryInfo) { size_t size = tscNumOfExprs(pQueryInfo); for (int32_t i = 0; i < size; ++i) { @@ -391,7 +395,6 @@ bool tscGetPointInterpQuery(SQueryInfo* pQueryInfo) { return false; } - bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo) { if (tscIsProjectionQuery(pQueryInfo)) { return false; @@ -524,7 +527,7 @@ bool timeWindowInterpoRequired(SQueryInfo *pQueryInfo) { } int32_t functionId = pExpr->base.functionId; - if (functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_INTERP) { + if (functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_INTERP || functionId == TSDB_FUNC_ELAPSED) { return true; } } @@ -5054,6 +5057,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt pQueryAttr->groupbyColumn = (!pQueryInfo->stateWindow) && tscGroupbyColumn(pQueryInfo); pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo); pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo); + pQueryAttr->needTableSeqScan = tscNeedTableSeqScan(pQueryInfo); pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo); pQueryAttr->distinct = pQueryInfo->distinct; pQueryAttr->sw = pQueryInfo->sessionWindow; diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 0f291936f5519b1db7f98b098e5f9f82303cd0f5..84491e0a438fdb3b5dd2905acffaf32c76b23c9b 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -475,6 +475,7 @@ typedef struct { bool tsCompQuery; // is tscomp query bool simpleAgg; bool pointInterpQuery; // point interpolation query + bool needTableSeqScan; // need scan table by table bool needReverseScan; // need reverse scan bool stateWindow; // state window flag diff --git a/src/query/inc/qAggMain.h b/src/query/inc/qAggMain.h index c9a022d7a1210b31b81bf3895a9b804a03bd30ae..be0f6aee59de760088c8f10b9d1a5dca79882edd 100644 --- a/src/query/inc/qAggMain.h +++ b/src/query/inc/qAggMain.h @@ -79,6 +79,8 @@ extern "C" { #define TSDB_FUNC_BLKINFO 39 +#define TSDB_FUNC_ELAPSED 40 + /////////////////////////////////////////// // the following functions is not implemented. // after implementation, move them before TSDB_FUNC_BLKINFO. also make TSDB_FUNC_BLKINFO the maxium function index diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index fe4fb6c950d4f3e0186668d957900934ba243e5d..0a52a44ed1f7019abc7542fab75cfd098302dbc1 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -230,6 +230,7 @@ typedef struct SQueryAttr { bool diffQuery; // is diff query bool simpleAgg; bool pointInterpQuery; // point interpolation query + bool needTableSeqScan; // need scan table by table bool needReverseScan; // need reverse scan bool distinct; // distinct query or not bool stateWindow; // window State on sub/normal table diff --git a/src/query/src/qAggMain.c b/src/query/src/qAggMain.c index f26b3cda1a56df698db0db1465bd6116726ca0ae..62f0edc8b13c97955fb35c6ae74a9c9abb3b59ea 100644 --- a/src/query/src/qAggMain.c +++ b/src/query/src/qAggMain.c @@ -196,6 +196,12 @@ typedef struct { char *taglists; } SSampleFuncInfo; +typedef struct SElapsedInfo { + int8_t hasResult; + TSKEY min; + TSKEY max; +} SElapsedInfo; + typedef struct { bool valueAssigned; union { @@ -371,6 +377,11 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI *bytes = sizeof(STwaInfo); *interBytes = *bytes; return TSDB_CODE_SUCCESS; + } else if (functionId == TSDB_FUNC_ELAPSED) { + *type = TSDB_DATA_TYPE_BINARY; + *bytes = sizeof(SElapsedInfo); + *interBytes = *bytes; + return TSDB_CODE_SUCCESS; } } @@ -471,6 +482,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI *bytes = sizeof(SStddevdstInfo); *interBytes = (*bytes); + } else if (functionId == TSDB_FUNC_ELAPSED) { + *type = TSDB_DATA_TYPE_DOUBLE; + *bytes = tDataTypes[*type].bytes; + *interBytes = sizeof(SElapsedInfo); } else { return TSDB_CODE_TSC_INVALID_OPERATION; } @@ -480,7 +495,7 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI // TODO use hash table int32_t isValidFunction(const char* name, int32_t len) { - for(int32_t i = 0; i <= TSDB_FUNC_BLKINFO; ++i) { + for(int32_t i = 0; i <= TSDB_FUNC_ELAPSED; ++i) { int32_t nameLen = (int32_t) strlen(aAggs[i].name); if (len != nameLen) { continue; @@ -3449,7 +3464,7 @@ static void spread_function(SQLFunctionCtx *pCtx) { SSpreadInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); int32_t numOfElems = 0; - + // todo : opt with pre-calculated result // column missing cause the hasNull to be true if (pCtx->preAggVals.isSet) { @@ -3552,7 +3567,7 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) { * the type of intermediate data is binary */ SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - + if (pCtx->currentStage == MERGE_STAGE) { assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); @@ -4922,6 +4937,120 @@ static void sample_func_finalizer(SQLFunctionCtx *pCtx) { doFinalizer(pCtx); } +static SElapsedInfo * getSElapsedInfo(SQLFunctionCtx *pCtx) { + if (pCtx->stableQuery && pCtx->currentStage != MERGE_STAGE) { + return (SElapsedInfo *)pCtx->pOutput; + } else { + return GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + } +} + +static bool elapsedSetup(SQLFunctionCtx *pCtx, SResultRowCellInfo* pResInfo) { + if (!function_setup(pCtx, pResInfo)) { + return false; + } + + SElapsedInfo *pInfo = getSElapsedInfo(pCtx); + pInfo->min = MAX_TS_KEY; + pInfo->max = 0; + pInfo->hasResult = 0; + + return true; +} + +static int32_t elapsedRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_t colId) { + return BLK_DATA_NO_NEEDED; +} + +static void elapsedFunction(SQLFunctionCtx *pCtx) { + SElapsedInfo *pInfo = getSElapsedInfo(pCtx); + if (pCtx->preAggVals.isSet) { + if (pInfo->min == MAX_TS_KEY) { + pInfo->min = pCtx->preAggVals.statis.min; + pInfo->max = pCtx->preAggVals.statis.max; + } else { + if (pCtx->order == TSDB_ORDER_ASC) { + pInfo->max = pCtx->preAggVals.statis.max; + } else { + pInfo->min = pCtx->preAggVals.statis.min; + } + } + } else { + // 0 == pCtx->size mean this is end interpolation. + if (0 == pCtx->size) { + if (pCtx->order == TSDB_ORDER_DESC) { + if (pCtx->end.key != INT64_MIN) { + pInfo->min = pCtx->end.key; + } + } else { + if (pCtx->end.key != INT64_MIN) { + pInfo->max = pCtx->end.key + 1; + } + } + goto elapsedOver; + } + + int64_t *ptsList = (int64_t *)GET_INPUT_DATA_LIST(pCtx); + // pCtx->start.key == INT64_MIN mean this is first window or there is actual start point of current window. + // pCtx->end.key == INT64_MIN mean current window does not end in current data block or there is actual end point of current window. + if (pCtx->order == TSDB_ORDER_DESC) { + if (pCtx->start.key == INT64_MIN) { + pInfo->max = (pInfo->max < ptsList[pCtx->size - 1]) ? ptsList[pCtx->size - 1] : pInfo->max; + } else { + pInfo->max = pCtx->start.key + 1; + } + + if (pCtx->end.key != INT64_MIN) { + pInfo->min = pCtx->end.key; + } else { + pInfo->min = ptsList[0]; + } + } else { + if (pCtx->start.key == INT64_MIN) { + pInfo->min = (pInfo->min > ptsList[0]) ? ptsList[0] : pInfo->min; + } else { + pInfo->min = pCtx->start.key; + } + + if (pCtx->end.key != INT64_MIN) { + pInfo->max = pCtx->end.key + 1; + } else { + pInfo->max = ptsList[pCtx->size - 1]; + } + } + } + +elapsedOver: + SET_VAL(pCtx, pCtx->size, 1); + + if (pCtx->size > 0) { + GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG; + pInfo->hasResult = DATA_SET_FLAG; + } +} + +static void elapsedMerge(SQLFunctionCtx *pCtx) { + SElapsedInfo *pInfo = getSElapsedInfo(pCtx); + memcpy(pInfo, pCtx->pInput, (size_t)pCtx->inputBytes); + GET_RES_INFO(pCtx)->hasResult = pInfo->hasResult; +} + +static void elapsedFinalizer(SQLFunctionCtx *pCtx) { + if (GET_RES_INFO(pCtx)->hasResult != DATA_SET_FLAG) { + setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes); + return; + } + + SElapsedInfo *pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + *(double *)pCtx->pOutput = (double)pInfo->max - (double)pInfo->min; + if (pCtx->numOfParams > 0 && pCtx->param[0].i64 > 0) { + *(double *)pCtx->pOutput = *(double *)pCtx->pOutput / pCtx->param[0].i64; + } + GET_RES_INFO(pCtx)->numOfRes = 1; + + doFinalizer(pCtx); +} + ///////////////////////////////////////////////////////////////////////////////////////////// /* * function compatible list. @@ -4942,8 +5071,8 @@ int32_t functionCompatList[] = { 1, 1, 1, 1, -1, 1, 1, 1, 5, 1, 1, // tid_tag, deriv, ceil, floor, round, csum, mavg, sample, 6, 8, 1, 1, 1, -1, -1, -1, - // block_info - 7 + // block_info, elapsed + 7, 1 }; SAggFunctionInfo aAggs[] = {{ @@ -5426,4 +5555,16 @@ SAggFunctionInfo aAggs[] = {{ block_func_merge, dataBlockRequired, }, + { + // 40 + "elapsed", + TSDB_FUNC_ELAPSED, + TSDB_FUNC_ELAPSED, + TSDB_BASE_FUNC_SO, + elapsedSetup, + elapsedFunction, + elapsedFinalizer, + elapsedMerge, + elapsedRequired, + } }; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 7e89b3a766f0c9124417c65b83dfe55853b0f094..251e210600198de0ba9aec34d322de6839a621b2 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -933,9 +933,10 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput) { SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr; - bool hasAggregates = pCtx[0].preAggVals.isSet; for (int32_t k = 0; k < numOfOutput; ++k) { + bool hasAggregates = pCtx[k].preAggVals.isSet; + pCtx[k].size = forwardStep; pCtx[k].startTs = pWin->skey; @@ -1258,7 +1259,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, for (int32_t k = 0; k < pOperator->numOfOutput; ++k) { int32_t functionId = pCtx[k].functionId; - if (functionId != TSDB_FUNC_TWA && functionId != TSDB_FUNC_INTERP) { + if (functionId != TSDB_FUNC_TWA && functionId != TSDB_FUNC_INTERP && functionId != TSDB_FUNC_ELAPSED) { pCtx[k].start.key = INT64_MIN; continue; } @@ -1301,7 +1302,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, pCtx[k].end.ptr = (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes; } } - } else if (functionId == TSDB_FUNC_TWA) { + } else if (functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_ELAPSED) { assert(curTs != windowKey); if (prevRowIndex == -1) { @@ -1468,7 +1469,6 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, pQueryAttr); bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); - SResultRow* pResult = NULL; int32_t ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); @@ -1491,23 +1491,22 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul continue; } - STimeWindow w = pRes->win; - ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &w, masterScan, &pResult, - tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); - if (ret != TSDB_CODE_SUCCESS) { - longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); + STimeWindow w = pRes->win; + ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &w, masterScan, &pResult, + tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); + if (ret != TSDB_CODE_SUCCESS) { + longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); + } - doTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY*)pRuntimeEnv->prevRow[0], -1, - tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP); + assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); - setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); - setNotInterpoWindowKey(pInfo->pCtx, pQueryAttr->numOfOutput, RESULT_ROW_START_INTERP); + doTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY*)pRuntimeEnv->prevRow[0], -1, + tsCols[startPos], startPos, QUERY_IS_ASC_QUERY(pQueryAttr) ? w.ekey : w.skey, RESULT_ROW_END_INTERP); - doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput); - } + setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + setNotInterpoWindowKey(pInfo->pCtx, pQueryAttr->numOfOutput, RESULT_ROW_START_INTERP); + doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput); + } // restore current time window ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx, @@ -1821,7 +1820,7 @@ void setBlockStatisInfo(SQLFunctionCtx *pCtx, SSDataBlock* pSDataBlock, SColInde pCtx->hasNull = hasNull(pColIndex, pStatis); // set the statistics data for primary time stamp column - if (pCtx->functionId == TSDB_FUNC_SPREAD && pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + if ((pCtx->functionId == TSDB_FUNC_SPREAD || pCtx->functionId == TSDB_FUNC_ELAPSED) && pColIndex->colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { pCtx->preAggVals.isSet = true; pCtx->preAggVals.statis.min = pSDataBlock->info.window.skey; pCtx->preAggVals.statis.max = pSDataBlock->info.window.ekey; @@ -6203,7 +6202,17 @@ group_finished_exit: return true; } +static void resetInterpolation(SQLFunctionCtx *pCtx, SQueryRuntimeEnv* pRuntimeEnv, int32_t numOfOutput) { + if (!pRuntimeEnv->pQueryAttr->timeWindowInterpo) { + return; + } + for (int32_t i = 0; i < numOfOutput; ++i) { + pCtx[i].start.key = INT64_MIN; + pCtx[i].end.key = INT64_MIN; + } + *(TSKEY *)pRuntimeEnv->prevRow[0] = INT64_MIN; +} static void doTimeEveryImpl(SOperatorInfo* pOperator, SQLFunctionCtx *pCtx, SSDataBlock* pBlock, bool newgroup) { STimeEveryOperatorInfo* pEveryInfo = (STimeEveryOperatorInfo*) pOperator->info; @@ -6431,6 +6440,7 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { SOperatorInfo* upstream = pOperator->upstream[0]; + STableId prevId = {0, 0}; while(1) { publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = upstream->exec(upstream, newgroup); @@ -6440,6 +6450,12 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) { break; } + if (prevId.tid != pBlock->info.tid || prevId.uid != pBlock->info.uid) { + resetInterpolation(pIntervalInfo->pCtx, pRuntimeEnv, pOperator->numOfOutput); + prevId.uid = pBlock->info.uid; + prevId.tid = pBlock->info.tid; + } + // the pDataBlock are always the same one, no need to call this again STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; @@ -8785,6 +8801,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S pQueryAttr->tsCompQuery = pQueryMsg->tsCompQuery; pQueryAttr->simpleAgg = pQueryMsg->simpleAgg; pQueryAttr->pointInterpQuery = pQueryMsg->pointInterpQuery; + pQueryAttr->needTableSeqScan = pQueryMsg->needTableSeqScan; pQueryAttr->needReverseScan = pQueryMsg->needReverseScan; pQueryAttr->stateWindow = pQueryMsg->stateWindow; pQueryAttr->vgId = vgId; diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c index 27a22f70832dc9669aa473b03820d84d4736b497..eb3a3f36207d27d610e29bd890a56b2ef411157c 100644 --- a/src/query/src/qPlan.c +++ b/src/query/src/qPlan.c @@ -538,7 +538,7 @@ SArray* createTableScanPlan(SQueryAttr* pQueryAttr) { } else { if (pQueryAttr->queryBlockDist) { op = OP_TableBlockInfoScan; - } else if (pQueryAttr->tsCompQuery || pQueryAttr->diffQuery) { + } else if (pQueryAttr->tsCompQuery || pQueryAttr->diffQuery || pQueryAttr->needTableSeqScan) { op = OP_TableSeqScan; } else if (pQueryAttr->needReverseScan || pQueryAttr->pointInterpQuery) { op = OP_DataBlocksOptScan; diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index 86bd39fa065d99ff66eeb2580ec80f07aa7601e6..3359931d5437a6007973b8522bb7cc7fe66fbc78 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -371,6 +371,7 @@ python3 ./test.py -f functions/function_irate.py python3 ./test.py -f functions/function_ceil.py python3 ./test.py -f functions/function_floor.py python3 ./test.py -f functions/function_round.py +python3 ./test.py -f functions/function_elapsed.py python3 ./test.py -f functions/function_mavg.py python3 ./test.py -f functions/function_csum.py diff --git a/tests/pytest/functions/function_elapsed.py b/tests/pytest/functions/function_elapsed.py new file mode 100644 index 0000000000000000000000000000000000000000..6bc54bfc1c7fc173bf9447da1a9b0aa4aba3e525 --- /dev/null +++ b/tests/pytest/functions/function_elapsed.py @@ -0,0 +1,97 @@ +################################################################### +# Copyright (c) 2020 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import taos +from util.log import * +from util.cases import * +from util.sql import * +from functions.function_elapsed_case import * + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + + def genTime(self, no): + h = int(no / 60) + hs = "%d" %h + if h < 10: + hs = "0%d" %h + + m = int(no % 60) + ms = "%d" %m + if m < 10: + ms = "0%d" %m + + return hs, ms + + def general(self): + # normal table + tdSql.execute("create database wxy_db minrows 10 maxrows 200") + tdSql.execute("use wxy_db") + tdSql.execute("create table t1(ts timestamp, i int, b bigint, f float, d double, bin binary(10), s smallint, t tinyint, bl bool, n nchar(10), ts1 timestamp)") + for i in range(1, 1001): + hs, ms = self.genTime(i) + if i < 500: + ret = tdSql.execute("insert into t1(ts, i, b) values (\"2021-11-22 %s:%s:00\", %d, 1)" % (hs, ms, i)) + else: + ret = tdSql.execute("insert into t1(ts, i, b) values (\"2021-11-22 %s:%s:00\", %d, 0)" % (hs, ms, i)) + tdSql.query("select count(*) from t1") + tdSql.checkEqual(int(tdSql.getData(0, 0)), 1000) + + # empty normal table + tdSql.execute("create table t2(ts timestamp, i int, b bigint, f float, d double, bin binary(10), s smallint, t tinyint, bl bool, n nchar(10), ts1 timestamp)") + + tdSql.execute("create database wxy_db_ns precision \"ns\"") + tdSql.execute("use wxy_db_ns") + tdSql.execute("create table t1 (ts timestamp, f float)") + tdSql.execute("insert into t1 values('2021-11-18 00:00:00.000000100', 1)" + "('2021-11-18 00:00:00.000000200', 2)" + "('2021-11-18 00:00:00.000000300', 3)" + "('2021-11-18 00:00:00.000000500', 4)") + + # super table + tdSql.execute("use wxy_db") + tdSql.execute("create stable st1(ts timestamp, i int, b bigint, f float, d double, bin binary(10), s smallint, t tinyint, bl bool, n nchar(10), ts1 timestamp) tags(id int)") + tdSql.execute("create table st1s1 using st1 tags(1)") + tdSql.execute("create table st1s2 using st1 tags(2)") + for i in range(1, 1001): + hs, ms = self.genTime(i) + if 0 == i % 2: + ret = tdSql.execute("insert into st1s1(ts, i) values (\"2021-11-22 %s:%s:00\", %d)" % (hs, ms, i)) + else: + ret = tdSql.execute("insert into st1s2(ts, i) values (\"2021-11-22 %s:%s:00\", %d)" % (hs, ms, i)) + tdSql.query("select count(*) from st1s1") + tdSql.checkEqual(int(tdSql.getData(0, 0)), 500) + tdSql.query("select count(*) from st1s2") + tdSql.checkEqual(int(tdSql.getData(0, 0)), 500) + # empty super table + tdSql.execute("create stable st2(ts timestamp, i int, b bigint, f float, d double, bin binary(10), s smallint, t tinyint, bl bool, n nchar(10), ts1 timestamp) tags(id int)") + tdSql.execute("create table st2s1 using st1 tags(1)") + tdSql.execute("create table st2s2 using st1 tags(2)") + + tdSql.execute("create stable st3(ts timestamp, i int, b bigint, f float, d double, bin binary(10), s smallint, t tinyint, bl bool, n nchar(10), ts1 timestamp) tags(id int)") + + def run(self): + tdSql.prepare() + self.general() + ElapsedCase().run() + + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) diff --git a/tests/pytest/functions/function_elapsed_case.py b/tests/pytest/functions/function_elapsed_case.py new file mode 100644 index 0000000000000000000000000000000000000000..56610a9347c3ab90a9addc64dd62a6ed60758abf --- /dev/null +++ b/tests/pytest/functions/function_elapsed_case.py @@ -0,0 +1,374 @@ +################################################################### +# Copyright (c) 2020 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import taos +from util.log import * +from util.cases import * +from util.sql import * + +class ElapsedCase: + def __init__(self, restart = False): + self.restart = restart + + def selectTest(self): + tdSql.execute("use wxy_db") + + tdSql.query("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.checkRows(1) + tdSql.checkCols(1) + + tdSql.query("select elapsed(ts, 1m) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.checkEqual(int(tdSql.getData(0, 0)), 999) + + tdSql.query("select elapsed(ts), elapsed(ts, 1m), elapsed(ts, 10m) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.checkEqual(int(tdSql.getData(0, 1)), 999) + tdSql.checkEqual(int(tdSql.getData(0, 2)), 99) + + tdSql.query("select elapsed(ts), count(*), avg(f), twa(f), irate(f), sum(f), stddev(f), leastsquares(f, 1, 1), " + "min(f), max(f), first(f), last(f), percentile(i, 20), apercentile(i, 30), last_row(i), spread(i) " + "from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.checkRows(1) + tdSql.checkCols(16) + tdSql.checkEqual(int(tdSql.getData(0, 1)), 1000) + + tdSql.query("select elapsed(ts) + 10, elapsed(ts) - 20, elapsed(ts) * 0, elapsed(ts) / 10, elapsed(ts) / elapsed(ts, 1m) from t1 " + "where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.checkRows(1) + tdSql.checkCols(5) + tdSql.checkEqual(int(tdSql.getData(0, 2)), 0) + + tdSql.query("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' group by tbname") + tdSql.checkRows(2) + tdSql.checkCols(2) # append tbname + + tdSql.query("select elapsed(ts, 10m) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' group by tbname") + tdSql.checkEqual(int(tdSql.getData(0, 0)), 99) + tdSql.checkEqual(int(tdSql.getData(1, 0)), 99) + + tdSql.query("select elapsed(ts), elapsed(ts, 10m), elapsed(ts, 100m) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' group by tbname") + tdSql.checkEqual(int(tdSql.getData(0, 1)), 99) + tdSql.checkEqual(int(tdSql.getData(0, 2)), 9) + # stddev(f), + tdSql.query("select elapsed(ts), count(*), avg(f), twa(f), irate(f), sum(f), min(f), max(f), first(f), last(f), apercentile(i, 30), last_row(i), spread(i) " + "from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' group by tbname") + tdSql.checkRows(2) + tdSql.checkCols(14) # append tbname + tdSql.checkEqual(int(tdSql.getData(0, 1)), 500) + + tdSql.query("select elapsed(ts) + 10, elapsed(ts) - 20, elapsed(ts) * 0, elapsed(ts) / 10, elapsed(ts) / elapsed(ts, 1m) " + "from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' group by tbname") + tdSql.checkRows(2) + tdSql.checkCols(6) # append tbname + tdSql.checkEqual(int(tdSql.getData(0, 2)), 0) + + tdSql.query("select elapsed(ts), tbname from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' group by tbname") + tdSql.checkRows(2) + tdSql.checkCols(3) # additional append tbname + + tdSql.execute("use wxy_db_ns") + tdSql.query("select elapsed(ts, 1b), elapsed(ts, 1u) from t1") + tdSql.checkRows(1) + tdSql.checkCols(2) + + self.selectIllegalTest() + + # It has little to do with the elapsed function, so just simple test. + def whereTest(self): + tdSql.execute("use wxy_db") + + tdSql.query("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' and id = 1 group by tbname") + tdSql.checkRows(1) + tdSql.checkCols(2) # append tbname + + # It has little to do with the elapsed function, so just simple test. + def sessionTest(self): + tdSql.execute("use wxy_db") + + tdSql.query("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' session(ts, 10s)") + tdSql.checkRows(1000) + + tdSql.query("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' session(ts, 70s)") + tdSql.checkRows(1) + + # It has little to do with the elapsed function, so just simple test. + def stateWindowTest(self): + tdSql.execute("use wxy_db") + + tdSql.query("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' state_window(i)") + tdSql.checkRows(1000) + + tdSql.query("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' state_window(b)") + tdSql.checkRows(2) + + def intervalTest(self): + tdSql.execute("use wxy_db") + + tdSql.query("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(1m)") + tdSql.checkRows(1000) + + # The first window has 9 records, and the last window has 1 record. + tdSql.query("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(10m)") + tdSql.checkRows(101) + tdSql.checkEqual(int(tdSql.getData(0, 1)), 9 * 60 * 1000) + tdSql.checkEqual(int(tdSql.getData(100, 1)), 0) + + # Skip windows without data. + tdSql.query("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(35s)") + tdSql.checkRows(1000) + + tdSql.query("select elapsed(ts), count(*), avg(f), twa(f), irate(f), sum(f), stddev(f), leastsquares(f, 1, 1), " + "min(f), max(f), first(f), last(f), percentile(i, 20), apercentile(i, 30), last_row(i), spread(i) " + "from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(20m)") + tdSql.checkRows(51) # ceil(1000/50) + 1(last point), window is half-open interval. + tdSql.checkCols(17) # front push timestamp + + tdSql.query("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(40s) group by tbname") + tdSql.checkRows(1000) + + tdSql.query("select elapsed(ts) + 10, elapsed(ts) - 20, elapsed(ts) * 0, elapsed(ts) / 10, elapsed(ts) / elapsed(ts, 1m) " + "from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(30m) group by tbname") + tdSql.checkRows(68) # ceil(1000/30) + tdSql.checkCols(7) # front push timestamp and append tbname + + # It has little to do with the elapsed function, so just simple test. + def fillTest(self): + tdSql.execute("use wxy_db") + + tdSql.query("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(30s) fill(value, 1000)") + tdSql.checkRows(2880) # The range of window conditions is 24 hours. + tdSql.checkEqual(int(tdSql.getData(0, 1)), 1000) + + tdSql.query("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(30s) fill(prev)") + tdSql.checkRows(2880) # The range of window conditions is 24 hours. + tdSql.checkData(0, 1, None) + + tdSql.query("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(30s) fill(null)") + tdSql.checkRows(2880) # The range of window conditions is 24 hours. + tdSql.checkData(0, 1, None) + + tdSql.query("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(30s) fill(linear)") + tdSql.checkRows(2880) # The range of window conditions is 24 hours. + + tdSql.query("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(30s) fill(next)") + tdSql.checkRows(2880) # The range of window conditions is 24 hours. + + # Elapsed only support group by tbname. Supported tests have been done in selectTest(). + def groupbyTest(self): + tdSql.execute("use wxy_db") + + tdSql.error("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' group by i") + tdSql.error("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' group by i") + + def orderbyCheck(self, sql, elapsedCol): + resultAsc = tdSql.getResult(sql) + resultdesc = tdSql.getResult(sql + " order by ts desc") + resultRows = len(resultAsc) + for i in range(resultRows): + tdSql.checkEqual(resultAsc[i][elapsedCol], resultdesc[resultRows - i - 1][elapsedCol]) + + def splitStableResult(self, sql, elapsedCol, tbnameCol): + subtable = {} + result = tdSql.getResult(sql) + for i in range(len(result)): + if None == subtable.get(result[i][tbnameCol]): + subtable[result[i][tbnameCol]] = [result[i][elapsedCol]] + else: + subtable[result[i][tbnameCol]].append(result[i][elapsedCol]) + return subtable + + def doOrderbyCheck(self, resultAsc, resultdesc): + resultRows = len(resultAsc) + for i in range(resultRows): + tdSql.checkEqual(resultAsc[i], resultdesc[resultRows - i - 1]) + + def orderbyForStableCheck(self, sql, elapsedCol, tbnameCol): + subtableAsc = self.splitStableResult(sql, elapsedCol, tbnameCol) + subtableDesc = self.splitStableResult(sql + " order by ts desc", elapsedCol, tbnameCol) + for kv in subtableAsc.items(): + descValue = subtableDesc.get(kv[0]) + if None == descValue: + tdLog.exit("%s failed: subtable %s not exists" % (sql)) + else: + self.doOrderbyCheck(kv[1], descValue) + + # Orderby clause only changes the output order and has no effect on the calculation results. + def orderbyTest(self): + tdSql.execute("use wxy_db") + + self.orderbyCheck("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'", 0) + self.orderbyCheck("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(40s)", 1) + self.orderbyCheck("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(1m)", 1) + self.orderbyCheck("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(10m)", 1) + self.orderbyCheck("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(150m)", 1) + self.orderbyCheck("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(222m)", 1) + self.orderbyCheck("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(1000m)", 1) + + self.orderbyForStableCheck("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' group by tbname", 0, 1) + self.orderbyForStableCheck("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(40s) group by tbname", 1, 2) + self.orderbyForStableCheck("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(1m) group by tbname", 1, 2) + self.orderbyForStableCheck("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(10m) group by tbname", 1, 2) + self.orderbyForStableCheck("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(150m) group by tbname", 1, 2) + self.orderbyForStableCheck("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(222m) group by tbname", 1, 2) + self.orderbyForStableCheck("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(1000m) group by tbname", 1, 2) + + def slimitCheck(self, sql): + tdSql.checkEqual(tdSql.query(sql + " slimit 0"), 0) + tdSql.checkEqual(tdSql.query(sql + " slimit 1 soffset 0"), tdSql.query(sql + " slimit 0, 1")) + tdSql.checkEqual(tdSql.query(sql + " slimit 1, 1"), tdSql.query(sql) / 2) + tdSql.checkEqual(tdSql.query(sql + " slimit 10"), tdSql.query(sql)) + + # It has little to do with the elapsed function, so just simple test. + def slimitTest(self): + tdSql.execute("use wxy_db") + + self.slimitCheck("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' group by tbname") + self.slimitCheck("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(40s) group by tbname") + + def limitCheck(self, sql, groupby = 0): + rows = tdSql.query(sql) + if rows > 0: + tdSql.checkEqual(tdSql.query(sql + " limit 0"), 0) + if 1 == groupby: + tdSql.checkEqual(tdSql.query(sql + " limit 1"), 2) + tdSql.checkEqual(tdSql.query(sql + " limit %d offset %d" % (rows / 2, rows / 3)), tdSql.query(sql + " limit %d, %d" % (rows / 3, rows / 2))) + tdSql.checkEqual(tdSql.query(sql + " limit %d" % (rows / 2)), rows) + else: + tdSql.checkEqual(tdSql.query(sql + " limit 1"), 1) + tdSql.checkEqual(tdSql.query(sql + " limit %d offset %d" % (rows / 2, rows / 3)), tdSql.query(sql + " limit %d, %d" % (rows / 3, rows / 2))) + tdSql.checkEqual(tdSql.query(sql + " limit %d" % (rows + 1)), rows) + + # It has little to do with the elapsed function, so just simple test. + def limitTest(self): + tdSql.execute("use wxy_db") + + self.limitCheck("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + self.limitCheck("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(40s)") + + self.limitCheck("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' group by tbname", 1) + self.limitCheck("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(40s) group by tbname", 1) + + def fromCheck(self, sqlTemplate, table): + tdSql.checkEqual(tdSql.getResult(sqlTemplate % table), tdSql.getResult(sqlTemplate % ("(select * from %s)" % table))) + tdSql.query(sqlTemplate % ("(select last(ts) from %s interval(10s))" % table)) + tdSql.query(sqlTemplate % ("(select elapsed(ts) from %s interval(10s))" % table)) + + # It has little to do with the elapsed function, so just simple test. + def fromTest(self): + tdSql.execute("use wxy_db") + + self.fromCheck("select elapsed(ts) from %s where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'", "t1") + self.fromCheck("select elapsed(ts) from %s where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(40s)", "t1") + tdSql.query("select * from (select elapsed(ts) from t1 interval(10s)) where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.query("select * from (select elapsed(ts) from t1)") + # empty table test + tdSql.checkEqual(tdSql.query("select elapsed(ts) from t2"), 0) + tdSql.checkEqual(tdSql.query("select elapsed(ts) from st2 group by tbname"), 0) + tdSql.checkEqual(tdSql.query("select elapsed(ts) from st3 group by tbname"), 0) + # Tags not allowed for table query, so there is no need to test super table. + tdSql.error("select elapsed(ts) from (select * from st1)") + + def joinCheck(self, sqlTemplate, rtable): + tdSql.checkEqual(tdSql.getResult(sqlTemplate % (rtable, "")), tdSql.getResult(sqlTemplate % ("t1, %s t2" % rtable, "t1.ts = t2.ts and "))) + + # It has little to do with the elapsed function, so just simple test. + def joinTest(self): + tdSql.execute("use wxy_db") + + # st1s1 is a subset of t1. + self.joinCheck("select elapsed(ts) from %s where %s ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'", "st1s1") + self.joinCheck("select elapsed(ts) from %s where %s ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(150m)", "st1s1") + # join query does not support group by, so there is no need to test super table. + + def unionAllCheck(self, sql1, sql2): + rows1 = tdSql.query(sql1) + rows2 = tdSql.query(sql2) + tdSql.checkEqual(tdSql.query(sql1 + " union all " + sql2), rows1 + rows2) + + # It has little to do with the elapsed function, so just simple test. + def unionAllTest(self): + tdSql.execute("use wxy_db") + + self.unionAllCheck("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'", + "select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-22 01:00:00'") + self.unionAllCheck("select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(40s)", + "select elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(150m)") + self.unionAllCheck("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' group by tbname", + "select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-22 02:00:00' group by tbname") + self.unionAllCheck("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(1m) group by tbname", + "select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' interval(222m) group by tbname") + + # It has little to do with the elapsed function, so just simple test. + def continuousQueryTest(self): + tdSql.execute("use wxy_db") + + if (self.restart): + tdSql.execute("drop table elapsed_t") + tdSql.execute("drop table elapsed_st") + tdSql.execute("create table elapsed_t as select elapsed(ts) from t1 interval(1m) sliding(30s)") + tdSql.execute("create table elapsed_st as select elapsed(ts) from st1 interval(1m) sliding(30s) group by tbname") + + def selectIllegalTest(self): + tdSql.execute("use wxy_db") + tdSql.error("select elapsed(1) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed('2021-11-18 00:00:10') from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(now) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(i) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(b) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(f) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(d) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(bin) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(s) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(t) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(bl) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(n) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(ts1) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(*) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(ts, '1s') from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(ts, i) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + #tdSql.error("select elapsed(ts, now) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(ts, ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(ts + 1) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(ts, 1b) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(ts, 1u) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(max(ts)) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select distinct elapsed(ts) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select distinct elapsed(ts) from st1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00' group by tbname") + tdSql.error("select elapsed(ts), i from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(ts), ts from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(ts), _c0 from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(ts), top(i, 1) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(ts), bottom(i, 1) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(ts), inerp(i) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(ts), diff(i) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(ts), derivative(i, 1s, 0) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(ts), ceil(i) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(ts), floor(i) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + tdSql.error("select elapsed(ts), round(i) from t1 where ts > '2021-11-22 00:00:00' and ts < '2021-11-23 00:00:00'") + + def run(self): + self.selectTest() + self.whereTest() + self.sessionTest() + self.stateWindowTest() + self.intervalTest() + self.fillTest() + self.groupbyTest() + self.orderbyTest() + self.slimitTest() + self.limitTest() + self.fromTest() + self.joinTest() + self.unionAllTest() + self.continuousQueryTest() diff --git a/tests/pytest/functions/function_elapsed_restart.py b/tests/pytest/functions/function_elapsed_restart.py new file mode 100644 index 0000000000000000000000000000000000000000..8b492267abdd8ea2d2b2fc27ee2e957e1038f48d --- /dev/null +++ b/tests/pytest/functions/function_elapsed_restart.py @@ -0,0 +1,35 @@ +################################################################### +# Copyright (c) 2020 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import taos +from util.log import * +from util.cases import * +from util.sql import * +from functions.function_elapsed_case import * + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + + def run(self): + tdSql.prepare() + ElapsedCase(True).run() + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase())