提交 551d7cfa 编写于 作者: M Minghao Li

sync refactor

上级 29fab2e3
...@@ -131,6 +131,7 @@ typedef struct SSyncNode { ...@@ -131,6 +131,7 @@ typedef struct SSyncNode {
// raft algorithm // raft algorithm
SSyncFSM* pFsm; SSyncFSM* pFsm;
int32_t quorum; int32_t quorum;
SRaftId leaderCache;
// life cycle // life cycle
int32_t refCount; int32_t refCount;
...@@ -193,18 +194,16 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); ...@@ -193,18 +194,16 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode); int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg);
void syncNodeBecomeFollower(SSyncNode* pSyncNode); int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
void syncNodeBecomeLeader(SSyncNode* pSyncNode); int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
void syncNodeFollower2Candidate(SSyncNode* pSyncNode); int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg);
void syncNodeCandidate2Leader(SSyncNode* pSyncNode); int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
void syncNodeLeader2Follower(SSyncNode* pSyncNode); int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -30,7 +30,7 @@ extern "C" { ...@@ -30,7 +30,7 @@ extern "C" {
// encode as uint32 // encode as uint32
typedef enum ESyncMessageType { typedef enum ESyncMessageType {
SYNC_UNKNOWN = 77, SYNC_UNKNOWN = 9999,
SYNC_TIMEOUT = 99, SYNC_TIMEOUT = 99,
SYNC_PING = 101, SYNC_PING = 101,
SYNC_PING_REPLY = 103, SYNC_PING_REPLY = 103,
......
...@@ -28,10 +28,6 @@ extern "C" { ...@@ -28,10 +28,6 @@ extern "C" {
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg);
void onRequestVote(SRaft *pRaft, const SyncRequestVote *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -28,8 +28,6 @@ extern "C" { ...@@ -28,8 +28,6 @@ extern "C" {
#include "syncRaft.h" #include "syncRaft.h"
#include "taosdef.h" #include "taosdef.h"
void onRequestVoteReply(SRaft *pRaft, const SyncRequestVoteReply *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "syncAppendEntries.h" #include "syncAppendEntries.h"
void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) {
// TLA+ Spec // TLA+ Spec
// AppendEntries(i, j) == // AppendEntries(i, j) ==
// /\ i /= j // /\ i /= j
...@@ -42,7 +42,7 @@ void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { ...@@ -42,7 +42,7 @@ void appendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) {
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
} }
void onAppendEntries(SRaft *pRaft, const SyncAppendEntries *pMsg) { int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// TLA+ Spec // TLA+ Spec
// HandleAppendEntriesRequest(i, j, m) == // HandleAppendEntriesRequest(i, j, m) ==
// LET logOk == \/ m.mprevLogIndex = 0 // LET logOk == \/ m.mprevLogIndex = 0
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "syncAppendEntriesReply.h" #include "syncAppendEntriesReply.h"
void onAppendEntriesReply(SRaft *pRaft, const SyncAppendEntriesReply *pMsg) { int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
// TLA+ Spec // TLA+ Spec
// HandleAppendEntriesResponse(i, j, m) == // HandleAppendEntriesResponse(i, j, m) ==
// /\ m.mterm = currentTerm[i] // /\ m.mterm = currentTerm[i]
......
...@@ -245,6 +245,42 @@ static void *syncIOConsumerFunc(void *param) { ...@@ -245,6 +245,42 @@ static void *syncIOConsumerFunc(void *param) {
syncPingReplyDestroy(pSyncMsg); syncPingReplyDestroy(pSyncMsg);
} }
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) {
if (io->FpOnSyncRequestVote) {
SyncRequestVote *pSyncMsg;
pSyncMsg = syncRequestVoteBuild(pRpcMsg->contLen);
syncRequestVoteFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncRequestVote(io->pSyncNode, pSyncMsg);
syncRequestVoteDestroy(pSyncMsg);
}
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) {
if (io->FpOnSyncRequestVoteReply) {
SyncRequestVoteReply *pSyncMsg;
pSyncMsg = SyncRequestVoteReplyBuild();
syncRequestVoteReplyFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncRequestVoteReply(io->pSyncNode, pSyncMsg);
syncRequestVoteReplyDestroy(pSyncMsg);
}
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) {
if (io->FpOnSyncAppendEntries) {
SyncAppendEntries *pSyncMsg;
pSyncMsg = syncAppendEntriesBuild(pRpcMsg->contLen);
syncAppendEntriesFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncAppendEntries(io->pSyncNode, pSyncMsg);
syncAppendEntriesDestroy(pSyncMsg);
}
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) {
if (io->FpOnSyncAppendEntriesReply) {
SyncAppendEntriesReply *pSyncMsg;
pSyncMsg = syncAppendEntriesReplyBuild();
syncAppendEntriesReplyFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncAppendEntriesReply(io->pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg);
}
} else if (pRpcMsg->msgType == SYNC_TIMEOUT) { } else if (pRpcMsg->msgType == SYNC_TIMEOUT) {
if (io->FpOnSyncTimeout != NULL) { if (io->FpOnSyncTimeout != NULL) {
SyncTimeout *pSyncMsg; SyncTimeout *pSyncMsg;
......
...@@ -31,16 +31,16 @@ static void syncNodeEqElectTimer(void* param, void* tmrId); ...@@ -31,16 +31,16 @@ static void syncNodeEqElectTimer(void* param, void* tmrId);
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg);
static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg);
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg); static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg);
static void syncNodeBecomeFollower(SSyncNode* pSyncNode);
static void syncNodeBecomeLeader(SSyncNode* pSyncNode);
static void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
static void syncNodeCandidate2Leader(SSyncNode* pSyncNode);
static void syncNodeLeader2Follower(SSyncNode* pSyncNode);
static void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
// --------------------------------- // ---------------------------------
int32_t syncInit() { int32_t syncInit() {
...@@ -188,8 +188,6 @@ int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { ...@@ -188,8 +188,6 @@ int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
return 0; return 0;
} }
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; }
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
if (pSyncNode->pHeartbeatTimer == NULL) { if (pSyncNode->pHeartbeatTimer == NULL) {
pSyncNode->pHeartbeatTimer = pSyncNode->pHeartbeatTimer =
...@@ -209,20 +207,6 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { ...@@ -209,20 +207,6 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
return 0; return 0;
} }
int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; }
void syncNodeBecomeFollower(SSyncNode* pSyncNode) {}
void syncNodeBecomeLeader(SSyncNode* pSyncNode) {}
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {}
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {}
void syncNodeLeader2Follower(SSyncNode* pSyncNode) {}
void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {}
// ------ local funciton --------- // ------ local funciton ---------
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
sTrace("syncNodePing pSyncNode:%p ", pSyncNode); sTrace("syncNodePing pSyncNode:%p ", pSyncNode);
...@@ -252,16 +236,6 @@ static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, Syn ...@@ -252,16 +236,6 @@ static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, Syn
return ret; return ret;
} }
static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) {
int32_t ret = 0;
return ret;
}
static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) {
int32_t ret = 0;
return ret;
}
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet; SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet); syncUtilraftId2EpSet(destRaftId, &epSet);
...@@ -311,26 +285,6 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { ...@@ -311,26 +285,6 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
return ret; return ret;
} }
static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
int32_t ret = 0;
return ret;
}
static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = 0;
return ret;
}
static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t ret = 0;
return ret;
}
static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = 0;
return ret;
}
static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
int32_t ret = 0; int32_t ret = 0;
sTrace("<-- syncNodeOnTimeoutCb -->"); sTrace("<-- syncNodeOnTimeoutCb -->");
...@@ -378,3 +332,37 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { ...@@ -378,3 +332,37 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
static void syncNodeEqElectTimer(void* param, void* tmrId) {} static void syncNodeEqElectTimer(void* param, void* tmrId) {}
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {} static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {}
static void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
pSyncNode->leaderCache.addr = 0;
pSyncNode->leaderCache.vgId = 0;
}
syncNodeStopHeartbeatTimer(pSyncNode);
syncNodeStartElectTimer(pSyncNode);
}
static void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
pSyncNode->state = TAOS_SYNC_STATE_LEADER;
pSyncNode->leaderCache = pSyncNode->raftId;
// next Index +=1
// match Index = 0;
syncNodeStopElectTimer(pSyncNode);
syncNodeStartHeartbeatTimer(pSyncNode);
// appendEntries;
}
static void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
assert(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
}
static void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {}
static void syncNodeLeader2Follower(SSyncNode* pSyncNode) {}
static void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {}
\ No newline at end of file
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "syncRequestVote.h" #include "syncRequestVote.h"
void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) {
// TLA+ Spec // TLA+ Spec
// RequestVote(i, j) == // RequestVote(i, j) ==
// /\ state[i] = Candidate // /\ state[i] = Candidate
...@@ -29,7 +29,7 @@ void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { ...@@ -29,7 +29,7 @@ void requestVote(SRaft *pRaft, const SyncRequestVote *pMsg) {
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
} }
void onRequestVote(SRaft *pRaft, const SyncRequestVote *pMsg) { int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
// TLA+ Spec // TLA+ Spec
// HandleRequestVoteRequest(i, j, m) == // HandleRequestVoteRequest(i, j, m) ==
// LET logOk == \/ m.mlastLogTerm > LastTerm(log[i]) // LET logOk == \/ m.mlastLogTerm > LastTerm(log[i])
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "syncRequestVoteReply.h" #include "syncRequestVoteReply.h"
void onRequestVoteReply(SRaft *pRaft, const SyncRequestVoteReply *pMsg) { int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
// TLA+ Spec // TLA+ Spec
// HandleRequestVoteResponse(i, j, m) == // HandleRequestVoteResponse(i, j, m) ==
// \* This tallies votes even when the current state is not Candidate, but // \* This tallies votes even when the current state is not Candidate, but
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册