未验证 提交 50ad7def 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21835 from taosdata/feat/TD-22336

stream session end window function
...@@ -103,9 +103,8 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo ...@@ -103,9 +103,8 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, bool includeEndpoint) { static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) {
int64_t* ts = (int64_t*)pColData->pData; int64_t* ts = (int64_t*)pColData->pData;
int32_t delta = includeEndpoint ? 1 : 0;
int64_t duration = pWin->ekey - pWin->skey + delta; int64_t duration = pWin->ekey - pWin->skey + delta;
ts[2] = duration; // set the duration ts[2] = duration; // set the duration
...@@ -642,7 +641,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num ...@@ -642,7 +641,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
pBlock->info.rows, numOfExprs); pBlock->info.rows, numOfExprs);
...@@ -917,7 +916,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -917,7 +916,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup); doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pBlock->info.rows, numOfOutput); pBlock->info.rows, numOfOutput);
...@@ -952,7 +951,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -952,7 +951,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
addToOpenWindowList(pResultRowInfo, pResult, tableGroupId); addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
} }
#endif #endif
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pBlock->info.rows, numOfOutput); pBlock->info.rows, numOfOutput);
doCloseWindow(pResultRowInfo, pInfo, pResult); doCloseWindow(pResultRowInfo, pInfo, pResult);
...@@ -1119,7 +1118,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -1119,7 +1118,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput); pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
...@@ -1144,7 +1143,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI ...@@ -1144,7 +1143,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput); pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
} }
...@@ -1751,7 +1750,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator ...@@ -1751,7 +1750,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
} }
// pInfo->numOfRows data belong to the current session window // pInfo->numOfRows data belong to the current session window
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput); pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
...@@ -1769,7 +1768,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator ...@@ -1769,7 +1768,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput); pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
} }
...@@ -2421,7 +2420,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p ...@@ -2421,7 +2420,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pResPos, POINTER_BYTES); tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pResPos, POINTER_BYTES);
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pSDataBlock->info.rows, numOfOutput); pSDataBlock->info.rows, numOfOutput);
key.ts = nextWin.skey; key.ts = nextWin.skey;
...@@ -3093,14 +3092,14 @@ static int32_t initSessionOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pR ...@@ -3093,14 +3092,14 @@ static int32_t initSessionOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pR
static int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult, static int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult,
int32_t startIndex, int32_t winRows, int32_t rows, int32_t numOutput, int32_t startIndex, int32_t winRows, int32_t rows, int32_t numOutput,
SOperatorInfo* pOperator) { SOperatorInfo* pOperator, int64_t winDelta) {
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t code = initSessionOutputBuf(pCurWin, pResult, pSup->pCtx, numOutput, pSup->rowEntryInfoOffset); int32_t code = initSessionOutputBuf(pCurWin, pResult, pSup->pCtx, numOutput, pSup->rowEntryInfoOffset);
if (code != TSDB_CODE_SUCCESS || (*pResult) == NULL) { if (code != TSDB_CODE_SUCCESS || (*pResult) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
updateTimeWindowInfo(pTimeWindowData, &pCurWin->sessionWin.win, false); updateTimeWindowInfo(pTimeWindowData, &pCurWin->sessionWin.win, winDelta);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, pTimeWindowData, startIndex, winRows, rows, numOutput); applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, pTimeWindowData, startIndex, winRows, rows, numOutput);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -3160,7 +3159,11 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC ...@@ -3160,7 +3159,11 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC
SResultRow* pWinResult = NULL; SResultRow* pWinResult = NULL;
initSessionOutputBuf(&winInfo, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset); initSessionOutputBuf(&winInfo, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey); pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, true); int64_t winDelta = 0;
if (IS_FINAL_OP(pInfo)) {
winDelta = pAggSup->gap;
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, winDelta);
compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
tSimpleHashRemove(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey)); tSimpleHashRemove(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey));
if (winInfo.isOutput && pStDeleted) { if (winInfo.isOutput && pStDeleted) {
...@@ -3217,8 +3220,12 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData ...@@ -3217,8 +3220,12 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
} }
int64_t winDelta = 0;
if (IS_FINAL_OP(pInfo)) {
winDelta = pAggSup->gap;
}
code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &winInfo, &pResult, i, winRows, rows, numOfOutput, code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &winInfo, &pResult, i, winRows, rows, numOfOutput,
pOperator); pOperator, winDelta);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
} }
...@@ -3378,7 +3385,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS ...@@ -3378,7 +3385,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
} }
} }
num++; num++;
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin.sessionWin.win, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin.sessionWin.win, pAggSup->gap);
initSessionOutputBuf(&childWin, &pChResult, pChild->exprSupp.pCtx, numOfOutput, initSessionOutputBuf(&childWin, &pChResult, pChild->exprSupp.pCtx, numOfOutput,
pChild->exprSupp.rowEntryInfoOffset); pChild->exprSupp.rowEntryInfoOffset);
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
...@@ -4010,7 +4017,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl ...@@ -4010,7 +4017,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
continue; continue;
} }
code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput, code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput,
pOperator); pOperator, 0);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
} }
...@@ -4283,7 +4290,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ...@@ -4283,7 +4290,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
continue; continue;
} }
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
currPos - startPos, pBlock->info.rows, pSup->numOfExprs); currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
...@@ -4303,7 +4310,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR ...@@ -4303,7 +4310,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
miaInfo->curTs = currWin.skey; miaInfo->curTs = currWin.skey;
} }
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
pBlock->info.rows, pSup->numOfExprs); pBlock->info.rows, pSup->numOfExprs);
} }
...@@ -4624,7 +4631,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* ...@@ -4624,7 +4631,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
// window start(end) key interpolation // window start(end) key interpolation
doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup); doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
pBlock->info.rows, numOfOutput); pBlock->info.rows, numOfOutput);
doCloseWindow(pResultRowInfo, iaInfo, pResult); doCloseWindow(pResultRowInfo, iaInfo, pResult);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册