From 1d22e6ba52a920ae7a323b551f63a514375ee3e2 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 26 Jun 2023 10:40:25 +0800 Subject: [PATCH] stream session end window function --- source/libs/executor/src/timewindowoperator.c | 45 +++++++++++-------- 1 file changed, 26 insertions(+), 19 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index f29ea5057e..e0abcd96c5 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -103,9 +103,8 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo return TSDB_CODE_SUCCESS; } -static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, bool includeEndpoint) { +static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) { int64_t* ts = (int64_t*)pColData->pData; - int32_t delta = includeEndpoint ? 1 : 0; int64_t duration = pWin->ekey - pWin->skey + delta; ts[2] = duration; // set the duration @@ -642,7 +641,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, true); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1); applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, numOfExprs); @@ -917,7 +916,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup); } - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1); applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, numOfOutput); @@ -952,7 +951,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul addToOpenWindowList(pResultRowInfo, pResult, tableGroupId); } #endif - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1); applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, numOfOutput); doCloseWindow(pResultRowInfo, pInfo, pResult); @@ -1119,7 +1118,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); } - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0); applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput); @@ -1144,7 +1143,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); } - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0); applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput); } @@ -1751,7 +1750,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator } // pInfo->numOfRows data belong to the current session window - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0); applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput); @@ -1769,7 +1768,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); } - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0); applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput); } @@ -2421,7 +2420,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pResPos, POINTER_BYTES); } - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1); applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pSDataBlock->info.rows, numOfOutput); key.ts = nextWin.skey; @@ -3093,14 +3092,14 @@ static int32_t initSessionOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pR static int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult, int32_t startIndex, int32_t winRows, int32_t rows, int32_t numOutput, - SOperatorInfo* pOperator) { + SOperatorInfo* pOperator, int64_t winDelta) { SExprSupp* pSup = &pOperator->exprSupp; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; int32_t code = initSessionOutputBuf(pCurWin, pResult, pSup->pCtx, numOutput, pSup->rowEntryInfoOffset); if (code != TSDB_CODE_SUCCESS || (*pResult) == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - updateTimeWindowInfo(pTimeWindowData, &pCurWin->sessionWin.win, false); + updateTimeWindowInfo(pTimeWindowData, &pCurWin->sessionWin.win, winDelta); applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, pTimeWindowData, startIndex, winRows, rows, numOutput); return TSDB_CODE_SUCCESS; } @@ -3160,7 +3159,11 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC SResultRow* pWinResult = NULL; initSessionOutputBuf(&winInfo, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset); pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey); - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, true); + int64_t winDelta = 0; + if (IS_FINAL_OP(pInfo)) { + winDelta = pAggSup->gap; + } + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, winDelta); compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); tSimpleHashRemove(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey)); if (winInfo.isOutput && pStDeleted) { @@ -3217,8 +3220,12 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } + int64_t winDelta = 0; + if (IS_FINAL_OP(pInfo)) { + winDelta = pAggSup->gap; + } code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &winInfo, &pResult, i, winRows, rows, numOfOutput, - pOperator); + pOperator, winDelta); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } @@ -3378,7 +3385,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS } } num++; - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin.sessionWin.win, true); + updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin.sessionWin.win, pAggSup->gap); initSessionOutputBuf(&childWin, &pChResult, pChild->exprSupp.pCtx, numOfOutput, pChild->exprSupp.rowEntryInfoOffset); compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); @@ -4010,7 +4017,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl continue; } code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput, - pOperator); + pOperator, 0); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } @@ -4283,7 +4290,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR continue; } - updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); + updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1); applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, pBlock->info.rows, pSup->numOfExprs); @@ -4303,7 +4310,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR miaInfo->curTs = currWin.skey; } - updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); + updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1); applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, pBlock->info.rows, pSup->numOfExprs); } @@ -4624,7 +4631,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* // window start(end) key interpolation doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup); - updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true); + updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1); applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, numOfOutput); doCloseWindow(pResultRowInfo, iaInfo, pResult); -- GitLab