From 183fcac01360783359b464fb6625fdff615cde0d Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 8 Jun 2022 15:05:37 +0800 Subject: [PATCH] add spread splitting translate fucntions --- source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/src/builtins.c | 33 ++++++++++++++++++++++--- source/libs/function/src/builtinsimpl.c | 4 +++ 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 2febf86e60..c049eedb21 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -102,6 +102,7 @@ int32_t topFunction(SqlFunctionCtx *pCtx); int32_t bottomFunction(SqlFunctionCtx *pCtx); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t getSpreadInfoSize(); bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t spreadFunction(SqlFunctionCtx* pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 7897cc10c2..5154a7bd77 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -317,7 +317,6 @@ static int32_t translateApercentileMerge(SFunctionNode* pFunc, char* pErrBuf, in return translateApercentileImpl(pFunc, pErrBuf, len, false); } - static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // pseudo column do not need to check parameters pFunc->node.resType = @@ -378,6 +377,34 @@ static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len) return TSDB_CODE_SUCCESS; } +static int32_t translateSpreadImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) { + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + if (isPartial) { + if (!IS_NUMERIC_TYPE(paraType) && TSDB_DATA_TYPE_TIMESTAMP != paraType) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + pFunc->node.resType = (SDataType){.bytes = getSpreadInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + } else { + if (TSDB_DATA_TYPE_BINARY != paraType) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE}; + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t translateSpreadPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateSpreadImpl(pFunc, pErrBuf, len, true); +} +static int32_t translateSpreadMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateSpreadImpl(pFunc, pErrBuf, len, false); +} + static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); if (1 != numOfParams && 2 != numOfParams) { @@ -1283,7 +1310,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "_spread_partial", .type = FUNCTION_TYPE_SPREAD_PARTIAL, .classification = FUNC_MGT_AGG_FUNC, - .translateFunc = translateSpread, + .translateFunc = translateSpreadPartial, .dataRequiredFunc = statisDataRequired, .getEnvFunc = getSpreadFuncEnv, .initFunc = spreadFunctionSetup, @@ -1294,7 +1321,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "_spread_merge", .type = FUNCTION_TYPE_SPREAD_MERGE, .classification = FUNC_MGT_AGG_FUNC, - .translateFunc = translateSpread, + .translateFunc = translateSpreadMerge, .dataRequiredFunc = statisDataRequired, .getEnvFunc = getSpreadFuncEnv, .initFunc = spreadFunctionSetup, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index e1a43e70f7..da271e28fd 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2868,6 +2868,10 @@ bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } +int32_t getSpreadInfoSize() { + return (int32_t)sizeof(SSpreadInfo); +} + bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { if (!functionSetup(pCtx, pResultInfo)) { return false; -- GitLab