diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 4a593854d5e65a0fcead6198f2a4d60f33c2e49f..4c81f82269054c46fde8fa3d860683cd738be572 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -68,6 +68,7 @@ typedef struct STopBotResItem { typedef struct STopBotRes { int32_t maxSize; int16_t type; //store the original input type, used in merge function + int32_t numOfItems; STopBotResItem* pItems; } STopBotRes; @@ -2718,12 +2719,13 @@ int32_t topFunction(SqlFunctionCtx* pCtx) { } static void topTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput) { - for (int32_t i = 0; i < pInput->maxSize; i++) { + for (int32_t i = 0; i < pInput->numOfItems; i++) { addResult(pCtx, &pInput->pItems[i], pInput->type, true); } } int32_t topFunctionMerge(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pCol = pInput->pData[0]; ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); @@ -2736,7 +2738,7 @@ int32_t topFunctionMerge(SqlFunctionCtx* pCtx) { pInfo->maxSize = pInputInfo->maxSize; pInfo->type = pInputInfo->type; topTransferInfo(pCtx, pInputInfo); - SET_VAL(GET_RES_INFO(pCtx), pInputInfo->maxSize, pInputInfo->maxSize); + SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes); return TSDB_CODE_SUCCESS; } @@ -2812,6 +2814,8 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData // allocate the buffer and keep the data of this row into the new allocated buffer pEntryInfo->numOfRes++; + // accumulate number of items for each vgroup, this info is needed for merge + pRes->numOfItems++; taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn, !isTopQuery); } else { // replace the minimum value in the result @@ -2983,6 +2987,8 @@ void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, pItem->tuplePos.pageId = -1; replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos); pEntryInfo->numOfRes++; + // accumulate number of items for each vgroup, this info is needed for merge + pRes->numOfItems++; taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn, !isTopQuery); } else { // replace the minimum value in the result