提交 264c30e5 编写于 作者: M Minghao Li

sync refactor

上级 8f108bfb
......@@ -99,8 +99,8 @@ typedef struct SRaftStore SRaftStore;
struct SVotesGranted;
typedef struct SVotesGranted SVotesGranted;
struct SVotesResponded;
typedef struct SVotesResponded SVotesResponded;
struct SVotesRespond;
typedef struct SVotesRespond SVotesRespond;
typedef struct SRaftId {
SyncNodeId addr; // typedef uint64_t SyncNodeId;
......@@ -112,6 +112,7 @@ typedef struct SSyncNode {
SyncGroupId vgId;
SSyncCfg syncCfg;
char path[TSDB_FILENAME_LEN];
char walPath[TSDB_FILENAME_LEN];
void* rpcClient;
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
void* queue;
......@@ -142,8 +143,8 @@ typedef struct SSyncNode {
SRaftStore* pRaftStore;
// tla+ candidate vars
SVotesGranted* pVotesGranted;
SVotesResponded* pVotesResponded;
SVotesGranted* pVotesGranted;
SVotesRespond* pVotesRespond;
// tla+ leader vars
SHashObj* pNextIndex;
......
......@@ -44,6 +44,7 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest);
// ---- misc ----
int32_t syncUtilRand(int32_t max);
int32_t syncUtilElectRandomMS();
int32_t syncUtilQuorum(int32_t replicaNum);
#ifdef __cplusplus
}
......
......@@ -20,10 +20,12 @@
#include "syncEnv.h"
#include "syncInt.h"
#include "syncRaft.h"
#include "syncRaftStore.h"
#include "syncRequestVote.h"
#include "syncRequestVoteReply.h"
#include "syncTimeout.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
static int32_t tsNodeRefId = -1;
......@@ -35,6 +37,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
static void UpdateTerm(SyncTerm term);
static void syncNodeBecomeFollower(SSyncNode* pSyncNode);
static void syncNodeBecomeLeader(SSyncNode* pSyncNode);
static void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
......@@ -71,19 +74,22 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
assert(pSyncNode != NULL);
memset(pSyncNode, 0, sizeof(SSyncNode));
// init by SSyncInfo
pSyncNode->vgId = pSyncInfo->vgId;
pSyncNode->syncCfg = pSyncInfo->syncCfg;
memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
pSyncNode->pFsm = pSyncInfo->pFsm;
memcpy(pSyncNode->walPath, pSyncInfo->walPath, sizeof(pSyncNode->walPath));
pSyncNode->rpcClient = pSyncInfo->rpcClient;
pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
pSyncNode->queue = pSyncInfo->queue;
pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
// init internal
pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex];
pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1;
syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncInfo->vgId, &pSyncNode->raftId);
// init peersNum, peers, peersId
pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1;
int j = 0;
for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) {
if (i != pSyncInfo->syncCfg.myIndex) {
......@@ -91,9 +97,37 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
j++;
}
}
for (int i = 0; i < pSyncNode->peersNum; ++i) {
syncUtilnodeInfo2raftId(&pSyncNode->peers[i], pSyncInfo->vgId, &pSyncNode->peersId[i]);
}
// init replicaNum, replicasId
pSyncNode->replicaNum = pSyncInfo->syncCfg.replicaNum;
for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) {
syncUtilnodeInfo2raftId(&pSyncInfo->syncCfg.nodeInfo[i], pSyncInfo->vgId, &pSyncNode->replicasId[i]);
}
// raft algorithm
pSyncNode->pFsm = pSyncInfo->pFsm;
pSyncNode->quorum = syncUtilQuorum(pSyncInfo->syncCfg.replicaNum);
pSyncNode->leaderCache = EMPTY_RAFT_ID;
// life cycle
// init server vars
pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER;
syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId);
pSyncNode->pRaftStore = raftStoreOpen(pSyncInfo->walPath);
assert(pSyncNode->pRaftStore != NULL);
// init candidate vars
pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode);
assert(pSyncNode->pVotesGranted != NULL);
pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode);
assert(pSyncNode->pVotesRespond != NULL);
// init leader vars
pSyncNode->pNextIndex = NULL;
pSyncNode->pMatchIndex = NULL;
// init ping timer
pSyncNode->pPingTimer = NULL;
......@@ -119,6 +153,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->FpHeartbeatTimer = syncNodeEqHeartbeatTimer;
pSyncNode->heartbeatTimerCounter = 0;
// init callback
pSyncNode->FpOnPing = syncNodeOnPingCb;
pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
......@@ -353,6 +388,8 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {}
static void UpdateTerm(SyncTerm term) {}
static void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
pSyncNode->leaderCache = EMPTY_RAFT_ID;
......
......@@ -97,4 +97,6 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) {
int32_t syncUtilRand(int32_t max) { return rand() % max; }
int32_t syncUtilElectRandomMS() { ELECT_TIMER_MS_MIN + syncUtilRand(ELECT_TIMER_MS_RANGE); }
\ No newline at end of file
int32_t syncUtilElectRandomMS() { ELECT_TIMER_MS_MIN + syncUtilRand(ELECT_TIMER_MS_RANGE); }
int32_t syncUtilQuorum(int32_t replicaNum) { return replicaNum / 2 + 1; }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册