From 551d7cfa6d35fb498ca81650bbf6fb68a044b31d Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Sun, 6 Mar 2022 12:25:12 +0800 Subject: [PATCH] sync refactor --- source/libs/sync/inc/syncInt.h | 17 ++-- source/libs/sync/inc/syncMessage.h | 2 +- source/libs/sync/inc/syncRequestVote.h | 4 - source/libs/sync/inc/syncRequestVoteReply.h | 2 - source/libs/sync/src/syncAppendEntries.c | 4 +- source/libs/sync/src/syncAppendEntriesReply.c | 2 +- source/libs/sync/src/syncIO.c | 36 +++++++ source/libs/sync/src/syncMain.c | 96 ++++++++----------- source/libs/sync/src/syncRequestVote.c | 4 +- source/libs/sync/src/syncRequestVoteReply.c | 2 +- 10 files changed, 93 insertions(+), 76 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 15ed5503eb..8d85ac8ccb 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -131,6 +131,7 @@ typedef struct SSyncNode { // raft algorithm SSyncFSM* pFsm; int32_t quorum; + SRaftId leaderCache; // life cycle int32_t refCount; @@ -193,18 +194,16 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode); int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); -int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); -int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms); - -void syncNodeBecomeFollower(SSyncNode* pSyncNode); -void syncNodeBecomeLeader(SSyncNode* pSyncNode); -void syncNodeFollower2Candidate(SSyncNode* pSyncNode); -void syncNodeCandidate2Leader(SSyncNode* pSyncNode); -void syncNodeLeader2Follower(SSyncNode* pSyncNode); -void syncNodeCandidate2Follower(SSyncNode* pSyncNode); + +int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg); +int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); +int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); +int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg); +int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); +int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index b022044528..54a29004d6 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -30,7 +30,7 @@ extern "C" { // encode as uint32 typedef enum ESyncMessageType { - SYNC_UNKNOWN = 77, + SYNC_UNKNOWN = 9999, SYNC_TIMEOUT = 99, SYNC_PING = 101, SYNC_PING_REPLY = 103, diff --git a/source/libs/sync/inc/syncRequestVote.h b/source/libs/sync/inc/syncRequestVote.h index c2eca55151..4fb2193010 100644 --- a/source/libs/sync/inc/syncRequestVote.h +++ b/source/libs/sync/inc/syncRequestVote.h @@ -28,10 +28,6 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" -void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg); - -void onRequestVote(SRaft *pRaft, const SyncRequestVote *pMsg); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncRequestVoteReply.h b/source/libs/sync/inc/syncRequestVoteReply.h index 38068dd0e2..21fb61f85f 100644 --- a/source/libs/sync/inc/syncRequestVoteReply.h +++ b/source/libs/sync/inc/syncRequestVoteReply.h @@ -28,8 +28,6 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" -void onRequestVoteReply(SRaft *pRaft, const SyncRequestVoteReply *pMsg); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 2b9c59ec92..f3045c3180 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -15,7 +15,7 @@ #include "syncAppendEntries.h" -void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { +int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) { // TLA+ Spec // AppendEntries(i, j) == // /\ i /= j @@ -42,7 +42,7 @@ void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { // /\ UNCHANGED <> } -void onAppendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { +int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // TLA+ Spec // HandleAppendEntriesRequest(i, j, m) == // LET logOk == \/ m.mprevLogIndex = 0 diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 05734237b9..81c9ea233b 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -15,7 +15,7 @@ #include "syncAppendEntriesReply.h" -void onAppendEntriesReply(SRaft *pRaft, const SyncAppendEntriesReply *pMsg) { +int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { // TLA+ Spec // HandleAppendEntriesResponse(i, j, m) == // /\ m.mterm = currentTerm[i] diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 25775bec76..354240cc6f 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -245,6 +245,42 @@ static void *syncIOConsumerFunc(void *param) { syncPingReplyDestroy(pSyncMsg); } + } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) { + if (io->FpOnSyncRequestVote) { + SyncRequestVote *pSyncMsg; + pSyncMsg = syncRequestVoteBuild(pRpcMsg->contLen); + syncRequestVoteFromRpcMsg(pRpcMsg, pSyncMsg); + io->FpOnSyncRequestVote(io->pSyncNode, pSyncMsg); + syncRequestVoteDestroy(pSyncMsg); + } + + } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) { + if (io->FpOnSyncRequestVoteReply) { + SyncRequestVoteReply *pSyncMsg; + pSyncMsg = SyncRequestVoteReplyBuild(); + syncRequestVoteReplyFromRpcMsg(pRpcMsg, pSyncMsg); + io->FpOnSyncRequestVoteReply(io->pSyncNode, pSyncMsg); + syncRequestVoteReplyDestroy(pSyncMsg); + } + + } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) { + if (io->FpOnSyncAppendEntries) { + SyncAppendEntries *pSyncMsg; + pSyncMsg = syncAppendEntriesBuild(pRpcMsg->contLen); + syncAppendEntriesFromRpcMsg(pRpcMsg, pSyncMsg); + io->FpOnSyncAppendEntries(io->pSyncNode, pSyncMsg); + syncAppendEntriesDestroy(pSyncMsg); + } + + } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) { + if (io->FpOnSyncAppendEntriesReply) { + SyncAppendEntriesReply *pSyncMsg; + pSyncMsg = syncAppendEntriesReplyBuild(); + syncAppendEntriesReplyFromRpcMsg(pRpcMsg, pSyncMsg); + io->FpOnSyncAppendEntriesReply(io->pSyncNode, pSyncMsg); + syncAppendEntriesReplyDestroy(pSyncMsg); + } + } else if (pRpcMsg->msgType == SYNC_TIMEOUT) { if (io->FpOnSyncTimeout != NULL) { SyncTimeout *pSyncMsg; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 6a0663dd57..c9f8372a82 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -31,16 +31,16 @@ static void syncNodeEqElectTimer(void* param, void* tmrId); static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); -static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg); -static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg); - static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); -static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); -static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); -static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); -static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg); + +static void syncNodeBecomeFollower(SSyncNode* pSyncNode); +static void syncNodeBecomeLeader(SSyncNode* pSyncNode); +static void syncNodeFollower2Candidate(SSyncNode* pSyncNode); +static void syncNodeCandidate2Leader(SSyncNode* pSyncNode); +static void syncNodeLeader2Follower(SSyncNode* pSyncNode); +static void syncNodeCandidate2Follower(SSyncNode* pSyncNode); // --------------------------------- int32_t syncInit() { @@ -188,8 +188,6 @@ int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { return 0; } -int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; } - int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { if (pSyncNode->pHeartbeatTimer == NULL) { pSyncNode->pHeartbeatTimer = @@ -209,20 +207,6 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { return 0; } -int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; } - -void syncNodeBecomeFollower(SSyncNode* pSyncNode) {} - -void syncNodeBecomeLeader(SSyncNode* pSyncNode) {} - -void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {} - -void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {} - -void syncNodeLeader2Follower(SSyncNode* pSyncNode) {} - -void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {} - // ------ local funciton --------- static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { sTrace("syncNodePing pSyncNode:%p ", pSyncNode); @@ -252,16 +236,6 @@ static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, Syn return ret; } -static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) { - int32_t ret = 0; - return ret; -} - -static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) { - int32_t ret = 0; - return ret; -} - static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { SEpSet epSet; syncUtilraftId2EpSet(destRaftId, &epSet); @@ -311,26 +285,6 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { return ret; } -static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { - int32_t ret = 0; - return ret; -} - -static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { - int32_t ret = 0; - return ret; -} - -static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { - int32_t ret = 0; - return ret; -} - -static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { - int32_t ret = 0; - return ret; -} - static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { int32_t ret = 0; sTrace("<-- syncNodeOnTimeoutCb -->"); @@ -377,4 +331,38 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { static void syncNodeEqElectTimer(void* param, void* tmrId) {} -static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {} \ No newline at end of file +static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {} + +static void syncNodeBecomeFollower(SSyncNode* pSyncNode) { + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + pSyncNode->leaderCache.addr = 0; + pSyncNode->leaderCache.vgId = 0; + } + + syncNodeStopHeartbeatTimer(pSyncNode); + syncNodeStartElectTimer(pSyncNode); +} + +static void syncNodeBecomeLeader(SSyncNode* pSyncNode) { + pSyncNode->state = TAOS_SYNC_STATE_LEADER; + pSyncNode->leaderCache = pSyncNode->raftId; + + // next Index +=1 + // match Index = 0; + + syncNodeStopElectTimer(pSyncNode); + syncNodeStartHeartbeatTimer(pSyncNode); + + // appendEntries; +} + +static void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { + assert(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER); + pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; +} + +static void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {} + +static void syncNodeLeader2Follower(SSyncNode* pSyncNode) {} + +static void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 7aee47b8e4..38eaea26ac 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -15,7 +15,7 @@ #include "syncRequestVote.h" -void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { +int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) { // TLA+ Spec // RequestVote(i, j) == // /\ state[i] = Candidate @@ -29,7 +29,7 @@ void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { // /\ UNCHANGED <> } -void onRequestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { +int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { // TLA+ Spec // HandleRequestVoteRequest(i, j, m) == // LET logOk == \/ m.mlastLogTerm > LastTerm(log[i]) diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index a9c88a7975..63bba7c480 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -15,7 +15,7 @@ #include "syncRequestVoteReply.h" -void onRequestVoteReply(SRaft *pRaft, const SyncRequestVoteReply *pMsg) { +int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { // TLA+ Spec // HandleRequestVoteResponse(i, j, m) == // \* This tallies votes even when the current state is not Candidate, but -- GitLab