diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 1ca51b30436b8e27f99e53be741ff9375c8030a6..1ed78750d180d2d5aca57294192bfb57621701dd 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -124,6 +124,7 @@ typedef enum EFunctionType { FUNCTION_TYPE_BLOCK_DIST, // block distribution aggregate function FUNCTION_TYPE_BLOCK_DIST_INFO, // block distribution pseudo column function FUNCTION_TYPE_TO_COLUMN, + FUNCTION_TYPE_GROUP_KEY, // distributed splitting functions FUNCTION_TYPE_APERCENTILE_PARTIAL = 4000, diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 4d2f926c125764c019ab8f2ca797bd3b230a683c..b3ce6abe87371f2557397bd6052ffa3300e10e24 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -202,6 +202,9 @@ bool blockDistSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t blockDistFunction(SqlFunctionCtx *pCtx); int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +bool getGroupKeyFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); +int32_t groupKeyFunction(SqlFunctionCtx* pCtx); + #ifdef __cplusplus } #endif diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 6516a10795b17b18bb99e062c7635c9ae01fa471..f3f10971488ed156bcaaf436332e989efcb7ae29 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1515,6 +1515,16 @@ static bool getBlockDistFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv return true; } +static int32_t translateGroupKey(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + if (1 != LIST_LENGTH(pFunc->pParameterList)) { + return TSDB_CODE_SUCCESS; + } + + SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0); + pFunc->node.resType = ((SExprNode*)pPara)->resType; + return TSDB_CODE_SUCCESS; +} + // clang-format off const SBuiltinFuncDefinition funcMgtBuiltins[] = { { @@ -2499,7 +2509,17 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .type = FUNCTION_TYPE_BLOCK_DIST_INFO, .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_SCAN_PC_FUNC, .translateFunc = translateBlockDistInfoFunc, - } + }, + { + .name = "_group_key", + .type = FUNCTION_TYPE_GROUP_KEY, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateGroupKey, + .getEnvFunc = getGroupKeyFuncEnv, + .initFunc = functionSetup, + .processFunc = groupKeyFunction, + .finalizeFunc = functionFinalize, + }, }; // clang-format on diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 4ec6432fc863225226a68fc3b8cb43a7f8ce4a9e..42c316b01d4600e1a958bad16345954d275d7026 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2402,6 +2402,12 @@ bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } +bool getGroupKeyFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + SColumnNode* pNode = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0); + pEnv->calcMemSize = pNode->node.resType.bytes; + return true; +} + static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowIndex) { if (pTsColInfo == NULL) { return 0; @@ -5349,6 +5355,28 @@ int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pResInfo->numOfRes; } +int32_t groupKeyFunction(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + char* buf = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pInputCol = pInput->pData[0]; + + int32_t bytes = pInputCol->info.bytes; + + int32_t startIndex = pInput->startRowIndex; + if (colDataIsNull_s(pInputCol, startIndex)) { + pResInfo->numOfRes = 0; + return TSDB_CODE_SUCCESS; + } + + char* data = colDataGetData(pInputCol, startIndex); + memcpy(buf, data, bytes); + SET_VAL(pResInfo, 1, 1); + + return TSDB_CODE_SUCCESS; +} + int32_t interpFunction(SqlFunctionCtx* pCtx) { #if 0 int32_t fillType = (int32_t) pCtx->param[2].i64;