提交 1392711e 编写于 作者: B Benguang Zhao

enh: add fsm FpAppliedIndexCb to get the current applied index

上级 9d107399
......@@ -143,6 +143,7 @@ typedef struct SSyncFSM {
void* data;
int32_t (*FpCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
SyncIndex (*FpAppliedIndexCb)(const struct SSyncFSM* pFsm);
int32_t (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
......
......@@ -113,6 +113,7 @@ typedef struct SMnode {
bool deploy;
char *path;
int64_t checkTime;
SyncIndex applied;
SSdb *pSdb;
SArray *pSteps;
SQHandle *pQuery;
......
......@@ -129,9 +129,13 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
int32_t code = 0;
SMnode *pMnode = pFsm->data;
atomic_store_64(&pMnode->applied, pMsg->info.conn.applyIndex);
if (!syncUtilUserCommit(pMsg->msgType)) {
goto _out;
}
code = mndProcessWriteMsg(pFsm, pMsg, pMeta);
_out:
......@@ -140,6 +144,11 @@ _out:
return code;
}
SyncIndex mndSyncAppliedIndex(const SSyncFSM *pFSM) {
SMnode *pMnode = pFSM->data;
return atomic_load_64(&pMnode->applied);
}
int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
mInfo("start to read snapshot from sdb in atomic way");
SMnode *pMnode = pFsm->data;
......@@ -253,6 +262,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pMnode;
pFsm->FpCommitCb = mndSyncCommitMsg;
pFsm->FpAppliedIndexCb = mndSyncAppliedIndex;
pFsm->FpPreCommitCb = NULL;
pFsm->FpRollBackCb = NULL;
pFsm->FpRestoreFinishCb = mndRestoreFinish;
......@@ -321,6 +331,10 @@ int32_t mndInitSync(SMnode *pMnode) {
}
pMnode->pSdb->sync = pMgmt->sync;
SSnapshot snap = {0};
sdbGetCommitInfo(pMnode->pSdb, &snap.lastApplyIndex, &snap.lastApplyTerm, &snap.lastConfigIndex);
atomic_store_64(&pMnode->applied, snap.lastApplyIndex);
mInfo("mnode-sync is opened, id:%" PRId64, pMgmt->sync);
return 0;
}
......
......@@ -320,8 +320,8 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
ASSERT(pVnode->state.applyTerm <= pMsg->info.conn.applyTerm);
ASSERT(pVnode->state.applied + 1 == version);
pVnode->state.applied = version;
pVnode->state.applyTerm = pMsg->info.conn.applyTerm;
atomic_store_64(&pVnode->state.applied, version);
atomic_store_64(&pVnode->state.applyTerm, pMsg->info.conn.applyTerm);
if (!syncUtilUserCommit(pMsg->msgType)) goto _exit;
......
......@@ -454,6 +454,11 @@ static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const
return 0;
}
static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) {
SVnode *pVnode = pFSM->data;
return atomic_load_64(&pVnode->state.applied);
}
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
......@@ -580,6 +585,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pVnode;
pFsm->FpCommitCb = vnodeSyncCommitMsg;
pFsm->FpAppliedIndexCb = vnodeSyncAppliedIndex;
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshotInfo;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册