diff --git a/source/libs/sync/inc/sync_raft_node_map.h b/source/libs/sync/inc/sync_raft_node_map.h new file mode 100644 index 0000000000000000000000000000000000000000..bfb5f684890cc216286be58db456f583cc7105d8 --- /dev/null +++ b/source/libs/sync/inc/sync_raft_node_map.h @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_RAFT_NODE_MAP_H +#define _TD_LIBS_SYNC_RAFT_NODE_MAP_H + +#include "sync.h" +#include "sync_type.h" + +// TODO: is TSDB_MAX_REPLICA enough? +struct SSyncRaftNodeMap { + int32_t replica; + SyncNodeId nodeId[TSDB_MAX_REPLICA]; +}; + +bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); + +void syncRaftCopyNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to); + +void syncRaftUnionNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to); + +void syncRaftAddToNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); + +#endif /* _TD_LIBS_SYNC_RAFT_NODE_MAP_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/sync_raft_proto.h b/source/libs/sync/inc/sync_raft_proto.h index c131e91139d786c7b2fa186f3e7a52091860ba18..dd153e8dadbd3342850b599aaf8d17436dc64f1a 100644 --- a/source/libs/sync/inc/sync_raft_proto.h +++ b/source/libs/sync/inc/sync_raft_proto.h @@ -17,6 +17,7 @@ #define TD_SYNC_RAFT_PROTO_H #include "sync_type.h" +#include "sync_raft_node_map.h" typedef enum ESyncRaftConfChangeType { SYNC_RAFT_Conf_AddNode = 0, diff --git a/source/libs/sync/inc/sync_raft_quorum_joint.h b/source/libs/sync/inc/sync_raft_quorum_joint.h index 0ef002fe1a325ffb4ee7100dbac8aeba123bd8a9..0637a9be9a5ea9fb090d8a59f2ac6649b27a6005 100644 --- a/source/libs/sync/inc/sync_raft_quorum_joint.h +++ b/source/libs/sync/inc/sync_raft_quorum_joint.h @@ -19,6 +19,7 @@ #include "taosdef.h" #include "sync.h" #include "sync_type.h" +#include "sync_raft_node_map.h" /** * SSyncRaftQuorumJointConfig is a configuration of two groups of (possibly overlapping) @@ -36,8 +37,6 @@ typedef struct SSyncRaftQuorumJointConfig { **/ ESyncRaftVoteType syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const ESyncRaftVoteType* votes); -bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId); - static FORCE_INLINE bool syncRaftJointConfigInOutgoing(const SSyncRaftQuorumJointConfig* config, SyncNodeId id) { return syncRaftIsInNodeMap(&config->outgoing, id); } @@ -59,7 +58,9 @@ static FORCE_INLINE const SSyncRaftNodeMap* syncRaftJointConfigOutgoing(const SS } static FORCE_INLINE void syncRaftJointConfigClearOutgoing(SSyncRaftQuorumJointConfig* config) { - memset(&config->outgoing, 0, sizeof(SSyncCluster)); + memset(&config->outgoing, 0, sizeof(SSyncRaftNodeMap)); } +void syncRaftJointConfigIDS(const SSyncRaftQuorumJointConfig* config, SSyncRaftNodeMap* nodeMap); + #endif /* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */ diff --git a/source/libs/sync/inc/sync_type.h b/source/libs/sync/inc/sync_type.h index cb938c73191ec0d879e4b889d6e88710dacc6626..6d29e019ccf502f7ae652d0ef70ccaf99478b166 100644 --- a/source/libs/sync/inc/sync_type.h +++ b/source/libs/sync/inc/sync_type.h @@ -32,6 +32,8 @@ typedef struct SSyncRaftProgress SSyncRaftProgress; typedef struct SSyncRaftProgressMap SSyncRaftProgressMap; typedef struct SSyncRaftProgressTrackerConfig SSyncRaftProgressTrackerConfig; +typedef struct SSyncRaftNodeMap SSyncRaftNodeMap; + typedef struct SSyncRaftProgressTracker SSyncRaftProgressTracker; typedef struct SSyncRaftChanger SSyncRaftChanger; @@ -68,11 +70,6 @@ typedef struct SSyncClusterConfig { const SSyncCluster* cluster; } SSyncClusterConfig; -typedef struct { - int32_t replica; - SyncNodeId nodeId[TSDB_MAX_REPLICA]; -} SSyncRaftNodeMap; - typedef enum { SYNC_RAFT_CAMPAIGN_PRE_ELECTION = 0, SYNC_RAFT_CAMPAIGN_ELECTION = 1, diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index 23351277c4bd983831cdb942ed9c934458f311bc..39e03775451979a48e22536091b04c72ce34799e 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -140,6 +140,7 @@ int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) { int32_t syncRaftTick(SSyncRaft* pRaft) { pRaft->currentTick += 1; + pRaft->tickFp(pRaft); return 0; } @@ -212,8 +213,11 @@ static void switchToConfig(SSyncRaft* pRaft, const SSyncRaftProgressTrackerConfi // If the the leadTransferee was removed or demoted, abort the leadership transfer. SyncNodeId leadTransferee = pRaft->leadTransferee; - if (leadTransferee != SYNC_NON_NODE_ID && !syncRaftIsInNodeMap(&pRaft->tracker->config.voters, leadTransferee)) { - abortLeaderTransfer(pRaft); + if (leadTransferee != SYNC_NON_NODE_ID) { + if (!syncRaftIsInNodeMap(&pRaft->tracker->config.voters.incoming, leadTransferee) && + !syncRaftIsInNodeMap(&pRaft->tracker->config.voters.outgoing, leadTransferee)) { + abortLeaderTransfer(pRaft); + } } } } diff --git a/source/libs/sync/src/raft_handle_election_message.c b/source/libs/sync/src/raft_handle_election_message.c index e536fc67c0df21b0ff2e23f4083fe3c57981ee6e..a58c8ba5cfa1d663c1486a2c2ccbf7d7c9b28708 100644 --- a/source/libs/sync/src/raft_handle_election_message.c +++ b/source/libs/sync/src/raft_handle_election_message.c @@ -19,24 +19,6 @@ #include "raft_message.h" int syncRaftHandleElectionMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { - if (pRaft->state == TAOS_SYNC_STATE_LEADER) { - syncDebug("[%d:%d] ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfGroupId, pRaft->selfId); - return 0; - } - - if (!syncRaftIsPromotable(pRaft)) { - syncDebug("[%d:%d] is unpromotable and can not campaign", pRaft->selfGroupId, pRaft->selfId); - return 0; - } - // if there is pending uncommitted config,cannot start election - if (syncRaftLogNumOfPendingConf(pRaft->log) > 0 && syncRaftHasUnappliedLog(pRaft->log)) { - syncWarn("[%d:%d] cannot syncRaftStartElection at term %" PRId64 " since there are still pending configuration changes to apply", - pRaft->selfGroupId, pRaft->selfId, pRaft->term); - return 0; - } - - syncInfo("[%d:%d] is starting a new election at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); - if (pRaft->preVote) { syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_PRE_ELECTION); } else { diff --git a/source/libs/sync/src/raft_replication.c b/source/libs/sync/src/raft_replication.c index c19fcd1e68022f34abf8293a4975400a9257c60b..74f40179c6d7b0d353c0d976664ed1d47cf1b058 100644 --- a/source/libs/sync/src/raft_replication.c +++ b/source/libs/sync/src/raft_replication.c @@ -22,7 +22,7 @@ static bool sendSnapshot(SSyncRaft* pRaft, SSyncRaftProgress* progress); static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress, SyncIndex prevIndex, SyncTerm prevTerm, - const SSyncRaftEntry *entries, int nEntry); + SSyncRaftEntry *entries, int nEntry); // syncRaftReplicate sends an append RPC with new entries to the given peer, // if necessary. Returns true if a message was sent. The sendIfEmpty @@ -68,7 +68,7 @@ static bool sendSnapshot(SSyncRaft* pRaft, SSyncRaftProgress* progress) { static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress, SyncIndex prevIndex, SyncTerm prevTerm, - const SSyncRaftEntry *entries, int nEntry) { + SSyncRaftEntry *entries, int nEntry) { SyncIndex lastIndex; SyncTerm logTerm = prevTerm; SNodeInfo* pNode = &(pRaft->cluster.nodeInfo[progress->selfIndex]); @@ -87,7 +87,7 @@ static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress, case PROGRESS_STATE_REPLICATE: lastIndex = entries[nEntry - 1].index; syncRaftProgressOptimisticNextIndex(progress, lastIndex); - syncRaftInflightAdd(&progress->inflights, lastIndex); + syncRaftInflightAdd(progress->inflights, lastIndex); break; case PROGRESS_STATE_PROBE: progress->probeSent = true; diff --git a/source/libs/sync/src/sync_raft_config_change.c b/source/libs/sync/src/sync_raft_config_change.c index 4e7f2190eaef22d2a6bbca5d835524894733bb56..288fdc465efc28d1618e5d09c271dcc205729e08 100644 --- a/source/libs/sync/src/sync_raft_config_change.c +++ b/source/libs/sync/src/sync_raft_config_change.c @@ -359,7 +359,7 @@ static void makeLearner(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConfi // be turned into a learner in LeaveJoint(). // // Otherwise, add a regular learner right away. - bool inOutgoing = syncRaftJointConfigInCluster(&config->voters.outgoing, id); + bool inOutgoing = syncRaftIsInNodeMap(&config->voters.outgoing, id); if (inOutgoing) { nilAwareAdd(&config->learnersNext, id); } else { @@ -381,7 +381,7 @@ static void removeNodeId(SSyncRaftChanger* changer, SSyncRaftProgressTrackerConf nilAwareDelete(&config->learnersNext, id); // If the peer is still a voter in the outgoing config, keep the Progress. - bool inOutgoing = syncRaftJointConfigInCluster(&config->voters.outgoing, id); + bool inOutgoing = syncRaftIsInNodeMap(&config->voters.outgoing, id); if (!inOutgoing) { syncRaftRemoveFromProgressMap(progressMap, id); } diff --git a/source/libs/sync/src/raft_election.c b/source/libs/sync/src/sync_raft_election.c similarity index 56% rename from source/libs/sync/src/raft_election.c rename to source/libs/sync/src/sync_raft_election.c index eb310c31eceeca07e9736a7d175723bf07661c23..74c3e09dae364ce4a245a97ca065efdca26106c3 100644 --- a/source/libs/sync/src/raft_election.c +++ b/source/libs/sync/src/sync_raft_election.c @@ -17,15 +17,42 @@ #include "raft.h" #include "raft_log.h" #include "raft_message.h" +#include "sync_raft_progress_tracker.h" + +static void campaign(SSyncRaft* pRaft, ESyncRaftElectionType cType); void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { - SyncTerm term; + if (pRaft->state == TAOS_SYNC_STATE_LEADER) { + syncDebug("[%d:%d] ignoring RAFT_MSG_INTERNAL_ELECTION because already leader", pRaft->selfGroupId, pRaft->selfId); + return; + } + + if (!syncRaftIsPromotable(pRaft)) { + syncWarn("[%d:%d] is unpromotable and can not campaign", pRaft->selfGroupId, pRaft->selfId); + return; + } + + // if there is pending uncommitted config,cannot start election + if (syncRaftLogNumOfPendingConf(pRaft->log) > 0 && syncRaftHasUnappliedLog(pRaft->log)) { + syncWarn("[%d:%d] cannot syncRaftStartElection at term %" PRId64 " since there are still pending configuration changes to apply", + pRaft->selfGroupId, pRaft->selfId, pRaft->term); + return; + } + + syncInfo("[%d:%d] is starting a new election at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, pRaft->term); + + campaign(pRaft, cType); +} + +// campaign transitions the raft instance to candidate state. This must only be +// called after verifying that this is a legitimate transition. +static void campaign(SSyncRaft* pRaft, ESyncRaftElectionType cType) { bool preVote; - ESyncRaftMessageType voteMsgType; + SyncTerm term; if (syncRaftIsPromotable(pRaft)) { syncDebug("[%d:%d] is unpromotable; campaign() should have been called", pRaft->selfGroupId, pRaft->selfId); - return 0; + return; } if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) { @@ -35,7 +62,6 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { term = pRaft->term + 1; } else { syncRaftBecomeCandidate(pRaft); - voteMsgType = RAFT_MSG_VOTE; term = pRaft->term; preVote = false; } @@ -43,10 +69,8 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { int quorum = syncRaftQuorum(pRaft); ESyncRaftVoteResult result = syncRaftPollVote(pRaft, pRaft->selfId, preVote, true, NULL, NULL); if (result == SYNC_RAFT_VOTE_WON) { - /** - * We won the election after voting for ourselves (which must mean that - * this is a single-node cluster). Advance to the next state. - **/ + // We won the election after voting for ourselves (which must mean that + // this is a single-node cluster). Advance to the next state. if (cType == SYNC_RAFT_CAMPAIGN_PRE_ELECTION) { syncRaftStartElection(pRaft, SYNC_RAFT_CAMPAIGN_ELECTION); } else { @@ -59,12 +83,17 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { int i; SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log); - for (i = 0; i < pRaft->cluster.replica; ++i) { - if (i == pRaft->cluster.selfIndex) { + SSyncRaftNodeMap nodeMap; + syncRaftJointConfigIDS(&pRaft->tracker->config.voters, &nodeMap); + for (i = 0; i < TSDB_MAX_REPLICA; ++i) { + SyncNodeId nodeId = nodeMap.nodeId[i]; + if (nodeId == SYNC_NON_NODE_ID) { continue; } - SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId; + if (nodeId == pRaft->selfId) { + continue; + } SSyncMessage* pMsg = syncNewVoteMsg(pRaft->selfGroupId, pRaft->selfId, term, cType, lastIndex, lastTerm); @@ -72,9 +101,9 @@ void syncRaftStartElection(SSyncRaft* pRaft, ESyncRaftElectionType cType) { continue; } - syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %" PRId64 "] sent %d request to %d at term %" PRId64 "", + syncInfo("[%d:%d] [logterm: %" PRId64 ", index: %" PRId64 "] sent vote request to %d at term %" PRId64 "", pRaft->selfGroupId, pRaft->selfId, lastTerm, - lastIndex, voteMsgType, nodeId, pRaft->term); + lastIndex, nodeId, pRaft->term); pRaft->io.send(pMsg, &(pRaft->cluster.nodeInfo[i])); } diff --git a/source/libs/sync/src/sync_raft_impl.c b/source/libs/sync/src/sync_raft_impl.c index d65e03c64f08416d467e829cd5faf2b7dc36e179..ab2db1023059457cd4ef6c109d7e0a175691373d 100644 --- a/source/libs/sync/src/sync_raft_impl.c +++ b/source/libs/sync/src/sync_raft_impl.c @@ -234,9 +234,7 @@ static int stepLeader(SSyncRaft* pRaft, const SSyncMessage* pMsg) { return 0; } -/** - * tickElection is run by followers and candidates per tick. - **/ +// tickElection is run by followers and candidates after r.electionTimeout. static void tickElection(SSyncRaft* pRaft) { pRaft->electionElapsed += 1; @@ -254,6 +252,7 @@ static void tickElection(SSyncRaft* pRaft) { syncRaftStep(pRaft, syncInitElectionMsg(&msg, pRaft->selfId)); } +// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout. static void tickHeartbeat(SSyncRaft* pRaft) { } diff --git a/source/libs/sync/src/sync_raft_node_map.c b/source/libs/sync/src/sync_raft_node_map.c new file mode 100644 index 0000000000000000000000000000000000000000..e13c8080751b997f80457aad05d1309270c59c5b --- /dev/null +++ b/source/libs/sync/src/sync_raft_node_map.c @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "sync_raft_node_map.h" +#include "sync_type.h" + +bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) { + int i; + + for (i = 0; i < TSDB_MAX_REPLICA; ++i) { + if (nodeId == nodeMap->nodeId[i]) { + return true; + } + } + + return false; +} + +void syncRaftCopyNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to) { + memcpy(to, nodeMap, sizeof(SSyncRaftNodeMap)); +} + +void syncRaftUnionNodeMap(const SSyncRaftNodeMap* nodeMap, SSyncRaftNodeMap* to) { + int i, j, m; + + for (i = 0; i < TSDB_MAX_REPLICA; ++i) { + SyncNodeId id = nodeMap->nodeId[i]; + if (id == SYNC_NON_NODE_ID) { + continue; + } + + syncRaftAddToNodeMap(to, id); + } +} + +void syncRaftAddToNodeMap(SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) { + assert(nodeMap->replica < TSDB_MAX_REPLICA); + + int i, j; + for (i = 0, j = -1; i < TSDB_MAX_REPLICA; ++i) { + SyncNodeId id = nodeMap->nodeId[i]; + if (id == SYNC_NON_NODE_ID) { + if (j == -1) j = i; + continue; + } + if (id == nodeId) { + return; + } + } + + assert(j != -1); + nodeMap->nodeId[j] = nodeId; + nodeMap->replica += 1; +} \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_progress_tracker.c b/source/libs/sync/src/sync_raft_progress_tracker.c index ea7f1ae4f5fed568a466ec3504e864b8377fcd2d..f43414127d34253ab9d4016996f6db8873ee0d25 100644 --- a/source/libs/sync/src/sync_raft_progress_tracker.c +++ b/source/libs/sync/src/sync_raft_progress_tracker.c @@ -49,6 +49,10 @@ void syncRaftCloneTrackerConfig(const SSyncRaftProgressTrackerConfig* from, SSyn } +int syncRaftCheckProgress(const SSyncRaftProgressTrackerConfig* config, SSyncRaftProgressMap* progressMap) { + return 0; +} + /** * syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the * election outcome is known. diff --git a/source/libs/sync/src/sync_raft_quorum_joint.c b/source/libs/sync/src/sync_raft_quorum_joint.c index fa663b6fc34166cc6d69824f5a2d7e85fcc5e717..8a99574d6876954497f653dca803196797b84123 100644 --- a/source/libs/sync/src/sync_raft_quorum_joint.c +++ b/source/libs/sync/src/sync_raft_quorum_joint.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "sync_raft_node_map.h" #include "sync_raft_quorum_majority.h" #include "sync_raft_quorum_joint.h" #include "sync_raft_quorum.h" @@ -71,15 +72,10 @@ void syncRaftJointConfigRemoveFromIncoming(SSyncRaftQuorumJointConfig* config, S assert(config->incoming.replica >= 0); } +void syncRaftJointConfigIDS(const SSyncRaftQuorumJointConfig* config, SSyncRaftNodeMap* nodeMap) { + int i, j, m; -bool syncRaftIsInNodeMap(const SSyncRaftNodeMap* nodeMap, SyncNodeId nodeId) { - int i; - - for (i = 0; i < TSDB_MAX_REPLICA; ++i) { - if (nodeId == nodeMap->nodeId[i]) { - return true; - } - } + syncRaftCopyNodeMap(&config->incoming, nodeMap); - return false; + syncRaftUnionNodeMap(&config->outgoing, nodeMap); } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_quorum_majority.c b/source/libs/sync/src/sync_raft_quorum_majority.c index 73eb378e09ac21827ee3fe3c18eae49c91dde2ed..8ff5752b97abe5959f68756758c95146193033e9 100644 --- a/source/libs/sync/src/sync_raft_quorum_majority.c +++ b/source/libs/sync/src/sync_raft_quorum_majority.c @@ -15,6 +15,7 @@ #include "sync_raft_quorum.h" #include "sync_raft_quorum_majority.h" +#include "sync_raft_node_map.h" /** * syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns