From 8e65f1e426b49efdd84a64c452f47e8bed9cdaba Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 9 Jun 2022 13:31:33 +0800 Subject: [PATCH] add hll partial/merge functions --- source/libs/function/inc/builtinsimpl.h | 2 ++ source/libs/function/src/builtins.c | 8 ++--- source/libs/function/src/builtinsimpl.c | 40 ++++++++++++++++++++++++- 3 files changed, 45 insertions(+), 5 deletions(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 9de1580c73..614be17e08 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -126,7 +126,9 @@ int32_t getHistogramInfoSize(); bool getHLLFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t hllFunction(SqlFunctionCtx* pCtx); +int32_t hllFunctionMerge(SqlFunctionCtx* pCtx); int32_t hllFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t hllPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t getHLLInfoSize(); bool getStateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 2158b8bca9..e2fc46aac0 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1509,20 +1509,20 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .name = "_hyperloglog_partial", .type = FUNCTION_TYPE_HYPERLOGLOG_PARTIAL, .classification = FUNC_MGT_AGG_FUNC, - .translateFunc = translateHLL, + .translateFunc = translateHLLPartial, .getEnvFunc = getHLLFuncEnv, .initFunc = functionSetup, .processFunc = hllFunction, - .finalizeFunc = hllFinalize + .finalizeFunc = hllPartialFinalize }, { .name = "_hyperloglog_merge", .type = FUNCTION_TYPE_HYPERLOGLOG_MERGE, .classification = FUNC_MGT_AGG_FUNC, - .translateFunc = translateHLL, + .translateFunc = translateHLLMerge, .getEnvFunc = getHLLFuncEnv, .initFunc = functionSetup, - .processFunc = hllFunction, + .processFunc = hllFunctionMerge, .finalizeFunc = hllFinalize }, { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 683d7e6342..5a4160f9a1 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3411,7 +3411,6 @@ int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { } int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { - SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); int32_t resultBytes = getHistogramInfoSize(); char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); @@ -3557,6 +3556,27 @@ int32_t hllFunction(SqlFunctionCtx *pCtx) { return TSDB_CODE_SUCCESS; } +int32_t hllFunctionMerge(SqlFunctionCtx *pCtx) { + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); + + SHLLInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + int32_t start = pInput->startRowIndex; + char* data = colDataGetData(pCol, start); + SHLLInfo* pInputInfo = (SHLLInfo *)varDataVal(data); + + for (int32_t k = 0; k < HLL_BUCKETS; ++k) { + if (pInfo->buckets[k] < pInputInfo->buckets[k]) { + pInputInfo->buckets[k] = pInfo->buckets[k]; + } + } + + SET_VAL(GET_RES_INFO(pCtx), 1, 1); + return TSDB_CODE_SUCCESS; +} + int32_t hllFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo *pInfo = GET_RES_INFO(pCtx); @@ -3569,6 +3589,24 @@ int32_t hllFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } +int32_t hllPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SHLLInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getHLLInfoSize(); + char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + + memcpy(varDataVal(res), pInfo, resultBytes); + varDataSetLen(res, resultBytes); + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + colDataAppend(pCol, pBlock->info.rows, res, false); + + taosMemoryFree(res); + return pResInfo->numOfRes; +} + bool getStateFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SStateInfo); return true; -- GitLab