diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 0544568c3b404b1c09211fda66072ff463cf5f49..0a0df58d4d58d4d78d47f56f17f42bacfdeb14fb 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -142,9 +142,7 @@ typedef struct { SQueryTableDataCond tableCond; int64_t fillHistoryVer1; int64_t fillHistoryVer2; - - // int8_t triggerSaved; - // int64_t deleteMarkSaved; + int64_t dataVersion; SStreamState* pState; } SStreamTaskInfo; @@ -575,6 +573,7 @@ typedef struct SStreamIntervalOperatorInfo { uint64_t numOfDatapack; SArray* pUpdated; SSHashObj* pUpdatedMap; + int64_t dataVersion; } SStreamIntervalOperatorInfo; typedef struct SDataGroupInfo { @@ -623,6 +622,7 @@ typedef struct SStreamSessionAggOperatorInfo { bool ignoreExpiredData; SArray* pUpdated; SSHashObj* pStUpdated; + int64_t dataVersion; } SStreamSessionAggOperatorInfo; typedef struct SStreamStateAggOperatorInfo { @@ -640,6 +640,7 @@ typedef struct SStreamStateAggOperatorInfo { bool ignoreExpiredData; SArray* pUpdated; SSHashObj* pSeUpdated; + int64_t dataVersion; } SStreamStateAggOperatorInfo; typedef struct SStreamPartitionOperatorInfo { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 5d7caabb74cb63a9fcc7b7b2700d3ceb94bbce93..fcdc70788d92a13d46272bc00826411c1a8aaa1b 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2409,9 +2409,14 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN return startPos; } +static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version) { + pTaskInfo->streamInfo.dataVersion = version; +} + static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId, SSHashObj* pUpdatedMap) { SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperatorInfo->info; + pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version); SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; @@ -2599,6 +2604,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { clearFunctionContext(&pOperator->exprSupp); // semi interval operator clear disk buffer clearStreamIntervalOperator(pInfo); + setStreamDataVersion(pTaskInfo, pInfo->dataVersion); qDebug("===stream===clear semi operator"); } else { deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, @@ -2883,6 +2889,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->pUpdatedMap = NULL; pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, compareTs, pInfo->pState, pInfo->twAggSup.deleteMark); + pInfo->dataVersion = 0; pOperator->operatorType = pPhyNode->type; pOperator->blocking = true; @@ -3233,6 +3240,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData int32_t rows = pSDataBlock->info.rows; int32_t winRows = 0; + pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version); + SColumnInfoData* pStartTsCol = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); TSKEY* startTsCols = (int64_t*)pStartTsCol->pData; SColumnInfoData* pEndTsCol = NULL; @@ -3695,6 +3704,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pInfo->ignoreExpiredData = pSessionNode->window.igExpired; pInfo->pUpdated = NULL; pInfo->pStUpdated = NULL; + pInfo->dataVersion = 0; setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -4005,6 +4015,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl TSKEY* tsCols = NULL; SResultRow* pResult = NULL; int32_t winRows = 0; + + pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version); + if (pSDataBlock->pDataBlock != NULL) { SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); tsCols = (int64_t*)pColDataInfo->pData; @@ -4220,6 +4233,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->ignoreExpiredData = pStateNode->window.igExpired; pInfo->pUpdated = NULL; pInfo->pSeUpdated = NULL; + pInfo->dataVersion = 0; setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -4856,6 +4870,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { setOperatorCompleted(pOperator); if (pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { streamStateCommit(pInfo->pState); + setStreamDataVersion(pTaskInfo, pInfo->dataVersion); pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs; } return NULL;