diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index f3e28936afc1b1556502eacd08f6b1e699abc198..9d0995a8d8e6ae29ee488a5d38cb6d083c036075 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -122,6 +122,10 @@ typedef enum EFunctionType { // internal function FUNCTION_TYPE_SELECT_VALUE, + // distributed splitting functions + FUNCTION_TYPE_APERCENTILE_PARTIAL, + FUNCTION_TYPE_APERCENTILE_MERGE, + // user defined funcion FUNCTION_TYPE_UDF = 10000 } EFunctionType; diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 68b83f4a1955c72e119dcadd5d409ce10639e5e1..34fa5e841728b867f0dc814353b6e82c0b3be0bd 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -23,6 +23,7 @@ extern "C" { #include "function.h" #include "functionMgt.h" + bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx)); @@ -77,6 +78,7 @@ bool getApercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool apercentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t apercentileFunction(SqlFunctionCtx *pCtx); int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index c9c63169c931cc03fdf58d16f551f6abbdc8ba85..05fcbb5e6b29b2ef11f47384bebe02dbd19671e5 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -212,7 +212,7 @@ static bool validateApercentileAlgo(const SValueNode* pVal) { 0 == strcasecmp(varDataVal(pVal->datum.p), "t-digest")); } -static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { +static int32_t translateApercentileImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) { int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); if (2 != numOfParams && 3 != numOfParams) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); @@ -261,10 +261,22 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t pValue->notReserved = true; } - pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE}; + if (!isPartial) { + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE}; + } else { + pFunc->node.resType = (SDataType){.bytes = 1000, .type = TSDB_DATA_TYPE_BINARY}; + } return TSDB_CODE_SUCCESS; } +static int32_t translateApercentilePartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateApercentileImpl(pFunc, pErrBuf, len, true); +} +static int32_t translateApercentileFinal(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateApercentileImpl(pFunc, pErrBuf, len, false); +} + + static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // pseudo column do not need to check parameters pFunc->node.resType = @@ -1142,9 +1154,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = countFunction, .finalizeFunc = functionFinalize, .invertFunc = countInvertFunction, - .combineFunc = combineFunction, - // .pPartialFunc = "count", - // .pMergeFunc = "sum" + .combineFunc = combineFunction, + .pPartialFunc = "count", + .pMergeFunc = "sum" }, { .name = "sum", @@ -1157,7 +1169,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = sumFunction, .finalizeFunc = functionFinalize, .invertFunc = sumInvertFunction, - .combineFunc = sumCombine, + .combineFunc = sumCombine, + .pPartialFunc = "sum", + .pMergeFunc = "sum" }, { .name = "min", @@ -1169,7 +1183,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = minmaxFunctionSetup, .processFunc = minFunction, .finalizeFunc = minmaxFunctionFinalize, - .combineFunc = minCombine + .combineFunc = minCombine, + .pPartialFunc = "min", + .pMergeFunc = "min" }, { .name = "max", @@ -1181,7 +1197,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = minmaxFunctionSetup, .processFunc = maxFunction, .finalizeFunc = minmaxFunctionFinalize, - .combineFunc = maxCombine + .combineFunc = maxCombine, + .pPartialFunc = "max", + .pMergeFunc = "max" }, { .name = "stddev", @@ -1217,6 +1235,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .finalizeFunc = avgFinalize, .invertFunc = avgInvertFunction, .combineFunc = avgCombine, + .pPartialFunc = "avgPartial", + .pMergeFunc = "avgMerge" }, { .name = "percentile", @@ -1232,7 +1252,27 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "apercentile", .type = FUNCTION_TYPE_APERCENTILE, .classification = FUNC_MGT_AGG_FUNC, - .translateFunc = translateApercentile, + .translateFunc = translateApercentileFinal, + .getEnvFunc = getApercentileFuncEnv, + .initFunc = apercentileFunctionSetup, + .processFunc = apercentileFunction, + .finalizeFunc = apercentileFinalize + }, + { + .name = "_apercentile_partial", + .type = FUNCTION_TYPE_APERCENTILE_PARTIAL, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateApercentilePartial, + .getEnvFunc = getApercentileFuncEnv, + .initFunc = apercentileFunctionSetup, + .processFunc = apercentileFunction, + .finalizeFunc = apercentilePartialFinalize + }, + { + .name = "_apercentile_merge", + .type = FUNCTION_TYPE_APERCENTILE_MERGE, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateApercentileFinal, .getEnvFunc = getApercentileFuncEnv, .initFunc = apercentileFunctionSetup, .processFunc = apercentileFunction, @@ -1299,7 +1339,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = functionSetup, .processFunc = firstFunction, .finalizeFunc = firstLastFinalize, - .combineFunc = firstCombine, + .combineFunc = firstCombine, + .pPartialFunc = "first", + .pMergeFunc = "first" }, { .name = "last", @@ -1311,6 +1353,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = lastFunction, .finalizeFunc = firstLastFinalize, .combineFunc = lastCombine, + .pPartialFunc = "last", + .pMergeFunc = "last" }, { .name = "twa", diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 4c7984c602f3c789183e96428a97374f4019829d..21847f5de1462b00180537063cd1076dee3e2fc5 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2071,6 +2071,41 @@ int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } +int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SVariant* pVal = &pCtx->param[1].param; + double percent = (pVal->nType == TSDB_DATA_TYPE_BIGINT) ? pVal->i : pVal->d; + + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SAPercentileInfo* pInfo = (SAPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo); + + int32_t bytesHist = (int32_t)(sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1)); + int32_t bytesDigest = (int32_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION)); + int32_t resultBytes = TMAX(bytesHist, bytesDigest); + char *tmp = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + + if (pInfo->algo == APERCT_ALGO_TDIGEST) { + if (pInfo->pTDigest->size > 0) { + memcpy(varDataVal(tmp), pInfo->pTDigest, resultBytes); + } else { + return TSDB_CODE_SUCCESS; + } + } else { + if (pInfo->pHisto->numOfElems > 0) { + memcpy(varDataVal(tmp), pInfo->pHisto, resultBytes); + } else { + return TSDB_CODE_SUCCESS; + } + } + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + colDataAppend(pCol, pBlock->info.rows, tmp, false); + + taosMemoryFree(tmp); + return pResInfo->numOfRes; +} + bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t);