提交 6e59c13c 编写于 作者: M Minghao Li

refactor(sync): add sync strategy

上级 0c1a46c2
...@@ -643,7 +643,8 @@ typedef int32_t (*FpOnSnapshotSendCb)(SSyncNode* ths, SyncSnapshotSend* pMsg); ...@@ -643,7 +643,8 @@ typedef int32_t (*FpOnSnapshotSendCb)(SSyncNode* ths, SyncSnapshotSend* pMsg);
typedef int32_t (*FpOnSnapshotRspCb)(SSyncNode* ths, SyncSnapshotRsp* pMsg); typedef int32_t (*FpOnSnapshotRspCb)(SSyncNode* ths, SyncSnapshotRsp* pMsg);
// option ---------------------------------- // option ----------------------------------
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
// --------------------------------------------- // ---------------------------------------------
......
...@@ -427,7 +427,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { ...@@ -427,7 +427,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
} while (0); } while (0);
// ToDo: ugly! use function pointer // ToDo: ugly! use function pointer
if (syncNodeSnapshotEnable(pSyncNode)) { if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_STANDARD_SNAPSHOT) {
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
......
...@@ -258,7 +258,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -258,7 +258,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// ToDo: ugly! use function pointer // ToDo: ugly! use function pointer
// use different strategy // use different strategy
if (syncNodeSnapshotEnable(pSyncNode)) { if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) {
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) { if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
......
...@@ -174,8 +174,9 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak); ...@@ -174,8 +174,9 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize); int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize);
// option // option
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex); ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex);
// ping -------------- // ping --------------
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
......
...@@ -1100,7 +1100,9 @@ void syncNodeClose(SSyncNode* pSyncNode) { ...@@ -1100,7 +1100,9 @@ void syncNodeClose(SSyncNode* pSyncNode) {
} }
// option // option
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; } // bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }
// ping -------------- // ping --------------
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册