提交 9734b9b0 编写于 作者: M Minghao Li

sync index

上级 a471152b
...@@ -17,11 +17,13 @@ ...@@ -17,11 +17,13 @@
#include "sync.h" #include "sync.h"
#include "syncAppendEntries.h" #include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h" #include "syncAppendEntriesReply.h"
#include "syncElection.h"
#include "syncEnv.h" #include "syncEnv.h"
#include "syncIndexMgr.h" #include "syncIndexMgr.h"
#include "syncInt.h" #include "syncInt.h"
#include "syncRaftLog.h" #include "syncRaftLog.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
#include "syncReplication.h"
#include "syncRequestVote.h" #include "syncRequestVote.h"
#include "syncRequestVoteReply.h" #include "syncRequestVoteReply.h"
#include "syncTimeout.h" #include "syncTimeout.h"
...@@ -41,13 +43,15 @@ static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); ...@@ -41,13 +43,15 @@ static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
// raft state change ---- // raft state change ----
static void UpdateTerm(SSyncNode* pSyncNode, SyncTerm term); static void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
static void syncNodeBecomeFollower(SSyncNode* pSyncNode); static void syncNodeBecomeFollower(SSyncNode* pSyncNode);
static void syncNodeBecomeLeader(SSyncNode* pSyncNode); static void syncNodeBecomeLeader(SSyncNode* pSyncNode);
static void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
static void syncNodeCandidate2Leader(SSyncNode* pSyncNode); static void syncNodeCandidate2Leader(SSyncNode* pSyncNode);
static void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
static void syncNodeLeader2Follower(SSyncNode* pSyncNode); static void syncNodeLeader2Follower(SSyncNode* pSyncNode);
static void syncNodeCandidate2Follower(SSyncNode* pSyncNode); static void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
// --------------------------------- // ---------------------------------
int32_t syncInit() { int32_t syncInit() {
...@@ -596,12 +600,15 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { ...@@ -596,12 +600,15 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
} }
// raft state change ---- // raft state change ----
static void UpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { static void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
if (term > pSyncNode->pRaftStore->currentTerm) { if (term > pSyncNode->pRaftStore->currentTerm) {
pSyncNode->pRaftStore->currentTerm = term; pSyncNode->pRaftStore->currentTerm = term;
pSyncNode->pRaftStore->voteFor = EMPTY_RAFT_ID;
raftStorePersist(pSyncNode->pRaftStore); raftStorePersist(pSyncNode->pRaftStore);
syncNodeBecomeFollower(pSyncNode); syncNodeBecomeFollower(pSyncNode);
pSyncNode->pRaftStore->voteFor = EMPTY_RAFT_ID;
raftStorePersist(pSyncNode->pRaftStore);
} }
} }
...@@ -610,9 +617,11 @@ static void syncNodeBecomeFollower(SSyncNode* pSyncNode) { ...@@ -610,9 +617,11 @@ static void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
pSyncNode->leaderCache = EMPTY_RAFT_ID; pSyncNode->leaderCache = EMPTY_RAFT_ID;
} }
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
syncNodeStopHeartbeatTimer(pSyncNode); syncNodeStopHeartbeatTimer(pSyncNode);
int32_t electMS = syncUtilElectRandomMS(); int32_t electMS = syncUtilElectRandomMS();
syncNodeStartElectTimer(pSyncNode, electMS); syncNodeRestartElectTimer(pSyncNode, electMS);
} }
// TLA+ Spec // TLA+ Spec
...@@ -637,13 +646,23 @@ static void syncNodeBecomeLeader(SSyncNode* pSyncNode) { ...@@ -637,13 +646,23 @@ static void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
pSyncNode->state = TAOS_SYNC_STATE_LEADER; pSyncNode->state = TAOS_SYNC_STATE_LEADER;
pSyncNode->leaderCache = pSyncNode->myRaftId; pSyncNode->leaderCache = pSyncNode->myRaftId;
// next Index +=1 for (int i = 0; i < pSyncNode->pNextIndex->replicaNum; ++i) {
// match Index = 0; pSyncNode->pNextIndex->index[i] = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1;
}
for (int i = 0; i < pSyncNode->pMatchIndex->replicaNum; ++i) {
pSyncNode->pMatchIndex->index[i] = SYNC_INDEX_INVALID;
}
syncNodeStopElectTimer(pSyncNode); syncNodeStopElectTimer(pSyncNode);
syncNodeStartHeartbeatTimer(pSyncNode); syncNodeStartHeartbeatTimer(pSyncNode);
syncNodeReplicate(pSyncNode);
}
// appendEntries; static void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
assert(voteGrantedMajority(pSyncNode->pVotesGranted));
syncNodeBecomeLeader(pSyncNode);
} }
static void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { static void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
...@@ -651,8 +670,12 @@ static void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { ...@@ -651,8 +670,12 @@ static void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
} }
static void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {} static void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
static void syncNodeLeader2Follower(SSyncNode* pSyncNode) {} syncNodeBecomeFollower(pSyncNode);
}
static void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {} static void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
syncNodeBecomeFollower(pSyncNode);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册