提交 25219949 编写于 作者: L lichuang

[TD-10645][raft]<feature>refactor sync interface

上级 83a40a7f
...@@ -36,7 +36,7 @@ typedef int (*configChangeFp)(SSyncRaftChanger* changer, const SSyncConfChangeSi ...@@ -36,7 +36,7 @@ typedef int (*configChangeFp)(SSyncRaftChanger* changer, const SSyncConfChangeSi
int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css,
SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap);
int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, bool autoLeave, const SSyncConfChangeSingleArray* css,
SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap);
#endif /* TD_SYNC_RAFT_CONFIG_CHANGE_H */ #endif /* TD_SYNC_RAFT_CONFIG_CHANGE_H */
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#include "sync_type.h" #include "sync_type.h"
#include "sync_raft_proto.h" #include "sync_raft_proto.h"
// Restore takes a Changer (which must represent an empty configuration), and // syncRaftRestoreConfig takes a Changer (which must represent an empty configuration), and
// runs a sequence of changes enacting the configuration described in the // runs a sequence of changes enacting the configuration described in the
// ConfState. // ConfState.
// //
......
...@@ -22,6 +22,9 @@ ...@@ -22,6 +22,9 @@
#define RAFT_READ_LOG_MAX_NUM 100 #define RAFT_READ_LOG_MAX_NUM 100
static int deserializeServerStateFromBuffer(SSyncServerState* server, const char* buffer, int n);
static int deserializeClusterConfigFromBuffer(SSyncClusterConfig* cluster, const char* buffer, int n);
static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
...@@ -35,7 +38,9 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { ...@@ -35,7 +38,9 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
SyncIndex initIndex = pInfo->snapshotIndex; SyncIndex initIndex = pInfo->snapshotIndex;
SSyncBuffer buffer[RAFT_READ_LOG_MAX_NUM]; SSyncBuffer buffer[RAFT_READ_LOG_MAX_NUM];
int nBuf, limit, i; int nBuf, limit, i;
char* buf;
int n;
memset(pRaft, 0, sizeof(SSyncRaft)); memset(pRaft, 0, sizeof(SSyncRaft));
memcpy(&pRaft->fsm, &pInfo->fsm, sizeof(SSyncFSM)); memcpy(&pRaft->fsm, &pInfo->fsm, sizeof(SSyncFSM));
...@@ -57,10 +62,15 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { ...@@ -57,10 +62,15 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
return -1; return -1;
} }
// read server state // read server state
if (stateManager->readServerState(stateManager, &serverState) != 0) { if (stateManager->readServerState(stateManager, &buf, &n) != 0) {
syncError("readServerState for vgid %d fail", pInfo->vgId); syncError("readServerState for vgid %d fail", pInfo->vgId);
return -1; return -1;
} }
if (deserializeServerStateFromBuffer(&serverState, buf, n) != 0) {
syncError("deserializeServerStateFromBuffer for vgid %d fail", pInfo->vgId);
return -1;
}
assert(initIndex <= serverState.commitIndex); assert(initIndex <= serverState.commitIndex);
// restore fsm state from snapshot index + 1 until commitIndex // restore fsm state from snapshot index + 1 until commitIndex
...@@ -119,6 +129,14 @@ int32_t syncRaftTick(SSyncRaft* pRaft) { ...@@ -119,6 +129,14 @@ int32_t syncRaftTick(SSyncRaft* pRaft) {
return 0; return 0;
} }
static int deserializeServerStateFromBuffer(SSyncServerState* server, const char* buffer, int n) {
return 0;
}
static int deserializeClusterConfigFromBuffer(SSyncClusterConfig* cluster, const char* buffer, int n) {
return 0;
}
/** /**
* pre-handle message, return true means no need to continue * pre-handle message, return true means no need to continue
* Handle the message term, which may result in our stepping down to a follower. * Handle the message term, which may result in our stepping down to a follower.
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "sync_raft_config_change.h" #include "sync_raft_config_change.h"
#include "sync_raft_progress.h" #include "sync_raft_progress.h"
#include "sync_raft_progress_tracker.h" #include "sync_raft_progress_tracker.h"
#include "sync_raft_quorum_joint.h"
static int checkAndCopy(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); static int checkAndCopy(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap);
static int checkAndReturn(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap); static int checkAndReturn(SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap);
...@@ -38,8 +39,8 @@ static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* ...@@ -38,8 +39,8 @@ static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig*
SSyncRaftProgressMap* progressMap, SyncNodeId id); SSyncRaftProgressMap* progressMap, SyncNodeId id);
static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
SSyncRaftProgressMap* progressMap, SyncNodeId id); SSyncRaftProgressMap* progressMap, SyncNodeId id);
static void remove(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, static void removeNodeId(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
SSyncRaftProgressMap* progressMap, SyncNodeId id); SSyncRaftProgressMap* progressMap, SyncNodeId id);
// syncRaftChangerSimpleConfig carries out a series of configuration changes that (in aggregate) // syncRaftChangerSimpleConfig carries out a series of configuration changes that (in aggregate)
// mutates the incoming majority config Voters[0] by at most one. This method // mutates the incoming majority config Voters[0] by at most one. This method
// will return an error if that is not the case, if the resulting quorum is // will return an error if that is not the case, if the resulting quorum is
...@@ -87,7 +88,7 @@ int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChange ...@@ -87,7 +88,7 @@ int syncRaftChangerSimpleConfig(SSyncRaftChanger* changer, const SSyncConfChange
// (Section 4.3) corresponds to `C_{new,old}`. // (Section 4.3) corresponds to `C_{new,old}`.
// //
// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf // [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, const SSyncConfChangeSingleArray* css, int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, bool autoLeave, const SSyncConfChangeSingleArray* css,
SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) {
int ret; int ret;
...@@ -108,9 +109,18 @@ int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, const SSyncConfChangeSi ...@@ -108,9 +109,18 @@ int syncRaftChangerEnterJoint(SSyncRaftChanger* changer, const SSyncConfChangeSi
} }
// Clear the outgoing config. // Clear the outgoing config.
syncRaftJointConfigClearOutgoing(config); syncRaftJointConfigClearOutgoing(&config->voters);
// Copy incoming to outgoing. // Copy incoming to outgoing.
memcpy(&config->voters.outgoing, &config->voters.incoming, sizeof(SSyncCluster));
ret = applyConfig(changer, config, progressMap, css);
if (ret != 0) {
return ret;
}
config->autoLeave = autoLeave;
return checkAndReturn(config, progressMap);
} }
// checkAndCopy copies the tracker's config and progress map (deeply enough for // checkAndCopy copies the tracker's config and progress map (deeply enough for
...@@ -210,7 +220,7 @@ static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig ...@@ -210,7 +220,7 @@ static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig
makeLearner(changer, config, progressMap, cs->nodeId); makeLearner(changer, config, progressMap, cs->nodeId);
break; break;
case SYNC_RAFT_Conf_RemoveNode: case SYNC_RAFT_Conf_RemoveNode:
remove(changer, config, progressMap, cs->nodeId); removeNodeId(changer, config, progressMap, cs->nodeId);
break; break;
case SYNC_RAFT_Conf_UpdateNode: case SYNC_RAFT_Conf_UpdateNode:
break; break;
...@@ -309,7 +319,7 @@ static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* ...@@ -309,7 +319,7 @@ static void makeVoter(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig*
progress->isLearner = false; progress->isLearner = false;
nilAwareDelete(&config->learners, id); nilAwareDelete(&config->learners, id);
nilAwareDelete(&config->learnersNext, id); nilAwareDelete(&config->learnersNext, id);
syncRaftJointConfigAddToIncoming(config, id); syncRaftJointConfigAddToIncoming(&config->voters, id);
} }
// makeLearner makes the given ID a learner or stages it to be a learner once // makeLearner makes the given ID a learner or stages it to be a learner once
...@@ -339,7 +349,7 @@ static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfi ...@@ -339,7 +349,7 @@ static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfi
return; return;
} }
// Remove any existing voter in the incoming config... // Remove any existing voter in the incoming config...
remove(changer, config, progressMap, id); removeNodeId(changer, config, progressMap, id);
// ... but save the Progress. // ... but save the Progress.
syncRaftAddToProgressMap(progressMap, id); syncRaftAddToProgressMap(progressMap, id);
...@@ -358,8 +368,8 @@ static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfi ...@@ -358,8 +368,8 @@ static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfi
} }
} }
// remove this peer as a voter or learner from the incoming config. // removeNodeId this peer as a voter or learner from the incoming config.
static void remove(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, static void removeNodeId(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
SSyncRaftProgressMap* progressMap, SyncNodeId id) { SSyncRaftProgressMap* progressMap, SyncNodeId id) {
int i = syncRaftFindProgressIndexByNodeId(progressMap, id); int i = syncRaftFindProgressIndexByNodeId(progressMap, id);
if (i == -1) { if (i == -1) {
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleArray* out, SSyncConfChangeSingleArray* in); static int toConfChangeSingle(const SSyncConfigState* cs, SSyncConfChangeSingleArray* out, SSyncConfChangeSingleArray* in);
// Restore takes a Changer (which must represent an empty configuration), and // syncRaftRestoreConfig takes a Changer (which must represent an empty configuration), and
// runs a sequence of changes enacting the configuration described in the // runs a sequence of changes enacting the configuration described in the
// ConfState. // ConfState.
// //
...@@ -70,7 +70,7 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs) ...@@ -70,7 +70,7 @@ int syncRaftRestoreConfig(SSyncRaftChanger* changer, const SSyncConfigState* cs)
} }
} }
ret = syncRaftChangerEnterJoint(changer, &incoming, config, progressMap); ret = syncRaftChangerEnterJoint(changer, cs->autoLeave, &incoming, config, progressMap);
if (ret != 0) { if (ret != 0) {
goto out; goto out;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册