提交 ffe44230 编写于 作者: M Minghao Li

sync refactor

上级 f556d981
...@@ -155,7 +155,7 @@ typedef struct SSyncNode { ...@@ -155,7 +155,7 @@ typedef struct SSyncNode {
SSyncLogStore* pLogStore; SSyncLogStore* pLogStore;
SyncIndex commitIndex; SyncIndex commitIndex;
// timer // ping timer
tmr_h pPingTimer; tmr_h pPingTimer;
int32_t pingTimerMS; int32_t pingTimerMS;
uint64_t pingTimerLogicClock; uint64_t pingTimerLogicClock;
...@@ -163,6 +163,7 @@ typedef struct SSyncNode { ...@@ -163,6 +163,7 @@ typedef struct SSyncNode {
TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp
uint64_t pingTimerCounter; uint64_t pingTimerCounter;
// elect timer
tmr_h pElectTimer; tmr_h pElectTimer;
int32_t electTimerMS; int32_t electTimerMS;
uint64_t electTimerLogicClock; uint64_t electTimerLogicClock;
...@@ -170,6 +171,7 @@ typedef struct SSyncNode { ...@@ -170,6 +171,7 @@ typedef struct SSyncNode {
TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp
uint64_t electTimerCounter; uint64_t electTimerCounter;
// heartbeat timer
tmr_h pHeartbeatTimer; tmr_h pHeartbeatTimer;
int32_t heartbeatTimerMS; int32_t heartbeatTimerMS;
uint64_t heartbeatTimerLogicClock; uint64_t heartbeatTimerLogicClock;
......
...@@ -53,6 +53,7 @@ extern "C" { ...@@ -53,6 +53,7 @@ extern "C" {
// //
int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -28,11 +28,15 @@ ...@@ -28,11 +28,15 @@
// mdest |-> j]) // mdest |-> j])
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
// //
int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {} int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {
int32_t ret = 0;
return ret;
}
int32_t syncNodeElect(SSyncNode* pSyncNode) { int32_t syncNodeElect(SSyncNode* pSyncNode) {
// start election // start election
syncNodeRequestVotePeers(pSyncNode); int32_t ret = syncNodeRequestVotePeers(pSyncNode);
return ret;
} }
int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) { int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) {
......
...@@ -37,7 +37,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); ...@@ -37,7 +37,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
static void UpdateTerm(SyncTerm term); static void UpdateTerm(SSyncNode* pSyncNode, SyncTerm term);
static void syncNodeBecomeFollower(SSyncNode* pSyncNode); static void syncNodeBecomeFollower(SSyncNode* pSyncNode);
static void syncNodeBecomeLeader(SSyncNode* pSyncNode); static void syncNodeBecomeLeader(SSyncNode* pSyncNode);
static void syncNodeFollower2Candidate(SSyncNode* pSyncNode); static void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
...@@ -452,9 +452,38 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { ...@@ -452,9 +452,38 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
} }
} }
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {} static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param;
if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <=
atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) {
SyncTimeout* pSyncMsg =
syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock),
pSyncNode->heartbeatTimerMS, pSyncNode);
SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
syncTimeoutDestroy(pSyncMsg);
// reset timer ms
// pSyncNode->heartbeatTimerMS += 100;
static void UpdateTerm(SyncTerm term) {} taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
&pSyncNode->pHeartbeatTimer);
} else {
sTrace("syncNodeEqHeartbeatTimer: heartbeatTimerLogicClock:%lu, heartbeatTimerLogicClockUser:%lu",
pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser);
}
}
static void UpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
if (term > pSyncNode->pRaftStore->currentTerm) {
pSyncNode->pRaftStore->currentTerm = term;
pSyncNode->pRaftStore->voteFor = EMPTY_RAFT_ID;
raftStorePersist(pSyncNode->pRaftStore);
syncNodeBecomeFollower(pSyncNode);
}
}
static void syncNodeBecomeFollower(SSyncNode* pSyncNode) { static void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
......
...@@ -41,7 +41,16 @@ ...@@ -41,7 +41,16 @@
// mdest |-> j]) // mdest |-> j])
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
// //
int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {} int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
int32_t ret = 0;
return ret;
}
int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
// start replicate
int32_t ret = syncNodeAppendEntriesPeers(pSyncNode);
return ret;
}
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) { int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) {
sTrace("syncNodeAppendEntries pSyncNode:%p ", pSyncNode); sTrace("syncNodeAppendEntries pSyncNode:%p ", pSyncNode);
......
...@@ -44,7 +44,7 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { ...@@ -44,7 +44,7 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
} else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) { } else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) {
if (atomic_load_64(&ths->heartbeatTimerLogicClockUser) <= pMsg->logicClock) { if (atomic_load_64(&ths->heartbeatTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->heartbeatTimerCounter); ++(ths->heartbeatTimerCounter);
syncNodeAppendEntriesPeers(ths); syncNodeReplicate(ths);
} }
} else { } else {
sTrace("unknown timeoutType:%d", pMsg->timeoutType); sTrace("unknown timeoutType:%d", pMsg->timeoutType);
......
...@@ -127,7 +127,7 @@ cJSON* syncUtilRaftId2Json(const SRaftId* p) { ...@@ -127,7 +127,7 @@ cJSON* syncUtilRaftId2Json(const SRaftId* p) {
cJSON_AddNumberToObject(pRoot, "vgId", p->vgId); cJSON_AddNumberToObject(pRoot, "vgId", p->vgId);
cJSON* pJson = cJSON_CreateObject(); cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SNodeInfo", pRoot); cJSON_AddItemToObject(pJson, "SRaftId", pRoot);
return pJson; return pJson;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册