From cd2e8fe730c3ced82bd265b978e04e46a16192df Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sun, 29 May 2022 07:15:29 +0800 Subject: [PATCH] enh(sync): add SSnapshot callback: reader, writer --- include/libs/sync/sync.h | 34 ++++++++----------- source/dnode/mnode/impl/src/mndSync.c | 4 +-- source/dnode/vnode/src/vnd/vnodeSync.c | 3 +- source/libs/sync/inc/syncSnapshot.h | 11 +++--- source/libs/sync/src/syncRaftCfg.c | 10 +++--- .../libs/sync/test/syncConfigChangeTest.cpp | 5 +-- source/libs/sync/test/syncSnapshotTest.cpp | 1 + 7 files changed, 33 insertions(+), 35 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 2e04afdbdc..a587ad6ef2 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -66,12 +66,6 @@ typedef struct SSyncCfg { SNodeInfo nodeInfo[TSDB_MAX_REPLICA]; } SSyncCfg; -typedef struct SSnapshot { - void* data; - SyncIndex lastApplyIndex; - SyncTerm lastApplyTerm; -} SSnapshot; - typedef struct SFsmCbMeta { SyncIndex index; bool isWeak; @@ -93,6 +87,12 @@ typedef struct SReConfigCbMeta { uint64_t flag; } SReConfigCbMeta; +typedef struct SSnapshot { + void *data; + SyncIndex lastApplyIndex; + SyncTerm lastApplyTerm; +} SSnapshot; + typedef struct SSyncFSM { void* data; @@ -101,23 +101,17 @@ typedef struct SSyncFSM { void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm); - int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); - - // if (*ppIter == NULL) - // *ppIter = new iter; - // else - // *ppIter.next(); - // - // if success, return 0. else return error code - int32_t (*FpSnapshotRead)(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, void** ppIter, char** ppBuf, - int32_t* len); + void (*FpReConfigCb)(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta); - // apply data into fsm - int32_t (*FpSnapshotApply)(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, char* pBuf, int32_t len); + int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); - void (*FpReConfigCb)(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta); + int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void** ppReader); + int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader); + int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len); - // int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot); + int32_t (*FpSnapshotStartWrite)(struct SSyncFSM* pFsm, void** ppWriter); + int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply); + int32_t (*FpSnapshotDoWrite)(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len); } SSyncFSM; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index d5fcf9b1eb..ecf7cabc13 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -105,6 +105,8 @@ void mndReConfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); + memset(pFsm, 0, sizeof(*pFsm)); + pFsm->data = pMnode; pFsm->FpCommitCb = mndSyncCommitMsg; @@ -113,8 +115,6 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { pFsm->FpGetSnapshot = mndSyncGetSnapshot; pFsm->FpRestoreFinishCb = mndRestoreFinish; - pFsm->FpSnapshotRead = mndSnapshotRead; - pFsm->FpSnapshotApply = mndSnapshotApply; pFsm->FpReConfigCb = mndReConfig; return pFsm; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index d8f3110a16..240b478ac0 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -142,14 +142,13 @@ void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); + memset(pFsm, 0, sizeof(*pFsm)); pFsm->data = pVnode; pFsm->FpCommitCb = vnodeSyncCommitMsg; pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpGetSnapshot = vnodeSyncGetSnapshot; pFsm->FpRestoreFinishCb = NULL; - pFsm->FpSnapshotRead = NULL; - pFsm->FpSnapshotApply = NULL; pFsm->FpReConfigCb = NULL; return pFsm; diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index b3174a4b36..43d1c0c0c3 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -28,10 +28,12 @@ extern "C" { #include "taosdef.h" typedef struct SSyncSnapshotSender { - bool isStart; - int32_t progressIndex; + int32_t sending; + int32_t received; + bool finish; void * pCurrentBlock; - int32_t len; + int32_t blockLen; + int64_t sendingMS; SSnapshot *pSnapshot; SSyncNode *pSyncNode; } SSyncSnapshotSender; @@ -43,7 +45,8 @@ cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender); char * snapshotSender2Str(SSyncSnapshotSender *pSender); typedef struct SSyncSnapshotReceiver { - bool isStart; + bool start; + int32_t received; int32_t progressIndex; void * pCurrentBlock; int32_t len; diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index 70481b853e..86a07ca4f8 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -58,8 +58,8 @@ int32_t raftCfgPersist(SRaftCfg *pRaftCfg) { int64_t ret = taosWriteFile(pRaftCfg->pFile, buf, sizeof(buf)); assert(ret == sizeof(buf)); - //int64_t ret = taosWriteFile(pRaftCfg->pFile, s, strlen(s) + 1); - //assert(ret == strlen(s) + 1); + // int64_t ret = taosWriteFile(pRaftCfg->pFile, s, strlen(s) + 1); + // assert(ret == strlen(s) + 1); taosMemoryFree(s); taosFsyncFile(pRaftCfg->pFile); @@ -170,7 +170,7 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, int8_t isStandBy, const char *path) { SRaftCfg raftCfg; raftCfg.cfg = *pCfg; raftCfg.isStandBy = isStandBy; - char * s = raftCfg2Str(&raftCfg); + char *s = raftCfg2Str(&raftCfg); char buf[CONFIG_FILE_LEN]; memset(buf, 0, sizeof(buf)); @@ -179,8 +179,8 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, int8_t isStandBy, const char *path) { int64_t ret = taosWriteFile(pFile, buf, sizeof(buf)); assert(ret == sizeof(buf)); - //int64_t ret = taosWriteFile(pFile, s, strlen(s) + 1); - //assert(ret == strlen(s) + 1); + // int64_t ret = taosWriteFile(pFile, s, strlen(s) + 1); + // assert(ret == strlen(s) + 1); taosMemoryFree(s); taosCloseFile(&pFile); diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index 1755b7a8fd..2880d89514 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -84,14 +84,15 @@ void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) SSyncFSM* createFsm() { SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); + memset(pFsm, 0, sizeof(*pFsm)); + pFsm->FpCommitCb = CommitCb; pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpRollBackCb = RollBackCb; pFsm->FpGetSnapshot = GetSnapshotCb; pFsm->FpRestoreFinishCb = RestoreFinishCb; - pFsm->FpSnapshotApply = NULL; - pFsm->FpSnapshotRead = NULL; + pFsm->FpReConfigCb = ReConfigCb; diff --git a/source/libs/sync/test/syncSnapshotTest.cpp b/source/libs/sync/test/syncSnapshotTest.cpp index 8ccd698907..820500e2d8 100644 --- a/source/libs/sync/test/syncSnapshotTest.cpp +++ b/source/libs/sync/test/syncSnapshotTest.cpp @@ -75,6 +75,7 @@ int32_t GetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { void initFsm() { pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM)); + memset(pFsm, 0, sizeof(*pFsm)); pFsm->FpCommitCb = CommitCb; pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpRollBackCb = RollBackCb; -- GitLab