From 0ffc6c46f328f3c886159def06ef7721833a4630 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 1 Jul 2022 14:47:12 +0800 Subject: [PATCH] refactor(sync): add snapshot2 interface --- include/libs/sync/sync.h | 2 +- source/dnode/mnode/impl/src/mndSync.c | 2 +- source/dnode/vnode/src/vnd/vnodeSync.c | 10 +++++----- source/libs/sync/src/syncAppendEntriesReply.c | 2 +- source/libs/sync/test/syncConfigChangeSnapshotTest.cpp | 2 +- source/libs/sync/test/syncSnapshotSenderTest.cpp | 2 +- source/libs/sync/test/syncTestTool.cpp | 2 +- 7 files changed, 11 insertions(+), 11 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index bd148fd158..d07cf0adf1 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -121,7 +121,7 @@ typedef struct SSyncFSM { int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader); int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); - int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void** ppReader); + int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader); int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader); int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index c58ce97588..693eed5222 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -117,7 +117,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM } } -int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { +int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { mDebug("start to read snapshot from sdb"); SMnode *pMnode = pFsm->data; return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index f9e811fc32..80ba80e61a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -119,7 +119,7 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) { } void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { - SVnode * pVnode = pInfo->ahandle; + SVnode *pVnode = pInfo->ahandle; int32_t vgId = pVnode->config.vgId; int32_t code = 0; SRpcMsg *pMsg = NULL; @@ -199,7 +199,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { } void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { - SVnode * pVnode = pInfo->ahandle; + SVnode *pVnode = pInfo->ahandle; int32_t vgId = pVnode->config.vgId; int32_t code = 0; SRpcMsg *pMsg = NULL; @@ -240,7 +240,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { STraceId *trace = &pMsg->info.traceId; do { - char * syncNodeStr = sync2SimpleStr(pVnode->sync); + char *syncNodeStr = sync2SimpleStr(pVnode->sync); static int64_t vndTick = 0; if (++vndTick % 10 == 1) { vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr); @@ -375,7 +375,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon } static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - SVnode * pVnode = pFsm->data; + SVnode *pVnode = pFsm->data; SSnapshot snapshot = {0}; SyncIndex beginIndex = SYNC_INDEX_INVALID; char logBuf[256] = {0}; @@ -409,7 +409,7 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); } -static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { return 0; } +static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { return 0; } static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; } diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 827b2a59a5..629d83eb51 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -120,7 +120,7 @@ static void syncNodeStartSnapshot(SSyncNode* ths, SyncIndex beginIndex, SyncInde void* pReader = NULL; SReaderParam readerParam = {.start = beginIndex, .end = endIndex}; - ths->pFsm->FpSnapshotStartRead(ths->pFsm, &pReader); + ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader); if (!snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) { ASSERT(pReader != NULL); snapshotSenderStart(pSender, snapshot, pReader); diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index b4f173ff02..2908dd5907 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -77,7 +77,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; } -int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) { +int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void *pParam, void** ppReader) { *ppReader = (void*)0xABCD; char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p", pFsm, *ppReader); diff --git a/source/libs/sync/test/syncSnapshotSenderTest.cpp b/source/libs/sync/test/syncSnapshotSenderTest.cpp index 04a02460af..dc38cd3df5 100644 --- a/source/libs/sync/test/syncSnapshotSenderTest.cpp +++ b/source/libs/sync/test/syncSnapshotSenderTest.cpp @@ -25,7 +25,7 @@ void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) int32_t GetSnapshot(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; } -int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) { return 0; } +int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void *pParam, void** ppReader) { return 0; } int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; } int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; } diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index 91a16cc033..4e84ff0f7e 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -74,7 +74,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; } -int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) { +int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void *pParam, void** ppReader) { *ppReader = (void*)0xABCD; char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p", pFsm, *ppReader); -- GitLab