From 4036d9714ea60afa8b80d3811f62dcd651000658 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Mon, 13 Jun 2022 13:38:21 +0800 Subject: [PATCH] add bottom function distribution splitting --- include/libs/function/functionMgt.h | 2 + source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/src/builtins.c | 26 +++++++++++- source/libs/function/src/builtinsimpl.c | 55 +++++++++++++++++-------- 4 files changed, 66 insertions(+), 18 deletions(-) diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 41f5990c95..e86d643c0d 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -136,6 +136,8 @@ typedef enum EFunctionType { FUNCTION_TYPE_ELAPSED_MERGE, FUNCTION_TYPE_TOP_PARTIAL, FUNCTION_TYPE_TOP_MERGE, + FUNCTION_TYPE_BOTTOM_PARTIAL, + FUNCTION_TYPE_BOTTOM_MERGE, // user defined funcion FUNCTION_TYPE_UDF = 10000 diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index bea9a6a1b8..710b1255f7 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -104,6 +104,7 @@ bool topBotFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) int32_t topFunction(SqlFunctionCtx *pCtx); int32_t topFunctionMerge(SqlFunctionCtx *pCtx); int32_t bottomFunction(SqlFunctionCtx *pCtx); +int32_t bottomFunctionMerge(SqlFunctionCtx *pCtx); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 0dd809524f..e71dcba500 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1574,10 +1574,34 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, .translateFunc = translateTopBot, .getEnvFunc = getTopBotFuncEnv, - .initFunc = functionSetup, + .initFunc = topBotFunctionSetup, .processFunc = bottomFunction, .finalizeFunc = topBotFinalize, .combineFunc = bottomCombine, + .pPartialFunc = "_bottom_partial", + .pMergeFunc = "_bottom_merge" + }, + { + .name = "_bottom_partial", + .type = FUNCTION_TYPE_BOTTOM_PARTIAL, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, + .translateFunc = translateTopBotPartial, + .getEnvFunc = getTopBotFuncEnv, + .initFunc = topBotFunctionSetup, + .processFunc = bottomFunction, + .finalizeFunc = topBotPartialFinalize, + .combineFunc = bottomCombine, + }, + { + .name = "_bottom_merge", + .type = FUNCTION_TYPE_BOTTOM_MERGE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, + .translateFunc = translateTopBotMerge, + .getEnvFunc = getTopBotMergeFuncEnv, + .initFunc = functionSetup, + .processFunc = bottomFunctionMerge, + .finalizeFunc = topBotMergeFinalize, + .combineFunc = bottomCombine, }, { .name = "spread", diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 30821f04cc..580df07cf8 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2716,9 +2716,33 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -static void topTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput) { +int32_t bottomFunction(SqlFunctionCtx* pCtx) { + int32_t numOfElems = 0; + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + + STopBotRes* pRes = getTopBotOutputInfo(pCtx); + pRes->type = pInput->pData[0]->info.type; + + int32_t start = pInput->startRowIndex; + for (int32_t i = start; i < pInput->numOfRows + start; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElems++; + char* data = colDataGetData(pCol, i); + doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false); + } + + return TSDB_CODE_SUCCESS; +} + +static void topBotTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput, bool isTopQuery) { for (int32_t i = 0; i < pInput->numOfItems; i++) { - addResult(pCtx, &pInput->pItems[i], pInput->type, true); + addResult(pCtx, &pInput->pItems[i], pInput->type, isTopQuery); } } @@ -2735,31 +2759,28 @@ int32_t topFunctionMerge(SqlFunctionCtx* pCtx) { pInfo->maxSize = pInputInfo->maxSize; pInfo->type = pInputInfo->type; - topTransferInfo(pCtx, pInputInfo); + topBotTransferInfo(pCtx, pInputInfo, true); SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes); return TSDB_CODE_SUCCESS; } -int32_t bottomFunction(SqlFunctionCtx* pCtx) { - int32_t numOfElems = 0; - SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); +int32_t bottomFunctionMerge(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SInputColumnInfoData* pInput = &pCtx->input; - SColumnInfoData* pCol = pInput->pData[0]; - - int32_t type = pInput->pData[0]->info.type; + SColumnInfoData* pCol = pInput->pData[0]; + ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); int32_t start = pInput->startRowIndex; - for (int32_t i = start; i < pInput->numOfRows + start; ++i) { - if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { - continue; - } + char* data = colDataGetData(pCol, start); + STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data); + STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - numOfElems++; - char* data = colDataGetData(pCol, i); - doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo, false); - } + pInfo->maxSize = pInputInfo->maxSize; + pInfo->type = pInputInfo->type; + topBotTransferInfo(pCtx, pInputInfo, false); + SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes); return TSDB_CODE_SUCCESS; } -- GitLab