diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index c8e803c811e04a31c848f9bc14b32a84a7fa679c..eeb36b04ce5bc2b8cfcc53f2771d67d5baf9e513 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -124,7 +124,7 @@ typedef enum EFunctionType { FUNCTION_TYPE_BLOCK_DIST, // block distribution aggregate function // distributed splitting functions - FUNCTION_TYPE_APERCENTILE_PARTIAL, + FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000, FUNCTION_TYPE_APERCENTILE_MERGE, FUNCTION_TYPE_SPREAD_PARTIAL, FUNCTION_TYPE_SPREAD_MERGE, @@ -134,6 +134,8 @@ typedef enum EFunctionType { FUNCTION_TYPE_HYPERLOGLOG_MERGE, FUNCTION_TYPE_ELAPSED_PARTIAL, FUNCTION_TYPE_ELAPSED_MERGE, + FUNCTION_TYPE_TOP_PARTIAL, + FUNCTION_TYPE_TOP_MERGE, // user defined funcion FUNCTION_TYPE_UDF = 10000 diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index b623c771108e0466b9fb5f6583915bdc86754f9c..92952d6cbc7846e31445e073ff1faa6719bc1ce8 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -104,6 +104,7 @@ int32_t bottomFunction(SqlFunctionCtx *pCtx); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); +int32_t getTopBotInfoSize(); bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index a8e9cac65e56e0c4b05531970924aa30d22eced6..aad871f311ceacfa47361ad410b60f80b8ebcb60 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -325,7 +325,7 @@ static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_ return TSDB_CODE_SUCCESS; } -static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { +static int32_t translateTopBot(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); if (2 != numOfParams) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); @@ -360,8 +360,62 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { return TSDB_CODE_SUCCESS; } -static int32_t translateBottom(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { - return translateTop(pFunc, pErrBuf, len); +static int32_t translateTopBotImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) { + int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); + + if (isPartial) { + if (2 != numOfParams) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type; + if (!IS_NUMERIC_TYPE(para1Type) || !IS_INTEGER_TYPE(para2Type)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + // param1 + SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1); + if (nodeType(pParamNode1) != QUERY_NODE_VALUE) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + SValueNode* pValue = (SValueNode*)pParamNode1; + if (pValue->node.resType.type != TSDB_DATA_TYPE_BIGINT) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + if (pValue->datum.i < 1 || pValue->datum.i > 100) { + return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName); + } + + pValue->notReserved = true; + + // set result type + pFunc->node.resType = (SDataType){.bytes = getTopBotInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + } else { + if (1 != numOfParams) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + if (TSDB_DATA_TYPE_BINARY == para1Type) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + // set result type + SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType; + pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type}; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t translateTopBotPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateTopBotImpl(pFunc, pErrBuf, len, true); +} + +static int32_t translateTopBotMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateTopBotImpl(pFunc, pErrBuf, len, false); } static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { @@ -1475,7 +1529,29 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "top", .type = FUNCTION_TYPE_TOP, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, - .translateFunc = translateTop, + .translateFunc = translateTopBot, + .getEnvFunc = getTopBotFuncEnv, + .initFunc = functionSetup, + .processFunc = topFunction, + .finalizeFunc = topBotFinalize, + .combineFunc = topCombine, + }, + { + .name = "_top_partial", + .type = FUNCTION_TYPE_TOP_PARTIAL, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, + .translateFunc = translateTopBotPartial, + .getEnvFunc = getTopBotFuncEnv, + .initFunc = functionSetup, + .processFunc = topFunction, + .finalizeFunc = topBotFinalize, + .combineFunc = topCombine, + }, + { + .name = "_top_merge", + .type = FUNCTION_TYPE_TOP_MERGE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, + .translateFunc = translateTopBotMerge, .getEnvFunc = getTopBotFuncEnv, .initFunc = functionSetup, .processFunc = topFunction, @@ -1486,7 +1562,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "bottom", .type = FUNCTION_TYPE_BOTTOM, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, - .translateFunc = translateBottom, + .translateFunc = translateTopBot, .getEnvFunc = getTopBotFuncEnv, .initFunc = functionSetup, .processFunc = bottomFunction, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 1eafd3c6493fda8aa2400f708f5529d4c8b85664..36c14f1be18b1f3ccc3d504965d64f5328a7991d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2655,6 +2655,10 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { return numOfElems; } +int32_t getTopBotInfoSize() { + return (int32_t)sizeof(STopBotRes); +} + bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SValueNode* pkNode = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1); pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem);