diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index c4f6f0cc455cc60bcca8ce5dc414be84e8ee96ae..65d2457e5a95816903b51ccf032ceba8f7f16e4b 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -114,7 +114,9 @@ int32_t getSpreadInfoSize(); bool getElapsedFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool elapsedFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t elapsedFunction(SqlFunctionCtx* pCtx); +int32_t elapsedFunctionMerge(SqlFunctionCtx* pCtx); int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t elapsedPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t getElapsedInfoSize(); bool getHistogramFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 04cfa9d360fa5857d609502c0c28a3ff4af81b76..299e45b4b25770dba24a09d75a28e0d896c82404 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1488,7 +1488,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getElapsedFuncEnv, .initFunc = elapsedFunctionSetup, .processFunc = elapsedFunction, - .finalizeFunc = elapsedFinalize + .finalizeFunc = elapsedPartialFinalize }, { .name = "_elapsed_merge", @@ -1498,7 +1498,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .translateFunc = translateElapsedMerge, .getEnvFunc = getElapsedFuncEnv, .initFunc = elapsedFunctionSetup, - .processFunc = elapsedFunction, + .processFunc = elapsedFunctionMerge, .finalizeFunc = elapsedFinalize }, { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index d202c8f5651c5e7710b9d11f0f8dca79fb3b76eb..bd8b6d05828f5c19f33b02c4cb336fd58681ba40 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3132,6 +3132,30 @@ _elapsed_over: return TSDB_CODE_SUCCESS; } +int32_t elapsedFunctionMerge(SqlFunctionCtx *pCtx) { + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); + + SElapsedInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + int32_t start = pInput->startRowIndex; + char* data = colDataGetData(pCol, start); + SElapsedInfo* pInputInfo = (SElapsedInfo *)varDataVal(data); + + pInfo->timeUnit = pInputInfo->timeUnit; + if (pInfo->min > pInputInfo->min) { + pInfo->min = pInputInfo->min; + } + + if (pInfo->max < pInputInfo->max) { + pInfo->max = pInputInfo->max; + } + + SET_VAL(GET_RES_INFO(pCtx), 1, 1); + return TSDB_CODE_SUCCESS; +} + int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SElapsedInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); double result = (double)pInfo->max - (double)pInfo->min; @@ -3140,6 +3164,24 @@ int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } +int32_t elapsedPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SElapsedInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getElapsedInfoSize(); + char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + + memcpy(varDataVal(res), pInfo, resultBytes); + varDataSetLen(res, resultBytes); + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + colDataAppend(pCol, pBlock->info.rows, res, false); + + taosMemoryFree(res); + return pResInfo->numOfRes; +} + int32_t getHistogramInfoSize() { return (int32_t)sizeof(SHistoFuncInfo) + HISTOGRAM_MAX_BINS_NUM * sizeof(SHistoFuncBin); }