From 49c19e13f271ee9d94a9a016e68aaaf2670c7205 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 18 Oct 2022 14:16:39 +0800 Subject: [PATCH] refactor(sync): rename function --- source/libs/sync/inc/syncElection.h | 7 +- source/libs/sync/inc/syncReplication.h | 12 +- source/libs/sync/src/syncAppendEntriesReply.c | 2 +- source/libs/sync/src/syncElection.c | 108 +++++------------- source/libs/sync/src/syncMain.c | 6 +- source/libs/sync/src/syncReplication.c | 11 +- 6 files changed, 43 insertions(+), 103 deletions(-) diff --git a/source/libs/sync/inc/syncElection.h b/source/libs/sync/inc/syncElection.h index 09651edd44..9ccd9dd28f 100644 --- a/source/libs/sync/inc/syncElection.h +++ b/source/libs/sync/inc/syncElection.h @@ -37,13 +37,10 @@ extern "C" { // msource |-> i, // mdest |-> j]) // /\ UNCHANGED <> -// -int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode); -int32_t syncNodeRequestVotePeersSnapshot(SSyncNode* pSyncNode); -int32_t syncNodeDoRequestVote(SSyncNode* pSyncNode); int32_t syncNodeElect(SSyncNode* pSyncNode); -int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg); +int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode); +int32_t syncNodeSendRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 90c11b2300..4f15a45cec 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -50,15 +50,15 @@ extern "C" { // msource |-> i, // mdest |-> j]) // /\ UNCHANGED <> -// -int32_t syncNodeHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncHeartbeat* pMsg); int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode); +int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, const SyncHeartbeat* pMsg); -int32_t syncNodeDoReplicate(SSyncNode* pSyncNode); -int32_t syncNodeDoAppendEntries(SSyncNode* pSyncNode, SRaftId* pDestId); -int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); -int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); +int32_t syncNodeReplicate(SSyncNode* pSyncNode); +int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId); + +int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* pDestId, const SyncAppendEntries* pMsg); +int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* pDestId, const SyncAppendEntries* pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 3c0b2bebfc..eb8777bf37 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -129,7 +129,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMs ASSERT(pState != NULL); if (pMsg->lastSendIndex == pState->lastSendIndex) { - syncNodeDoAppendEntries(ths, &(pMsg->srcId)); + syncNodeReplicateOne(ths, &(pMsg->srcId)); } } diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index af7ea9201b..b0ccb12a00 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -30,68 +30,6 @@ // msource |-> i, // mdest |-> j]) // /\ UNCHANGED <> -// -int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) { - ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); - - int32_t ret = 0; - for (int i = 0; i < pSyncNode->peersNum; ++i) { - SyncRequestVote* pMsg = syncRequestVoteBuild(pSyncNode->vgId); - pMsg->srcId = pSyncNode->myRaftId; - pMsg->destId = pSyncNode->peersId[i]; - pMsg->term = pSyncNode->pRaftStore->currentTerm; - pMsg->lastLogIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); - pMsg->lastLogTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore); - - ret = syncNodeRequestVote(pSyncNode, &pSyncNode->peersId[i], pMsg); - ASSERT(ret == 0); - syncRequestVoteDestroy(pMsg); - } - return ret; -} - -int32_t syncNodeRequestVotePeersSnapshot(SSyncNode* pSyncNode) { - ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); - - int32_t ret = 0; - for (int i = 0; i < pSyncNode->peersNum; ++i) { - SyncRequestVote* pMsg = syncRequestVoteBuild(pSyncNode->vgId); - pMsg->srcId = pSyncNode->myRaftId; - pMsg->destId = pSyncNode->peersId[i]; - pMsg->term = pSyncNode->pRaftStore->currentTerm; - - ret = syncNodeGetLastIndexTerm(pSyncNode, &(pMsg->lastLogIndex), &(pMsg->lastLogTerm)); - ASSERT(ret == 0); - - ret = syncNodeRequestVote(pSyncNode, &pSyncNode->peersId[i], pMsg); - ASSERT(ret == 0); - syncRequestVoteDestroy(pMsg); - } - return ret; -} - -int32_t syncNodeDoRequestVote(SSyncNode* pSyncNode) { - if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) { - syncNodeEventLog(pSyncNode, "not candidate, stop elect"); - return 0; - } - - int32_t ret = 0; - for (int i = 0; i < pSyncNode->peersNum; ++i) { - SyncRequestVote* pMsg = syncRequestVoteBuild(pSyncNode->vgId); - pMsg->srcId = pSyncNode->myRaftId; - pMsg->destId = pSyncNode->peersId[i]; - pMsg->term = pSyncNode->pRaftStore->currentTerm; - - ret = syncNodeGetLastIndexTerm(pSyncNode, &(pMsg->lastLogIndex), &(pMsg->lastLogTerm)); - ASSERT(ret == 0); - - ret = syncNodeRequestVote(pSyncNode, &pSyncNode->peersId[i], pMsg); - ASSERT(ret == 0); - syncRequestVoteDestroy(pMsg); - } - return ret; -} int32_t syncNodeElect(SSyncNode* pSyncNode) { syncNodeEventLog(pSyncNode, "begin election"); @@ -121,32 +59,38 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { return ret; } - if (syncNodeIsMnode(pSyncNode)) { - switch (pSyncNode->pRaftCfg->snapshotStrategy) { - case SYNC_STRATEGY_NO_SNAPSHOT: - ret = syncNodeRequestVotePeers(pSyncNode); - break; - - case SYNC_STRATEGY_STANDARD_SNAPSHOT: - case SYNC_STRATEGY_WAL_FIRST: - ret = syncNodeRequestVotePeersSnapshot(pSyncNode); - break; - - default: - ret = syncNodeRequestVotePeers(pSyncNode); - break; - } - } else { - ret = syncNodeDoRequestVote(pSyncNode); - } - + ret = syncNodeRequestVotePeers(pSyncNode); ASSERT(ret == 0); + syncNodeResetElectTimer(pSyncNode); return ret; } -int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) { +int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) { + if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) { + syncNodeEventLog(pSyncNode, "not candidate, stop elect"); + return 0; + } + + int32_t ret = 0; + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SyncRequestVote* pMsg = syncRequestVoteBuild(pSyncNode->vgId); + pMsg->srcId = pSyncNode->myRaftId; + pMsg->destId = pSyncNode->peersId[i]; + pMsg->term = pSyncNode->pRaftStore->currentTerm; + + ret = syncNodeGetLastIndexTerm(pSyncNode, &(pMsg->lastLogIndex), &(pMsg->lastLogTerm)); + ASSERT(ret == 0); + + ret = syncNodeSendRequestVote(pSyncNode, &pSyncNode->peersId[i], pMsg); + ASSERT(ret == 0); + syncRequestVoteDestroy(pMsg); + } + return ret; +} + +int32_t syncNodeSendRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) { int32_t ret = 0; syncLogSendRequestVote(pSyncNode, pMsg, ""); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index f52a719f7f..dcd35c3e25 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2299,7 +2299,7 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { syncMaybeAdvanceCommitIndex(pSyncNode); if (pSyncNode->replicaNum > 1) { - syncNodeDoReplicate(pSyncNode); + syncNodeReplicate(pSyncNode); } } @@ -2696,7 +2696,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { #endif // send msg - syncNodeHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg); + syncNodeSendHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg); syncHeartbeatDestroy(pSyncMsg); @@ -2900,7 +2900,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd // if mulit replica, start replicate right now if (ths->replicaNum > 1) { - syncNodeDoReplicate(ths); + syncNodeReplicate(ths); } // if only myself, maybe commit right now diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 69c708c745..8bf0798f96 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -47,9 +47,8 @@ // msource |-> i, // mdest |-> j]) // /\ UNCHANGED <> -// -int32_t syncNodeDoAppendEntries(SSyncNode* pSyncNode, SRaftId* pDestId) { +int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) { // next index SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); @@ -118,7 +117,7 @@ int32_t syncNodeDoAppendEntries(SSyncNode* pSyncNode, SRaftId* pDestId) { return 0; } -int32_t syncNodeDoReplicate(SSyncNode* pSyncNode) { +int32_t syncNodeReplicate(SSyncNode* pSyncNode) { syncNodeEventLog(pSyncNode, "do replicate"); if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { @@ -128,7 +127,7 @@ int32_t syncNodeDoReplicate(SSyncNode* pSyncNode) { int32_t ret = 0; for (int i = 0; i < pSyncNode->peersNum; ++i) { SRaftId* pDestId = &(pSyncNode->peersId[i]); - ret = syncNodeDoAppendEntries(pSyncNode, pDestId); + ret = syncNodeReplicateOne(pSyncNode, pDestId); if (ret != 0) { char host[64]; int16_t port; @@ -175,7 +174,7 @@ int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, c return ret; } -int32_t syncNodeHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncHeartbeat* pMsg) { +int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncHeartbeat* pMsg) { int32_t ret = 0; syncLogSendHeartbeat(pSyncNode, pMsg, ""); @@ -198,7 +197,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) { syncHeartbeat2RpcMsg(pSyncMsg, &rpcMsg); // send msg - syncNodeHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg); + syncNodeSendHeartbeat(pSyncNode, &(pSyncMsg->destId), pSyncMsg); syncHeartbeatDestroy(pSyncMsg); } -- GitLab