From 3d34f7850ee8ca88c15eb7a68ff98bda847529b5 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 20 May 2022 13:59:20 +0800 Subject: [PATCH] fix(query): change unique function type to non-standard sql function --- source/libs/function/inc/builtinsimpl.h | 13 +- source/libs/function/src/builtins.c | 20 +-- source/libs/function/src/builtinsimpl.c | 182 ++++++++++++------------ 3 files changed, 106 insertions(+), 109 deletions(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index b75b52f5b3..3e2ccbc6b8 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -76,11 +76,6 @@ int32_t firstFunction(SqlFunctionCtx *pCtx); int32_t lastFunction(SqlFunctionCtx *pCtx); int32_t lastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); -bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); -int32_t uniqueFunction(SqlFunctionCtx *pCtx); -int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); - bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); int32_t topFunction(SqlFunctionCtx *pCtx); int32_t bottomFunction(SqlFunctionCtx *pCtx); @@ -125,7 +120,13 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx); bool getTailFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t tailFunction(SqlFunctionCtx* pCtx); -int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +//int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); + +bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +int32_t uniqueFunction(SqlFunctionCtx *pCtx); +//int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); + bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 85c69d028b..ab56e0546b 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -892,16 +892,6 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = lastFunction, .finalizeFunc = lastFinalize }, - { - .name = "unique", - .type = FUNCTION_TYPE_UNIQUE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC, - .translateFunc = translateUnique, - .getEnvFunc = getUniqueFuncEnv, - .initFunc = uniqueFunctionSetup, - .processFunc = uniqueFunction, - .finalizeFunc = uniqueFinalize - }, { .name = "histogram", .type = FUNCTION_TYPE_HISTOGRAM, @@ -992,6 +982,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = tailFunction, .finalizeFunc = tailFinalize }, + { + .name = "unique", + .type = FUNCTION_TYPE_UNIQUE, + .classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC, + .translateFunc = translateUnique, + .getEnvFunc = getUniqueFuncEnv, + .initFunc = uniqueFunctionSetup, + .processFunc = uniqueFunction, + .finalizeFunc = uniqueFinalize + }, { .name = "abs", .type = FUNCTION_TYPE_ABS, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 79d4e4f225..020df6cc3d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1979,99 +1979,6 @@ int32_t lastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pResInfo->numOfRes; } -bool getUniqueFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { - pEnv->calcMemSize = sizeof(SUniqueInfo) + UNIQUE_MAX_RESULT_SIZE; - return true; -} - -bool uniqueFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { - if (!functionSetup(pCtx, pResInfo)) { - return false; - } - - SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); - pInfo->numOfPoints = 0; - pInfo->colType = pCtx->resDataInfo.type; - pInfo->colBytes = pCtx->resDataInfo.bytes; - if (pInfo->pHash != NULL) { - taosHashClear(pInfo->pHash); - } else { - pInfo->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - } - return true; -} - -static void doUniqueAdd(SUniqueInfo* pInfo, char *data, TSKEY ts, bool isNull) { - int32_t hashKeyBytes = IS_VAR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes; - - SUniqueItem *pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes); - if (pHashItem == NULL) { - int32_t size = sizeof(SUniqueItem) + pInfo->colBytes; - SUniqueItem *pItem = (SUniqueItem *)(pInfo->pItems + pInfo->numOfPoints * size); - pItem->timestamp = ts; - memcpy(pItem->data, data, pInfo->colBytes); - - taosHashPut(pInfo->pHash, data, hashKeyBytes, (char *)pItem, sizeof(SUniqueItem*)); - pInfo->numOfPoints++; - } else if (pHashItem->timestamp > ts) { - pHashItem->timestamp = ts; - } - -} - -int32_t uniqueFunction(SqlFunctionCtx* pCtx) { - SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); - - SInputColumnInfoData* pInput = &pCtx->input; - TSKEY* tsList = (int64_t*)pInput->pPTS->pData; - - SColumnInfoData* pInputCol = pInput->pData[0]; - SColumnInfoData* pTsOutput = pCtx->pTsOutput; - SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; - - int32_t startOffset = pCtx->offset; - for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { - char* data = colDataGetData(pInputCol, i); - doUniqueAdd(pInfo, data, tsList[i], colDataIsNull_s(pInputCol, i)); - - if (sizeof(SUniqueInfo) + pInfo->numOfPoints * (sizeof(SUniqueItem) + pInfo->colBytes) >= UNIQUE_MAX_RESULT_SIZE) { - taosHashCleanup(pInfo->pHash); - return 0; - } - } - - //taosqsort(pInfo->pItems, pInfo->numOfPoints, POINTER_BYTES, NULL, tailCompFn); - - //for (int32_t i = 0; i < pInfo->numOfPoints; ++i) { - // int32_t pos = startOffset + i; - // STailItem *pItem = pInfo->pItems[i]; - // if (pItem->isNull) { - // colDataAppendNULL(pOutput, pos); - // } else { - // colDataAppend(pOutput, pos, pItem->data, false); - // } - //} - - pResInfo->numOfRes = pInfo->numOfPoints; - return TSDB_CODE_SUCCESS; -} - -int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { - SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t slotId = pCtx->pExpr->base.resSchema.slotId; - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); - - for (int32_t i = 0; i < pResInfo->numOfRes; ++i) { - SUniqueItem *pItem = (SUniqueItem *)(pInfo->pItems + i * (sizeof(SUniqueItem) + pInfo->colBytes)); - colDataAppend(pCol, i, pItem->data, false); - //TODO: handle ts output - } - - return pResInfo->numOfRes; -} - bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SDiffInfo); return true; @@ -3659,3 +3566,92 @@ int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pEntryInfo->numOfRes; } + +bool getUniqueFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(SUniqueInfo) + UNIQUE_MAX_RESULT_SIZE; + return true; +} + +bool uniqueFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { + if (!functionSetup(pCtx, pResInfo)) { + return false; + } + + SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + pInfo->numOfPoints = 0; + pInfo->colType = pCtx->resDataInfo.type; + pInfo->colBytes = pCtx->resDataInfo.bytes; + if (pInfo->pHash != NULL) { + taosHashClear(pInfo->pHash); + } else { + pInfo->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + } + return true; +} + +static void doUniqueAdd(SUniqueInfo* pInfo, char *data, TSKEY ts, bool isNull) { + int32_t hashKeyBytes = IS_VAR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes; + + SUniqueItem *pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes); + if (pHashItem == NULL) { + int32_t size = sizeof(SUniqueItem) + pInfo->colBytes; + SUniqueItem *pItem = (SUniqueItem *)(pInfo->pItems + pInfo->numOfPoints * size); + pItem->timestamp = ts; + memcpy(pItem->data, data, pInfo->colBytes); + + taosHashPut(pInfo->pHash, data, hashKeyBytes, (char *)pItem, sizeof(SUniqueItem*)); + pInfo->numOfPoints++; + } else if (pHashItem->timestamp > ts) { + pHashItem->timestamp = ts; + } + +} + +int32_t uniqueFunction(SqlFunctionCtx* pCtx) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); + + SInputColumnInfoData* pInput = &pCtx->input; + TSKEY* tsList = (int64_t*)pInput->pPTS->pData; + + SColumnInfoData* pInputCol = pInput->pData[0]; + SColumnInfoData* pTsOutput = pCtx->pTsOutput; + SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput; + + int32_t startOffset = pCtx->offset; + for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { + char* data = colDataGetData(pInputCol, i); + doUniqueAdd(pInfo, data, tsList[i], colDataIsNull_s(pInputCol, i)); + + if (sizeof(SUniqueInfo) + pInfo->numOfPoints * (sizeof(SUniqueItem) + pInfo->colBytes) >= UNIQUE_MAX_RESULT_SIZE) { + taosHashCleanup(pInfo->pHash); + return 0; + } + } + + for (int32_t i = 0; i < pInfo->numOfPoints; ++i) { + SUniqueItem *pItem = (SUniqueItem *)(pInfo->pItems + i * (sizeof(SUniqueItem) + pInfo->colBytes)); + colDataAppend(pOutput, i, pItem->data, false); + if (pTsOutput != NULL) { + colDataAppendInt64(pTsOutput, i, &pItem->timestamp); + } + } + + return pInfo->numOfPoints; +} + +int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SUniqueInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + for (int32_t i = 0; i < pResInfo->numOfRes; ++i) { + SUniqueItem *pItem = (SUniqueItem *)(pInfo->pItems + i * (sizeof(SUniqueItem) + pInfo->colBytes)); + colDataAppend(pCol, i, pItem->data, false); + //TODO: handle ts output + } + + return pResInfo->numOfRes; +} + -- GitLab