From 264c30e5db9b348d06135bc9638d7e1cd9b66069 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 8 Mar 2022 10:52:18 +0800 Subject: [PATCH] sync refactor --- source/libs/sync/inc/syncInt.h | 9 ++++--- source/libs/sync/inc/syncUtil.h | 1 + source/libs/sync/src/syncMain.c | 45 ++++++++++++++++++++++++++++++--- source/libs/sync/src/syncUtil.c | 4 ++- 4 files changed, 50 insertions(+), 9 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index b8c7eb60e7..79412febd9 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -99,8 +99,8 @@ typedef struct SRaftStore SRaftStore; struct SVotesGranted; typedef struct SVotesGranted SVotesGranted; -struct SVotesResponded; -typedef struct SVotesResponded SVotesResponded; +struct SVotesRespond; +typedef struct SVotesRespond SVotesRespond; typedef struct SRaftId { SyncNodeId addr; // typedef uint64_t SyncNodeId; @@ -112,6 +112,7 @@ typedef struct SSyncNode { SyncGroupId vgId; SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; + char walPath[TSDB_FILENAME_LEN]; void* rpcClient; int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); void* queue; @@ -142,8 +143,8 @@ typedef struct SSyncNode { SRaftStore* pRaftStore; // tla+ candidate vars - SVotesGranted* pVotesGranted; - SVotesResponded* pVotesResponded; + SVotesGranted* pVotesGranted; + SVotesRespond* pVotesRespond; // tla+ leader vars SHashObj* pNextIndex; diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 9b481e82d9..eb1e888254 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -44,6 +44,7 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest); // ---- misc ---- int32_t syncUtilRand(int32_t max); int32_t syncUtilElectRandomMS(); +int32_t syncUtilQuorum(int32_t replicaNum); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index db34d16690..836f1b0e83 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -20,10 +20,12 @@ #include "syncEnv.h" #include "syncInt.h" #include "syncRaft.h" +#include "syncRaftStore.h" #include "syncRequestVote.h" #include "syncRequestVoteReply.h" #include "syncTimeout.h" #include "syncUtil.h" +#include "syncVoteMgr.h" static int32_t tsNodeRefId = -1; @@ -35,6 +37,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); +static void UpdateTerm(SyncTerm term); static void syncNodeBecomeFollower(SSyncNode* pSyncNode); static void syncNodeBecomeLeader(SSyncNode* pSyncNode); static void syncNodeFollower2Candidate(SSyncNode* pSyncNode); @@ -71,19 +74,22 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { assert(pSyncNode != NULL); memset(pSyncNode, 0, sizeof(SSyncNode)); + // init by SSyncInfo pSyncNode->vgId = pSyncInfo->vgId; pSyncNode->syncCfg = pSyncInfo->syncCfg; memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path)); - pSyncNode->pFsm = pSyncInfo->pFsm; - + memcpy(pSyncNode->walPath, pSyncInfo->walPath, sizeof(pSyncNode->walPath)); pSyncNode->rpcClient = pSyncInfo->rpcClient; pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; pSyncNode->queue = pSyncInfo->queue; pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg; + // init internal pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex]; - pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1; + syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncInfo->vgId, &pSyncNode->raftId); + // init peersNum, peers, peersId + pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1; int j = 0; for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) { if (i != pSyncInfo->syncCfg.myIndex) { @@ -91,9 +97,37 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { j++; } } + for (int i = 0; i < pSyncNode->peersNum; ++i) { + syncUtilnodeInfo2raftId(&pSyncNode->peers[i], pSyncInfo->vgId, &pSyncNode->peersId[i]); + } + // init replicaNum, replicasId + pSyncNode->replicaNum = pSyncInfo->syncCfg.replicaNum; + for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) { + syncUtilnodeInfo2raftId(&pSyncInfo->syncCfg.nodeInfo[i], pSyncInfo->vgId, &pSyncNode->replicasId[i]); + } + + // raft algorithm + pSyncNode->pFsm = pSyncInfo->pFsm; + pSyncNode->quorum = syncUtilQuorum(pSyncInfo->syncCfg.replicaNum); + pSyncNode->leaderCache = EMPTY_RAFT_ID; + + // life cycle + + // init server vars pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; - syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId); + pSyncNode->pRaftStore = raftStoreOpen(pSyncInfo->walPath); + assert(pSyncNode->pRaftStore != NULL); + + // init candidate vars + pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode); + assert(pSyncNode->pVotesGranted != NULL); + pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode); + assert(pSyncNode->pVotesRespond != NULL); + + // init leader vars + pSyncNode->pNextIndex = NULL; + pSyncNode->pMatchIndex = NULL; // init ping timer pSyncNode->pPingTimer = NULL; @@ -119,6 +153,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->FpHeartbeatTimer = syncNodeEqHeartbeatTimer; pSyncNode->heartbeatTimerCounter = 0; + // init callback pSyncNode->FpOnPing = syncNodeOnPingCb; pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb; pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb; @@ -353,6 +388,8 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {} +static void UpdateTerm(SyncTerm term) {} + static void syncNodeBecomeFollower(SSyncNode* pSyncNode) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { pSyncNode->leaderCache = EMPTY_RAFT_ID; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index c70e490025..1e62301814 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -97,4 +97,6 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) { int32_t syncUtilRand(int32_t max) { return rand() % max; } -int32_t syncUtilElectRandomMS() { ELECT_TIMER_MS_MIN + syncUtilRand(ELECT_TIMER_MS_RANGE); } \ No newline at end of file +int32_t syncUtilElectRandomMS() { ELECT_TIMER_MS_MIN + syncUtilRand(ELECT_TIMER_MS_RANGE); } + +int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; } \ No newline at end of file -- GitLab