From 818e3636d6dc15772ab63a41ef32b293de1a46fd Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 12 May 2022 20:40:17 +0800 Subject: [PATCH] feat(query): add mavg function --- source/libs/function/inc/builtinsimpl.h | 3 + source/libs/function/src/builtins.c | 31 +++++++++ source/libs/function/src/builtinsimpl.c | 85 ++++++++++++++++++++++++- 3 files changed, 118 insertions(+), 1 deletion(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index e1e30d37ea..ee976cbab3 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -97,6 +97,9 @@ int32_t stateDurationFunction(SqlFunctionCtx* pCtx); bool getCsumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t csumFunction(SqlFunctionCtx* pCtx); +bool getMavgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +int32_t mavgFunction(SqlFunctionCtx* pCtx); + bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); #ifdef __cplusplus diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index c9136433e0..1cc8729858 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -339,6 +339,27 @@ static int32_t translateCsum(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { return TSDB_CODE_SUCCESS; } +static int32_t translateMavg(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + if (2 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); + if (QUERY_NODE_COLUMN != nodeType(pPara)) { + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, + "The input parameter of MAVG function can only be column"); + } + + uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type; + if (!IS_NUMERIC_TYPE(colType) || !IS_INTEGER_TYPE(paraType)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE}; + return TSDB_CODE_SUCCESS; +} + static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // todo return TSDB_CODE_SUCCESS; @@ -783,6 +804,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = csumFunction, .finalizeFunc = NULL }, + { + .name = "mavg", + .type = FUNCTION_TYPE_MAVG, + .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateMavg, + .getEnvFunc = getMavgFuncEnv, + .initFunc = functionSetup, + .processFunc = mavgFunction, + .finalizeFunc = NULL + }, { .name = "abs", .type = FUNCTION_TYPE_ABS, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 173dc9fd57..fd8c9fd79c 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -21,7 +21,8 @@ #include "tdatablock.h" #include "tpercentile.h" -#define HISTOGRAM_MAX_BINS_NUM 100 +#define HISTOGRAM_MAX_BINS_NUM 1000 +#define MAVG_MAX_POINTS_NUM 1000 typedef struct SSumRes { union { @@ -141,6 +142,14 @@ typedef enum { STATE_OPER_EQ, } EStateOperType; +typedef struct SMavgInfo { + int32_t pos; + double sum; + int32_t numOfPoints; + bool pointsMeet; + double points[]; +} SMavgInfo; + #define SET_VAL(_info, numOfElem, res) \ do { \ if ((numOfElem) <= 0) { \ @@ -2946,3 +2955,77 @@ int32_t csumFunction(SqlFunctionCtx* pCtx) { return numOfElems; } + +bool getMavgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(SMavgInfo) + MAVG_MAX_POINTS_NUM * sizeof(double); + return true; +} + +bool mavgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo *pResultInfo) { + if (!functionSetup(pCtx, pResultInfo)) { + return false; + } + + SMavgInfo *pInfo = GET_ROWCELL_INTERBUF(pResultInfo); + pInfo->pos = 0; + pInfo->sum = 0; + pInfo->numOfPoints = pCtx->param[1].param.i; + if (pInfo->numOfPoints < 1 || pInfo->numOfPoints > MAVG_MAX_POINTS_NUM) { + return false; + } + pInfo->pointsMeet = false; + + return true; +} + +int32_t mavgFunction(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SMavgInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + + SColumnInfoData* pInputCol = pInput->pData[0]; + SColumnInfoData* pTsOutput = pCtx->pTsOutput; + SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; + + int32_t numOfElems = 0; + int32_t type = pInputCol->info.type; + int32_t startOffset = pCtx->offset; + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { + int32_t pos = startOffset + numOfElems; + if (colDataIsNull_f(pInputCol->nullbitmap, i)) { + //colDataAppendNULL(pOutput, i); + continue; + } + + char* data = colDataGetData(pInputCol, i); + double v; + GET_TYPED_DATA(v, double, type, data); + + if (!pInfo->pointsMeet && (pInfo->pos < pInfo->numOfPoints - 1)) { + pInfo->points[pInfo->pos] = v; + pInfo->sum += v; + } else { + if (!pInfo->pointsMeet && (pInfo->pos == pInfo->numOfPoints - 1)) { + pInfo->sum +=v; + pInfo->pointsMeet = true; + } else { + pInfo->sum = pInfo->sum + v - pInfo->points[pInfo->pos]; + } + } + + pInfo->points[pInfo->pos] = v; + double result = pInfo->sum / pInfo->numOfPoints; + colDataAppend(pOutput, pos, (char *)&result, false); + + //TODO: remove this after pTsOutput is handled + if (pTsOutput != NULL) { + colDataAppendInt64(pTsOutput, pos, &tsList[i]); + } + + numOfElems++; + } + + return numOfElems; +} -- GitLab