diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 50f8281c513cf960ae0668041311655d64f75564..5ffcbb7a0919f6277414405e03e690f5115f3909 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -82,18 +82,29 @@ typedef struct SFsmCbMeta { SyncTerm currentTerm; } SFsmCbMeta; +typedef struct SReConfigCbMeta { + int32_t code; + SyncIndex index; + SyncTerm term; + SyncTerm currentTerm; +} SReConfigCbMeta; + typedef struct SSyncFSM { void* data; + void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); - void (*FpRestoreFinish)(struct SSyncFSM* pFsm); - int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); - int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot); + void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm); + int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); void* (*FpSnapshotRead)(struct SSyncFSM* pFsm, const SSnapshot* snapshot, void* iter, char** ppBuf, int32_t* len); int32_t (*FpSnapshotApply)(struct SSyncFSM* pFsm, const SSnapshot* snapshot, char* pBuf, int32_t len); + void (*FpReConfigCb)(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta); + + // int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot); + } SSyncFSM; // abstract definition of log store in raft diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index a4e6cfd5cac233c0acc626a9ab463ac7acef6f94..e9587606d6ff7f93a6b0ad3b77507d453ef0e6fe 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -49,15 +49,46 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) { pMnode->syncMgmt.restored = true; } +void* mndSnapshotRead(struct SSyncFSM* pFsm, const SSnapshot* snapshot, void* iter, char** ppBuf, int32_t* len) { + /* + SMnode *pMnode = pFsm->data; + SSdbIter *pIter; + if (iter == NULL) { + pIter = sdbIterInit(pMnode->sdb) + } else { + pIter = iter; + } + pIter = sdbIterRead(pIter); + return pIter; + */ + + return NULL; +} + +int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* snapshot, char* pBuf, int32_t len) { + SMnode *pMnode = pFsm->data; + sdbWrite(pMnode->pSdb, (SSdbRaw*)pBuf); + return 0; +} + +void mndReConfig(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { + +} + SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); pFsm->data = pMnode; + pFsm->FpCommitCb = mndSyncCommitMsg; pFsm->FpPreCommitCb = NULL; pFsm->FpRollBackCb = NULL; + pFsm->FpGetSnapshot = mndSyncGetSnapshot; - pFsm->FpRestoreFinish = mndRestoreFinish; - pFsm->FpRestoreSnapshot = NULL; + pFsm->FpRestoreFinishCb = mndRestoreFinish; + pFsm->FpSnapshotRead = mndSnapshotRead; + pFsm->FpSnapshotApply = mndSnapshotApply; + pFsm->FpReConfigCb = mndReConfig; + return pFsm; } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 882ee912cde37414bc219efe75c113d0868c1810..d8f3110a16fbd118e966a34d2d8d8d8c58519f54 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -147,6 +147,10 @@ SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpGetSnapshot = vnodeSyncGetSnapshot; - pFsm->FpRestoreFinish = NULL; + pFsm->FpRestoreFinishCb = NULL; + pFsm->FpSnapshotRead = NULL; + pFsm->FpSnapshotApply = NULL; + pFsm->FpReConfigCb = NULL; + return pFsm; } \ No newline at end of file diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index fa735e71c029e22d67e7b2681ff1fc7144527061..0411628c5c40a233e3c3f11b5ef455670914163a 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -362,8 +362,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // restore finish if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) { if (ths->restoreFinish == false) { - if (ths->pFsm->FpRestoreFinish != NULL) { - ths->pFsm->FpRestoreFinish(ths->pFsm); + if (ths->pFsm->FpRestoreFinishCb != NULL) { + ths->pFsm->FpRestoreFinishCb(ths->pFsm); } ths->restoreFinish = true; sInfo("==syncNodeOnAppendEntriesCb== restoreFinish set true %p vgId:%d", ths, ths->vgId); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 18c6f8930ac73f2bdc5d9e3d860f8b2f8dec0188..36713ceed5049bfb729e48bd420c0c990c241ec2 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -139,8 +139,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { // restore finish if (pEntry->index == pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore)) { if (pSyncNode->restoreFinish == false) { - if (pSyncNode->pFsm->FpRestoreFinish != NULL) { - pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm); + if (pSyncNode->pFsm->FpRestoreFinishCb != NULL) { + pSyncNode->pFsm->FpRestoreFinishCb(pSyncNode->pFsm); } pSyncNode->restoreFinish = true; sInfo("==syncMaybeAdvanceCommitIndex== restoreFinish set true %p vgId:%d", pSyncNode, pSyncNode->vgId); diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index c8a9fba9bf00490eac9f1af640e30ee687fa5e2f..f52fef0019de62e99ae1e9dca379e44b0a39c307 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -73,7 +73,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; } -void FpRestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==FpRestoreFinishCb=="); } +void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } SSyncFSM* createFsm() { SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); @@ -81,7 +81,7 @@ SSyncFSM* createFsm() { pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpRollBackCb = RollBackCb; pFsm->FpGetSnapshot = GetSnapshotCb; - pFsm->FpRestoreFinish = FpRestoreFinishCb; + pFsm->FpRestoreFinishCb = RestoreFinishCb; return pFsm; }