From b8a1a7f17588a1ae13421dfc45e9d57b65eeb5ad Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 23 Aug 2022 11:33:52 +0800 Subject: [PATCH] fix(stream): state window update --- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/scanoperator.c | 15 ++++-- source/libs/executor/src/timewindowoperator.c | 48 +++++++++---------- 3 files changed, 35 insertions(+), 30 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index fb4eac991f..601c22a3ba 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -1016,7 +1016,7 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx); bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup); bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup); -void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid); +void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, int32_t uidCol, uint64_t* pID); void printDataBlock(SSDataBlock* pBlock, const char* flag); int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7b13aa8ad8..1377b42b72 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1086,7 +1086,10 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr SColumnInfoData* pDestStartCol = taosArrayGet(pDestBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pDestEndCol = taosArrayGet(pDestBlock->pDataBlock, END_TS_COLUMN_INDEX); + SColumnInfoData* pDestUidCol = taosArrayGet(pDestBlock->pDataBlock, UID_COLUMN_INDEX); SColumnInfoData* pDestGpCol = taosArrayGet(pDestBlock->pDataBlock, GROUPID_COLUMN_INDEX); + SColumnInfoData* pDestCalStartTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); + SColumnInfoData* pDestCalEndTsCol = taosArrayGet(pDestBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); int32_t dummy = 0; for (int32_t i = 0; i < pSrcBlock->info.rows; i++) { uint64_t groupId = getGroupId(pInfo->pTableScanOp, uidCol[i]); @@ -1100,9 +1103,13 @@ static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSr SResultWindowInfo* pEndWin = getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, endData[i], endData[i], groupId, 0, &dummy); ASSERT(pEndWin); + TSKEY ts = INT64_MIN; colDataAppend(pDestStartCol, i, (const char*)&pStartWin->win.skey, false); colDataAppend(pDestEndCol, i, (const char*)&pEndWin->win.ekey, false); + colDataAppendNULL(pDestUidCol, i); colDataAppend(pDestGpCol, i, (const char*)&groupId, false); + colDataAppendNULL(pDestCalStartTsCol, i); + colDataAppendNULL(pDestCalEndTsCol, i); pDestBlock->info.rows++; } return TSDB_CODE_SUCCESS; @@ -1157,13 +1164,13 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, return code; } -void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid) { +void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, int32_t uidCol, uint64_t* pID) { SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); - SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); + SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, uidCol); colDataAppend(pStartTsCol, pBlock->info.rows, (const char*)pStartTs, false); colDataAppend(pEndTsCol, pBlock->info.rows, (const char*)pEndTs, false); - colDataAppend(pUidCol, pBlock->info.rows, (const char*)pUid, false); + colDataAppend(pUidCol, pBlock->info.rows, (const char*)pID, false); pBlock->info.rows++; } @@ -1190,7 +1197,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock bool closedWin = isClosed && isSignleIntervalWindow(pInfo) && isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup); if ((update || closedWin) && out) { - appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid); + appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, UID_COLUMN_INDEX, &pBlock->info.uid); } } if (out) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 0594a727fc..e9298487e7 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3951,11 +3951,13 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup, int32_t numOfOutput, int64_t gap, SArray* result) { SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, tsIndex); TSKEY* tsCols = (TSKEY*)pColDataInfo->pData; + SColumnInfoData* pGpDataInfo = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); + uint64_t* gpCols = (uint64_t*)pGpDataInfo->pData; int32_t step = 0; for (int32_t i = 0; i < pBlock->info.rows; i += step) { int32_t winIndex = 0; SResultWindowInfo* pCurWin = - getCurSessionWindow(pAggSup, tsCols[i], INT64_MIN, pBlock->info.groupId, gap, &winIndex); + getCurSessionWindow(pAggSup, tsCols[i], INT64_MIN, gpCols[i], gap, &winIndex); if (!pCurWin || pCurWin->pos.pageId == -1) { // window has been closed. step = 1; @@ -4168,13 +4170,13 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { if (pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); - doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, 0, pOperator->exprSupp.numOfExprs, 0, + doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, START_TS_COLUMN_INDEX, pOperator->exprSupp.numOfExprs, 0, pWins); if (IS_FINAL_OP(pInfo)) { int32_t childIndex = getChildIndex(pBlock); SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); SStreamSessionAggOperatorInfo* pChildInfo = pChildOp->info; - doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, 0, pChildOp->exprSupp.numOfExprs, + doClearSessionWindows(&pChildInfo->streamAggSup, &pChildOp->exprSupp, pBlock, START_TS_COLUMN_INDEX, pChildOp->exprSupp.numOfExprs, 0, NULL); rebuildTimeWindow(pInfo, pWins, pBlock->info.groupId, pOperator->exprSupp.numOfExprs, pOperator); } @@ -4285,21 +4287,21 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } else if (pOperator->status == OP_RES_TO_RETURN) { doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); if (pBInfo->pRes->info.rows > 0) { - printDataBlock(pBInfo->pRes, "sems session"); + printDataBlock(pBInfo->pRes, "semi session"); return pBInfo->pRes; } // doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0 && !pInfo->returnDelete) { pInfo->returnDelete = true; - printDataBlock(pInfo->pDelRes, "sems session"); + printDataBlock(pInfo->pDelRes, "semi session"); return pInfo->pDelRes; } if (pInfo->pUpdateRes->info.rows > 0) { // process the rest of the data pOperator->status = OP_OPENED; - printDataBlock(pInfo->pUpdateRes, "sems session"); + printDataBlock(pInfo->pUpdateRes, "semi session"); return pInfo->pUpdateRes; } // semi interval operator clear disk buffer @@ -4318,13 +4320,14 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { clearSpecialDataBlock(pInfo->pUpdateRes); break; } + printDataBlock(pBlock, "semi session recv"); if (pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); - doClearSessionWindows(&pInfo->streamAggSup, pSup, pBlock, 0, pSup->numOfExprs, 0, pWins); + doClearSessionWindows(&pInfo->streamAggSup, pSup, pBlock, START_TS_COLUMN_INDEX, pSup->numOfExprs, 0, pWins); removeSessionResults(pStUpdated, pWins); taosArrayDestroy(pWins); - copyUpdateDataBlock(pInfo->pUpdateRes, pBlock, pInfo->primaryTsIndex); + copyDataBlock(pInfo->pUpdateRes, pBlock); break; } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { // gap must be 0 @@ -4364,21 +4367,21 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf); if (pBInfo->pRes->info.rows > 0) { - printDataBlock(pBInfo->pRes, "sems session"); + printDataBlock(pBInfo->pRes, "semi session"); return pBInfo->pRes; } // doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); if (pInfo->pDelRes->info.rows > 0 && !pInfo->returnDelete) { pInfo->returnDelete = true; - printDataBlock(pInfo->pDelRes, "sems session"); + printDataBlock(pInfo->pDelRes, "semi session"); return pInfo->pDelRes; } if (pInfo->pUpdateRes->info.rows > 0) { // process the rest of the data pOperator->status = OP_OPENED; - printDataBlock(pInfo->pUpdateRes, "sems session"); + printDataBlock(pInfo->pUpdateRes, "semi session"); return pInfo->pUpdateRes; } @@ -4400,8 +4403,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream pOperator->name = "StreamSessionFinalAggOperator"; } else { pInfo->isFinal = false; - pInfo->pUpdateRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - pInfo->pUpdateRes->info.type = STREAM_CLEAR; + pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); blockDataEnsureCapacity(pInfo->pUpdateRes, 128); pOperator->name = "StreamSessionSemiAggOperator"; pOperator->fpSet = @@ -4616,23 +4618,20 @@ int32_t updateStateWindowInfo(SArray* pWinInfos, int32_t winIndex, TSKEY* pTs, u } static void doClearStateWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, - int32_t tsIndex, SColumn* pCol, int32_t keyIndex, SHashObj* pSeUpdated, SHashObj* pSeDeleted) { - SColumnInfoData* pTsColInfo = taosArrayGet(pBlock->pDataBlock, tsIndex); - SColumnInfoData* pKeyColInfo = taosArrayGet(pBlock->pDataBlock, keyIndex); + SHashObj* pSeUpdated, SHashObj* pSeDeleted) { + SColumnInfoData* pTsColInfo = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + SColumnInfoData* pGroupColInfo = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); TSKEY* tsCol = (TSKEY*)pTsColInfo->pData; bool allEqual = false; int32_t step = 1; - uint64_t groupId = pBlock->info.groupId; + uint64_t* gpCol = (uint64_t*) pGroupColInfo->pData; for (int32_t i = 0; i < pBlock->info.rows; i += step) { - char* pKeyData = colDataGetData(pKeyColInfo, i); int32_t winIndex = 0; - SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup, tsCol[i], groupId, &winIndex); + SStateWindowInfo* pCurWin = getStateWindowByTs(pAggSup, tsCol[i], gpCol[i], &winIndex); if (!pCurWin) { continue; } - step = updateStateWindowInfo(pAggSup->pCurWins, winIndex, tsCol, groupId, pKeyColInfo, - pBlock->info.rows, i, &allEqual, pSeDeleted); - ASSERT(isTsInWindow(pCurWin, tsCol[i]) || isEqualStateKey(pCurWin, pKeyData)); + updateSessionWindowInfo(&pCurWin->winInfo, tsCol, NULL, 0, pBlock->info.rows, i, 0, NULL); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo); } @@ -4675,7 +4674,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl pSDataBlock->info.rows, i, &allEqual, pStDeleted); if (!allEqual) { appendOneRow(pAggSup->pScanBlock, &pCurWin->winInfo.win.skey, &pCurWin->winInfo.win.ekey, - &groupId); + GROUPID_COLUMN_INDEX, &groupId); taosHashRemove(pSeUpdated, &pCurWin->winInfo.pos, sizeof(SResultRowPosition)); deleteWindow(pAggSup->pCurWins, winIndex, destroyStateWinInfo); continue; @@ -4730,8 +4729,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { printDataBlock(pBlock, "single state recv"); if (pBlock->info.type == STREAM_CLEAR) { - doClearStateWindows(&pInfo->streamAggSup, pBlock, pInfo->primaryTsIndex, &pInfo->stateCol, pInfo->stateCol.slotId, - pSeUpdated, pInfo->pSeDeleted); + doClearStateWindows(&pInfo->streamAggSup, pBlock, pSeUpdated, pInfo->pSeDeleted); continue; } else if (pBlock->info.type == STREAM_DELETE_DATA) { SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo)); -- GitLab