提交 1d22e6ba 编写于 作者: L liuyao

stream session end window function

上级 05c7ec61
......@@ -103,9 +103,8 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo
return TSDB_CODE_SUCCESS;
}
static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, bool includeEndpoint) {
static void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t delta) {
int64_t* ts = (int64_t*)pColData->pData;
int32_t delta = includeEndpoint ? 1 : 0;
int64_t duration = pWin->ekey - pWin->skey + delta;
ts[2] = duration; // set the duration
......@@ -642,7 +641,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, true);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0,
pBlock->info.rows, numOfExprs);
......@@ -917,7 +916,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
doWindowBorderInterpolation(pInfo, pBlock, pResult, &win, startPos, forwardRows, pSup);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pBlock->info.rows, numOfOutput);
......@@ -952,7 +951,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
addToOpenWindowList(pResultRowInfo, pResult, tableGroupId);
}
#endif
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pBlock->info.rows, numOfOutput);
doCloseWindow(pResultRowInfo, pInfo, pResult);
......@@ -1119,7 +1118,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
......@@ -1144,7 +1143,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
}
......@@ -1751,7 +1750,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
}
// pInfo->numOfRows data belong to the current session window
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
......@@ -1769,7 +1768,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_APP_ERROR);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, 0);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
}
......@@ -2421,7 +2420,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pResPos, POINTER_BYTES);
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
pSDataBlock->info.rows, numOfOutput);
key.ts = nextWin.skey;
......@@ -3093,14 +3092,14 @@ static int32_t initSessionOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pR
static int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindowInfo* pCurWin, SResultRow** pResult,
int32_t startIndex, int32_t winRows, int32_t rows, int32_t numOutput,
SOperatorInfo* pOperator) {
SOperatorInfo* pOperator, int64_t winDelta) {
SExprSupp* pSup = &pOperator->exprSupp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
int32_t code = initSessionOutputBuf(pCurWin, pResult, pSup->pCtx, numOutput, pSup->rowEntryInfoOffset);
if (code != TSDB_CODE_SUCCESS || (*pResult) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
updateTimeWindowInfo(pTimeWindowData, &pCurWin->sessionWin.win, false);
updateTimeWindowInfo(pTimeWindowData, &pCurWin->sessionWin.win, winDelta);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, pTimeWindowData, startIndex, winRows, rows, numOutput);
return TSDB_CODE_SUCCESS;
}
......@@ -3160,7 +3159,11 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC
SResultRow* pWinResult = NULL;
initSessionOutputBuf(&winInfo, &pWinResult, pAggSup->pDummyCtx, numOfOutput, pSup->rowEntryInfoOffset);
pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, true);
int64_t winDelta = 0;
if (IS_FINAL_OP(pInfo)) {
winDelta = pAggSup->gap;
}
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pCurWin->sessionWin.win, winDelta);
compactFunctions(pSup->pCtx, pAggSup->pDummyCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
tSimpleHashRemove(pStUpdated, &winInfo.sessionWin, sizeof(SSessionKey));
if (winInfo.isOutput && pStDeleted) {
......@@ -3217,8 +3220,12 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
}
int64_t winDelta = 0;
if (IS_FINAL_OP(pInfo)) {
winDelta = pAggSup->gap;
}
code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &winInfo, &pResult, i, winRows, rows, numOfOutput,
pOperator);
pOperator, winDelta);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
}
......@@ -3378,7 +3385,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
}
}
num++;
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin.sessionWin.win, true);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin.sessionWin.win, pAggSup->gap);
initSessionOutputBuf(&childWin, &pChResult, pChild->exprSupp.pCtx, numOfOutput,
pChild->exprSupp.rowEntryInfoOffset);
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
......@@ -4010,7 +4017,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
continue;
}
code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &curWin.winInfo, &pResult, i, winRows, rows, numOfOutput,
pOperator);
pOperator, 0);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
}
......@@ -4283,7 +4290,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
continue;
}
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos,
currPos - startPos, pBlock->info.rows, pSup->numOfExprs);
......@@ -4303,7 +4310,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
miaInfo->curTs = currWin.skey;
}
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
pBlock->info.rows, pSup->numOfExprs);
}
......@@ -4624,7 +4631,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
// window start(end) key interpolation
doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true);
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, 1);
applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
pBlock->info.rows, numOfOutput);
doCloseWindow(pResultRowInfo, iaInfo, pResult);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册