diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 95b16589eeb3f511130c31698c33de65db3e3507..66900fb7aa3a8b8bf740c8b4e5df3b7256cbcdce 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1246,6 +1246,9 @@ void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprS pCtx[i].fpSet.init(&pCtx[i], pResInfo); } } + SFilePage* bufPage = getBufPage(pResultBuf, p1->pageId); + setBufPageDirty(bufPage, true); + releaseBufPage(pResultBuf, bufPage); } bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t bytes, uint64_t groupId, @@ -3171,6 +3174,10 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup, getSessionTimeWindow(pAggSup, tsCols[i], INT64_MIN, pBlock->info.groupId, gap, &winIndex); step = updateSessionWindowInfo(pCurWin, tsCols, NULL, pBlock->info.rows, i, gap, NULL); ASSERT(isInWindow(pCurWin, tsCols[i], gap)); + if (pCurWin->pos.pageId == -1) { + // window has been closed. + continue; + } doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pSup, numOfOutput); if (result) { taosArrayPush(result, pCurWin); @@ -3246,12 +3253,12 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin setWindowOutputBuf(pChWin, &pChResult, pChild->exprSupp.pCtx, groupId, numOfOutput, pChild->exprSupp.rowEntryInfoOffset, &pChInfo->streamAggSup, pTaskInfo); compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo); - SFilePage* bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pChWin->pos.pageId); - setBufPageDirty(bufPage, true); - releaseBufPage(pInfo->streamAggSup.pResultBuf, bufPage); + SFilePage* bufPage = getBufPage(pChInfo->streamAggSup.pResultBuf, pChWin->pos.pageId); + releaseBufPage(pChInfo->streamAggSup.pResultBuf, bufPage); continue; + } else if (!pChWin->isClosed) { + break; } - break; } } SFilePage* bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pParentWin->pos.pageId); @@ -3265,7 +3272,8 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*); SResultWindowInfo* getResWinForSession(void* pData) { return (SResultWindowInfo*)pData; } SResultWindowInfo* getResWinForState(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; } -int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArray* pClosed, __get_win_info_ fn) { +int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, + SArray* pClosed, __get_win_info_ fn, bool delete) { // Todo(liuyao) save window to tdb void** pIte = NULL; size_t keyLen = 0; @@ -3279,10 +3287,18 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra if (isCloseWindow(&pSeWin->win, pTwSup)) { if (!pSeWin->isClosed) { pSeWin->isClosed = true; - if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { + if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && pClosed) { int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, *pGroupId, pClosed); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pSeWin->isOutput = true; } + if (delete) { + taosArrayRemove(pWins, i); + i--; + size = taosArrayGetSize(pWins); + } } continue; } @@ -3292,6 +3308,16 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra return TSDB_CODE_SUCCESS; } +static void closeChildSessionWindow(SArray* pChildren, TSKEY maxTs, bool delete) { + int32_t size = taosArrayGetSize(pChildren); + for (int32_t i = 0; i < size; i++) { + SOperatorInfo* pChildOp = taosArrayGetP(pChildren, i); + SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info; + pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs); + closeSessionWindow(pChInfo->streamAggSup.pResultRows, &pChInfo->twAggSup, NULL, getResWinForSession, delete); + } +} + int32_t getAllSessionWindow(SHashObj* pHashMap, SArray* pClosed, __get_win_info_ fn) { void** pIte = NULL; while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { @@ -3339,6 +3365,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { if (pBlock == NULL) { break; } + printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "Final Session Recv" : "Single Session Recv"); if (pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); @@ -3385,7 +3412,9 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { // restore the value pOperator->status = OP_RES_TO_RETURN; - closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForSession); + closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, + getResWinForSession, pInfo->ignoreCloseWindow); + closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreCloseWindow); copyUpdateResult(pStUpdated, pUpdated); taosHashCleanup(pStUpdated); @@ -3437,10 +3466,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } else if (pOperator->status == OP_RES_TO_RETURN) { doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { + printDataBlock(pInfo->pDelRes, "Semi Session"); return pInfo->pDelRes; } doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); - if (pInfo->binfo.pRes->info.rows == 0) { + if (pBInfo->pRes->info.rows == 0) { pOperator->status = OP_EXEC_DONE; if (pInfo->pUpdateRes->info.rows == 0) { // semi interval operator clear disk buffer @@ -3449,9 +3479,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } // process the rest of the data pOperator->status = OP_OPENED; + printDataBlock(pInfo->pUpdateRes, "Semi Session"); return pInfo->pUpdateRes; } - return pInfo->binfo.pRes; + printDataBlock(pBInfo->pRes, "Semi Session"); + return pBInfo->pRes; } _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); @@ -3495,21 +3527,24 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); - blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0) { + printDataBlock(pInfo->pDelRes, "Semi Session"); return pInfo->pDelRes; } - doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); - if (pInfo->binfo.pRes->info.rows == 0) { + doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); + if (pBInfo->pRes->info.rows == 0) { pOperator->status = OP_EXEC_DONE; if (pInfo->pUpdateRes->info.rows == 0) { return NULL; } // process the rest of the data pOperator->status = OP_OPENED; + printDataBlock(pInfo->pUpdateRes, "Semi Session"); return pInfo->pUpdateRes; } + printDataBlock(pBInfo->pRes, "Semi Session"); return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; } @@ -3867,7 +3902,9 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { // restore the value pOperator->status = OP_RES_TO_RETURN; - closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForState); + closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, + getResWinForState, pInfo->ignoreCloseWindow); + closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreCloseWindow); copyUpdateResult(pSeUpdated, pUpdated); taosHashCleanup(pSeUpdated);