diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 0bc6076a8883d1cf8d8a12d64177c8e631b5152c..bea9a6a1b8c30ca123d7ce4475b73cf4f41a98fc 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -106,6 +106,7 @@ int32_t topFunctionMerge(SqlFunctionCtx *pCtx); int32_t bottomFunction(SqlFunctionCtx *pCtx); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getTopBotInfoSize(int64_t numOfItems); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index be879b08c0b3e00063c01d9c9d957d3bdc91ef88..1fbaa6b37ebc64594db7cf803f2ed26bb79a2278 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1562,7 +1562,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getTopBotMergeFuncEnv, .initFunc = functionSetup, .processFunc = topFunctionMerge, - .finalizeFunc = topBotFinalize, + .finalizeFunc = topBotMergeFinalize, .combineFunc = topCombine, }, { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 25d9284314f44fafdc9a9d2ca0ecc39b4c4507a8..147407122a446682e3740bfee98d1fb61b9d8858 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2661,7 +2661,7 @@ bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } -bool getTopBotFuncMergeEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { +bool getTopBotMergeFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { //intermediate result is binary and length contains VAR header size pEnv->calcMemSize = pFunc->node.resType.bytes - VARSTR_HEADER_SIZE; return true; @@ -2731,7 +2731,10 @@ int32_t topFunctionMerge(SqlFunctionCtx* pCtx) { int32_t start = pInput->startRowIndex; char* data = colDataGetData(pCol, start); STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data); + STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + pInfo->maxSize = pInputInfo->maxSize; + pInfo->type = pInputInfo->type; topTransferInfo(pCtx, pInputInfo); SET_VAL(GET_RES_INFO(pCtx), pInputInfo->maxSize, pInputInfo->maxSize); @@ -2909,7 +2912,7 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS releaseBufPage(pCtx->pBuf, pPage); } -int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { +int32_t topBotFinalizeImpl(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, bool isMerge) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); @@ -2929,13 +2932,23 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false); } - setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow); + if (!isMerge) { + setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow); + } currentRow += 1; } return pEntryInfo->numOfRes; } +int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + return topBotFinalizeImpl(pCtx, pBlock, false); +} + +int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + return topBotFinalizeImpl(pCtx, pBlock, true); +} + int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); STopBotRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));