提交 74da3c05 编写于 作者: L liuyao

feat:set check point id

上级 a4ba5401
...@@ -92,7 +92,7 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); ...@@ -92,7 +92,7 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
// todo refactor // todo refactor
int64_t qGetCheckpointVersion(qTaskInfo_t tinfo); void qGetCheckpointVersion(qTaskInfo_t tinfo, int64_t* dataVer, int64_t* ckId);
/** /**
* Set multiple input data blocks for the stream scan. * Set multiple input data blocks for the stream scan.
......
...@@ -42,6 +42,7 @@ typedef struct STdbState { ...@@ -42,6 +42,7 @@ typedef struct STdbState {
typedef struct { typedef struct {
STdbState* pTdbState; STdbState* pTdbState;
int32_t number; int32_t number;
int64_t checkPointId;
} SStreamState; } SStreamState;
SStreamState* streamStateOpen(char* path, struct SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages); SStreamState* streamStateOpen(char* path, struct SStreamTask* pTask, bool specPath, int32_t szPage, int32_t pages);
......
...@@ -140,6 +140,7 @@ typedef struct { ...@@ -140,6 +140,7 @@ typedef struct {
int64_t fillHistoryVer2; int64_t fillHistoryVer2;
SStreamState* pState; SStreamState* pState;
int64_t dataVersion; int64_t dataVersion;
int64_t checkPointId;
} SStreamTaskInfo; } SStreamTaskInfo;
typedef struct { typedef struct {
......
...@@ -198,9 +198,10 @@ int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { ...@@ -198,9 +198,10 @@ int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
return code; return code;
} }
int64_t qGetCheckpointVersion(qTaskInfo_t tinfo) { void qGetCheckpointVersion(qTaskInfo_t tinfo, int64_t* dataVer, int64_t* ckId) {
SExecTaskInfo* pTaskInfo = tinfo; SExecTaskInfo* pTaskInfo = tinfo;
return pTaskInfo->streamInfo.dataVersion; *dataVer = pTaskInfo->streamInfo.dataVersion;
*ckId = pTaskInfo->streamInfo.checkPointId;
} }
......
...@@ -2333,8 +2333,9 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN ...@@ -2333,8 +2333,9 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN
return startPos; return startPos;
} }
static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version) { static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version, int64_t ckId) {
pTaskInfo->streamInfo.dataVersion = version; pTaskInfo->streamInfo.dataVersion = version;
pTaskInfo->streamInfo.checkPointId = ckId;
} }
static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId, static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBlock, uint64_t groupId,
...@@ -2506,7 +2507,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { ...@@ -2506,7 +2507,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
clearFunctionContext(&pOperator->exprSupp); clearFunctionContext(&pOperator->exprSupp);
// semi interval operator clear disk buffer // semi interval operator clear disk buffer
clearStreamIntervalOperator(pInfo); clearStreamIntervalOperator(pInfo);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion); setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
qDebug("===stream===clear semi operator"); qDebug("===stream===clear semi operator");
} else { } else {
deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark,
...@@ -4762,7 +4763,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -4762,7 +4763,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
&pInfo->delKey); &pInfo->delKey);
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
streamStateCommit(pTaskInfo->streamInfo.pState); streamStateCommit(pTaskInfo->streamInfo.pState);
setStreamDataVersion(pTaskInfo, pInfo->dataVersion); setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId);
return NULL; return NULL;
} }
......
...@@ -285,10 +285,12 @@ int32_t streamExecForAll(SStreamTask* pTask) { ...@@ -285,10 +285,12 @@ int32_t streamExecForAll(SStreamTask* pTask) {
streamTaskExecImpl(pTask, pInput, pRes); streamTaskExecImpl(pTask, pInput, pRes);
int64_t ckVer = qGetCheckpointVersion(pTask->exec.pExecutor); int64_t ckId = 0;
if (ckVer > pTask->startVer) { // save it since the checkpoint is updated int64_t dataVer = 0;
qDebug("s-task:%s exec end, checkpoint ver from %"PRId64" to %"PRId64, pTask->id.idStr, pTask->startVer, ckVer); qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId);
pTask->startVer = ckVer; if (dataVer > pTask->startVer) { // save it since the checkpoint is updated
qDebug("s-task:%s exec end, checkpoint ver from %"PRId64" to %"PRId64, pTask->id.idStr, pTask->startVer, dataVer);
pTask->startVer = dataVer;
streamMetaSaveTask(pTask->pMeta, pTask); streamMetaSaveTask(pTask->pMeta, pTask);
if (streamMetaCommit(pTask->pMeta) < 0) { if (streamMetaCommit(pTask->pMeta) < 0) {
......
...@@ -193,6 +193,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int ...@@ -193,6 +193,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
} }
pState->pTdbState->pOwner = pTask; pState->pTdbState->pOwner = pTask;
pState->checkPointId = 0;
return pState; return pState;
...@@ -243,6 +244,7 @@ int32_t streamStateCommit(SStreamState* pState) { ...@@ -243,6 +244,7 @@ int32_t streamStateCommit(SStreamState* pState) {
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1; return -1;
} }
pState->checkPointId++;
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册