提交 a8da3cfd 编写于 作者: 5 54liuyao

feat(stream): stream state support apercentile

上级 4935b146
...@@ -1227,7 +1227,10 @@ void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData, int ...@@ -1227,7 +1227,10 @@ void doClearWindow(SAggSupporter* pSup, SOptrBasicInfo* pBinfo, char* pData, int
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId); SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
SResultRowPosition* p1 = SResultRowPosition* p1 =
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
ASSERT(p1); if (!p1) {
// window has been closed
return;
}
doClearWindowImpl(p1, pSup->pResultBuf, pBinfo, numOfOutput); doClearWindowImpl(p1, pSup->pResultBuf, pBinfo, numOfOutput);
} }
...@@ -2202,12 +2205,12 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -2202,12 +2205,12 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
} }
} }
int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, int32_t initBiasicInfo(SOptrBasicInfo* pBasicInfo, SExprInfo* pExprInfo,
SDiskbasedBuf* pResultBuf) { int32_t numOfCols, SSDataBlock* pResultBlock) {
pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset); pBasicInfo->pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pBasicInfo->rowCellInfoOffset);
pBasicInfo->pRes = pResultBlock; pBasicInfo->pRes = pResultBlock;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
pBasicInfo->pCtx[i].pBuf = pResultBuf; pBasicInfo->pCtx[i].pBuf = NULL;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -2237,16 +2240,15 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SEx ...@@ -2237,16 +2240,15 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SEx
initResultSizeInfo(pOperator, 4096); initResultSizeInfo(pOperator, 4096);
code = initSessionAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo"); code = initBiasicInfo(&pInfo->binfo, pExprInfo, numOfCols, pResBlock);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
pInfo->streamAggSup.resultRowSize = getResultRowSize(pInfo->binfo.pCtx, numOfCols);
code = initBiasicInfo(&pInfo->binfo, pExprInfo, numOfCols, pResBlock, pInfo->streamAggSup.pResultBuf); code = initSessionAggSupporter(&pInfo->streamAggSup, "StreamSessionAggOperatorInfo");
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
pInfo->streamAggSup.resultRowSize = getResultRowSize(pInfo->binfo.pCtx, numOfCols);
pInfo->pDummyCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfCols, sizeof(SqlFunctionCtx)); pInfo->pDummyCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfCols, sizeof(SqlFunctionCtx));
if (pInfo->pDummyCtx == NULL) { if (pInfo->pDummyCtx == NULL) {
...@@ -3101,6 +3103,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -3101,6 +3103,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId;
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr; SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
SStreamStateAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamStateAggOperatorInfo)); SStreamStateAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamStateAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
...@@ -3121,17 +3124,18 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys ...@@ -3121,17 +3124,18 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
.winMap = NULL, .winMap = NULL,
}; };
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
int32_t code = initStateAggSupporter(&pInfo->streamAggSup, "StreamStateAggOperatorInfo");
code = initBiasicInfo(&pInfo->binfo, pExprInfo, numOfCols, pResBlock);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
code = initBiasicInfo(&pInfo->binfo, pExprInfo, numOfCols, pResBlock, pInfo->streamAggSup.pResultBuf); pInfo->streamAggSup.resultRowSize = getResultRowSize(pInfo->binfo.pCtx, numOfCols);
code = initStateAggSupporter(&pInfo->streamAggSup, "StreamStateAggOperatorInfo");
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
pInfo->streamAggSup.resultRowSize = getResultRowSize(pInfo->binfo.pCtx, numOfCols);
pInfo->pDummyCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfCols, sizeof(SqlFunctionCtx)); pInfo->pDummyCtx = (SqlFunctionCtx*)taosMemoryCalloc(numOfCols, sizeof(SqlFunctionCtx));
if (pInfo->pDummyCtx == NULL) { if (pInfo->pDummyCtx == NULL) {
goto _error; goto _error;
......
...@@ -85,6 +85,7 @@ int32_t apercentileFunction(SqlFunctionCtx *pCtx); ...@@ -85,6 +85,7 @@ int32_t apercentileFunction(SqlFunctionCtx *pCtx);
int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx); int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx);
int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo); bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
......
...@@ -1241,7 +1241,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1241,7 +1241,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getPercentileFuncEnv, .getEnvFunc = getPercentileFuncEnv,
.initFunc = percentileFunctionSetup, .initFunc = percentileFunctionSetup,
.processFunc = percentileFunction, .processFunc = percentileFunction,
.finalizeFunc = percentileFinalize .finalizeFunc = percentileFinalize,
.invertFunc = NULL,
.combineFunc = NULL,
}, },
{ {
.name = "apercentile", .name = "apercentile",
...@@ -1252,6 +1254,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -1252,6 +1254,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = apercentileFunctionSetup, .initFunc = apercentileFunctionSetup,
.processFunc = apercentileFunction, .processFunc = apercentileFunction,
.finalizeFunc = apercentileFinalize, .finalizeFunc = apercentileFinalize,
.combineFunc = apercentileCombine,
.pPartialFunc = "_apercentile_partial", .pPartialFunc = "_apercentile_partial",
.pMergeFunc = "_apercentile_merge" .pMergeFunc = "_apercentile_merge"
}, },
......
...@@ -2217,6 +2217,26 @@ int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -2217,6 +2217,26 @@ int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes; return pResInfo->numOfRes;
} }
int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
SAPercentileInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
int32_t type = pDestCtx->input.pData[0]->info.type;
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SAPercentileInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
ASSERT(pDBuf->algo == pSBuf->algo);
if (pDBuf->algo == APERCT_ALGO_TDIGEST) {
tdigestMerge(pDBuf->pTDigest, pSBuf->pTDigest);
} else {
SHistogramInfo* pTmp = tHistogramMerge(pDBuf->pHisto, pSBuf->pHisto, MAX_HISTOGRAM_BIN);
memcpy(pDBuf->pHisto, pTmp, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
pDBuf->pHisto->elems = (SHistBin*) ((char *)pDBuf->pHisto + sizeof(SHistogramInfo));
tHistogramDestroy(&pTmp);
}
pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes);
return TSDB_CODE_SUCCESS;
}
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0); SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t); pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册