提交 349a6a47 编写于 作者: L lichuang

[TD-10645][raft]<feature>add raft message handle

上级 24a0966d
...@@ -133,9 +133,9 @@ typedef struct SStateManager { ...@@ -133,9 +133,9 @@ typedef struct SStateManager {
int32_t (*readServerState)(struct SStateManager* stateMng, SSyncServerState* state); int32_t (*readServerState)(struct SStateManager* stateMng, SSyncServerState* state);
// void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster); void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster);
// const SSyncClusterConfig* (*readCluster)(struct SStateManager* stateMng); const SSyncClusterConfig* (*readCluster)(struct SStateManager* stateMng);
} SStateManager; } SStateManager;
typedef struct { typedef struct {
......
...@@ -20,6 +20,8 @@ ...@@ -20,6 +20,8 @@
#include "sync_type.h" #include "sync_type.h"
#include "raft_message.h" #include "raft_message.h"
#define SYNC_NON_NODE_ID -1
typedef struct SSyncRaftProgress SSyncRaftProgress; typedef struct SSyncRaftProgress SSyncRaftProgress;
typedef struct RaftLeaderState { typedef struct RaftLeaderState {
...@@ -28,38 +30,84 @@ typedef struct RaftLeaderState { ...@@ -28,38 +30,84 @@ typedef struct RaftLeaderState {
} RaftLeaderState; } RaftLeaderState;
typedef struct SSyncRaftIOMethods { typedef struct SSyncRaftIOMethods {
SyncTime (*time)(SSyncRaft*);
} SSyncRaftIOMethods; } SSyncRaftIOMethods;
typedef int (*SyncRaftStepFp)(SSyncRaft* pRaft, const SSyncMessage* pMsg);
typedef void (*SyncRaftTickFp)(SSyncRaft* pRaft);
struct SSyncRaft { struct SSyncRaft {
// owner sync node // owner sync node
SSyncNode* pNode; SSyncNode* pNode;
SSyncInfo info; SSyncInfo info;
// election timeout tick(random in [3:6] tick) SSyncTerm term;
uint16_t electionTick; SyncNodeId voteFor;
// heartbeat timeout tick(default: 1 tick) SyncNodeId selfId;
uint16_t heartbeatTick;
/**
* the leader id
**/
SyncNodeId leaderId;
/**
* leadTransferee is id of the leader transfer target when its value is not zero.
* Follow the procedure defined in raft thesis 3.10.
**/
SyncNodeId leadTransferee;
/**
* New configuration is ignored if there exists unapplied configuration.
**/
bool pendingConf;
ESyncRole state;
int installSnapShotTimeoutMS; /**
* number of ticks since it reached last electionTimeout when it is leader
* or candidate.
* number of ticks since it reached last electionTimeout or received a
* valid message from current leader when it is a follower.
**/
uint16_t electionElapsed;
// /**
int heartbeatTimeoutMS; * number of ticks since it reached last heartbeatTimeout.
* only leader keeps heartbeatElapsed.
**/
uint16_t heartbeatElapsed;
// election timeout tick(random in [3:6] tick)
uint16_t electionTimeoutTick;
// heartbeat timeout tick(default: 1 tick)
uint16_t heartbeatTimeoutTick;
bool preVote; bool preVote;
bool checkQuorum;
SSyncRaftIOMethods io; SSyncRaftIOMethods io;
RaftLeaderState leaderState; RaftLeaderState leaderState;
SSyncRaftUnstableLog *log; SSyncRaftUnstableLog *log;
SyncRaftStepFp stepFp;
SyncRaftTickFp tickFp;
}; };
int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo); int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo);
int32_t syncRaftStep(SSyncRaft* pRaft, const RaftMessage* pMsg); int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg);
int32_t syncRaftTick(SSyncRaft* pRaft); int32_t syncRaftTick(SSyncRaft* pRaft);
void syncRaftBecomeFollower(SSyncRaft* pRaft, SSyncTerm term, SyncNodeId leaderId);
void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft);
bool syncRaftIsPromotable(SSyncRaft* pRaft);
bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft);
#endif /* _TD_LIBS_SYNC_RAFT_H */ #endif /* _TD_LIBS_SYNC_RAFT_H */
\ No newline at end of file
...@@ -28,15 +28,14 @@ typedef enum RaftMessageType { ...@@ -28,15 +28,14 @@ typedef enum RaftMessageType {
// client propose a cmd // client propose a cmd
RAFT_MSG_INTERNAL_PROP = 1, RAFT_MSG_INTERNAL_PROP = 1,
RAFT_MSG_APPEND, // node election timeout
RAFT_MSG_APPEND_RESP, RAFT_MSG_INTERNAL_ELECTION = 2,
RAFT_MSG_VOTE, RAFT_MSG_VOTE = 3,
RAFT_MSG_VOTE_RESP, RAFT_MSG_VOTE_RESP = 4,
RAFT_MSG_PRE_VOTE,
RAFT_MSG_PRE_VOTE_RESP,
RAFT_MSG_PRE_VOTE = 5,
RAFT_MSG_PRE_VOTE_RESP = 6,
} RaftMessageType; } RaftMessageType;
typedef struct RaftMsgInternal_Prop { typedef struct RaftMsgInternal_Prop {
...@@ -45,7 +44,15 @@ typedef struct RaftMsgInternal_Prop { ...@@ -45,7 +44,15 @@ typedef struct RaftMsgInternal_Prop {
void* pData; void* pData;
} RaftMsgInternal_Prop; } RaftMsgInternal_Prop;
typedef struct RaftMessage { typedef struct RaftMsgInternal_Election {
} RaftMsgInternal_Election;
typedef struct RaftMsg_PreVoteResp {
bool reject;
} RaftMsg_PreVoteResp;
typedef struct SSyncMessage {
RaftMessageType msgType; RaftMessageType msgType;
SSyncTerm term; SSyncTerm term;
SyncNodeId from; SyncNodeId from;
...@@ -53,12 +60,17 @@ typedef struct RaftMessage { ...@@ -53,12 +60,17 @@ typedef struct RaftMessage {
union { union {
RaftMsgInternal_Prop propose; RaftMsgInternal_Prop propose;
RaftMsgInternal_Election election;
RaftMsg_PreVoteResp preVoteResp;
}; };
} RaftMessage; } SSyncMessage;
static FORCE_INLINE RaftMessage* syncInitPropMsg(RaftMessage* pMsg, const SSyncBuffer* pBuf, void* pData, bool isWeak) { static FORCE_INLINE SSyncMessage* syncInitPropMsg(SSyncMessage* pMsg, const SSyncBuffer* pBuf, void* pData, bool isWeak) {
*pMsg = (RaftMessage) { *pMsg = (SSyncMessage) {
.msgType = RAFT_MSG_INTERNAL_PROP, .msgType = RAFT_MSG_INTERNAL_PROP,
.term = 0,
.propose = (RaftMsgInternal_Prop) { .propose = (RaftMsgInternal_Prop) {
.isWeak = isWeak, .isWeak = isWeak,
.pBuf = pBuf, .pBuf = pBuf,
...@@ -69,10 +81,24 @@ static FORCE_INLINE RaftMessage* syncInitPropMsg(RaftMessage* pMsg, const SSyncB ...@@ -69,10 +81,24 @@ static FORCE_INLINE RaftMessage* syncInitPropMsg(RaftMessage* pMsg, const SSyncB
return pMsg; return pMsg;
} }
static FORCE_INLINE bool syncIsInternalMsg(const RaftMessage* pMsg) { static FORCE_INLINE SSyncMessage* syncInitElectionMsg(SSyncMessage* pMsg, SyncNodeId from) {
return pMsg->msgType == RAFT_MSG_INTERNAL_PROP; *pMsg = (SSyncMessage) {
.msgType = RAFT_MSG_INTERNAL_ELECTION,
.term = 0,
.from = from,
.election = (RaftMsgInternal_Election) {
},
};
return pMsg;
}
static FORCE_INLINE bool syncIsInternalMsg(const SSyncMessage* pMsg) {
return pMsg->msgType == RAFT_MSG_INTERNAL_PROP ||
pMsg->msgType == RAFT_MSG_INTERNAL_ELECTION;
} }
void syncFreeMessage(const RaftMessage* pMsg); void syncFreeMessage(const SSyncMessage* pMsg);
#endif /* _TD_LIBS_SYNC_RAFT_MESSAGE_H */ #endif /* _TD_LIBS_SYNC_RAFT_MESSAGE_H */
\ No newline at end of file
...@@ -148,6 +148,7 @@ void syncRaftProgressBecomeReplicate(SSyncRaft* pRaft, int i); ...@@ -148,6 +148,7 @@ void syncRaftProgressBecomeReplicate(SSyncRaft* pRaft, int i);
void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i, SyncIndex snapshotIndex); void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i, SyncIndex snapshotIndex);
/* inflights APIs */
int syncRaftInflightReset(SSyncRaftInflights* inflights); int syncRaftInflightReset(SSyncRaftInflights* inflights);
bool syncRaftInflightFull(SSyncRaftInflights* inflights); bool syncRaftInflightFull(SSyncRaftInflights* inflights);
void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex); void syncRaftInflightAdd(SSyncRaftInflights* inflights, SyncIndex inflightIndex);
......
...@@ -18,7 +18,20 @@ ...@@ -18,7 +18,20 @@
#define RAFT_READ_LOG_MAX_NUM 100 #define RAFT_READ_LOG_MAX_NUM 100
static void syncRaftBecomeFollower(SSyncRaft* pRaft, SSyncTerm term); static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static void tickElection(SSyncRaft* pRaft);
static void tickHeartbeat(SSyncRaft* pRaft);
static void abortLeaderTransfer(SSyncRaft* pRaft);
static void resetRaft(SSyncRaft* pRaft, SSyncTerm term);
int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
SSyncNode* pNode = pRaft->pNode; SSyncNode* pNode = pRaft->pNode;
...@@ -30,6 +43,8 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { ...@@ -30,6 +43,8 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
SSyncBuffer buffer[RAFT_READ_LOG_MAX_NUM]; SSyncBuffer buffer[RAFT_READ_LOG_MAX_NUM];
int nBuf, limit, i; int nBuf, limit, i;
memset(pRaft, 0, sizeof(SSyncRaft));
memcpy(&pRaft->info, pInfo, sizeof(SSyncInfo)); memcpy(&pRaft->info, pInfo, sizeof(SSyncInfo));
stateManager = &(pRaft->info.stateManager); stateManager = &(pRaft->info.stateManager);
logStore = &(pRaft->info.logStore); logStore = &(pRaft->info.logStore);
...@@ -60,15 +75,30 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { ...@@ -60,15 +75,30 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
} }
assert(initIndex == serverState.commitIndex); assert(initIndex == serverState.commitIndex);
pRaft->heartbeatTick = 1; pRaft->heartbeatTimeoutTick = 1;
syncRaftBecomeFollower(pRaft, 1); syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID);
syncInfo("restore vgid %d state: snapshot index success", pInfo->vgId); syncInfo("restore vgid %d state: snapshot index success", pInfo->vgId);
return 0; return 0;
} }
int32_t syncRaftStep(SSyncRaft* pRaft, const RaftMessage* pMsg) { int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
syncDebug("from ");
if (preHandleMessage(pRaft, pMsg)) {
syncFreeMessage(pMsg);
return 0;
}
RaftMessageType msgType = pMsg->msgType;
if (msgType == RAFT_MSG_INTERNAL_ELECTION) {
} else if (msgType == RAFT_MSG_VOTE || msgType == RAFT_MSG_PRE_VOTE) {
} else {
pRaft->stepFp(pRaft, pMsg);
}
syncFreeMessage(pMsg); syncFreeMessage(pMsg);
return 0; return 0;
} }
...@@ -77,7 +107,131 @@ int32_t syncRaftTick(SSyncRaft* pRaft) { ...@@ -77,7 +107,131 @@ int32_t syncRaftTick(SSyncRaft* pRaft) {
return 0; return 0;
} }
static void syncRaftBecomeFollower(SSyncRaft* pRaft, SSyncTerm term) { void syncRaftBecomeFollower(SSyncRaft* pRaft, SSyncTerm term, SyncNodeId leaderId) {
pRaft->electionTick = taosRand() % 3 + 3; pRaft->stepFp = stepFollower;
return; resetRaft(pRaft, term);
pRaft->tickFp = tickElection;
pRaft->leaderId = leaderId;
pRaft->state = TAOS_SYNC_ROLE_FOLLOWER;
}
void syncRaftRandomizedElectionTimeout(SSyncRaft* pRaft) {
// electionTimeoutTick in [3,6] tick
pRaft->electionTimeoutTick = taosRand() % 4 + 3;
}
bool syncRaftIsPromotable(SSyncRaft* pRaft) {
return pRaft->info.syncCfg.selfIndex >= 0 &&
pRaft->info.syncCfg.selfIndex < pRaft->info.syncCfg.replica &&
pRaft->selfId != SYNC_NON_NODE_ID;
}
bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) {
return pRaft->electionElapsed >= pRaft->electionTimeoutTick;
}
/**
* pre-handle message, return true is no need to continue
* Handle the message term, which may result in our stepping down to a follower.
**/
static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
// local message?
if (pMsg->term == 0) {
return false;
}
if (pMsg->term > pRaft->term) {
return preHandleNewTermMessage(pRaft, pMsg);
}
return preHandleOldTermMessage(pRaft, pMsg);;
}
static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
SyncNodeId leaderId = pMsg->from;
RaftMessageType msgType = pMsg->msgType;
if (msgType == RAFT_MSG_VOTE || msgType == RAFT_MSG_PRE_VOTE) {
leaderId = SYNC_NON_NODE_ID;
}
if (msgType == RAFT_MSG_PRE_VOTE) {
// Never change our term in response to a PreVote
} else if (msgType == RAFT_MSG_PRE_VOTE_RESP && !pMsg->preVoteResp.reject) {
/**
* We send pre-vote requests with a term in our future. If the
* pre-vote is granted, we will increment our term when we get a
* quorum. If it is not, the term comes from the node that
* rejected our vote so we should become a follower at the new
* term.
**/
} else {
syncRaftBecomeFollower(pRaft, pMsg->term, leaderId);
}
return false;
}
static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
// if receive old term message, no need to continue
return true;
}
static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
return 0;
}
static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
return 0;
}
static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
return 0;
}
/**
* tickElection is run by followers and candidates per tick.
**/
static void tickElection(SSyncRaft* pRaft) {
pRaft->electionElapsed += 1;
if (!syncRaftIsPromotable(pRaft)) {
return;
}
if (!syncRaftIsPastElectionTimeout(pRaft)) {
return;
}
// election timeout
pRaft->electionElapsed = 0;
SSyncMessage msg;
syncRaftStep(pRaft, syncInitElectionMsg(&msg, pRaft->selfId));
}
static void tickHeartbeat(SSyncRaft* pRaft) {
}
static void abortLeaderTransfer(SSyncRaft* pRaft) {
pRaft->leadTransferee = SYNC_NON_NODE_ID;
}
static void resetRaft(SSyncRaft* pRaft, SSyncTerm term) {
if (pRaft->term != term) {
pRaft->term = term;
pRaft->voteFor = SYNC_NON_NODE_ID;
}
pRaft->leaderId = SYNC_NON_NODE_ID;
pRaft->electionElapsed = 0;
pRaft->heartbeatElapsed = 0;
syncRaftRandomizedElectionTimeout(pRaft);
abortLeaderTransfer(pRaft);
pRaft->pendingConf = false;
} }
\ No newline at end of file
...@@ -15,8 +15,8 @@ ...@@ -15,8 +15,8 @@
#include "raft_message.h" #include "raft_message.h"
void syncFreeMessage(const RaftMessage* pMsg) { void syncFreeMessage(const SSyncMessage* pMsg) {
if (!syncIsInternalMsg(pMsg)) { if (!syncIsInternalMsg(pMsg)) {
free((RaftMessage*)pMsg); free((SSyncMessage*)pMsg);
} }
} }
\ No newline at end of file
...@@ -177,14 +177,6 @@ void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i, SyncIndex snapshotI ...@@ -177,14 +177,6 @@ void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i, SyncIndex snapshotI
progress->pendingSnapshotIndex = snapshotIndex; progress->pendingSnapshotIndex = snapshotIndex;
} }
static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state) {
progress->paused = false;
progress->pendingSnapshotIndex = 0;
progress->state = state;
syncRaftInflightReset(&(progress->inflights));
}
int syncRaftInflightReset(SSyncRaftInflights* inflights) { int syncRaftInflightReset(SSyncRaftInflights* inflights) {
inflights->count = 0; inflights->count = 0;
inflights->start = 0; inflights->start = 0;
...@@ -240,7 +232,12 @@ void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights) { ...@@ -240,7 +232,12 @@ void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights) {
syncRaftInflightFreeTo(inflights, inflights->buffer[inflights->start]); syncRaftInflightFreeTo(inflights, inflights->buffer[inflights->start]);
} }
static void resetProgressState(SSyncRaftProgress* progress, RaftProgressState state) {
progress->paused = false;
progress->pendingSnapshotIndex = 0;
progress->state = state;
syncRaftInflightReset(&(progress->inflights));
}
......
...@@ -157,7 +157,7 @@ void syncStop(const SSyncNode* pNode) { ...@@ -157,7 +157,7 @@ void syncStop(const SSyncNode* pNode) {
} }
int32_t syncPropose(SSyncNode* syncNode, const SSyncBuffer* pBuf, void* pData, bool isWeak) { int32_t syncPropose(SSyncNode* syncNode, const SSyncBuffer* pBuf, void* pData, bool isWeak) {
RaftMessage msg; SSyncMessage msg;
pthread_mutex_lock(&syncNode->mutex); pthread_mutex_lock(&syncNode->mutex);
int32_t ret = syncRaftStep(&syncNode->raft, syncInitPropMsg(&msg, pBuf, pData, isWeak)); int32_t ret = syncRaftStep(&syncNode->raft, syncInitPropMsg(&msg, pBuf, pData, isWeak));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册