diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index cca73617b203c373e84b3e30c9d66067558639d2..0da0081cb008b85dda0da1ee6a4510458c639ab2 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -100,8 +100,10 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); int32_t topFunction(SqlFunctionCtx *pCtx); +int32_t topFunctionMerge(SqlFunctionCtx *pCtx); int32_t bottomFunction(SqlFunctionCtx *pCtx); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getTopBotInfoSize(); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 4ea6b8cdfcd4042b852b9d886ca5fbc7df90affa..4631c15c21d2d3493bf9d9edaf6fc223d9e8c567 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1549,7 +1549,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getTopBotFuncEnv, .initFunc = functionSetup, .processFunc = topFunction, - .finalizeFunc = topBotFinalize, + .finalizeFunc = topBotPartialFinalize, .combineFunc = topCombine, }, { @@ -1559,7 +1559,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateTopBotMerge, .getEnvFunc = getTopBotFuncEnv, .initFunc = functionSetup, - .processFunc = topFunction, + .processFunc = topFunctionMerge, .finalizeFunc = topBotFinalize, .combineFunc = topCombine, }, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 98b9c82cb2b6c2b2c565d1551e09586936623bdc..76a9b79605910a1852de5a7bf49b4b9743187bb5 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2727,7 +2727,7 @@ int32_t topFunctionMerge(SqlFunctionCtx* pCtx) { STopBotRes* pInputInfo = (STopBotRes *)varDataVal(data); topTransferInfo(pCtx, pInputInfo); - SET_VAL(GET_RES_INFO(pCtx), 1, 1); + SET_VAL(GET_RES_INFO(pCtx), pInputInfo->maxSize, pInputInfo->maxSize); return TSDB_CODE_SUCCESS; } @@ -2930,6 +2930,24 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pEntryInfo->numOfRes; } +int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); + STopBotRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getTopBotInfoSize(); + char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + + memcpy(varDataVal(res), pRes, resultBytes); + varDataSetLen(res, resultBytes); + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + colDataAppend(pCol, pBlock->info.rows, res, false); + + taosMemoryFree(res); + return 1; +} + void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); @@ -2990,15 +3008,15 @@ int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { return TSDB_CODE_SUCCESS; } +int32_t getSpreadInfoSize() { + return (int32_t)sizeof(SSpreadInfo); +} + bool getSpreadFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SSpreadInfo); return true; } -int32_t getSpreadInfoSize() { - return (int32_t)sizeof(SSpreadInfo); -} - bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { if (!functionSetup(pCtx, pResultInfo)) { return false; @@ -3125,7 +3143,7 @@ int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SSpreadInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t resultBytes = (int32_t)sizeof(SSpreadInfo); + int32_t resultBytes = getSpreadInfoSize(); char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); memcpy(varDataVal(res), pInfo, resultBytes);