diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 41f5990c9501f37308c7051d54ec4730fcdf8e97..e86d643c0d63d65ab6789572bdcdc4c127d9063a 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -136,6 +136,8 @@ typedef enum EFunctionType { FUNCTION_TYPE_ELAPSED_MERGE, FUNCTION_TYPE_TOP_PARTIAL, FUNCTION_TYPE_TOP_MERGE, + FUNCTION_TYPE_BOTTOM_PARTIAL, + FUNCTION_TYPE_BOTTOM_MERGE, // user defined funcion FUNCTION_TYPE_UDF = 10000 diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index bea9a6a1b8c30ca123d7ce4475b73cf4f41a98fc..710b1255f76dd0b948db2c39d400d7fce03a34b1 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -104,6 +104,7 @@ bool topBotFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) int32_t topFunction(SqlFunctionCtx *pCtx); int32_t topFunctionMerge(SqlFunctionCtx *pCtx); int32_t bottomFunction(SqlFunctionCtx *pCtx); +int32_t bottomFunctionMerge(SqlFunctionCtx *pCtx); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 0dd809524f72b11c43e7fa251daa59d064ae3d2e..e71dcba50085f193ae22c50f254f9a36e3374a23 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1574,10 +1574,34 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, .translateFunc = translateTopBot, .getEnvFunc = getTopBotFuncEnv, - .initFunc = functionSetup, + .initFunc = topBotFunctionSetup, .processFunc = bottomFunction, .finalizeFunc = topBotFinalize, .combineFunc = bottomCombine, + .pPartialFunc = "_bottom_partial", + .pMergeFunc = "_bottom_merge" + }, + { + .name = "_bottom_partial", + .type = FUNCTION_TYPE_BOTTOM_PARTIAL, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, + .translateFunc = translateTopBotPartial, + .getEnvFunc = getTopBotFuncEnv, + .initFunc = topBotFunctionSetup, + .processFunc = bottomFunction, + .finalizeFunc = topBotPartialFinalize, + .combineFunc = bottomCombine, + }, + { + .name = "_bottom_merge", + .type = FUNCTION_TYPE_BOTTOM_MERGE, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, + .translateFunc = translateTopBotMerge, + .getEnvFunc = getTopBotMergeFuncEnv, + .initFunc = functionSetup, + .processFunc = bottomFunctionMerge, + .finalizeFunc = topBotMergeFinalize, + .combineFunc = bottomCombine, }, { .name = "spread", diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 30821f04cc4c915847afd1b79e1f84ad7c276246..580df07cf866bed21faa17bead249fd4e9e9a65d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2716,9 +2716,33 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -static void topTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput) { +int32_t bottomFunction(SqlFunctionCtx* pCtx) { + int32_t numOfElems = 0; + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + + STopBotRes* pRes = getTopBotOutputInfo(pCtx); + pRes->type = pInput->pData[0]->info.type; + + int32_t start = pInput->startRowIndex; + for (int32_t i = start; i < pInput->numOfRows + start; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElems++; + char* data = colDataGetData(pCol, i); + doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, pRes->type, pInput->uid, pResInfo, false); + } + + return TSDB_CODE_SUCCESS; +} + +static void topBotTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput, bool isTopQuery) { for (int32_t i = 0; i < pInput->numOfItems; i++) { - addResult(pCtx, &pInput->pItems[i], pInput->type, true); + addResult(pCtx, &pInput->pItems[i], pInput->type, isTopQuery); } } @@ -2735,31 +2759,28 @@ int32_t topFunctionMerge(SqlFunctionCtx* pCtx) { pInfo->maxSize = pInputInfo->maxSize; pInfo->type = pInputInfo->type; - topTransferInfo(pCtx, pInputInfo); + topBotTransferInfo(pCtx, pInputInfo, true); SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes); return TSDB_CODE_SUCCESS; } -int32_t bottomFunction(SqlFunctionCtx* pCtx) { - int32_t numOfElems = 0; - SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); +int32_t bottomFunctionMerge(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SInputColumnInfoData* pInput = &pCtx->input; - SColumnInfoData* pCol = pInput->pData[0]; - - int32_t type = pInput->pData[0]->info.type; + SColumnInfoData* pCol = pInput->pData[0]; + ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); int32_t start = pInput->startRowIndex; - for (int32_t i = start; i < pInput->numOfRows + start; ++i) { - if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { - continue; - } + char* data = colDataGetData(pCol, start); + STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data); + STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - numOfElems++; - char* data = colDataGetData(pCol, i); - doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo, false); - } + pInfo->maxSize = pInputInfo->maxSize; + pInfo->type = pInputInfo->type; + topBotTransferInfo(pCtx, pInputInfo, false); + SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes); return TSDB_CODE_SUCCESS; }