提交 d1ca2ae2 编写于 作者: L liuyao

stream single session op end window function

上级 01c0bbb3
...@@ -3182,7 +3182,7 @@ int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pW ...@@ -3182,7 +3182,7 @@ int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pW
} }
static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pStUpdated, static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pStUpdated,
SSHashObj* pStDeleted, bool hasEndTs) { SSHashObj* pStDeleted, bool hasEndTs, bool addGap) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
int32_t numOfOutput = pOperator->exprSupp.numOfExprs; int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
...@@ -3221,7 +3221,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData ...@@ -3221,7 +3221,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
} }
int64_t winDelta = 0; int64_t winDelta = 0;
if (IS_FINAL_OP(pInfo)) { if (addGap) {
winDelta = pAggSup->gap; winDelta = pAggSup->gap;
} }
code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &winInfo, &pResult, i, winRows, rows, numOfOutput, code = doOneWindowAggImpl(&pInfo->twAggSup.timeWindowData, &winInfo, &pResult, i, winRows, rows, numOfOutput,
...@@ -3547,7 +3547,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -3547,7 +3547,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, IS_FINAL_OP(pInfo)); doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, pInfo->pStDeleted, IS_FINAL_OP(pInfo), true);
if (IS_FINAL_OP(pInfo)) { if (IS_FINAL_OP(pInfo)) {
int32_t chIndex = getChildIndex(pBlock); int32_t chIndex = getChildIndex(pBlock);
int32_t size = taosArrayGetSize(pInfo->pChildren); int32_t size = taosArrayGetSize(pInfo->pChildren);
...@@ -3562,7 +3562,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { ...@@ -3562,7 +3562,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
} }
SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex);
setInputDataBlock(&pChildOp->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); setInputDataBlock(&pChildOp->exprSupp, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doStreamSessionAggImpl(pChildOp, pBlock, NULL, NULL, true); doStreamSessionAggImpl(pChildOp, pBlock, NULL, NULL, true, false);
} }
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.watermark);
...@@ -3767,7 +3767,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { ...@@ -3767,7 +3767,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
} }
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true); setInputDataBlock(pSup, pBlock, TSDB_ORDER_ASC, MAIN_SCAN, true);
doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, NULL, false); doStreamSessionAggImpl(pOperator, pBlock, pInfo->pStUpdated, NULL, false, false);
maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册