diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index ed7d4dfc602fde29538183d8f691a24f5d63ec56..837a0ce0054b8fd7cc92a7b164f667cb6841a276 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -138,6 +138,19 @@ typedef struct STSCompInfo { STSBuf *pTSBuf; } STSCompInfo; +typedef struct SRateInfo { + int64_t CorrectionValue; + int64_t firstValue; + TSKEY firstKey; + int64_t lastValue; + TSKEY lastKey; + int8_t hasResult; // flag to denote has value + bool isIRate; // true for IRate functions, false for Rate functions + int64_t num; // for sum/avg + double sum; // for sum/avg +} SRateInfo; + + int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type, int16_t *bytes, int16_t *intermediateResBytes, int16_t extLength, bool isSuperTable) { if (!isValidDataType(dataType, dataBytes)) { @@ -192,7 +205,12 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI *type = TSDB_DATA_TYPE_BINARY; *bytes = sizeof(SAvgInfo); *intermediateResBytes = *bytes; + return TSDB_CODE_SUCCESS; + } else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE) { + *type = TSDB_DATA_TYPE_DOUBLE; + *bytes = sizeof(SRateInfo); + *intermediateResBytes = sizeof(SRateInfo); return TSDB_CODE_SUCCESS; } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { *type = TSDB_DATA_TYPE_BINARY; @@ -253,6 +271,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI *type = TSDB_DATA_TYPE_DOUBLE; *bytes = sizeof(double); *intermediateResBytes = sizeof(SAvgInfo); + } else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE) { + *type = TSDB_DATA_TYPE_DOUBLE; + *bytes = sizeof(double); + *intermediateResBytes = sizeof(SRateInfo); } else if (functionId == TSDB_FUNC_STDDEV) { *type = TSDB_DATA_TYPE_DOUBLE; *bytes = sizeof(double); @@ -4348,6 +4370,462 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) { doFinalizer(pCtx); } +////////////////////////////////////////////////////////////////////////////////////////////// +// RATE functions + +static double do_calc_rate(const SRateInfo* pRateInfo) { + if ((INT64_MIN == pRateInfo->lastKey) || (INT64_MIN == pRateInfo->firstKey) || (pRateInfo->firstKey >= pRateInfo->lastKey)) { + return 0; + } + + int64_t diff = 0; + + if (pRateInfo->isIRate) { + diff = pRateInfo->lastValue; + if (diff >= pRateInfo->firstValue) { + diff -= pRateInfo->firstValue; + } + } else { + diff = pRateInfo->CorrectionValue + pRateInfo->lastValue - pRateInfo->firstValue; + if (diff <= 0) { + return 0; + } + } + + int64_t duration = pRateInfo->lastKey - pRateInfo->firstKey; + duration = (duration + 500) / 1000; + + double resultVal = ((double)diff) / duration; + + pTrace("do_calc_rate() isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64 " resultVal:%f", + pRateInfo->isIRate, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, resultVal); + + return resultVal; +} + + +static bool rate_function_setup(SQLFunctionCtx *pCtx) { + if (!function_setup(pCtx)) { + return false; + } + + SResultInfo *pResInfo = GET_RES_INFO(pCtx); //->aOutputBuf + pCtx->outputBytes; + SRateInfo * pInfo = pResInfo->interResultBuf; + + pInfo->CorrectionValue = 0; + pInfo->firstKey = INT64_MIN; + pInfo->lastKey = INT64_MIN; + pInfo->firstValue = INT64_MIN; + pInfo->lastValue = INT64_MIN; + pInfo->num = 0; + pInfo->sum = 0; + + pInfo->hasResult = 0; + pInfo->isIRate = ((pCtx->functionId == TSDB_FUNC_IRATE) || (pCtx->functionId == TSDB_FUNC_SUM_IRATE) || (pCtx->functionId == TSDB_FUNC_AVG_IRATE)); + return true; +} + + +static void rate_function(SQLFunctionCtx *pCtx) { + + assert(IS_DATA_BLOCK_LOADED(pCtx->blockStatus)); + + int32_t notNullElems = 0; + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + TSKEY *primaryKey = pCtx->ptsList; + + pTrace("%p rate_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull); + + for (int32_t i = 0; i < pCtx->size; ++i) { + char *pData = GET_INPUT_CHAR_INDEX(pCtx, i); + if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { + pTrace("%p rate_function() index of null data:%d", pCtx, i); + continue; + } + + notNullElems++; + + int64_t v = 0; + switch (pCtx->inputType) { + case TSDB_DATA_TYPE_TINYINT: + v = (int64_t)GET_INT8_VAL(pData); + break; + case TSDB_DATA_TYPE_SMALLINT: + v = (int64_t)GET_INT16_VAL(pData); + break; + case TSDB_DATA_TYPE_INT: + v = (int64_t)GET_INT32_VAL(pData); + break; + case TSDB_DATA_TYPE_BIGINT: + v = (int64_t)GET_INT64_VAL(pData); + break; + default: + assert(0); + } + + if ((INT64_MIN == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) { + pRateInfo->firstValue = v; + pRateInfo->firstKey = primaryKey[i]; + + pTrace("firstValue:%" PRId64 " firstKey:%" PRId64, pRateInfo->firstValue, pRateInfo->firstKey); + } + + if (INT64_MIN == pRateInfo->lastValue) { + pRateInfo->lastValue = v; + } else if (v < pRateInfo->lastValue) { + pRateInfo->CorrectionValue += pRateInfo->lastValue; + pTrace("CorrectionValue:%" PRId64, pRateInfo->CorrectionValue); + } + + pRateInfo->lastValue = v; + pRateInfo->lastKey = primaryKey[i]; + pTrace("lastValue:%" PRId64 " lastKey:%" PRId64, pRateInfo->lastValue, pRateInfo->lastKey); + } + + if (!pCtx->hasNull) { + assert(pCtx->size == notNullElems); + } + + SET_VAL(pCtx, notNullElems, 1); + + if (notNullElems > 0) { + pRateInfo->hasResult = DATA_SET_FLAG; + pResInfo->hasResult = DATA_SET_FLAG; + } + + // keep the data into the final output buffer for super table query since this execution may be the last one + if (pResInfo->superTableQ) { + memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo)); + } +} + +static void rate_function_f(SQLFunctionCtx *pCtx, int32_t index) { + void *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { + return; + } + + // NOTE: keep the intermediate result into the interResultBuf + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + TSKEY *primaryKey = pCtx->ptsList; + + int64_t v = 0; + switch (pCtx->inputType) { + case TSDB_DATA_TYPE_TINYINT: + v = (int64_t)GET_INT8_VAL(pData); + break; + case TSDB_DATA_TYPE_SMALLINT: + v = (int64_t)GET_INT16_VAL(pData); + break; + case TSDB_DATA_TYPE_INT: + v = (int64_t)GET_INT32_VAL(pData); + break; + case TSDB_DATA_TYPE_BIGINT: + v = (int64_t)GET_INT64_VAL(pData); + break; + default: + assert(0); + } + + if ((INT64_MIN == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) { + pRateInfo->firstValue = v; + pRateInfo->firstKey = primaryKey[index]; + } + + if (INT64_MIN == pRateInfo->lastValue) { + pRateInfo->lastValue = v; + } else if (v < pRateInfo->lastValue) { + pRateInfo->CorrectionValue += pRateInfo->lastValue; + } + + pRateInfo->lastValue = v; + pRateInfo->lastKey = primaryKey[index]; + + pTrace("====%p rate_function_f() index:%d lastValue:%" PRId64 " lastKey:%" PRId64 " CorrectionValue:%" PRId64, pCtx, index, pRateInfo->lastValue, pRateInfo->lastKey, pRateInfo->CorrectionValue); + + SET_VAL(pCtx, 1, 1); + + // set has result flag + pRateInfo->hasResult = DATA_SET_FLAG; + pResInfo->hasResult = DATA_SET_FLAG; + + // keep the data into the final output buffer for super table query since this execution may be the last one + if (pResInfo->superTableQ) { + memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo)); + } +} + + + +static void rate_func_merge(SQLFunctionCtx *pCtx) { + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + assert(pResInfo->superTableQ); + + pTrace("rate_func_merge() size:%d", pCtx->size); + + //SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + SRateInfo *pBuf = (SRateInfo *)pCtx->aOutputBuf; + char *indicator = pCtx->aInputElemBuf; + + assert(1 == pCtx->size); + + int32_t numOfNotNull = 0; + for (int32_t i = 0; i < pCtx->size; ++i, indicator += sizeof(SRateInfo)) { + SRateInfo *pInput = (SRateInfo *)indicator; + if (DATA_SET_FLAG != pInput->hasResult) { + continue; + } + + numOfNotNull++; + memcpy(pBuf, pInput, sizeof(SRateInfo)); + pTrace("%p rate_func_merge() isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64, + pCtx, pInput->isIRate, pInput->firstKey, pInput->lastKey, pInput->firstValue, pInput->lastValue, pInput->CorrectionValue); + } + + SET_VAL(pCtx, numOfNotNull, 1); + + if (numOfNotNull > 0) { + pBuf->hasResult = DATA_SET_FLAG; + } + + return; +} + + + +static void rate_func_copy(SQLFunctionCtx *pCtx) { + assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY); + + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + memcpy(pResInfo->interResultBuf, pCtx->aInputElemBuf, (size_t)pCtx->inputBytes); + pResInfo->hasResult = ((SRateInfo*)pCtx->aInputElemBuf)->hasResult; + + SRateInfo* pRateInfo = (SRateInfo*)pCtx->aInputElemBuf; + pTrace("%p rate_func_second_merge() firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64 " hasResult:%d", + pCtx, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, pRateInfo->hasResult); +} + + + +static void rate_finalizer(SQLFunctionCtx *pCtx) { + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + + pTrace("%p isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64 " hasResult:%d", + pCtx, pRateInfo->isIRate, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, pRateInfo->hasResult); + + if (pRateInfo->hasResult != DATA_SET_FLAG) { + setNull(pCtx->aOutputBuf, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); + return; + } + + *(double*)pCtx->aOutputBuf = do_calc_rate(pRateInfo); + + pTrace("rate_finalizer() output result:%f", *(double *)pCtx->aOutputBuf); + + // cannot set the numOfIteratedElems again since it is set during previous iteration + pResInfo->numOfRes = 1; + pResInfo->hasResult = DATA_SET_FLAG; + + doFinalizer(pCtx); +} + + +static void irate_function(SQLFunctionCtx *pCtx) { + + assert(IS_DATA_BLOCK_LOADED(pCtx->blockStatus)); + + int32_t notNullElems = 0; + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + TSKEY *primaryKey = pCtx->ptsList; + + pTrace("%p irate_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull); + + if (pCtx->size < 1) { + return; + } + + for (int32_t i = pCtx->size - 1; i >= 0; --i) { + char *pData = GET_INPUT_CHAR_INDEX(pCtx, i); + if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { + pTrace("%p irate_function() index of null data:%d", pCtx, i); + continue; + } + + notNullElems++; + + int64_t v = 0; + switch (pCtx->inputType) { + case TSDB_DATA_TYPE_TINYINT: + v = (int64_t)GET_INT8_VAL(pData); + break; + case TSDB_DATA_TYPE_SMALLINT: + v = (int64_t)GET_INT16_VAL(pData); + break; + case TSDB_DATA_TYPE_INT: + v = (int64_t)GET_INT32_VAL(pData); + break; + case TSDB_DATA_TYPE_BIGINT: + v = (int64_t)GET_INT64_VAL(pData); + break; + default: + assert(0); + } + + // TODO: calc once if only call this function once ???? + if ((INT64_MIN == pRateInfo->lastKey) || (INT64_MIN == pRateInfo->lastValue)) { + pRateInfo->lastValue = v; + pRateInfo->lastKey = primaryKey[i]; + + pTrace("%p irate_function() lastValue:%" PRId64 " lastKey:%" PRId64, pCtx, pRateInfo->lastValue, pRateInfo->lastKey); + continue; + } + + if ((INT64_MIN == pRateInfo->firstKey) || (INT64_MIN == pRateInfo->firstValue)){ + pRateInfo->firstValue = v; + pRateInfo->firstKey = primaryKey[i]; + + pTrace("%p irate_function() firstValue:%" PRId64 " firstKey:%" PRId64, pCtx, pRateInfo->firstValue, pRateInfo->firstKey); + break; + } + } + + SET_VAL(pCtx, notNullElems, 1); + + if (notNullElems > 0) { + pRateInfo->hasResult = DATA_SET_FLAG; + pResInfo->hasResult = DATA_SET_FLAG; + } + + // keep the data into the final output buffer for super table query since this execution may be the last one + if (pResInfo->superTableQ) { + memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo)); + } +} + +static void irate_function_f(SQLFunctionCtx *pCtx, int32_t index) { + void *pData = GET_INPUT_CHAR_INDEX(pCtx, index); + if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { + return; + } + + // NOTE: keep the intermediate result into the interResultBuf + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + TSKEY *primaryKey = pCtx->ptsList; + + int64_t v = 0; + switch (pCtx->inputType) { + case TSDB_DATA_TYPE_TINYINT: + v = (int64_t)GET_INT8_VAL(pData); + break; + case TSDB_DATA_TYPE_SMALLINT: + v = (int64_t)GET_INT16_VAL(pData); + break; + case TSDB_DATA_TYPE_INT: + v = (int64_t)GET_INT32_VAL(pData); + break; + case TSDB_DATA_TYPE_BIGINT: + v = (int64_t)GET_INT64_VAL(pData); + break; + default: + assert(0); + } + + pRateInfo->firstKey = pRateInfo->lastKey; + pRateInfo->firstValue = pRateInfo->lastValue; + + pRateInfo->lastValue = v; + pRateInfo->lastKey = primaryKey[index]; + + pTrace("====%p irate_function_f() index:%d lastValue:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " firstKey:%" PRId64, pCtx, index, pRateInfo->lastValue, pRateInfo->lastKey, pRateInfo->firstValue , pRateInfo->firstKey); + + SET_VAL(pCtx, 1, 1); + + // set has result flag + pRateInfo->hasResult = DATA_SET_FLAG; + pResInfo->hasResult = DATA_SET_FLAG; + + // keep the data into the final output buffer for super table query since this execution may be the last one + if (pResInfo->superTableQ) { + memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo)); + } +} + +static void do_sumrate_merge(SQLFunctionCtx *pCtx) { + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + assert(pResInfo->superTableQ); + + SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + char * input = GET_INPUT_CHAR(pCtx); + + for (int32_t i = 0; i < pCtx->size; ++i, input += pCtx->inputBytes) { + SRateInfo *pInput = (SRateInfo *)input; + + pTrace("%p do_sumrate_merge() hasResult:%d input num:%" PRId64 " input sum:%f total num:%" PRId64 " total sum:%f", pCtx, pInput->hasResult, pInput->num, pInput->sum, pRateInfo->num, pRateInfo->sum); + + if (pInput->hasResult != DATA_SET_FLAG) { + continue; + } else if (pInput->num == 0) { + pRateInfo->sum += do_calc_rate(pInput); + pRateInfo->num++; + } else { + pRateInfo->sum += pInput->sum; + pRateInfo->num += pInput->num; + } + pRateInfo->hasResult = DATA_SET_FLAG; + } + + // if the data set hasResult is not set, the result is null + if (DATA_SET_FLAG == pRateInfo->hasResult) { + pResInfo->hasResult = DATA_SET_FLAG; + SET_VAL(pCtx, pRateInfo->num, 1); + memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo)); + } +} + +static void sumrate_func_merge(SQLFunctionCtx *pCtx) { + pTrace("%p sumrate_func_merge() process ...", pCtx); + do_sumrate_merge(pCtx); +} + +static void sumrate_func_second_merge(SQLFunctionCtx *pCtx) { + pTrace("%p sumrate_func_second_merge() process ...", pCtx); + do_sumrate_merge(pCtx); +} + +static void sumrate_finalizer(SQLFunctionCtx *pCtx) { + SResultInfo *pResInfo = GET_RES_INFO(pCtx); + SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf; + + pTrace("%p sumrate_finalizer() superTableQ:%d num:%" PRId64 " sum:%f hasResult:%d", pCtx, pResInfo->superTableQ, pRateInfo->num, pRateInfo->sum, pRateInfo->hasResult); + + if (pRateInfo->hasResult != DATA_SET_FLAG) { + setNull(pCtx->aOutputBuf, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); + return; + } + + if (pRateInfo->num == 0) { + // from meter + *(double*)pCtx->aOutputBuf = do_calc_rate(pRateInfo); + } else if (pCtx->functionId == TSDB_FUNC_SUM_RATE || pCtx->functionId == TSDB_FUNC_SUM_IRATE) { + *(double*)pCtx->aOutputBuf = pRateInfo->sum; + } else { + *(double*)pCtx->aOutputBuf = pRateInfo->sum / pRateInfo->num; + } + + pResInfo->numOfRes = 1; + pResInfo->hasResult = DATA_SET_FLAG; + doFinalizer(pCtx); +} + + +///////////////////////////////////////////////////////////////////////////////////////////// + + /* * function compatible list. * tag and ts are not involved in the compatibility check @@ -4359,23 +4837,18 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) { * e.g., count/sum/avg/min/max/stddev/percentile/apercentile/first/last... * */ -int32_t funcCompatDefList[28] = { - /* - * count, sum, avg, min, max, stddev, percentile, apercentile, first, last - */ - 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, - - /* - * last_row, top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_z, tag - */ - 4, -1, -1, 1, 1, 1, 1, 1, 1, -1, 1, - - /* - * colprj, tagprj, arithmetic, diff, first_dist, last_dist, interp - */ - 1, 1, 1, -1, 1, 1, 5}; +int32_t funcCompatDefList[] = { + // count, sum, avg, min, max, stddev, percentile, apercentile, first, last + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + // last_row, top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_z + 4, -1, -1, 1, 1, 1, 1, 1, 1, -1, + // tag, colprj, tagprj, arithmetic, diff, first_dist, last_dist, interp rate irate + 1, 1, 1, 1, -1, 1, 1, 5, 1, 1, + // sum_rate, sum_irate, avg_rate, avg_irate + 1, 1, 1, 1, +}; -SQLAggFuncElem aAggs[28] = {{ +SQLAggFuncElem aAggs[] = {{ // 0, count function does not invoke the finalize function "count", TSDB_FUNC_COUNT, @@ -4798,4 +5271,94 @@ SQLAggFuncElem aAggs[28] = {{ noop1, copy_function, no_data_info, + }, + { + // 28 + "rate", + TSDB_FUNC_RATE, + TSDB_FUNC_RATE, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + rate_function_setup, + rate_function, + rate_function_f, + no_next_step, + rate_finalizer, + rate_func_merge, + rate_func_copy, + data_req_load_info, + }, + { + // 29 + "irate", + TSDB_FUNC_IRATE, + TSDB_FUNC_IRATE, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + rate_function_setup, + irate_function, + irate_function_f, + no_next_step, + rate_finalizer, + rate_func_merge, + rate_func_copy, + data_req_load_info, + }, + { + // 30 + "sum_rate", + TSDB_FUNC_SUM_RATE, + TSDB_FUNC_SUM_RATE, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + rate_function_setup, + rate_function, + rate_function_f, + no_next_step, + sumrate_finalizer, + sumrate_func_merge, + sumrate_func_second_merge, + data_req_load_info, + }, + { + // 31 + "sum_irate", + TSDB_FUNC_SUM_IRATE, + TSDB_FUNC_SUM_IRATE, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + rate_function_setup, + irate_function, + irate_function_f, + no_next_step, + sumrate_finalizer, + sumrate_func_merge, + sumrate_func_second_merge, + data_req_load_info, + }, + { + // 32 + "avg_rate", + TSDB_FUNC_AVG_RATE, + TSDB_FUNC_AVG_RATE, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + rate_function_setup, + rate_function, + rate_function_f, + no_next_step, + sumrate_finalizer, + sumrate_func_merge, + sumrate_func_second_merge, + data_req_load_info, + }, + { + // 33 + "avg_irate", + TSDB_FUNC_AVG_IRATE, + TSDB_FUNC_AVG_IRATE, + TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS, + rate_function_setup, + irate_function, + irate_function_f, + no_next_step, + sumrate_finalizer, + sumrate_func_merge, + sumrate_func_second_merge, + data_req_load_info, }}; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index da7e22fe1e42d0dc9b761b9bd13567440d659005..5652dbf58a1e6a9997d5b4c8ca610339f1f8675e 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -1122,7 +1122,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel if (addProjectionExprAndResultField(pQueryInfo, pItem) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; } - } else if (pItem->pNode->nSQLOptr >= TK_COUNT && pItem->pNode->nSQLOptr <= TK_LAST_ROW) { + } else if (pItem->pNode->nSQLOptr >= TK_COUNT && pItem->pNode->nSQLOptr <= TK_AVG_IRATE) { // sql function in selection clause, append sql function info in pSqlCmd structure sequentially if (addExprAndResultField(pQueryInfo, outputIndex, pItem) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; @@ -1504,6 +1504,12 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt } case TK_SUM: case TK_AVG: + case TK_RATE: + case TK_IRATE: + case TK_SUM_RATE: + case TK_SUM_IRATE: + case TK_AVG_RATE: + case TK_AVG_IRATE: case TK_TWA: case TK_MIN: case TK_MAX: @@ -1956,6 +1962,24 @@ int32_t changeFunctionID(int32_t optr, int16_t* functionId) { case TK_AVG: *functionId = TSDB_FUNC_AVG; break; + case TK_RATE: + *functionId = TSDB_FUNC_RATE; + break; + case TK_IRATE: + *functionId = TSDB_FUNC_IRATE; + break; + case TK_SUM_RATE: + *functionId = TSDB_FUNC_SUM_RATE; + break; + case TK_SUM_IRATE: + *functionId = TSDB_FUNC_SUM_IRATE; + break; + case TK_AVG_RATE: + *functionId = TSDB_FUNC_AVG_RATE; + break; + case TK_AVG_IRATE: + *functionId = TSDB_FUNC_AVG_IRATE; + break; case TK_MIN: *functionId = TSDB_FUNC_MIN; break; @@ -2149,7 +2173,8 @@ int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo* pQueryInfo) { int16_t functionId = aAggs[pExpr->functionId].stableFuncId; if ((functionId >= TSDB_FUNC_SUM && functionId <= TSDB_FUNC_TWA) || - (functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_LAST_DST)) { + (functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_LAST_DST) || + (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) { if (getResultDataInfo(pField->type, pField->bytes, functionId, pExpr->param[0].i64Key, &type, &bytes, &intermediateBytes, 0, true) != TSDB_CODE_SUCCESS) { return TSDB_CODE_INVALID_SQL; @@ -2912,7 +2937,7 @@ static int32_t validateSQLExpr(tSQLExpr* pExpr, SQueryInfo* pQueryInfo, SColumnL pList->ids[pList->num++] = index; } else if (pExpr->nSQLOptr == TK_FLOAT && (isnan(pExpr->val.dKey) || isinf(pExpr->val.dKey))) { return TSDB_CODE_INVALID_SQL; - } else if (pExpr->nSQLOptr >= TK_MIN && pExpr->nSQLOptr <= TK_LAST_ROW) { + } else if (pExpr->nSQLOptr >= TK_MIN && pExpr->nSQLOptr <= TK_AVG_IRATE) { return TSDB_CODE_INVALID_SQL; } @@ -2966,8 +2991,8 @@ static bool isValidExpr(tSQLExpr* pLeft, tSQLExpr* pRight, int32_t optr) { * * However, columnA < 4+12 is valid */ - if ((pLeft->nSQLOptr >= TK_COUNT && pLeft->nSQLOptr <= TK_LAST_ROW) || - (pRight->nSQLOptr >= TK_COUNT && pRight->nSQLOptr <= TK_LAST_ROW) || + if ((pLeft->nSQLOptr >= TK_COUNT && pLeft->nSQLOptr <= TK_AVG_IRATE) || + (pRight->nSQLOptr >= TK_COUNT && pRight->nSQLOptr <= TK_AVG_IRATE) || (pLeft->nSQLOptr >= TK_BOOL && pLeft->nSQLOptr <= TK_BINARY && pRight->nSQLOptr >= TK_BOOL && pRight->nSQLOptr <= TK_BINARY)) { return false; diff --git a/src/inc/tsqlfunction.h b/src/inc/tsqlfunction.h index 93f50cf4f3862e7d2747c10399858c3be0562072..2caecb6309d11065653666e786218ca87e5bcce9 100644 --- a/src/inc/tsqlfunction.h +++ b/src/inc/tsqlfunction.h @@ -60,6 +60,13 @@ extern "C" { #define TSDB_FUNC_LAST_DST 26 #define TSDB_FUNC_INTERP 27 +#define TSDB_FUNC_RATE 28 +#define TSDB_FUNC_IRATE 29 +#define TSDB_FUNC_SUM_RATE 30 +#define TSDB_FUNC_SUM_IRATE 31 +#define TSDB_FUNC_AVG_RATE 32 +#define TSDB_FUNC_AVG_IRATE 33 + #define TSDB_FUNCSTATE_SO 0x1U // single output #define TSDB_FUNCSTATE_MO 0x2U // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM #define TSDB_FUNCSTATE_STREAM 0x4U // function avail for stream @@ -287,10 +294,10 @@ typedef struct STwaInfo { } STwaInfo; /* global sql function array */ -extern struct SQLAggFuncElem aAggs[28]; +extern struct SQLAggFuncElem aAggs[]; /* compatible check array list */ -extern int32_t funcCompatDefList[28]; +extern int32_t funcCompatDefList[]; void getStatistics(char *priData, char *data, int32_t size, int32_t numOfRow, int32_t type, int64_t *min, int64_t *max, int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull); diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index 40b65aa16375a81bb4a46f61967b2576d5af6189..cce66786fd993b1b300a5a2abbe27e5f4eff37de 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -126,6 +126,7 @@ static FORCE_INLINE SMeterObj* getMeterObj(void* hashHandle, int32_t sid) { bool isQueryKilled(SQuery* pQuery); bool isFixedOutputQuery(SQuery* pQuery); bool isPointInterpoQuery(SQuery* pQuery); +bool isSumAvgRateQuery(SQuery *pQuery); bool isTopBottomQuery(SQuery* pQuery); bool isFirstLastRowQuery(SQuery* pQuery); bool isTSCompQuery(SQuery* pQuery); diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 33fb3fe760de8b6dc27079d8da231b4291fc2c7e..dd86c6a35c0f8e6f1a81c67bca824b662bb1d647 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -2438,8 +2438,9 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t startQueryTimes // store the first&last timestamp into the intermediate buffer [1], the true // value may be null but timestamp will never be null pCtx->ptsList = (int64_t *)(primaryColumnData + startOffset * TSDB_KEYSIZE); - } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_TWA || - functionId == TSDB_FUNC_DIFF) { + } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || + functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_DIFF || + (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) { /* * leastsquares function needs two columns of input, currently, the x value of linear equation is set to * timestamp column, and the y-value is the column specified in pQuery->pSelectExpr[i].colIdxInBuffer @@ -2723,6 +2724,22 @@ bool isPointInterpoQuery(SQuery *pQuery) { } // TODO REFACTOR:MERGE WITH CLIENT-SIDE FUNCTION +bool isSumAvgRateQuery(SQuery *pQuery) { + for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { + int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; + if (functionId == TSDB_FUNC_TS) { + continue; + } + + if (functionId == TSDB_FUNC_SUM_RATE || functionId == TSDB_FUNC_SUM_IRATE || + functionId == TSDB_FUNC_AVG_RATE || functionId == TSDB_FUNC_AVG_IRATE) { + return true; + } + } + + return false; +} + bool isTopBottomQuery(SQuery *pQuery) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; @@ -4584,7 +4601,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) initSlidingWindowInfo(&pRuntimeEnv->swindowResInfo, 4096, type, pQuery->rowSize, pSupporter->pResult); } - if (pQuery->nAggTimeInterval != 0) { + if (pQuery->nAggTimeInterval != 0 || isSumAvgRateQuery(pQuery)) { // one page for each table at least ret = createResultBuf(&pRuntimeEnv->pResultBuf, pSupporter->numOfMeters, pQuery->rowSize); if (ret != TSDB_CODE_SUCCESS) { diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index 1b04806f7c4e68f8a6c3f98df82f5941414f700f..c243a78e837cbc0f1ad60d83a7786da3aae54d3a 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -398,7 +398,7 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo restoreIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); - if (pQuery->nAggTimeInterval == 0) { // normal query + if (pQuery->nAggTimeInterval == 0 && !isSumAvgRateQuery(pQuery)) { // normal query if ((pQuery->lastKey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || (pQuery->lastKey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { qTrace( @@ -964,7 +964,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { return; } - if (pQuery->nAggTimeInterval > 0) { + if (pQuery->nAggTimeInterval > 0 || isSumAvgRateQuery(pQuery)) { assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0); if (mergeMetersResultToOneGroups(pSupporter) == TSDB_CODE_SUCCESS) { diff --git a/src/util/src/ttokenizer.c b/src/util/src/ttokenizer.c index d4f3bd6879dae6b9e8573a9230f39fe3405b5927..7cbb4552b410536176f5e69d9ee9336af197d94f 100644 --- a/src/util/src/ttokenizer.c +++ b/src/util/src/ttokenizer.c @@ -231,6 +231,7 @@ static SKeyword keywordTable[] = { {"RATE", TK_RATE}, {"IRATE", TK_IRATE}, {"SUM_RATE", TK_SUM_RATE}, + {"SUM_IRATE", TK_SUM_IRATE}, {"AVG_RATE", TK_AVG_RATE}, {"AVG_IRATE", TK_AVG_IRATE}, };