diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 15ed5503ebe7fe2e927826711a384deec611fb3c..8d85ac8ccb14d4f1a1da54498d3f6544592d478a 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -131,6 +131,7 @@ typedef struct SSyncNode { // raft algorithm SSyncFSM* pFsm; int32_t quorum; + SRaftId leaderCache; // life cycle int32_t refCount; @@ -193,18 +194,16 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode); int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); -int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); -int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms); - -void syncNodeBecomeFollower(SSyncNode* pSyncNode); -void syncNodeBecomeLeader(SSyncNode* pSyncNode); -void syncNodeFollower2Candidate(SSyncNode* pSyncNode); -void syncNodeCandidate2Leader(SSyncNode* pSyncNode); -void syncNodeLeader2Follower(SSyncNode* pSyncNode); -void syncNodeCandidate2Follower(SSyncNode* pSyncNode); + +int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg); +int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); +int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); +int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg); +int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); +int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index b022044528ccd241b53b6e1d37c629b843ce412e..54a29004d6ee60a17a942a416ae67f1ae9b9b081 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -30,7 +30,7 @@ extern "C" { // encode as uint32 typedef enum ESyncMessageType { - SYNC_UNKNOWN = 77, + SYNC_UNKNOWN = 9999, SYNC_TIMEOUT = 99, SYNC_PING = 101, SYNC_PING_REPLY = 103, diff --git a/source/libs/sync/inc/syncRequestVote.h b/source/libs/sync/inc/syncRequestVote.h index c2eca55151e24aceae235e7232faf03bb5784894..4fb21930105a82f2a8718ad817816cb9e0c83262 100644 --- a/source/libs/sync/inc/syncRequestVote.h +++ b/source/libs/sync/inc/syncRequestVote.h @@ -28,10 +28,6 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" -void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg); - -void onRequestVote(SRaft *pRaft, const SyncRequestVote *pMsg); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncRequestVoteReply.h b/source/libs/sync/inc/syncRequestVoteReply.h index 38068dd0e2db38750a352b4546cf19c49e5eb09b..21fb61f85fafb2d7cfcf7f083c5e00d25d9bb5b0 100644 --- a/source/libs/sync/inc/syncRequestVoteReply.h +++ b/source/libs/sync/inc/syncRequestVoteReply.h @@ -28,8 +28,6 @@ extern "C" { #include "syncRaft.h" #include "taosdef.h" -void onRequestVoteReply(SRaft *pRaft, const SyncRequestVoteReply *pMsg); - #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 2b9c59ec92a3b368426af4000b93eb6db1858727..f3045c31801c6d03f5c3f6c4fac6cbae06d9d8d0 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -15,7 +15,7 @@ #include "syncAppendEntries.h" -void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { +int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) { // TLA+ Spec // AppendEntries(i, j) == // /\ i /= j @@ -42,7 +42,7 @@ void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { // /\ UNCHANGED <> } -void onAppendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { +int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // TLA+ Spec // HandleAppendEntriesRequest(i, j, m) == // LET logOk == \/ m.mprevLogIndex = 0 diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 05734237b93de6ec43e85116e0d65640f0ccd942..81c9ea233b03f26b40bc2e1269a6d1536f8dc9af 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -15,7 +15,7 @@ #include "syncAppendEntriesReply.h" -void onAppendEntriesReply(SRaft *pRaft, const SyncAppendEntriesReply *pMsg) { +int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { // TLA+ Spec // HandleAppendEntriesResponse(i, j, m) == // /\ m.mterm = currentTerm[i] diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 25775bec7634a4ec93317e3a6f038b6fc3a3c735..354240cc6fa8fe79f846ca93f7005a19f3ad1924 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -245,6 +245,42 @@ static void *syncIOConsumerFunc(void *param) { syncPingReplyDestroy(pSyncMsg); } + } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) { + if (io->FpOnSyncRequestVote) { + SyncRequestVote *pSyncMsg; + pSyncMsg = syncRequestVoteBuild(pRpcMsg->contLen); + syncRequestVoteFromRpcMsg(pRpcMsg, pSyncMsg); + io->FpOnSyncRequestVote(io->pSyncNode, pSyncMsg); + syncRequestVoteDestroy(pSyncMsg); + } + + } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) { + if (io->FpOnSyncRequestVoteReply) { + SyncRequestVoteReply *pSyncMsg; + pSyncMsg = SyncRequestVoteReplyBuild(); + syncRequestVoteReplyFromRpcMsg(pRpcMsg, pSyncMsg); + io->FpOnSyncRequestVoteReply(io->pSyncNode, pSyncMsg); + syncRequestVoteReplyDestroy(pSyncMsg); + } + + } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) { + if (io->FpOnSyncAppendEntries) { + SyncAppendEntries *pSyncMsg; + pSyncMsg = syncAppendEntriesBuild(pRpcMsg->contLen); + syncAppendEntriesFromRpcMsg(pRpcMsg, pSyncMsg); + io->FpOnSyncAppendEntries(io->pSyncNode, pSyncMsg); + syncAppendEntriesDestroy(pSyncMsg); + } + + } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) { + if (io->FpOnSyncAppendEntriesReply) { + SyncAppendEntriesReply *pSyncMsg; + pSyncMsg = syncAppendEntriesReplyBuild(); + syncAppendEntriesReplyFromRpcMsg(pRpcMsg, pSyncMsg); + io->FpOnSyncAppendEntriesReply(io->pSyncNode, pSyncMsg); + syncAppendEntriesReplyDestroy(pSyncMsg); + } + } else if (pRpcMsg->msgType == SYNC_TIMEOUT) { if (io->FpOnSyncTimeout != NULL) { SyncTimeout *pSyncMsg; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 6a0663dd579cb14dbabf20fcaa1bdce89ddeda2f..c9f8372a824783c92818391b6a4c27169ef6029e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -31,16 +31,16 @@ static void syncNodeEqElectTimer(void* param, void* tmrId); static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); -static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg); -static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg); - static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); -static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); -static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); -static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); -static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg); + +static void syncNodeBecomeFollower(SSyncNode* pSyncNode); +static void syncNodeBecomeLeader(SSyncNode* pSyncNode); +static void syncNodeFollower2Candidate(SSyncNode* pSyncNode); +static void syncNodeCandidate2Leader(SSyncNode* pSyncNode); +static void syncNodeLeader2Follower(SSyncNode* pSyncNode); +static void syncNodeCandidate2Follower(SSyncNode* pSyncNode); // --------------------------------- int32_t syncInit() { @@ -188,8 +188,6 @@ int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { return 0; } -int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; } - int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { if (pSyncNode->pHeartbeatTimer == NULL) { pSyncNode->pHeartbeatTimer = @@ -209,20 +207,6 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { return 0; } -int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; } - -void syncNodeBecomeFollower(SSyncNode* pSyncNode) {} - -void syncNodeBecomeLeader(SSyncNode* pSyncNode) {} - -void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {} - -void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {} - -void syncNodeLeader2Follower(SSyncNode* pSyncNode) {} - -void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {} - // ------ local funciton --------- static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { sTrace("syncNodePing pSyncNode:%p ", pSyncNode); @@ -252,16 +236,6 @@ static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, Syn return ret; } -static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) { - int32_t ret = 0; - return ret; -} - -static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) { - int32_t ret = 0; - return ret; -} - static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { SEpSet epSet; syncUtilraftId2EpSet(destRaftId, &epSet); @@ -311,26 +285,6 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { return ret; } -static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { - int32_t ret = 0; - return ret; -} - -static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { - int32_t ret = 0; - return ret; -} - -static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { - int32_t ret = 0; - return ret; -} - -static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { - int32_t ret = 0; - return ret; -} - static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { int32_t ret = 0; sTrace("<-- syncNodeOnTimeoutCb -->"); @@ -377,4 +331,38 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { static void syncNodeEqElectTimer(void* param, void* tmrId) {} -static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {} \ No newline at end of file +static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {} + +static void syncNodeBecomeFollower(SSyncNode* pSyncNode) { + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + pSyncNode->leaderCache.addr = 0; + pSyncNode->leaderCache.vgId = 0; + } + + syncNodeStopHeartbeatTimer(pSyncNode); + syncNodeStartElectTimer(pSyncNode); +} + +static void syncNodeBecomeLeader(SSyncNode* pSyncNode) { + pSyncNode->state = TAOS_SYNC_STATE_LEADER; + pSyncNode->leaderCache = pSyncNode->raftId; + + // next Index +=1 + // match Index = 0; + + syncNodeStopElectTimer(pSyncNode); + syncNodeStartHeartbeatTimer(pSyncNode); + + // appendEntries; +} + +static void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { + assert(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER); + pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; +} + +static void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {} + +static void syncNodeLeader2Follower(SSyncNode* pSyncNode) {} + +static void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 7aee47b8e42ced91522c177d887785a6193328af..38eaea26ac0c28b2ff3e6646ed9e5abbbab5537b 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -15,7 +15,7 @@ #include "syncRequestVote.h" -void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { +int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) { // TLA+ Spec // RequestVote(i, j) == // /\ state[i] = Candidate @@ -29,7 +29,7 @@ void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { // /\ UNCHANGED <> } -void onRequestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { +int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { // TLA+ Spec // HandleRequestVoteRequest(i, j, m) == // LET logOk == \/ m.mlastLogTerm > LastTerm(log[i]) diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index a9c88a797545cc207d94c338ef9bc0f86f77e5c4..63bba7c4804ddb39d32604e7e218351d71a1cad8 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -15,7 +15,7 @@ #include "syncRequestVoteReply.h" -void onRequestVoteReply(SRaft *pRaft, const SyncRequestVoteReply *pMsg) { +int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { // TLA+ Spec // HandleRequestVoteResponse(i, j, m) == // \* This tallies votes even when the current state is not Candidate, but