提交 0c1a46c2 编写于 作者: M Minghao Li

refactor(sync): add sync strategy

上级 7a3e9c9a
...@@ -624,6 +624,9 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl ...@@ -624,6 +624,9 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl
int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg);
int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatch* pMsg);
int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
int32_t syncNodeOnSnapshotSendCb(SSyncNode* ths, SyncSnapshotSend* pMsg); int32_t syncNodeOnSnapshotSendCb(SSyncNode* ths, SyncSnapshotSend* pMsg);
int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg); int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg);
......
...@@ -256,70 +256,133 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -256,70 +256,133 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SRpcMsg *pRpcMsg = pMsg; SRpcMsg *pRpcMsg = pMsg;
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) { // ToDo: ugly! use function pointer
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); // use different strategy
assert(pSyncMsg != NULL); if (syncNodeSnapshotEnable(pSyncNode)) {
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
syncTimeoutDestroy(pSyncMsg); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) { syncTimeoutDestroy(pSyncMsg);
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); } else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg); ASSERT(pSyncMsg != NULL);
syncPingDestroy(pSyncMsg); ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); } else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
assert(pSyncMsg != NULL); SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL);
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg); syncPingReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); syncClientRequestDestroy(pSyncMsg);
syncClientRequestDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); ASSERT(pSyncMsg != NULL);
assert(pSyncMsg != NULL); ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg); } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { ASSERT(pSyncMsg != NULL);
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
assert(pSyncMsg != NULL); syncRequestVoteReplyDestroy(pSyncMsg);
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg); } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
syncRequestVoteReplyDestroy(pSyncMsg); SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL);
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg); syncAppendEntriesDestroy(pSyncMsg);
assert(pSyncMsg != NULL);
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg); SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
syncAppendEntriesDestroy(pSyncMsg); ASSERT(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { syncAppendEntriesReplyDestroy(pSyncMsg);
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); } else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
ret = vnodeSetStandBy(pVnode);
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg); if (ret != 0 && terrno != 0) ret = terrno;
syncAppendEntriesReplyDestroy(pSyncMsg); SRpcMsg rsp = {.code = ret, .info = pMsg->info};
tmsgSendRsp(&rsp);
} else {
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
ret = -1;
}
} else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
ret = vnodeSetStandBy(pVnode);
if (ret != 0 && terrno != 0) ret = terrno;
SRpcMsg rsp = {.code = ret, .info = pMsg->info};
tmsgSendRsp(&rsp);
} else { } else {
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType); // use wal first strategy
ret = -1;
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL);
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
syncTimeoutDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL);
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL);
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
syncPingReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL);
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
syncClientRequestDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) {
SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pRpcMsg);
ASSERT(pSyncMsg != NULL);
ret = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
syncClientRequestBatchDestroyDeep(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL);
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_BATCH) {
SyncAppendEntriesBatch *pSyncMsg = syncAppendEntriesBatchFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesSnapshot2Cb(pSyncNode, pSyncMsg);
syncAppendEntriesBatchDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL);
ret = syncNodeOnAppendEntriesReplySnapshot2Cb(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
ret = vnodeSetStandBy(pVnode);
if (ret != 0 && terrno != 0) ret = terrno;
SRpcMsg rsp = {.code = ret, .info = pMsg->info};
tmsgSendRsp(&rsp);
} else {
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
ret = -1;
}
} }
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
......
...@@ -96,7 +96,7 @@ typedef void* queue[2]; ...@@ -96,7 +96,7 @@ typedef void* queue[2];
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit #define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
#define TRANS_RETRY_INTERVAL 200 // ms retry interval #define TRANS_RETRY_INTERVAL 30 // ms retry interval
#define TRANS_CONN_TIMEOUT 3 // connect timeout #define TRANS_CONN_TIMEOUT 3 // connect timeout
typedef SRpcMsg STransMsg; typedef SRpcMsg STransMsg;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册