diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index db92740fff260f07dbc438166b7d04682bbea136..40393a4eabb83c37bc555da0eddf35e0d5c2233e 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -40,6 +40,11 @@ bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t minFunction(SqlFunctionCtx* pCtx); int32_t maxFunction(SqlFunctionCtx *pCtx); +bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +int32_t avgFunction(SqlFunctionCtx* pCtx); +int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId); + bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t stddevFunction(SqlFunctionCtx* pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 3da5dc3289e246d6202c984bbc64f40eecde2832..70087ee46bf3b2f49a8876372c1df75c8cdb582a 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -104,7 +104,7 @@ static int32_t translateCount(SFunctionNode* pFunc, char* pErrBuf, int32_t len) if (1 != LIST_LENGTH(pFunc->pParameterList)) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } - pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT}; + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; return TSDB_CODE_SUCCESS; } @@ -479,6 +479,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = stddevFunction, .finalizeFunc = stddevFinalize }, + { + .name = "avg", + .type = FUNCTION_TYPE_AVG, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateInNumOutDou, + .getEnvFunc = getAvgFuncEnv, + .initFunc = avgFunctionSetup, + .processFunc = avgFunction, + .finalizeFunc = avgFinalize + }, { .name = "percentile", .type = FUNCTION_TYPE_PERCENTILE, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 3dc2e62e92e4b8e28d4178032b4aaed81d050a8a..63af2fb97225014483a328b9e700eaf58ef20683 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -20,6 +20,59 @@ #include "tdatablock.h" #include "tpercentile.h" +typedef struct SSumRes { + union { + int64_t isum; + uint64_t usum; + double dsum; + }; +} SSumRes; + +typedef struct SAvgRes { + double result; + SSumRes sum; + int64_t count; +} SAvgRes; + +typedef struct STopBotResItem { + SVariant v; + uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data + struct { + int32_t pageId; + int32_t offset; + } tuplePos; // tuple data of this chosen row +} STopBotResItem; + +typedef struct STopBotRes { + int32_t pageId; +// int32_t num; + STopBotResItem *pItems; +} STopBotRes; + +typedef struct SStddevRes { + double result; + int64_t count; + union {double quadraticDSum; int64_t quadraticISum;}; + union {double dsum; int64_t isum;}; +} SStddevRes; + +typedef struct SPercentileInfo { + double result; + tMemBucket *pMemBucket; + int32_t stage; + double minval; + double maxval; + int64_t numOfElems; +} SPercentileInfo; + +typedef struct SDiffInfo { + bool hasPrev; + bool includeNull; + bool ignoreNegative; + bool firstOutput; + union { int64_t i64; double d64;} prev; +} SDiffInfo; + #define SET_VAL(_info, numOfElem, res) \ do { \ if ((numOfElem) <= 0) { \ @@ -28,13 +81,50 @@ (_info)->numOfRes = (res); \ } while (0) -typedef struct SSumRes { - union { - int64_t isum; - uint64_t usum; - double dsum; - }; -} SSumRes; +#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList)) +#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)]) + +#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \ + do { \ + for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \ + SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \ + __ctx->fpSet.process(__ctx); \ + } \ + } while (0); + +#define DO_UPDATE_SUBSID_RES(ctx, ts) \ + do { \ + for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \ + SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \ + if (__ctx->functionId == FUNCTION_TS_DUMMY) { \ + __ctx->tag.i = (ts); \ + __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \ + } \ + __ctx->fpSet.process(__ctx); \ + } \ + } while (0) + +#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \ + do { \ + if (((left) < (right)) ^ (sign)) { \ + (left) = (right); \ + DO_UPDATE_SUBSID_RES(ctx, _ts); \ + (num) += 1; \ + } \ + } while (0) + +#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \ + do { \ + _t *d = (_t *)((_col)->pData); \ + for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \ + if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \ + continue; \ + } \ + TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \ + UPDATE_DATA(ctx, val, d[i], num, sign, ts); \ + } \ + } while (0) + bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { if (pResultInfo->initialized) { @@ -135,7 +225,7 @@ int32_t sumFunction(SqlFunctionCtx *pCtx) { int32_t type = pInput->pData[0]->info.type; SSumRes* pSumRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - + if (pInput->colDataAggIsSet) { numOfElem = pInput->numOfRows - pAgg->numOfNull; ASSERT(numOfElem >= 0); @@ -190,6 +280,145 @@ bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } +bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(double); + return true; +} + +bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { + if (!functionSetup(pCtx, pResultInfo)) { + return false; + } + + SAvgRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo); + memset(pRes, 0, sizeof(SAvgRes)); + return true; +} + +int32_t avgFunction(SqlFunctionCtx* pCtx) { + int32_t numOfElem = 0; + + // Only the pre-computing information loaded and actual data does not loaded + SInputColumnInfoData* pInput = &pCtx->input; + int32_t type = pInput->pData[0]->info.type; + + SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + // computing based on the true data block + SColumnInfoData* pCol = pInput->pData[0]; + + int32_t start = pInput->startRowIndex; + int32_t numOfRows = pInput->numOfRows; + + switch (type) { + case TSDB_DATA_TYPE_TINYINT: { + int8_t* plist = (int8_t*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem += 1; + pAvgRes->count += 1; + pAvgRes->sum.isum += plist[i]; + } + + break; + } + + case TSDB_DATA_TYPE_SMALLINT: { + int16_t* plist = (int16_t*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem += 1; + pAvgRes->count += 1; + pAvgRes->sum.isum += plist[i]; + } + break; + } + + case TSDB_DATA_TYPE_INT: { + int32_t* plist = (int32_t*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem += 1; + pAvgRes->count += 1; + pAvgRes->sum.isum += plist[i]; + } + + break; + } + + case TSDB_DATA_TYPE_BIGINT: { + int64_t* plist = (int64_t*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem += 1; + pAvgRes->count += 1; + pAvgRes->sum.isum += plist[i]; + } + break; + } + + case TSDB_DATA_TYPE_FLOAT: { + float* plist = (float*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem += 1; + pAvgRes->count += 1; + pAvgRes->sum.dsum += plist[i]; + } + break; + } + + case TSDB_DATA_TYPE_DOUBLE: { + double* plist = (double*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem += 1; + pAvgRes->count += 1; + pAvgRes->sum.dsum += plist[i]; + } + break; + } + + default: + break; + } + + // data in the check operation are all null, not output + SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); + return TSDB_CODE_SUCCESS; +} + +int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) { + SInputColumnInfoData* pInput = &pCtx->input; + int32_t type = pInput->pData[0]->info.type; + SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + if (IS_INTEGER_TYPE(type)) { + pAvgRes->result = pAvgRes->sum.isum / ((double) pAvgRes->count); + } else { + pAvgRes->result = pAvgRes->sum.dsum / ((double) pAvgRes->count); + } + + return functionFinalize(pCtx, pBlock, slotId); +} + EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow){ return FUNC_DATA_REQUIRED_STATIS_LOAD; } @@ -292,49 +521,6 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } -#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList)) -#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)]) - -#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \ - do { \ - for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \ - SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \ - __ctx->fpSet.process(__ctx); \ - } \ - } while (0); - -#define DO_UPDATE_SUBSID_RES(ctx, ts) \ - do { \ - for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \ - SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \ - if (__ctx->functionId == FUNCTION_TS_DUMMY) { \ - __ctx->tag.i = (ts); \ - __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \ - } \ - __ctx->fpSet.process(__ctx); \ - } \ - } while (0) - -#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \ - do { \ - if (((left) < (right)) ^ (sign)) { \ - (left) = (right); \ - DO_UPDATE_SUBSID_RES(ctx, _ts); \ - (num) += 1; \ - } \ - } while (0) - -#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \ - do { \ - _t *d = (_t *)((_col)->pData); \ - for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \ - if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \ - continue; \ - } \ - TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \ - UPDATE_DATA(ctx, val, d[i], num, sign, ts); \ - } \ - } while (0) int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) { int32_t numOfElems = 0; @@ -479,13 +665,6 @@ int32_t maxFunction(SqlFunctionCtx *pCtx) { return TSDB_CODE_SUCCESS; } -typedef struct SStddevRes { - double result; - int64_t count; - union {double quadraticDSum; int64_t quadraticISum;}; - union {double dsum; int64_t isum;}; -} SStddevRes; - bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SStddevRes); return true; @@ -588,8 +767,8 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) { numOfElem += 1; pStddevRes->count += 1; - pStddevRes->isum += plist[i]; - pStddevRes->quadraticISum += plist[i] * plist[i]; + pStddevRes->dsum += plist[i]; + pStddevRes->quadraticDSum += plist[i] * plist[i]; } break; } @@ -603,8 +782,8 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) { numOfElem += 1; pStddevRes->count += 1; - pStddevRes->isum += plist[i]; - pStddevRes->quadraticISum += plist[i] * plist[i]; + pStddevRes->dsum += plist[i]; + pStddevRes->quadraticDSum += plist[i] * plist[i]; } break; } @@ -619,21 +798,21 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) { } int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) { + SInputColumnInfoData* pInput = &pCtx->input; + int32_t type = pInput->pData[0]->info.type; SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - double avg = pStddevRes->isum / ((double) pStddevRes->count); - pStddevRes->result = sqrt(pStddevRes->quadraticISum/((double)pStddevRes->count) - avg*avg); + double avg; + if (IS_INTEGER_TYPE(type)) { + avg = pStddevRes->isum / ((double) pStddevRes->count); + pStddevRes->result = sqrt(pStddevRes->quadraticISum/((double)pStddevRes->count) - avg*avg); + } else { + avg = pStddevRes->dsum / ((double) pStddevRes->count); + pStddevRes->result = sqrt(pStddevRes->quadraticDSum/((double)pStddevRes->count) - avg*avg); + } + return functionFinalize(pCtx, pBlock, slotId); } -typedef struct SPercentileInfo { - double result; - tMemBucket *pMemBucket; - int32_t stage; - double minval; - double maxval; - int64_t numOfElems; -} SPercentileInfo; - bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SPercentileInfo); return true; @@ -928,14 +1107,6 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) { return TSDB_CODE_SUCCESS; } -typedef struct SDiffInfo { - bool hasPrev; - bool includeNull; - bool ignoreNegative; - bool firstOutput; - union { int64_t i64; double d64;} prev; -} SDiffInfo; - bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SDiffInfo); return true; @@ -1168,21 +1339,6 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { } } -typedef struct STopBotResItem { - SVariant v; - uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data - struct { - int32_t pageId; - int32_t offset; - } tuplePos; // tuple data of this chosen row -} STopBotResItem; - -typedef struct STopBotRes { - int32_t pageId; -// int32_t num; - STopBotResItem *pItems; -} STopBotRes; - bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1); pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem); @@ -1335,4 +1491,4 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId return pEntryInfo->numOfRes; // return functionFinalize(pCtx, pBlock, slotId); -} \ No newline at end of file +}