diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index fbc60041b242ed039e873bd5877e3c5c8a4f3828..4760358e0d350cf06aeb23bef86fd710733e6b5b 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -32,9 +32,6 @@ #define TAIL_MAX_POINTS_NUM 100 #define TAIL_MAX_OFFSET 100 -#define UNIQUE_MAX_RESULT_SIZE (1024 * 1024 * 10) -#define MODE_MAX_RESULT_SIZE UNIQUE_MAX_RESULT_SIZE - #define HLL_BUCKET_BITS 14 // The bits of the bucket #define HLL_DATA_BITS (64 - HLL_BUCKET_BITS) #define HLL_BUCKETS (1 << HLL_BUCKET_BITS) @@ -244,12 +241,11 @@ typedef struct SUniqueInfo { typedef struct SModeItem { int64_t count; + STuplePos dataPos; STuplePos tuplePos; - char data[]; } SModeItem; typedef struct SModeInfo { - int32_t numOfPoints; uint8_t colType; int16_t colBytes; SHashObj* pHash; @@ -257,7 +253,7 @@ typedef struct SModeInfo { STuplePos nullTuplePos; bool nullTupleSaved; - char pItems[]; + char* buf; // serialize data buffer } SModeInfo; typedef struct SDerivInfo { @@ -3113,7 +3109,7 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid return buf; } -static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STupleKey key, +static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STupleKey* key, STuplePos* pPos) { STuplePos p = {0}; if (pHandle->pBuf != NULL) { @@ -3149,8 +3145,8 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, releaseBufPage(pHandle->pBuf, pPage); } else { // other tuple save policy - if (streamStateFuncPut(pHandle->pState, &key, pBuf, length) >= 0) { - p.streamTupleKey = key; + if (streamStateFuncPut(pHandle->pState, key, pBuf, length) >= 0) { + p.streamTupleKey = *key; } } @@ -3174,7 +3170,7 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* } char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf); - return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, key, pPos); + return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, &key, pPos); } static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STuplePos* pPos) { @@ -4949,7 +4945,7 @@ int32_t uniqueFunction(SqlFunctionCtx* pCtx) { } bool getModeFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { - pEnv->calcMemSize = sizeof(SModeInfo) + MODE_MAX_RESULT_SIZE; + pEnv->calcMemSize = sizeof(SModeInfo); return true; } @@ -4959,7 +4955,6 @@ bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { } SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); - pInfo->numOfPoints = 0; pInfo->colType = pCtx->resDataInfo.type; pInfo->colBytes = pCtx->resDataInfo.bytes; if (pInfo->pHash != NULL) { @@ -4970,38 +4965,60 @@ bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { pInfo->nullTupleSaved = false; pInfo->nullTuplePos.pageId = -1; + pInfo->buf = taosMemoryMalloc(pInfo->colBytes); + return true; } +static void modeFunctionCleanup(SModeInfo * pInfo) { + taosHashCleanup(pInfo->pHash); + taosMemoryFreeClear(pInfo->buf); +} + +static int32_t saveModeTupleData(SqlFunctionCtx* pCtx, char* data, SModeInfo *pInfo, STuplePos* pPos) { + if (IS_VAR_DATA_TYPE(pInfo->colType)) { + memcpy(pInfo->buf, data, varDataTLen(data)); + } else { + memcpy(pInfo->buf, data, pInfo->colBytes); + } + + return doSaveTupleData(&pCtx->saveHandle, pInfo->buf, pInfo->colBytes, NULL, pPos); +} + static int32_t doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx, char* data) { - int32_t hashKeyBytes = IS_STR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes; - SModeItem** pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes); + int32_t code = TSDB_CODE_SUCCESS; + int32_t hashKeyBytes = IS_STR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes; + + SModeItem* pHashItem = (SModeItem *)taosHashGet(pInfo->pHash, data, hashKeyBytes); if (pHashItem == NULL) { - int32_t size = sizeof(SModeItem) + pInfo->colBytes; - SModeItem* pItem = (SModeItem*)(pInfo->pItems + pInfo->numOfPoints * size); - memcpy(pItem->data, data, hashKeyBytes); - pItem->count += 1; + int32_t size = sizeof(SModeItem); + SModeItem item = {0}; + + item.count += 1; + code = saveModeTupleData(pCtx, data, pInfo, &item.dataPos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } if (pCtx->subsidiaries.num > 0) { - int32_t code = saveTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &pItem->tuplePos); + code = saveTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &item.tuplePos); if (code != TSDB_CODE_SUCCESS) { return code; } } - taosHashPut(pInfo->pHash, data, hashKeyBytes, &pItem, sizeof(SModeItem*)); - pInfo->numOfPoints++; + taosHashPut(pInfo->pHash, data, hashKeyBytes, &item, sizeof(SModeItem)); } else { - (*pHashItem)->count += 1; + pHashItem->count += 1; if (pCtx->subsidiaries.num > 0) { - int32_t code = updateTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &((*pHashItem)->tuplePos)); + int32_t code = updateTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &pHashItem->tuplePos); if (code != TSDB_CODE_SUCCESS) { return code; } } } - return TSDB_CODE_SUCCESS; + return code; } int32_t modeFunction(SqlFunctionCtx* pCtx) { @@ -5024,18 +5041,15 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); int32_t code = doModeAdd(pInfo, i, pCtx, data); if (code != TSDB_CODE_SUCCESS) { + modeFunctionCleanup(pInfo); return code; } - - if (sizeof(SModeInfo) + pInfo->numOfPoints * (sizeof(SModeItem) + pInfo->colBytes) >= MODE_MAX_RESULT_SIZE) { - taosHashCleanup(pInfo->pHash); - return TSDB_CODE_OUT_OF_MEMORY; - } } if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) { int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pInfo->nullTuplePos); if (code != TSDB_CODE_SUCCESS) { + modeFunctionCleanup(pInfo); return code; } pInfo->nullTupleSaved = true; @@ -5054,26 +5068,37 @@ int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); int32_t currentRow = pBlock->info.rows; - int32_t resIndex = -1; + STuplePos resDataPos, resTuplePos; int32_t maxCount = 0; - for (int32_t i = 0; i < pInfo->numOfPoints; ++i) { - SModeItem* pItem = (SModeItem*)(pInfo->pItems + i * (sizeof(SModeItem) + pInfo->colBytes)); + + void *pIter = taosHashIterate(pInfo->pHash, NULL); + while (pIter != NULL) { + SModeItem *pItem = (SModeItem *)pIter; if (pItem->count >= maxCount) { maxCount = pItem->count; - resIndex = i; + resDataPos = pItem->dataPos; + resTuplePos = pItem->tuplePos; } + + pIter = taosHashIterate(pInfo->pHash, pIter); } if (maxCount != 0) { - SModeItem* pResItem = (SModeItem*)(pInfo->pItems + resIndex * (sizeof(SModeItem) + pInfo->colBytes)); - colDataSetVal(pCol, currentRow, pResItem->data, false); - code = setSelectivityValue(pCtx, pBlock, &pResItem->tuplePos, currentRow); + const char* pData = loadTupleData(pCtx, &resDataPos); + if (pData == NULL) { + code = TSDB_CODE_NO_AVAIL_DISK; + modeFunctionCleanup(pInfo); + return code; + } + + colDataSetVal(pCol, currentRow, pData, false); + code = setSelectivityValue(pCtx, pBlock, &resTuplePos, currentRow); } else { colDataSetNULL(pCol, currentRow); code = setSelectivityValue(pCtx, pBlock, &pInfo->nullTuplePos, currentRow); } - taosHashCleanup(pInfo->pHash); + modeFunctionCleanup(pInfo); return code; } diff --git a/tests/system-test/2-query/mode.py b/tests/system-test/2-query/mode.py index 807a1ac394aee9477fdf88120f554685a465f219..8321ef167585ac31d7d8086bfa24ccb34926ecb8 100644 --- a/tests/system-test/2-query/mode.py +++ b/tests/system-test/2-query/mode.py @@ -53,6 +53,7 @@ class TDTestCase: ( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a ) ( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" ) ( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" ) + ( '2022-12-31 01:01:32.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" ) ( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" ) ( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) '''