From 8ae9fb6ae133f33efaf659e8732864b3c89c542b Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 14 Mar 2022 18:22:39 +0800 Subject: [PATCH] sync index --- source/libs/sync/inc/syncRaftStore.h | 6 +++++ source/libs/sync/inc/syncUtil.h | 1 + source/libs/sync/src/syncMain.c | 34 +++++++++++++++++++++++----- source/libs/sync/src/syncRaftStore.c | 29 ++++++++++++++++++++++++ source/libs/sync/src/syncUtil.c | 2 ++ 5 files changed, 66 insertions(+), 6 deletions(-) diff --git a/source/libs/sync/inc/syncRaftStore.h b/source/libs/sync/inc/syncRaftStore.h index 4058d3bd1c..30f7c5d9f7 100644 --- a/source/libs/sync/inc/syncRaftStore.h +++ b/source/libs/sync/inc/syncRaftStore.h @@ -43,6 +43,12 @@ int32_t raftStorePersist(SRaftStore *pRaftStore); int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len); int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len); +bool raftStoreHasVoted(SRaftStore *pRaftStore); +void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId); +void raftStoreClearVote(SRaftStore *pRaftStore); +void raftStoreNextTerm(SRaftStore *pRaftStore); +void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term); + // for debug ------------------- void raftStorePrint(SRaftStore *pObj); void raftStorePrint2(char *s, SRaftStore *pObj); diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index bc38acdfe6..1b702c2528 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -34,6 +34,7 @@ void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet); void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet); void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId); bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2); +bool syncUtilEmptyId(const SRaftId* pId); // ---- SSyncBuffer ---- void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 7f183021cf..da1286bd32 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -21,6 +21,7 @@ #include "syncEnv.h" #include "syncIndexMgr.h" #include "syncInt.h" +#include "syncMessage.h" #include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncReplication.h" @@ -52,6 +53,9 @@ static void syncNodeFollower2Candidate(SSyncNode* pSyncNode); static void syncNodeLeader2Follower(SSyncNode* pSyncNode); static void syncNodeCandidate2Follower(SSyncNode* pSyncNode); +// raft vote ---- +static void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId); +static void syncNodeVoteForSelf(SSyncNode* pSyncNode); // --------------------------------- int32_t syncInit() { @@ -602,13 +606,9 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { // raft state change ---- static void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { if (term > pSyncNode->pRaftStore->currentTerm) { - pSyncNode->pRaftStore->currentTerm = term; - raftStorePersist(pSyncNode->pRaftStore); - + raftStoreSetTerm(pSyncNode->pRaftStore, term); syncNodeBecomeFollower(pSyncNode); - - pSyncNode->pRaftStore->voteFor = EMPTY_RAFT_ID; - raftStorePersist(pSyncNode->pRaftStore); + raftStoreClearVote(pSyncNode->pRaftStore); } } @@ -679,3 +679,25 @@ static void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); syncNodeBecomeFollower(pSyncNode); } + +// raft vote ---- +static void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) { + assert(term == pSyncNode->pRaftStore->currentTerm); + assert(!raftStoreHasVoted(pSyncNode->pRaftStore)); + + raftStoreVote(pSyncNode->pRaftStore, pRaftId); +} + +static void syncNodeVoteForSelf(SSyncNode* pSyncNode) { + syncNodeVoteForTerm(pSyncNode, pSyncNode->pRaftStore->currentTerm, &(pSyncNode->myRaftId)); + + SyncRequestVoteReply* pMsg = syncRequestVoteReplyBuild(); + pMsg->srcId = pSyncNode->myRaftId; + pMsg->destId = pSyncNode->myRaftId; + pMsg->term = pSyncNode->pRaftStore->currentTerm; + pMsg->voteGranted = true; + + voteGrantedVote(pSyncNode->pVotesGranted, pMsg); + votesRespondAdd(pSyncNode->pVotesRespond, pMsg); + syncRequestVoteReplyDestroy(pMsg); +} \ No newline at end of file diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index 3a26caa161..5ad618b9c0 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -15,6 +15,8 @@ #include "syncRaftStore.h" #include "cJSON.h" +#include "syncEnv.h" +#include "syncUtil.h" // private function static int32_t raftStoreInit(SRaftStore *pRaftStore); @@ -135,6 +137,33 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { return 0; } +bool raftStoreHasVoted(SRaftStore *pRaftStore) { + bool b = syncUtilEmptyId(&(pRaftStore->voteFor)); + return b; +} + +void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId) { + assert(!raftStoreHasVoted(pRaftStore)); + assert(!syncUtilEmptyId(pRaftId)); + pRaftStore->voteFor = *pRaftId; + raftStorePersist(pRaftStore); +} + +void raftStoreClearVote(SRaftStore *pRaftStore) { + pRaftStore->voteFor = EMPTY_RAFT_ID; + raftStorePersist(pRaftStore); +} + +void raftStoreNextTerm(SRaftStore *pRaftStore) { + ++(pRaftStore->currentTerm); + raftStorePersist(pRaftStore); +} + +void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term) { + pRaftStore->currentTerm = term; + raftStorePersist(pRaftStore); +} + // for debug ------------------- void raftStorePrint(SRaftStore *pObj) { char serialized[RAFT_STORE_BLOCK_SIZE]; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 4e56a9670f..ba8a76c190 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -74,6 +74,8 @@ bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) { return ret; } +bool syncUtilEmptyId(const SRaftId* pId) { return (pId->addr == 0 && pId->vgId == 0); } + // ---- SSyncBuffer ----- void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len) { syncBuf->len = len; -- GitLab