提交 87efc517 编写于 作者: L lihui

[TBASE-1462]

上级 5b4c77e3
...@@ -138,6 +138,19 @@ typedef struct STSCompInfo { ...@@ -138,6 +138,19 @@ typedef struct STSCompInfo {
STSBuf *pTSBuf; STSBuf *pTSBuf;
} STSCompInfo; } STSCompInfo;
typedef struct SRateInfo {
int64_t CorrectionValue;
int64_t firstValue;
TSKEY firstKey;
int64_t lastValue;
TSKEY lastKey;
int8_t hasResult; // flag to denote has value
bool isIRate; // true for IRate functions, false for Rate functions
int64_t num; // for sum/avg
double sum; // for sum/avg
} SRateInfo;
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type, int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
int16_t *bytes, int16_t *intermediateResBytes, int16_t extLength, bool isSuperTable) { int16_t *bytes, int16_t *intermediateResBytes, int16_t extLength, bool isSuperTable) {
if (!isValidDataType(dataType, dataBytes)) { if (!isValidDataType(dataType, dataBytes)) {
...@@ -192,7 +205,12 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -192,7 +205,12 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*type = TSDB_DATA_TYPE_BINARY; *type = TSDB_DATA_TYPE_BINARY;
*bytes = sizeof(SAvgInfo); *bytes = sizeof(SAvgInfo);
*intermediateResBytes = *bytes; *intermediateResBytes = *bytes;
return TSDB_CODE_SUCCESS;
} else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(SRateInfo);
*intermediateResBytes = sizeof(SRateInfo);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) { } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
*type = TSDB_DATA_TYPE_BINARY; *type = TSDB_DATA_TYPE_BINARY;
...@@ -253,6 +271,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI ...@@ -253,6 +271,10 @@ int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionI
*type = TSDB_DATA_TYPE_DOUBLE; *type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double); *bytes = sizeof(double);
*intermediateResBytes = sizeof(SAvgInfo); *intermediateResBytes = sizeof(SAvgInfo);
} else if (functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE) {
*type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double);
*intermediateResBytes = sizeof(SRateInfo);
} else if (functionId == TSDB_FUNC_STDDEV) { } else if (functionId == TSDB_FUNC_STDDEV) {
*type = TSDB_DATA_TYPE_DOUBLE; *type = TSDB_DATA_TYPE_DOUBLE;
*bytes = sizeof(double); *bytes = sizeof(double);
...@@ -4348,6 +4370,462 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) { ...@@ -4348,6 +4370,462 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) {
doFinalizer(pCtx); doFinalizer(pCtx);
} }
//////////////////////////////////////////////////////////////////////////////////////////////
// RATE functions
static double do_calc_rate(const SRateInfo* pRateInfo) {
if ((INT64_MIN == pRateInfo->lastKey) || (INT64_MIN == pRateInfo->firstKey) || (pRateInfo->firstKey >= pRateInfo->lastKey)) {
return 0;
}
int64_t diff = 0;
if (pRateInfo->isIRate) {
diff = pRateInfo->lastValue;
if (diff >= pRateInfo->firstValue) {
diff -= pRateInfo->firstValue;
}
} else {
diff = pRateInfo->CorrectionValue + pRateInfo->lastValue - pRateInfo->firstValue;
if (diff <= 0) {
return 0;
}
}
int64_t duration = pRateInfo->lastKey - pRateInfo->firstKey;
duration = (duration + 500) / 1000;
double resultVal = ((double)diff) / duration;
pTrace("do_calc_rate() isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64 " resultVal:%f",
pRateInfo->isIRate, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, resultVal);
return resultVal;
}
static bool rate_function_setup(SQLFunctionCtx *pCtx) {
if (!function_setup(pCtx)) {
return false;
}
SResultInfo *pResInfo = GET_RES_INFO(pCtx); //->aOutputBuf + pCtx->outputBytes;
SRateInfo * pInfo = pResInfo->interResultBuf;
pInfo->CorrectionValue = 0;
pInfo->firstKey = INT64_MIN;
pInfo->lastKey = INT64_MIN;
pInfo->firstValue = INT64_MIN;
pInfo->lastValue = INT64_MIN;
pInfo->num = 0;
pInfo->sum = 0;
pInfo->hasResult = 0;
pInfo->isIRate = ((pCtx->functionId == TSDB_FUNC_IRATE) || (pCtx->functionId == TSDB_FUNC_SUM_IRATE) || (pCtx->functionId == TSDB_FUNC_AVG_IRATE));
return true;
}
static void rate_function(SQLFunctionCtx *pCtx) {
assert(IS_DATA_BLOCK_LOADED(pCtx->blockStatus));
int32_t notNullElems = 0;
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf;
TSKEY *primaryKey = pCtx->ptsList;
pTrace("%p rate_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull);
for (int32_t i = 0; i < pCtx->size; ++i) {
char *pData = GET_INPUT_CHAR_INDEX(pCtx, i);
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
pTrace("%p rate_function() index of null data:%d", pCtx, i);
continue;
}
notNullElems++;
int64_t v = 0;
switch (pCtx->inputType) {
case TSDB_DATA_TYPE_TINYINT:
v = (int64_t)GET_INT8_VAL(pData);
break;
case TSDB_DATA_TYPE_SMALLINT:
v = (int64_t)GET_INT16_VAL(pData);
break;
case TSDB_DATA_TYPE_INT:
v = (int64_t)GET_INT32_VAL(pData);
break;
case TSDB_DATA_TYPE_BIGINT:
v = (int64_t)GET_INT64_VAL(pData);
break;
default:
assert(0);
}
if ((INT64_MIN == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) {
pRateInfo->firstValue = v;
pRateInfo->firstKey = primaryKey[i];
pTrace("firstValue:%" PRId64 " firstKey:%" PRId64, pRateInfo->firstValue, pRateInfo->firstKey);
}
if (INT64_MIN == pRateInfo->lastValue) {
pRateInfo->lastValue = v;
} else if (v < pRateInfo->lastValue) {
pRateInfo->CorrectionValue += pRateInfo->lastValue;
pTrace("CorrectionValue:%" PRId64, pRateInfo->CorrectionValue);
}
pRateInfo->lastValue = v;
pRateInfo->lastKey = primaryKey[i];
pTrace("lastValue:%" PRId64 " lastKey:%" PRId64, pRateInfo->lastValue, pRateInfo->lastKey);
}
if (!pCtx->hasNull) {
assert(pCtx->size == notNullElems);
}
SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) {
pRateInfo->hasResult = DATA_SET_FLAG;
pResInfo->hasResult = DATA_SET_FLAG;
}
// keep the data into the final output buffer for super table query since this execution may be the last one
if (pResInfo->superTableQ) {
memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo));
}
}
static void rate_function_f(SQLFunctionCtx *pCtx, int32_t index) {
void *pData = GET_INPUT_CHAR_INDEX(pCtx, index);
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
return;
}
// NOTE: keep the intermediate result into the interResultBuf
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf;
TSKEY *primaryKey = pCtx->ptsList;
int64_t v = 0;
switch (pCtx->inputType) {
case TSDB_DATA_TYPE_TINYINT:
v = (int64_t)GET_INT8_VAL(pData);
break;
case TSDB_DATA_TYPE_SMALLINT:
v = (int64_t)GET_INT16_VAL(pData);
break;
case TSDB_DATA_TYPE_INT:
v = (int64_t)GET_INT32_VAL(pData);
break;
case TSDB_DATA_TYPE_BIGINT:
v = (int64_t)GET_INT64_VAL(pData);
break;
default:
assert(0);
}
if ((INT64_MIN == pRateInfo->firstValue) || (INT64_MIN == pRateInfo->firstKey)) {
pRateInfo->firstValue = v;
pRateInfo->firstKey = primaryKey[index];
}
if (INT64_MIN == pRateInfo->lastValue) {
pRateInfo->lastValue = v;
} else if (v < pRateInfo->lastValue) {
pRateInfo->CorrectionValue += pRateInfo->lastValue;
}
pRateInfo->lastValue = v;
pRateInfo->lastKey = primaryKey[index];
pTrace("====%p rate_function_f() index:%d lastValue:%" PRId64 " lastKey:%" PRId64 " CorrectionValue:%" PRId64, pCtx, index, pRateInfo->lastValue, pRateInfo->lastKey, pRateInfo->CorrectionValue);
SET_VAL(pCtx, 1, 1);
// set has result flag
pRateInfo->hasResult = DATA_SET_FLAG;
pResInfo->hasResult = DATA_SET_FLAG;
// keep the data into the final output buffer for super table query since this execution may be the last one
if (pResInfo->superTableQ) {
memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo));
}
}
static void rate_func_merge(SQLFunctionCtx *pCtx) {
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
assert(pResInfo->superTableQ);
pTrace("rate_func_merge() size:%d", pCtx->size);
//SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf;
SRateInfo *pBuf = (SRateInfo *)pCtx->aOutputBuf;
char *indicator = pCtx->aInputElemBuf;
assert(1 == pCtx->size);
int32_t numOfNotNull = 0;
for (int32_t i = 0; i < pCtx->size; ++i, indicator += sizeof(SRateInfo)) {
SRateInfo *pInput = (SRateInfo *)indicator;
if (DATA_SET_FLAG != pInput->hasResult) {
continue;
}
numOfNotNull++;
memcpy(pBuf, pInput, sizeof(SRateInfo));
pTrace("%p rate_func_merge() isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64,
pCtx, pInput->isIRate, pInput->firstKey, pInput->lastKey, pInput->firstValue, pInput->lastValue, pInput->CorrectionValue);
}
SET_VAL(pCtx, numOfNotNull, 1);
if (numOfNotNull > 0) {
pBuf->hasResult = DATA_SET_FLAG;
}
return;
}
static void rate_func_copy(SQLFunctionCtx *pCtx) {
assert(pCtx->inputType == TSDB_DATA_TYPE_BINARY);
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
memcpy(pResInfo->interResultBuf, pCtx->aInputElemBuf, (size_t)pCtx->inputBytes);
pResInfo->hasResult = ((SRateInfo*)pCtx->aInputElemBuf)->hasResult;
SRateInfo* pRateInfo = (SRateInfo*)pCtx->aInputElemBuf;
pTrace("%p rate_func_second_merge() firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64 " hasResult:%d",
pCtx, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, pRateInfo->hasResult);
}
static void rate_finalizer(SQLFunctionCtx *pCtx) {
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf;
pTrace("%p isIRate:%d firstKey:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " lastValue:%" PRId64 " CorrectionValue:%" PRId64 " hasResult:%d",
pCtx, pRateInfo->isIRate, pRateInfo->firstKey, pRateInfo->lastKey, pRateInfo->firstValue, pRateInfo->lastValue, pRateInfo->CorrectionValue, pRateInfo->hasResult);
if (pRateInfo->hasResult != DATA_SET_FLAG) {
setNull(pCtx->aOutputBuf, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
return;
}
*(double*)pCtx->aOutputBuf = do_calc_rate(pRateInfo);
pTrace("rate_finalizer() output result:%f", *(double *)pCtx->aOutputBuf);
// cannot set the numOfIteratedElems again since it is set during previous iteration
pResInfo->numOfRes = 1;
pResInfo->hasResult = DATA_SET_FLAG;
doFinalizer(pCtx);
}
static void irate_function(SQLFunctionCtx *pCtx) {
assert(IS_DATA_BLOCK_LOADED(pCtx->blockStatus));
int32_t notNullElems = 0;
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf;
TSKEY *primaryKey = pCtx->ptsList;
pTrace("%p irate_function() size:%d, hasNull:%d", pCtx, pCtx->size, pCtx->hasNull);
if (pCtx->size < 1) {
return;
}
for (int32_t i = pCtx->size - 1; i >= 0; --i) {
char *pData = GET_INPUT_CHAR_INDEX(pCtx, i);
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
pTrace("%p irate_function() index of null data:%d", pCtx, i);
continue;
}
notNullElems++;
int64_t v = 0;
switch (pCtx->inputType) {
case TSDB_DATA_TYPE_TINYINT:
v = (int64_t)GET_INT8_VAL(pData);
break;
case TSDB_DATA_TYPE_SMALLINT:
v = (int64_t)GET_INT16_VAL(pData);
break;
case TSDB_DATA_TYPE_INT:
v = (int64_t)GET_INT32_VAL(pData);
break;
case TSDB_DATA_TYPE_BIGINT:
v = (int64_t)GET_INT64_VAL(pData);
break;
default:
assert(0);
}
// TODO: calc once if only call this function once ????
if ((INT64_MIN == pRateInfo->lastKey) || (INT64_MIN == pRateInfo->lastValue)) {
pRateInfo->lastValue = v;
pRateInfo->lastKey = primaryKey[i];
pTrace("%p irate_function() lastValue:%" PRId64 " lastKey:%" PRId64, pCtx, pRateInfo->lastValue, pRateInfo->lastKey);
continue;
}
if ((INT64_MIN == pRateInfo->firstKey) || (INT64_MIN == pRateInfo->firstValue)){
pRateInfo->firstValue = v;
pRateInfo->firstKey = primaryKey[i];
pTrace("%p irate_function() firstValue:%" PRId64 " firstKey:%" PRId64, pCtx, pRateInfo->firstValue, pRateInfo->firstKey);
break;
}
}
SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) {
pRateInfo->hasResult = DATA_SET_FLAG;
pResInfo->hasResult = DATA_SET_FLAG;
}
// keep the data into the final output buffer for super table query since this execution may be the last one
if (pResInfo->superTableQ) {
memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo));
}
}
static void irate_function_f(SQLFunctionCtx *pCtx, int32_t index) {
void *pData = GET_INPUT_CHAR_INDEX(pCtx, index);
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
return;
}
// NOTE: keep the intermediate result into the interResultBuf
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf;
TSKEY *primaryKey = pCtx->ptsList;
int64_t v = 0;
switch (pCtx->inputType) {
case TSDB_DATA_TYPE_TINYINT:
v = (int64_t)GET_INT8_VAL(pData);
break;
case TSDB_DATA_TYPE_SMALLINT:
v = (int64_t)GET_INT16_VAL(pData);
break;
case TSDB_DATA_TYPE_INT:
v = (int64_t)GET_INT32_VAL(pData);
break;
case TSDB_DATA_TYPE_BIGINT:
v = (int64_t)GET_INT64_VAL(pData);
break;
default:
assert(0);
}
pRateInfo->firstKey = pRateInfo->lastKey;
pRateInfo->firstValue = pRateInfo->lastValue;
pRateInfo->lastValue = v;
pRateInfo->lastKey = primaryKey[index];
pTrace("====%p irate_function_f() index:%d lastValue:%" PRId64 " lastKey:%" PRId64 " firstValue:%" PRId64 " firstKey:%" PRId64, pCtx, index, pRateInfo->lastValue, pRateInfo->lastKey, pRateInfo->firstValue , pRateInfo->firstKey);
SET_VAL(pCtx, 1, 1);
// set has result flag
pRateInfo->hasResult = DATA_SET_FLAG;
pResInfo->hasResult = DATA_SET_FLAG;
// keep the data into the final output buffer for super table query since this execution may be the last one
if (pResInfo->superTableQ) {
memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo));
}
}
static void do_sumrate_merge(SQLFunctionCtx *pCtx) {
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
assert(pResInfo->superTableQ);
SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf;
char * input = GET_INPUT_CHAR(pCtx);
for (int32_t i = 0; i < pCtx->size; ++i, input += pCtx->inputBytes) {
SRateInfo *pInput = (SRateInfo *)input;
pTrace("%p do_sumrate_merge() hasResult:%d input num:%" PRId64 " input sum:%f total num:%" PRId64 " total sum:%f", pCtx, pInput->hasResult, pInput->num, pInput->sum, pRateInfo->num, pRateInfo->sum);
if (pInput->hasResult != DATA_SET_FLAG) {
continue;
} else if (pInput->num == 0) {
pRateInfo->sum += do_calc_rate(pInput);
pRateInfo->num++;
} else {
pRateInfo->sum += pInput->sum;
pRateInfo->num += pInput->num;
}
pRateInfo->hasResult = DATA_SET_FLAG;
}
// if the data set hasResult is not set, the result is null
if (DATA_SET_FLAG == pRateInfo->hasResult) {
pResInfo->hasResult = DATA_SET_FLAG;
SET_VAL(pCtx, pRateInfo->num, 1);
memcpy(pCtx->aOutputBuf, pResInfo->interResultBuf, sizeof(SRateInfo));
}
}
static void sumrate_func_merge(SQLFunctionCtx *pCtx) {
pTrace("%p sumrate_func_merge() process ...", pCtx);
do_sumrate_merge(pCtx);
}
static void sumrate_func_second_merge(SQLFunctionCtx *pCtx) {
pTrace("%p sumrate_func_second_merge() process ...", pCtx);
do_sumrate_merge(pCtx);
}
static void sumrate_finalizer(SQLFunctionCtx *pCtx) {
SResultInfo *pResInfo = GET_RES_INFO(pCtx);
SRateInfo *pRateInfo = (SRateInfo *)pResInfo->interResultBuf;
pTrace("%p sumrate_finalizer() superTableQ:%d num:%" PRId64 " sum:%f hasResult:%d", pCtx, pResInfo->superTableQ, pRateInfo->num, pRateInfo->sum, pRateInfo->hasResult);
if (pRateInfo->hasResult != DATA_SET_FLAG) {
setNull(pCtx->aOutputBuf, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
return;
}
if (pRateInfo->num == 0) {
// from meter
*(double*)pCtx->aOutputBuf = do_calc_rate(pRateInfo);
} else if (pCtx->functionId == TSDB_FUNC_SUM_RATE || pCtx->functionId == TSDB_FUNC_SUM_IRATE) {
*(double*)pCtx->aOutputBuf = pRateInfo->sum;
} else {
*(double*)pCtx->aOutputBuf = pRateInfo->sum / pRateInfo->num;
}
pResInfo->numOfRes = 1;
pResInfo->hasResult = DATA_SET_FLAG;
doFinalizer(pCtx);
}
/////////////////////////////////////////////////////////////////////////////////////////////
/* /*
* function compatible list. * function compatible list.
* tag and ts are not involved in the compatibility check * tag and ts are not involved in the compatibility check
...@@ -4359,23 +4837,18 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) { ...@@ -4359,23 +4837,18 @@ static void ts_comp_finalize(SQLFunctionCtx *pCtx) {
* e.g., count/sum/avg/min/max/stddev/percentile/apercentile/first/last... * e.g., count/sum/avg/min/max/stddev/percentile/apercentile/first/last...
* *
*/ */
int32_t funcCompatDefList[28] = { int32_t funcCompatDefList[] = {
/* // count, sum, avg, min, max, stddev, percentile, apercentile, first, last
* count, sum, avg, min, max, stddev, percentile, apercentile, first, last 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
*/ // last_row, top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_z
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 4, -1, -1, 1, 1, 1, 1, 1, 1, -1,
// tag, colprj, tagprj, arithmetic, diff, first_dist, last_dist, interp rate irate
/* 1, 1, 1, 1, -1, 1, 1, 5, 1, 1,
* last_row, top, bottom, spread, twa, leastsqr, ts, ts_dummy, tag_dummy, ts_z, tag // sum_rate, sum_irate, avg_rate, avg_irate
*/ 1, 1, 1, 1,
4, -1, -1, 1, 1, 1, 1, 1, 1, -1, 1, };
/*
* colprj, tagprj, arithmetic, diff, first_dist, last_dist, interp
*/
1, 1, 1, -1, 1, 1, 5};
SQLAggFuncElem aAggs[28] = {{ SQLAggFuncElem aAggs[] = {{
// 0, count function does not invoke the finalize function // 0, count function does not invoke the finalize function
"count", "count",
TSDB_FUNC_COUNT, TSDB_FUNC_COUNT,
...@@ -4798,4 +5271,94 @@ SQLAggFuncElem aAggs[28] = {{ ...@@ -4798,4 +5271,94 @@ SQLAggFuncElem aAggs[28] = {{
noop1, noop1,
copy_function, copy_function,
no_data_info, no_data_info,
},
{
// 28
"rate",
TSDB_FUNC_RATE,
TSDB_FUNC_RATE,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS,
rate_function_setup,
rate_function,
rate_function_f,
no_next_step,
rate_finalizer,
rate_func_merge,
rate_func_copy,
data_req_load_info,
},
{
// 29
"irate",
TSDB_FUNC_IRATE,
TSDB_FUNC_IRATE,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS,
rate_function_setup,
irate_function,
irate_function_f,
no_next_step,
rate_finalizer,
rate_func_merge,
rate_func_copy,
data_req_load_info,
},
{
// 30
"sum_rate",
TSDB_FUNC_SUM_RATE,
TSDB_FUNC_SUM_RATE,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS,
rate_function_setup,
rate_function,
rate_function_f,
no_next_step,
sumrate_finalizer,
sumrate_func_merge,
sumrate_func_second_merge,
data_req_load_info,
},
{
// 31
"sum_irate",
TSDB_FUNC_SUM_IRATE,
TSDB_FUNC_SUM_IRATE,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS,
rate_function_setup,
irate_function,
irate_function_f,
no_next_step,
sumrate_finalizer,
sumrate_func_merge,
sumrate_func_second_merge,
data_req_load_info,
},
{
// 32
"avg_rate",
TSDB_FUNC_AVG_RATE,
TSDB_FUNC_AVG_RATE,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS,
rate_function_setup,
rate_function,
rate_function_f,
no_next_step,
sumrate_finalizer,
sumrate_func_merge,
sumrate_func_second_merge,
data_req_load_info,
},
{
// 33
"avg_irate",
TSDB_FUNC_AVG_IRATE,
TSDB_FUNC_AVG_IRATE,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS,
rate_function_setup,
irate_function,
irate_function_f,
no_next_step,
sumrate_finalizer,
sumrate_func_merge,
sumrate_func_second_merge,
data_req_load_info,
}}; }};
...@@ -1122,7 +1122,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel ...@@ -1122,7 +1122,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
if (addProjectionExprAndResultField(pQueryInfo, pItem) != TSDB_CODE_SUCCESS) { if (addProjectionExprAndResultField(pQueryInfo, pItem) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }
} else if (pItem->pNode->nSQLOptr >= TK_COUNT && pItem->pNode->nSQLOptr <= TK_LAST_ROW) { } else if (pItem->pNode->nSQLOptr >= TK_COUNT && pItem->pNode->nSQLOptr <= TK_AVG_IRATE) {
// sql function in selection clause, append sql function info in pSqlCmd structure sequentially // sql function in selection clause, append sql function info in pSqlCmd structure sequentially
if (addExprAndResultField(pQueryInfo, outputIndex, pItem) != TSDB_CODE_SUCCESS) { if (addExprAndResultField(pQueryInfo, outputIndex, pItem) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
...@@ -1504,6 +1504,12 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt ...@@ -1504,6 +1504,12 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprIt
} }
case TK_SUM: case TK_SUM:
case TK_AVG: case TK_AVG:
case TK_RATE:
case TK_IRATE:
case TK_SUM_RATE:
case TK_SUM_IRATE:
case TK_AVG_RATE:
case TK_AVG_IRATE:
case TK_TWA: case TK_TWA:
case TK_MIN: case TK_MIN:
case TK_MAX: case TK_MAX:
...@@ -1956,6 +1962,24 @@ int32_t changeFunctionID(int32_t optr, int16_t* functionId) { ...@@ -1956,6 +1962,24 @@ int32_t changeFunctionID(int32_t optr, int16_t* functionId) {
case TK_AVG: case TK_AVG:
*functionId = TSDB_FUNC_AVG; *functionId = TSDB_FUNC_AVG;
break; break;
case TK_RATE:
*functionId = TSDB_FUNC_RATE;
break;
case TK_IRATE:
*functionId = TSDB_FUNC_IRATE;
break;
case TK_SUM_RATE:
*functionId = TSDB_FUNC_SUM_RATE;
break;
case TK_SUM_IRATE:
*functionId = TSDB_FUNC_SUM_IRATE;
break;
case TK_AVG_RATE:
*functionId = TSDB_FUNC_AVG_RATE;
break;
case TK_AVG_IRATE:
*functionId = TSDB_FUNC_AVG_IRATE;
break;
case TK_MIN: case TK_MIN:
*functionId = TSDB_FUNC_MIN; *functionId = TSDB_FUNC_MIN;
break; break;
...@@ -2149,7 +2173,8 @@ int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo* pQueryInfo) { ...@@ -2149,7 +2173,8 @@ int32_t tscTansformSQLFunctionForSTableQuery(SQueryInfo* pQueryInfo) {
int16_t functionId = aAggs[pExpr->functionId].stableFuncId; int16_t functionId = aAggs[pExpr->functionId].stableFuncId;
if ((functionId >= TSDB_FUNC_SUM && functionId <= TSDB_FUNC_TWA) || if ((functionId >= TSDB_FUNC_SUM && functionId <= TSDB_FUNC_TWA) ||
(functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_LAST_DST)) { (functionId >= TSDB_FUNC_FIRST_DST && functionId <= TSDB_FUNC_LAST_DST) ||
(functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) {
if (getResultDataInfo(pField->type, pField->bytes, functionId, pExpr->param[0].i64Key, &type, &bytes, if (getResultDataInfo(pField->type, pField->bytes, functionId, pExpr->param[0].i64Key, &type, &bytes,
&intermediateBytes, 0, true) != TSDB_CODE_SUCCESS) { &intermediateBytes, 0, true) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
...@@ -2912,7 +2937,7 @@ static int32_t validateSQLExpr(tSQLExpr* pExpr, SQueryInfo* pQueryInfo, SColumnL ...@@ -2912,7 +2937,7 @@ static int32_t validateSQLExpr(tSQLExpr* pExpr, SQueryInfo* pQueryInfo, SColumnL
pList->ids[pList->num++] = index; pList->ids[pList->num++] = index;
} else if (pExpr->nSQLOptr == TK_FLOAT && (isnan(pExpr->val.dKey) || isinf(pExpr->val.dKey))) { } else if (pExpr->nSQLOptr == TK_FLOAT && (isnan(pExpr->val.dKey) || isinf(pExpr->val.dKey))) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} else if (pExpr->nSQLOptr >= TK_MIN && pExpr->nSQLOptr <= TK_LAST_ROW) { } else if (pExpr->nSQLOptr >= TK_MIN && pExpr->nSQLOptr <= TK_AVG_IRATE) {
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} }
...@@ -2966,8 +2991,8 @@ static bool isValidExpr(tSQLExpr* pLeft, tSQLExpr* pRight, int32_t optr) { ...@@ -2966,8 +2991,8 @@ static bool isValidExpr(tSQLExpr* pLeft, tSQLExpr* pRight, int32_t optr) {
* *
* However, columnA < 4+12 is valid * However, columnA < 4+12 is valid
*/ */
if ((pLeft->nSQLOptr >= TK_COUNT && pLeft->nSQLOptr <= TK_LAST_ROW) || if ((pLeft->nSQLOptr >= TK_COUNT && pLeft->nSQLOptr <= TK_AVG_IRATE) ||
(pRight->nSQLOptr >= TK_COUNT && pRight->nSQLOptr <= TK_LAST_ROW) || (pRight->nSQLOptr >= TK_COUNT && pRight->nSQLOptr <= TK_AVG_IRATE) ||
(pLeft->nSQLOptr >= TK_BOOL && pLeft->nSQLOptr <= TK_BINARY && pRight->nSQLOptr >= TK_BOOL && (pLeft->nSQLOptr >= TK_BOOL && pLeft->nSQLOptr <= TK_BINARY && pRight->nSQLOptr >= TK_BOOL &&
pRight->nSQLOptr <= TK_BINARY)) { pRight->nSQLOptr <= TK_BINARY)) {
return false; return false;
......
...@@ -60,6 +60,13 @@ extern "C" { ...@@ -60,6 +60,13 @@ extern "C" {
#define TSDB_FUNC_LAST_DST 26 #define TSDB_FUNC_LAST_DST 26
#define TSDB_FUNC_INTERP 27 #define TSDB_FUNC_INTERP 27
#define TSDB_FUNC_RATE 28
#define TSDB_FUNC_IRATE 29
#define TSDB_FUNC_SUM_RATE 30
#define TSDB_FUNC_SUM_IRATE 31
#define TSDB_FUNC_AVG_RATE 32
#define TSDB_FUNC_AVG_IRATE 33
#define TSDB_FUNCSTATE_SO 0x1U // single output #define TSDB_FUNCSTATE_SO 0x1U // single output
#define TSDB_FUNCSTATE_MO 0x2U // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM #define TSDB_FUNCSTATE_MO 0x2U // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM
#define TSDB_FUNCSTATE_STREAM 0x4U // function avail for stream #define TSDB_FUNCSTATE_STREAM 0x4U // function avail for stream
...@@ -287,10 +294,10 @@ typedef struct STwaInfo { ...@@ -287,10 +294,10 @@ typedef struct STwaInfo {
} STwaInfo; } STwaInfo;
/* global sql function array */ /* global sql function array */
extern struct SQLAggFuncElem aAggs[28]; extern struct SQLAggFuncElem aAggs[];
/* compatible check array list */ /* compatible check array list */
extern int32_t funcCompatDefList[28]; extern int32_t funcCompatDefList[];
void getStatistics(char *priData, char *data, int32_t size, int32_t numOfRow, int32_t type, int64_t *min, int64_t *max, void getStatistics(char *priData, char *data, int32_t size, int32_t numOfRow, int32_t type, int64_t *min, int64_t *max,
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull); int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull);
......
...@@ -126,6 +126,7 @@ static FORCE_INLINE SMeterObj* getMeterObj(void* hashHandle, int32_t sid) { ...@@ -126,6 +126,7 @@ static FORCE_INLINE SMeterObj* getMeterObj(void* hashHandle, int32_t sid) {
bool isQueryKilled(SQuery* pQuery); bool isQueryKilled(SQuery* pQuery);
bool isFixedOutputQuery(SQuery* pQuery); bool isFixedOutputQuery(SQuery* pQuery);
bool isPointInterpoQuery(SQuery* pQuery); bool isPointInterpoQuery(SQuery* pQuery);
bool isSumAvgRateQuery(SQuery *pQuery);
bool isTopBottomQuery(SQuery* pQuery); bool isTopBottomQuery(SQuery* pQuery);
bool isFirstLastRowQuery(SQuery* pQuery); bool isFirstLastRowQuery(SQuery* pQuery);
bool isTSCompQuery(SQuery* pQuery); bool isTSCompQuery(SQuery* pQuery);
......
...@@ -2438,8 +2438,9 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t startQueryTimes ...@@ -2438,8 +2438,9 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, int64_t startQueryTimes
// store the first&last timestamp into the intermediate buffer [1], the true // store the first&last timestamp into the intermediate buffer [1], the true
// value may be null but timestamp will never be null // value may be null but timestamp will never be null
pCtx->ptsList = (int64_t *)(primaryColumnData + startOffset * TSDB_KEYSIZE); pCtx->ptsList = (int64_t *)(primaryColumnData + startOffset * TSDB_KEYSIZE);
} else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_TWA || } else if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM ||
functionId == TSDB_FUNC_DIFF) { functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_DIFF ||
(functionId >= TSDB_FUNC_RATE && functionId <= TSDB_FUNC_AVG_IRATE)) {
/* /*
* leastsquares function needs two columns of input, currently, the x value of linear equation is set to * leastsquares function needs two columns of input, currently, the x value of linear equation is set to
* timestamp column, and the y-value is the column specified in pQuery->pSelectExpr[i].colIdxInBuffer * timestamp column, and the y-value is the column specified in pQuery->pSelectExpr[i].colIdxInBuffer
...@@ -2723,6 +2724,22 @@ bool isPointInterpoQuery(SQuery *pQuery) { ...@@ -2723,6 +2724,22 @@ bool isPointInterpoQuery(SQuery *pQuery) {
} }
// TODO REFACTOR:MERGE WITH CLIENT-SIDE FUNCTION // TODO REFACTOR:MERGE WITH CLIENT-SIDE FUNCTION
bool isSumAvgRateQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
if (functionId == TSDB_FUNC_TS) {
continue;
}
if (functionId == TSDB_FUNC_SUM_RATE || functionId == TSDB_FUNC_SUM_IRATE ||
functionId == TSDB_FUNC_AVG_RATE || functionId == TSDB_FUNC_AVG_IRATE) {
return true;
}
}
return false;
}
bool isTopBottomQuery(SQuery *pQuery) { bool isTopBottomQuery(SQuery *pQuery) {
for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) { for (int32_t i = 0; i < pQuery->numOfOutputCols; ++i) {
int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId; int32_t functionId = pQuery->pSelectExpr[i].pBase.functionId;
...@@ -4584,7 +4601,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param) ...@@ -4584,7 +4601,7 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
initSlidingWindowInfo(&pRuntimeEnv->swindowResInfo, 4096, type, pQuery->rowSize, pSupporter->pResult); initSlidingWindowInfo(&pRuntimeEnv->swindowResInfo, 4096, type, pQuery->rowSize, pSupporter->pResult);
} }
if (pQuery->nAggTimeInterval != 0) { if (pQuery->nAggTimeInterval != 0 || isSumAvgRateQuery(pQuery)) {
// one page for each table at least // one page for each table at least
ret = createResultBuf(&pRuntimeEnv->pResultBuf, pSupporter->numOfMeters, pQuery->rowSize); ret = createResultBuf(&pRuntimeEnv->pResultBuf, pSupporter->numOfMeters, pQuery->rowSize);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
......
...@@ -398,7 +398,7 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo ...@@ -398,7 +398,7 @@ static void queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMeterDataInfo
restoreIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo); restoreIntervalQueryRange(pRuntimeEnv, pMeterQueryInfo);
if (pQuery->nAggTimeInterval == 0) { // normal query if (pQuery->nAggTimeInterval == 0 && !isSumAvgRateQuery(pQuery)) { // normal query
if ((pQuery->lastKey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || if ((pQuery->lastKey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pQuery->lastKey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { (pQuery->lastKey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
qTrace( qTrace(
...@@ -964,7 +964,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { ...@@ -964,7 +964,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
return; return;
} }
if (pQuery->nAggTimeInterval > 0) { if (pQuery->nAggTimeInterval > 0 || isSumAvgRateQuery(pQuery)) {
assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0); assert(pSupporter->subgroupIdx == 0 && pSupporter->numOfGroupResultPages == 0);
if (mergeMetersResultToOneGroups(pSupporter) == TSDB_CODE_SUCCESS) { if (mergeMetersResultToOneGroups(pSupporter) == TSDB_CODE_SUCCESS) {
......
...@@ -231,6 +231,7 @@ static SKeyword keywordTable[] = { ...@@ -231,6 +231,7 @@ static SKeyword keywordTable[] = {
{"RATE", TK_RATE}, {"RATE", TK_RATE},
{"IRATE", TK_IRATE}, {"IRATE", TK_IRATE},
{"SUM_RATE", TK_SUM_RATE}, {"SUM_RATE", TK_SUM_RATE},
{"SUM_IRATE", TK_SUM_IRATE},
{"AVG_RATE", TK_AVG_RATE}, {"AVG_RATE", TK_AVG_RATE},
{"AVG_IRATE", TK_AVG_IRATE}, {"AVG_IRATE", TK_AVG_IRATE},
}; };
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册