diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index fd66194143083cc40709152fbe9e681692850055..9d05d1fdc948253085c2a4e398f047f48637d641 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -92,7 +92,7 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); // todo refactor -int64_t qGetCheckpointVersion(qTaskInfo_t tinfo); +void qGetCheckpointVersion(qTaskInfo_t tinfo, int64_t* dataVer, int64_t* ckId); /** * Set multiple input data blocks for the stream scan. diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index fd5cec293167604795961f49ca7b048102d068e8..42a7261f3889f41befa4545471bbffd40d355f4e 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -42,6 +42,7 @@ typedef struct STdbState { typedef struct { STdbState* pTdbState; int32_t number; + int64_t checkPointId; } SStreamState; SStreamState* streamStateOpen(char* path, struct SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 759502e40f454d2fb9dd935b38a9e1f7da9ed687..7fee4f9b83767e42a813a775924a026760a20925 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -140,6 +140,7 @@ typedef struct { int64_t fillHistoryVer2; SStreamState* pState; int64_t dataVersion; + int64_t checkPointId; } SStreamTaskInfo; typedef struct { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index caaeaa76c2e9f296aee221a7d192bf53cfdba7ed..2244847faa386ade1e321a3aa42ffe14f5ec0e3b 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -198,9 +198,10 @@ int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { return code; } -int64_t qGetCheckpointVersion(qTaskInfo_t tinfo) { +void qGetCheckpointVersion(qTaskInfo_t tinfo, int64_t* dataVer, int64_t* ckId) { SExecTaskInfo* pTaskInfo = tinfo; - return pTaskInfo->streamInfo.dataVersion; + *dataVer = pTaskInfo->streamInfo.dataVersion; + *ckId = pTaskInfo->streamInfo.checkPointId; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index f122323109298d7552c1f14177a88d7fceac1222..658dd5197885e2256076c04a711ddc03cdf70020 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2333,8 +2333,9 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN return startPos; } -static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version) { +static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version, int64_t ckId) { pTaskInfo->streamInfo.dataVersion = version; + pTaskInfo->streamInfo.checkPointId = ckId; } static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId, @@ -2506,7 +2507,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { clearFunctionContext(&pOperator->exprSupp); // semi interval operator clear disk buffer clearStreamIntervalOperator(pInfo); - setStreamDataVersion(pTaskInfo, pInfo->dataVersion); + setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); qDebug("===stream===clear semi operator"); } else { deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, @@ -4762,7 +4763,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { &pInfo->delKey); setOperatorCompleted(pOperator); streamStateCommit(pTaskInfo->streamInfo.pState); - setStreamDataVersion(pTaskInfo, pInfo->dataVersion); + setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); return NULL; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index f2db8113d35c108d1597c99101ab8952f23ec033..8dafafcc5fa7e30dd8a402842c6ec9c39f55e51b 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -285,10 +285,12 @@ int32_t streamExecForAll(SStreamTask* pTask) { streamTaskExecImpl(pTask, pInput, pRes); - int64_t ckVer = qGetCheckpointVersion(pTask->exec.pExecutor); - if (ckVer > pTask->startVer) { // save it since the checkpoint is updated - qDebug("s-task:%s exec end, checkpoint ver from %"PRId64" to %"PRId64, pTask->id.idStr, pTask->startVer, ckVer); - pTask->startVer = ckVer; + int64_t ckId = 0; + int64_t dataVer = 0; + qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); + if (dataVer > pTask->startVer) { // save it since the checkpoint is updated + qDebug("s-task:%s exec end, checkpoint ver from %"PRId64" to %"PRId64, pTask->id.idStr, pTask->startVer, dataVer); + pTask->startVer = dataVer; streamMetaSaveTask(pTask->pMeta, pTask); if (streamMetaCommit(pTask->pMeta) < 0) { diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 04a141443843403d61cb5cdb1e95638da6677eff..7bea989e3a63be5110deb9e8dbb5b00ac6eed643 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -193,6 +193,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int } pState->pTdbState->pOwner = pTask; + pState->checkPointId = 0; return pState; @@ -243,6 +244,7 @@ int32_t streamStateCommit(SStreamState* pState) { TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } + pState->checkPointId++; return 0; }