From 6d952626882dbdc66a8c4cb1572b1ca69bf9330d Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 13 Jun 2022 19:11:28 +0800 Subject: [PATCH] enh(query): add first/last function distributed implementation --- include/libs/function/functionMgt.h | 2 + source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/src/builtins.c | 57 +++++++++++++++++++++++++ source/libs/function/src/builtinsimpl.c | 4 ++ 4 files changed, 64 insertions(+) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index e86d643c0d..f01e734fde 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -138,6 +138,8 @@ typedef enum EFunctionType { FUNCTION_TYPE_TOP_MERGE, FUNCTION_TYPE_BOTTOM_PARTIAL, FUNCTION_TYPE_BOTTOM_MERGE, + FUNCTION_TYPE_FIRST_PARTIAL, + FUNCTION_TYPE_FIRST_MERGE, // user defined funcion FUNCTION_TYPE_UDF = 10000 diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index a528041964..f3bcad8ede 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -97,6 +97,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx); int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); +int32_t getFirstLastInfoSize(int32_t resBytes); bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); bool getTopBotMergeFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 23940a415e..a2849d601e 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -954,6 +954,41 @@ static int32_t translateFirstLast(SFunctionNode* pFunc, char* pErrBuf, int32_t l return TSDB_CODE_SUCCESS; } +static int32_t translateFirstLastImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) { + // first(col_list) will be rewritten as first(col) + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return TSDB_CODE_SUCCESS; + } + + SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); + uint8_t paraType = ((SExprNode*)pPara)->resType.type; + int32_t paraBytes = ((SExprNode*)pPara)->resType.bytes; + if (isPartial) { + if (QUERY_NODE_COLUMN != nodeType(pPara)) { + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, + "The parameters of first/last can only be columns"); + } + + pFunc->node.resType = (SDataType){.bytes = getFirstLastInfoSize(paraBytes) + VARSTR_HEADER_SIZE, + .type = TSDB_DATA_TYPE_BINARY}; + } else { + if (TSDB_DATA_TYPE_BINARY != paraType) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = ((SExprNode*)pPara)->resType; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t translateFirstLastPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateFirstLastImpl(pFunc, pErrBuf, len, true); +} + +static int32_t translateFirstLastMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateFirstLastImpl(pFunc, pErrBuf, len, false); +} + static int32_t translateUnique(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { if (1 != LIST_LENGTH(pFunc->pParameterList)) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); @@ -1706,6 +1741,28 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .finalizeFunc = firstLastFinalize, .combineFunc = firstCombine, }, + { + .name = "_first_partial", + .type = FUNCTION_TYPE_FIRST_PARTIAL, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateFirstLastPartial, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = firstFunction, + .finalizeFunc = firstLastFinalize, + .combineFunc = firstCombine, + }, + { + .name = "_first_merge", + .type = FUNCTION_TYPE_FIRST_MERGE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateFirstLastMerge, + .getEnvFunc = getFirstLastFuncEnv, + .initFunc = functionSetup, + .processFunc = firstFunction, + .finalizeFunc = firstLastFinalize, + .combineFunc = firstCombine, + }, { .name = "last", .type = FUNCTION_TYPE_LAST, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 3d64f82eba..44418685b0 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2244,6 +2244,10 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) return TSDB_CODE_SUCCESS; } +int32_t getFirstLastInfoSize(int32_t resBytes) { + return resBytes + sizeof(int64_t); +} + bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t); -- GitLab