提交 ffed8636 编写于 作者: L lichuang

[TD-10645][raft]<feature>fix compile error

上级 30321c34
...@@ -112,4 +112,6 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r ...@@ -112,4 +112,6 @@ ESyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* r
void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs); void syncRaftConfigState(const SSyncRaftProgressTracker* tracker, SSyncConfigState* cs);
bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId);
#endif /* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */ #endif /* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */
...@@ -36,34 +36,25 @@ typedef struct SSyncRaftQuorumJointConfig { ...@@ -36,34 +36,25 @@ typedef struct SSyncRaftQuorumJointConfig {
**/ **/
ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteType* votes); ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteType* votes);
static FORCE_INLINE bool syncRaftJointConfigInCluster(const SSyncCluster* cluster, SyncNodeId id) { bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId);
int i;
for (i = 0; i < cluster->replica; ++i) {
if (cluster->nodeInfo[i].nodeId == id) {
return true;
}
}
return false;
}
static FORCE_INLINE bool syncRaftJointConfigInOutgoing(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) { static FORCE_INLINE bool syncRaftJointConfigInOutgoing(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) {
return syncRaftJointConfigInCluster(&config->outgoing, id); return syncRaftIsInNodeMap(&config->outgoing, id);
} }
static FORCE_INLINE bool syncRaftJointConfigInIncoming(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) { static FORCE_INLINE bool syncRaftJointConfigInIncoming(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) {
return syncRaftJointConfigInCluster(&config->incoming, id); return syncRaftIsInNodeMap(&config->incoming, id);
} }
void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id); void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id);
void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id); void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, SyncNodeId id);
static FORCE_INLINE const SSyncCluster* syncRaftJointConfigIncoming(const SSyncRaftQuorumJointConfig* config) { static FORCE_INLINE const SSyncRaftNodeMap* syncRaftJointConfigIncoming(const SSyncRaftQuorumJointConfig* config) {
return &config->incoming; return &config->incoming;
} }
static FORCE_INLINE const SSyncCluster* syncRaftJointConfigOutgoing(const SSyncRaftQuorumJointConfig* config) { static FORCE_INLINE const SSyncRaftNodeMap* syncRaftJointConfigOutgoing(const SSyncRaftQuorumJointConfig* config) {
return &config->outgoing; return &config->outgoing;
} }
......
...@@ -26,6 +26,6 @@ ...@@ -26,6 +26,6 @@
* yes/no has been reached), won (a quorum of yes has been reached), or lost (a * yes/no has been reached), won (a quorum of yes has been reached), or lost (a
* quorum of no has been reached). * quorum of no has been reached).
**/ **/
ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const ESyncRaftVoteType* votes); ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, const ESyncRaftVoteType* votes);
#endif /* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */ #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */
...@@ -30,6 +30,8 @@ static int deserializeClusterStateFromBuffer(SSyncConfigState* cluster, const ch ...@@ -30,6 +30,8 @@ static int deserializeClusterStateFromBuffer(SSyncConfigState* cluster, const ch
static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfig* config, static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfig* config,
const SSyncRaftProgressMap* progressMap, SSyncConfigState* cs); const SSyncRaftProgressMap* progressMap, SSyncConfigState* cs);
static void abortLeaderTransfer(SSyncRaft* pRaft);
static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
...@@ -109,38 +111,6 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { ...@@ -109,38 +111,6 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID); syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID);
#if 0
// restore fsm state from snapshot index + 1 until commitIndex
++initIndex;
while (initIndex <= serverState.commitIndex) {
limit = MIN(RAFT_READ_LOG_MAX_NUM, serverState.commitIndex - initIndex + 1);
if (logStore->logRead(logStore, initIndex, limit, buffer, &nBuf) != 0) {
return -1;
}
assert(limit == nBuf);
for (i = 0; i < limit; ++i) {
fsm->applyLog(fsm, initIndex + i, &(buffer[i]), NULL);
free(buffer[i].data);
}
initIndex += nBuf;
}
assert(initIndex == serverState.commitIndex);
//pRaft->heartbeatTimeoutTick = 1;
syncRaftBecomeFollower(pRaft, pRaft->term, SYNC_NON_NODE_ID);
pRaft->selfIndex = pRaft->cluster.selfIndex;
#endif
syncInfo("[%d:%d] restore vgid %d state: snapshot index success", syncInfo("[%d:%d] restore vgid %d state: snapshot index success",
pRaft->selfGroupId, pRaft->selfId, pInfo->vgId); pRaft->selfGroupId, pRaft->selfId, pInfo->vgId);
return 0; return 0;
...@@ -242,12 +212,16 @@ static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfi ...@@ -242,12 +212,16 @@ static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfi
// If the the leadTransferee was removed or demoted, abort the leadership transfer. // If the the leadTransferee was removed or demoted, abort the leadership transfer.
SyncNodeId leadTransferee = pRaft->leadTransferee; SyncNodeId leadTransferee = pRaft->leadTransferee;
if (leadTransferee != SYNC_NON_NODE_ID) { if (leadTransferee != SYNC_NON_NODE_ID && !syncRaftIsInNodeMap(&pRaft->tracker->config.voters, leadTransferee)) {
abortLeaderTransfer(pRaft);
} }
} }
} }
static void abortLeaderTransfer(SSyncRaft* pRaft) {
pRaft->leadTransferee = SYNC_NON_NODE_ID;
}
/** /**
* pre-handle message, return true means no need to continue * pre-handle message, return true means no need to continue
* Handle the message term, which may result in our stepping down to a follower. * Handle the message term, which may result in our stepping down to a follower.
......
...@@ -26,8 +26,8 @@ static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProg ...@@ -26,8 +26,8 @@ static int checkInvariants(SSyncRaftProgressTrackerConfig* config, SSyncRaftProg
static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config); static bool hasJointConfig(const SSyncRaftProgressTrackerConfig* config);
static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css); SSyncRaftProgressMap* progressMap, const SSyncConfChangeSingleArray* css);
static int symDiff(const SSyncCluster* l, const SSyncCluster* r);
static int symDiff(const SSyncRaftNodeMap* l, const SSyncRaftNodeMap* r);
static void initProgress(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config, static void initProgress(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig* config,
SSyncRaftProgressMap* progressMap, SyncNodeId id, bool isLearner); SSyncRaftProgressMap* progressMap, SyncNodeId id, bool isLearner);
...@@ -237,27 +237,27 @@ static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig ...@@ -237,27 +237,27 @@ static int applyConfig(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfig
// symdiff returns the count of the symmetric difference between the sets of // symdiff returns the count of the symmetric difference between the sets of
// uint64s, i.e. len( (l - r) \union (r - l)). // uint64s, i.e. len( (l - r) \union (r - l)).
static int symDiff(const SSyncCluster* l, const SSyncCluster* r) { static int symDiff(const SSyncRaftNodeMap* l, const SSyncRaftNodeMap* r) {
int n; int n;
int i; int i;
int j0, j1; int j0, j1;
const SSyncCluster* pairs[2][2] = { const SSyncRaftNodeMap* pairs[2][2] = {
{l, r}, // count elems in l but not in r {l, r}, // count elems in l but not in r
{r, l}, // count elems in r but not in l {r, l}, // count elems in r but not in l
}; };
for (n = 0, i = 0; i < 2; ++i) { for (n = 0, i = 0; i < 2; ++i) {
const SSyncCluster** pp = pairs[i]; const SSyncRaftNodeMap** pp = pairs[i];
const SSyncCluster* p0 = pp[0]; const SSyncRaftNodeMap* p0 = pp[0];
const SSyncCluster* p1 = pp[1]; const SSyncRaftNodeMap* p1 = pp[1];
for (j0 = 0; j0 < p0->replica; ++j0) { for (j0 = 0; j0 < TSDB_MAX_REPLICA; ++j0) {
SyncNodeId id = p0->nodeInfo[j0].nodeId; SyncNodeId id = p0->nodeId[j0];
if (id == SYNC_NON_NODE_ID) { if (id == SYNC_NON_NODE_ID) {
continue; continue;
} }
for (j1 = 0; j1 < p1->replica; ++j1) { for (j1 = 0; j1 < p1->replica; ++j1) {
if (p1->nodeInfo[j1].nodeId != SYNC_NON_NODE_ID && p1->nodeInfo[j1].nodeId != id) { if (p1->nodeId[j1] != SYNC_NON_NODE_ID && p1->nodeId[j1] != id) {
n+=1; n+=1;
} }
} }
......
...@@ -185,8 +185,17 @@ void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState) { ...@@ -185,8 +185,17 @@ void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState) {
pRaft->voteFor = serverState->voteFor; pRaft->voteFor = serverState->voteFor;
} }
void syncRaftBroadcastAppend(SSyncRaft* pRaft) { static void visitProgressSendAppend(int i, SSyncRaftProgress* progress, void* arg) {
SSyncRaft* pRaft = (SSyncRaft*)arg;
if (pRaft->selfId == progress->id) {
return;
}
syncRaftReplicate(arg, progress, true);
}
void syncRaftBroadcastAppend(SSyncRaft* pRaft) {
syncRaftProgressVisit(pRaft->tracker, visitProgressSendAppend, pRaft);
} }
static int convertClear(SSyncRaft* pRaft) { static int convertClear(SSyncRaft* pRaft) {
...@@ -279,6 +288,7 @@ bool syncRaftMaybeCommit(SSyncRaft* pRaft) { ...@@ -279,6 +288,7 @@ bool syncRaftMaybeCommit(SSyncRaft* pRaft) {
* trigger I/O requests for newly appended log entries or heartbeats. * trigger I/O requests for newly appended log entries or heartbeats.
**/ **/
static int triggerAll(SSyncRaft* pRaft) { static int triggerAll(SSyncRaft* pRaft) {
#if 0
assert(pRaft->state == TAOS_SYNC_STATE_LEADER); assert(pRaft->state == TAOS_SYNC_STATE_LEADER);
int i; int i;
...@@ -287,8 +297,10 @@ static int triggerAll(SSyncRaft* pRaft) { ...@@ -287,8 +297,10 @@ static int triggerAll(SSyncRaft* pRaft) {
continue; continue;
} }
syncRaftReplicate(pRaft, i); syncRaftReplicate(pRaft, pRaft->tracker->progressMap.progress[i], true);
} }
#endif
return 0;
} }
static void abortLeaderTransfer(SSyncRaft* pRaft) { static void abortLeaderTransfer(SSyncRaft* pRaft) {
......
...@@ -44,16 +44,16 @@ void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNo ...@@ -44,16 +44,16 @@ void syncRaftJointConfigAddToIncoming(SSyncRaftQuorumJointConfig* config, SyncNo
int i, min; int i, min;
for (i = 0, min = -1; i < TSDB_MAX_REPLICA; ++i) { for (i = 0, min = -1; i < TSDB_MAX_REPLICA; ++i) {
if (config->incoming.nodeInfo[i].nodeId == id) { if (config->incoming.nodeId[i] == id) {
return; return;
} }
if (min == -1 && config->incoming.nodeInfo[i].nodeId == SYNC_NON_NODE_ID) { if (min == -1 && config->incoming.nodeId[i] == SYNC_NON_NODE_ID) {
min = i; min = i;
} }
} }
assert(min != -1); assert(min != -1);
config->incoming.nodeInfo[min].nodeId = id; config->incoming.nodeId[min] = id;
config->incoming.replica += 1; config->incoming.replica += 1;
} }
...@@ -61,12 +61,25 @@ void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, S ...@@ -61,12 +61,25 @@ void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, S
int i; int i;
for (i = 0; i < TSDB_MAX_REPLICA; ++i) { for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
if (config->incoming.nodeInfo[i].nodeId == id) { if (config->incoming.nodeId[i] == id) {
config->incoming.replica -= 1; config->incoming.replica -= 1;
config->incoming.nodeInfo[i].nodeId = SYNC_NON_NODE_ID; config->incoming.nodeId[i] = SYNC_NON_NODE_ID;
break; break;
} }
} }
assert(config->incoming.replica >= 0); assert(config->incoming.replica >= 0);
}
bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) {
int i;
for (i = 0; i < TSDB_MAX_REPLICA; ++i) {
if (nodeId == nodeMap->nodeId[i]) {
return true;
}
}
return false;
} }
\ No newline at end of file
...@@ -22,14 +22,14 @@ ...@@ -22,14 +22,14 @@
* yes/no has been reached), won (a quorum of yes has been reached), or lost (a * yes/no has been reached), won (a quorum of yes has been reached), or lost (a
* quorum of no has been reached). * quorum of no has been reached).
**/ **/
ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncCluster* config, const ESyncRaftVoteType* votes) { ESyncRaftVoteResult syncRaftMajorityVoteResult(SSyncRaftNodeMap* config, const ESyncRaftVoteType* votes) {
if (config->replica == 0) { if (config->replica == 0) {
return SYNC_RAFT_VOTE_WON; return SYNC_RAFT_VOTE_WON;
} }
int i, g, r, missing; int i, g, r, missing;
for (i = g = r = missing = 0; i < TSDB_MAX_REPLICA; ++i) { for (i = g = r = missing = 0; i < TSDB_MAX_REPLICA; ++i) {
if (config->nodeInfo[i].nodeId == SYNC_NON_NODE_ID) { if (config->nodeId[i] == SYNC_NON_NODE_ID) {
continue; continue;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册