From a8da3cfd143caabd0d1b56a83a732c92aa846167 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 8 Jun 2022 18:15:48 +0800 Subject: [PATCH] feat(stream): stream state support apercentile --- source/libs/executor/src/timewindowoperator.c | 26 +++++++++++-------- source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/src/builtins.c | 5 +++- source/libs/function/src/builtinsimpl.c | 20 ++++++++++++++ 4 files changed, 40 insertions(+), 12 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index d5e7d5c515..b309478556 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1227,7 +1227,10 @@ void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData, int SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId); SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); - ASSERT(p1); + if (!p1) { + // window has been closed + return; + } doClearWindowImpl(p1, pSup->pResultBuf, pBinfo, numOfOutput); } @@ -2202,12 +2205,12 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) { } } -int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, - SDiskbasedBuf* pResultBuf) { +int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo, + int32_t numOfCols, SSDataBlock* pResultBlock) { pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset); pBasicInfo->pRes = pResultBlock; for (int32_t i = 0; i < numOfCols; ++i) { - pBasicInfo->pCtx[i].pBuf = pResultBuf; + pBasicInfo->pCtx[i].pBuf = NULL; } return TSDB_CODE_SUCCESS; } @@ -2237,16 +2240,15 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SEx initResultSizeInfo(pOperator, 4096); - code = initSessionAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo"); + code = initBiasicInfo(&pInfo->binfo, pExprInfo, numOfCols, pResBlock); if (code != TSDB_CODE_SUCCESS) { goto _error; } - - code = initBiasicInfo(&pInfo->binfo, pExprInfo, numOfCols, pResBlock, pInfo->streamAggSup.pResultBuf); + pInfo->streamAggSup.resultRowSize = getResultRowSize(pInfo->binfo.pCtx, numOfCols); + code = initSessionAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo"); if (code != TSDB_CODE_SUCCESS) { goto _error; } - pInfo->streamAggSup.resultRowSize = getResultRowSize(pInfo->binfo.pCtx, numOfCols); pInfo->pDummyCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfCols, sizeof(SqlFunctionCtx)); if (pInfo->pDummyCtx == NULL) { @@ -3101,6 +3103,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr; + int32_t code = TSDB_CODE_OUT_OF_MEMORY; SStreamStateAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamStateAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -3121,17 +3124,18 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys .winMap = NULL, }; initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - int32_t code = initStateAggSupporter(&pInfo->streamAggSup, "StreamStateAggOperatorInfo"); + + code = initBiasicInfo(&pInfo->binfo, pExprInfo, numOfCols, pResBlock); if (code != TSDB_CODE_SUCCESS) { goto _error; } - code = initBiasicInfo(&pInfo->binfo, pExprInfo, numOfCols, pResBlock, pInfo->streamAggSup.pResultBuf); + pInfo->streamAggSup.resultRowSize = getResultRowSize(pInfo->binfo.pCtx, numOfCols); + code = initStateAggSupporter(&pInfo->streamAggSup, "StreamStateAggOperatorInfo"); if (code != TSDB_CODE_SUCCESS) { goto _error; } - pInfo->streamAggSup.resultRowSize = getResultRowSize(pInfo->binfo.pCtx, numOfCols); pInfo->pDummyCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfCols, sizeof(SqlFunctionCtx)); if (pInfo->pDummyCtx == NULL) { goto _error; diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 981b1eec88..56379f0e1f 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -85,6 +85,7 @@ int32_t apercentileFunction(SqlFunctionCtx *pCtx); int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx); int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 946c37bfd2..a7d6fd7371 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -1241,7 +1241,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getPercentileFuncEnv, .initFunc = percentileFunctionSetup, .processFunc = percentileFunction, - .finalizeFunc = percentileFinalize + .finalizeFunc = percentileFinalize, + .invertFunc = NULL, + .combineFunc = NULL, }, { .name = "apercentile", @@ -1252,6 +1254,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = apercentileFunctionSetup, .processFunc = apercentileFunction, .finalizeFunc = apercentileFinalize, + .combineFunc = apercentileCombine, .pPartialFunc = "_apercentile_partial", .pMergeFunc = "_apercentile_merge" }, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 34adf11032..0852779791 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2217,6 +2217,26 @@ int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pResInfo->numOfRes; } +int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { + SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); + SAPercentileInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); + int32_t type = pDestCtx->input.pData[0]->info.type; + + SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); + SAPercentileInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); + ASSERT(pDBuf->algo == pSBuf->algo); + if (pDBuf->algo == APERCT_ALGO_TDIGEST) { + tdigestMerge(pDBuf->pTDigest, pSBuf->pTDigest); + } else { + SHistogramInfo* pTmp = tHistogramMerge(pDBuf->pHisto, pSBuf->pHisto, MAX_HISTOGRAM_BIN); + memcpy(pDBuf->pHisto, pTmp, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1)); + pDBuf->pHisto->elems = (SHistBin*) ((char *)pDBuf->pHisto + sizeof(SHistogramInfo)); + tHistogramDestroy(&pTmp); + } + pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); + return TSDB_CODE_SUCCESS; +} + bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t); -- GitLab