From ffe442301c532ca5d0139ac4e23d8097d2abf34e Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 8 Mar 2022 14:19:50 +0800 Subject: [PATCH] sync refactor --- source/libs/sync/inc/syncInt.h | 4 ++- source/libs/sync/inc/syncReplication.h | 1 + source/libs/sync/src/syncElection.c | 8 ++++-- source/libs/sync/src/syncMain.c | 35 +++++++++++++++++++++++--- source/libs/sync/src/syncReplication.c | 11 +++++++- source/libs/sync/src/syncTimeout.c | 2 +- source/libs/sync/src/syncUtil.c | 2 +- 7 files changed, 54 insertions(+), 9 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 2c8e36fd08..874763cd45 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -155,7 +155,7 @@ typedef struct SSyncNode { SSyncLogStore* pLogStore; SyncIndex commitIndex; - // timer + // ping timer tmr_h pPingTimer; int32_t pingTimerMS; uint64_t pingTimerLogicClock; @@ -163,6 +163,7 @@ typedef struct SSyncNode { TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp uint64_t pingTimerCounter; + // elect timer tmr_h pElectTimer; int32_t electTimerMS; uint64_t electTimerLogicClock; @@ -170,6 +171,7 @@ typedef struct SSyncNode { TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp uint64_t electTimerCounter; + // heartbeat timer tmr_h pHeartbeatTimer; int32_t heartbeatTimerMS; uint64_t heartbeatTimerLogicClock; diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 467cfdde5c..aca6205b9d 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -53,6 +53,7 @@ extern "C" { // int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); +int32_t syncNodeReplicate(SSyncNode* pSyncNode); int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); #ifdef __cplusplus diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 87017b718d..24f41a0e69 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -28,11 +28,15 @@ // mdest |-> j]) // /\ UNCHANGED <> // -int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {} +int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) { + int32_t ret = 0; + return ret; +} int32_t syncNodeElect(SSyncNode* pSyncNode) { // start election - syncNodeRequestVotePeers(pSyncNode); + int32_t ret = syncNodeRequestVotePeers(pSyncNode); + return ret; } int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 44d2e4c160..f2939df60c 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -37,7 +37,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); -static void UpdateTerm(SyncTerm term); +static void UpdateTerm(SSyncNode* pSyncNode, SyncTerm term); static void syncNodeBecomeFollower(SSyncNode* pSyncNode); static void syncNodeBecomeLeader(SSyncNode* pSyncNode); static void syncNodeFollower2Candidate(SSyncNode* pSyncNode); @@ -452,9 +452,38 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { } } -static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {} +static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { + SSyncNode* pSyncNode = (SSyncNode*)param; + if (atomic_load_64(&pSyncNode->heartbeatTimerLogicClockUser) <= + atomic_load_64(&pSyncNode->heartbeatTimerLogicClock)) { + SyncTimeout* pSyncMsg = + syncTimeoutBuild2(SYNC_TIMEOUT_HEARTBEAT, atomic_load_64(&pSyncNode->heartbeatTimerLogicClock), + pSyncNode->heartbeatTimerMS, pSyncNode); + + SRpcMsg rpcMsg; + syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + syncTimeoutDestroy(pSyncMsg); + + // reset timer ms + // pSyncNode->heartbeatTimerMS += 100; -static void UpdateTerm(SyncTerm term) {} + taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, &gSyncEnv->pTimerManager, + &pSyncNode->pHeartbeatTimer); + } else { + sTrace("syncNodeEqHeartbeatTimer: heartbeatTimerLogicClock:%lu, heartbeatTimerLogicClockUser:%lu", + pSyncNode->heartbeatTimerLogicClock, pSyncNode->heartbeatTimerLogicClockUser); + } +} + +static void UpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { + if (term > pSyncNode->pRaftStore->currentTerm) { + pSyncNode->pRaftStore->currentTerm = term; + pSyncNode->pRaftStore->voteFor = EMPTY_RAFT_ID; + raftStorePersist(pSyncNode->pRaftStore); + syncNodeBecomeFollower(pSyncNode); + } +} static void syncNodeBecomeFollower(SSyncNode* pSyncNode) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 37e8959ff3..dfbe5db0ed 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -41,7 +41,16 @@ // mdest |-> j]) // /\ UNCHANGED <> // -int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {} +int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { + int32_t ret = 0; + return ret; +} + +int32_t syncNodeReplicate(SSyncNode* pSyncNode) { + // start replicate + int32_t ret = syncNodeAppendEntriesPeers(pSyncNode); + return ret; +} int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) { sTrace("syncNodeAppendEntries pSyncNode:%p ", pSyncNode); diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index df9b9d27b4..7cbfd6d40a 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -44,7 +44,7 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { } else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) { if (atomic_load_64(&ths->heartbeatTimerLogicClockUser) <= pMsg->logicClock) { ++(ths->heartbeatTimerCounter); - syncNodeAppendEntriesPeers(ths); + syncNodeReplicate(ths); } } else { sTrace("unknown timeoutType:%d", pMsg->timeoutType); diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 04be3f33ac..fa6c245fd8 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -127,7 +127,7 @@ cJSON* syncUtilRaftId2Json(const SRaftId* p) { cJSON_AddNumberToObject(pRoot, "vgId", p->vgId); cJSON* pJson = cJSON_CreateObject(); - cJSON_AddItemToObject(pJson, "SNodeInfo", pRoot); + cJSON_AddItemToObject(pJson, "SRaftId", pRoot); return pJson; } -- GitLab