提交 aa857815 编写于 作者: M Minghao Li

enh(sync): add mndSnapshotRead mndSnapshotApply

上级 800ef42a
......@@ -82,18 +82,29 @@ typedef struct SFsmCbMeta {
SyncTerm currentTerm;
} SFsmCbMeta;
typedef struct SReConfigCbMeta {
int32_t code;
SyncIndex index;
SyncTerm term;
SyncTerm currentTerm;
} SReConfigCbMeta;
typedef struct SSyncFSM {
void* data;
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void (*FpRestoreFinish)(struct SSyncFSM* pFsm);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot);
void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
void* (*FpSnapshotRead)(struct SSyncFSM* pFsm, const SSnapshot* snapshot, void* iter, char** ppBuf, int32_t* len);
int32_t (*FpSnapshotApply)(struct SSyncFSM* pFsm, const SSnapshot* snapshot, char* pBuf, int32_t len);
void (*FpReConfigCb)(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta);
// int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot);
} SSyncFSM;
// abstract definition of log store in raft
......
......@@ -49,15 +49,46 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
pMnode->syncMgmt.restored = true;
}
void* mndSnapshotRead(struct SSyncFSM* pFsm, const SSnapshot* snapshot, void* iter, char** ppBuf, int32_t* len) {
/*
SMnode *pMnode = pFsm->data;
SSdbIter *pIter;
if (iter == NULL) {
pIter = sdbIterInit(pMnode->sdb)
} else {
pIter = iter;
}
pIter = sdbIterRead(pIter);
return pIter;
*/
return NULL;
}
int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* snapshot, char* pBuf, int32_t len) {
SMnode *pMnode = pFsm->data;
sdbWrite(pMnode->pSdb, (SSdbRaw*)pBuf);
return 0;
}
void mndReConfig(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
}
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
pFsm->data = pMnode;
pFsm->FpCommitCb = mndSyncCommitMsg;
pFsm->FpPreCommitCb = NULL;
pFsm->FpRollBackCb = NULL;
pFsm->FpGetSnapshot = mndSyncGetSnapshot;
pFsm->FpRestoreFinish = mndRestoreFinish;
pFsm->FpRestoreSnapshot = NULL;
pFsm->FpRestoreFinishCb = mndRestoreFinish;
pFsm->FpSnapshotRead = mndSnapshotRead;
pFsm->FpSnapshotApply = mndSnapshotApply;
pFsm->FpReConfigCb = mndReConfig;
return pFsm;
}
......
......@@ -147,6 +147,10 @@ SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
pFsm->FpRestoreFinish = NULL;
pFsm->FpRestoreFinishCb = NULL;
pFsm->FpSnapshotRead = NULL;
pFsm->FpSnapshotApply = NULL;
pFsm->FpReConfigCb = NULL;
return pFsm;
}
\ No newline at end of file
......@@ -362,8 +362,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// restore finish
if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) {
if (ths->restoreFinish == false) {
if (ths->pFsm->FpRestoreFinish != NULL) {
ths->pFsm->FpRestoreFinish(ths->pFsm);
if (ths->pFsm->FpRestoreFinishCb != NULL) {
ths->pFsm->FpRestoreFinishCb(ths->pFsm);
}
ths->restoreFinish = true;
sInfo("==syncNodeOnAppendEntriesCb== restoreFinish set true %p vgId:%d", ths, ths->vgId);
......
......@@ -139,8 +139,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
// restore finish
if (pEntry->index == pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore)) {
if (pSyncNode->restoreFinish == false) {
if (pSyncNode->pFsm->FpRestoreFinish != NULL) {
pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm);
if (pSyncNode->pFsm->FpRestoreFinishCb != NULL) {
pSyncNode->pFsm->FpRestoreFinishCb(pSyncNode->pFsm);
}
pSyncNode->restoreFinish = true;
sInfo("==syncMaybeAdvanceCommitIndex== restoreFinish set true %p vgId:%d", pSyncNode, pSyncNode->vgId);
......
......@@ -73,7 +73,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
return 0;
}
void FpRestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==FpRestoreFinishCb=="); }
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
SSyncFSM* createFsm() {
SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
......@@ -81,7 +81,7 @@ SSyncFSM* createFsm() {
pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb;
pFsm->FpGetSnapshot = GetSnapshotCb;
pFsm->FpRestoreFinish = FpRestoreFinishCb;
pFsm->FpRestoreFinishCb = RestoreFinishCb;
return pFsm;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册