diff --git a/source/libs/sync/inc/syncRaftStore.h b/source/libs/sync/inc/syncRaftStore.h index 4058d3bd1c6fcd717e193ccfae9c04a5d9321f8a..30f7c5d9f7d66b37fec79ab370961ea7b0dc8998 100644 --- a/source/libs/sync/inc/syncRaftStore.h +++ b/source/libs/sync/inc/syncRaftStore.h @@ -43,6 +43,12 @@ int32_t raftStorePersist(SRaftStore *pRaftStore); int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len); int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len); +bool raftStoreHasVoted(SRaftStore *pRaftStore); +void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId); +void raftStoreClearVote(SRaftStore *pRaftStore); +void raftStoreNextTerm(SRaftStore *pRaftStore); +void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term); + // for debug ------------------- void raftStorePrint(SRaftStore *pObj); void raftStorePrint2(char *s, SRaftStore *pObj); diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index bc38acdfe6f51e719e5d0a423780f451144eaee5..1b702c252867dc8a909048d2bea2c960989f40b4 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -34,6 +34,7 @@ void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet); void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet); void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId); bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2); +bool syncUtilEmptyId(const SRaftId* pId); // ---- SSyncBuffer ---- void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 7f183021cf8688dd9417dd795cfd46a5b1d12da3..da1286bd32a84e6e792714bb67d1dd213590cd9e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -21,6 +21,7 @@ #include "syncEnv.h" #include "syncIndexMgr.h" #include "syncInt.h" +#include "syncMessage.h" #include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncReplication.h" @@ -52,6 +53,9 @@ static void syncNodeFollower2Candidate(SSyncNode* pSyncNode); static void syncNodeLeader2Follower(SSyncNode* pSyncNode); static void syncNodeCandidate2Follower(SSyncNode* pSyncNode); +// raft vote ---- +static void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId); +static void syncNodeVoteForSelf(SSyncNode* pSyncNode); // --------------------------------- int32_t syncInit() { @@ -602,13 +606,9 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { // raft state change ---- static void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { if (term > pSyncNode->pRaftStore->currentTerm) { - pSyncNode->pRaftStore->currentTerm = term; - raftStorePersist(pSyncNode->pRaftStore); - + raftStoreSetTerm(pSyncNode->pRaftStore, term); syncNodeBecomeFollower(pSyncNode); - - pSyncNode->pRaftStore->voteFor = EMPTY_RAFT_ID; - raftStorePersist(pSyncNode->pRaftStore); + raftStoreClearVote(pSyncNode->pRaftStore); } } @@ -679,3 +679,25 @@ static void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); syncNodeBecomeFollower(pSyncNode); } + +// raft vote ---- +static void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) { + assert(term == pSyncNode->pRaftStore->currentTerm); + assert(!raftStoreHasVoted(pSyncNode->pRaftStore)); + + raftStoreVote(pSyncNode->pRaftStore, pRaftId); +} + +static void syncNodeVoteForSelf(SSyncNode* pSyncNode) { + syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId)); + + SyncRequestVoteReply* pMsg = syncRequestVoteReplyBuild(); + pMsg->srcId = pSyncNode->myRaftId; + pMsg->destId = pSyncNode->myRaftId; + pMsg->term = pSyncNode->pRaftStore->currentTerm; + pMsg->voteGranted = true; + + voteGrantedVote(pSyncNode->pVotesGranted, pMsg); + votesRespondAdd(pSyncNode->pVotesRespond, pMsg); + syncRequestVoteReplyDestroy(pMsg); +} \ No newline at end of file diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index 3a26caa161aa888e4b3e5770ddffb12986bc7cba..5ad618b9c097e7c088133314be3067c5f8d09819 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -15,6 +15,8 @@ #include "syncRaftStore.h" #include "cJSON.h" +#include "syncEnv.h" +#include "syncUtil.h" // private function static int32_t raftStoreInit(SRaftStore *pRaftStore); @@ -135,6 +137,33 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { return 0; } +bool raftStoreHasVoted(SRaftStore *pRaftStore) { + bool b = syncUtilEmptyId(&(pRaftStore->voteFor)); + return b; +} + +void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId) { + assert(!raftStoreHasVoted(pRaftStore)); + assert(!syncUtilEmptyId(pRaftId)); + pRaftStore->voteFor = *pRaftId; + raftStorePersist(pRaftStore); +} + +void raftStoreClearVote(SRaftStore *pRaftStore) { + pRaftStore->voteFor = EMPTY_RAFT_ID; + raftStorePersist(pRaftStore); +} + +void raftStoreNextTerm(SRaftStore *pRaftStore) { + ++(pRaftStore->currentTerm); + raftStorePersist(pRaftStore); +} + +void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term) { + pRaftStore->currentTerm = term; + raftStorePersist(pRaftStore); +} + // for debug ------------------- void raftStorePrint(SRaftStore *pObj) { char serialized[RAFT_STORE_BLOCK_SIZE]; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 4e56a9670f5c2408e08fbf34a3d2588a5bf24e4d..ba8a76c1901750ae9687e22e2bf12165a8fb9238 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -74,6 +74,8 @@ bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) { return ret; } +bool syncUtilEmptyId(const SRaftId* pId) { return (pId->addr == 0 && pId->vgId == 0); } + // ---- SSyncBuffer ----- void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len) { syncBuf->len = len;