diff --git a/source/libs/sync/inc/raft.h b/source/libs/sync/inc/raft.h index 5b6efb95e5f0eb53d536cddb13e408f1dac404d0..6fa6c6e346ea358506e195d2583c6bc4fd08b95b 100644 --- a/source/libs/sync/inc/raft.h +++ b/source/libs/sync/inc/raft.h @@ -18,6 +18,7 @@ #include "sync.h" #include "sync_type.h" +#include "thash.h" #include "raft_message.h" #include "sync_raft_impl.h" #include "sync_raft_quorum.h" @@ -43,7 +44,8 @@ struct SSyncRaft { // owner sync node SSyncNode* pNode; - SSyncCluster cluster; + // hash map nodeId -> SNodeInfo* + SHashObj* nodeInfoMap; int selfIndex; SyncNodeId selfId; diff --git a/source/libs/sync/inc/raft_configuration.h b/source/libs/sync/inc/raft_configuration.h deleted file mode 100644 index ac9bbb5e55d89f0f1c4952dad15c1c900828b631..0000000000000000000000000000000000000000 --- a/source/libs/sync/inc/raft_configuration.h +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_LIBS_SYNC_RAFT_CONFIGURATION_H -#define _TD_LIBS_SYNC_RAFT_CONFIGURATION_H - -#include "sync.h" -#include "sync_type.h" - -// return -1 if cannot find this id -int syncRaftConfigurationIndexOfNode(SSyncRaft *pRaft, SyncNodeId id); - -int syncRaftConfigurationVoterCount(SSyncRaft *pRaft); - -#endif /* _TD_LIBS_SYNC_RAFT_CONFIGURATION_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/sync_raft_impl.h b/source/libs/sync/inc/sync_raft_impl.h index bd77978c28cb42e03b47ea035e9a5cc4122733da..a8615f17ebcff7cd371cfc07763510d9a6d48f1a 100644 --- a/source/libs/sync/inc/sync_raft_impl.h +++ b/source/libs/sync/inc/sync_raft_impl.h @@ -51,4 +51,6 @@ void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState); void syncRaftBroadcastAppend(SSyncRaft* pRaft); +SNodeInfo* syncRaftGetNodeById(SSyncRaft *pRaft, SyncNodeId id); + #endif /* _TD_LIBS_SYNC_RAFT_IMPL_H */ diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index 39e03775451979a48e22536091b04c72ce34799e..d39e04749260a48d67118833092463f8cb78e5c2 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -14,7 +14,7 @@ */ #include "raft.h" -#include "raft_configuration.h" +#include "sync_raft_impl.h" #include "raft_log.h" #include "sync_raft_restore.h" #include "raft_replication.h" @@ -59,6 +59,11 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { logStore = &(pRaft->logStore); fsm = &(pRaft->fsm); + pRaft->nodeInfoMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (pRaft->nodeInfoMap == NULL) { + return -1; + } + // init progress tracker pRaft->tracker = syncRaftOpenProgressTracker(); if (pRaft->tracker == NULL) { @@ -290,8 +295,8 @@ static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) * but it will not receive MsgApp or MsgHeartbeat, so it will not create * disruptive term increases **/ - int peerIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); - if (peerIndex < 0) { + SNodeInfo* pNode = syncRaftGetNodeById(pRaft, pMsg->from); + if (pNode == NULL) { return true; } SSyncMessage* msg = syncNewEmptyAppendRespMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term); @@ -299,7 +304,7 @@ static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) return true; } - pRaft->io.send(msg, &(pRaft->cluster.nodeInfo[peerIndex])); + pRaft->io.send(msg, pNode); } else { // ignore other cases syncInfo("[%d:%d] [term:%" PRId64 "] ignored a %d message with lower term from %d [term:%" PRId64 "]", diff --git a/source/libs/sync/src/raft_configuration.c b/source/libs/sync/src/raft_configuration.c deleted file mode 100644 index e16cb349892a78fef6ac58f1fb3d0bc9ecd66064..0000000000000000000000000000000000000000 --- a/source/libs/sync/src/raft_configuration.c +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "raft_configuration.h" -#include "raft.h" - -int syncRaftConfigurationIndexOfNode(SSyncRaft *pRaft, SyncNodeId id) { - return (int)(id); -} - -int syncRaftConfigurationVoterCount(SSyncRaft *pRaft) { - return pRaft->cluster.replica; -} \ No newline at end of file diff --git a/source/libs/sync/src/raft_handle_append_entries_message.c b/source/libs/sync/src/raft_handle_append_entries_message.c index 4797b6ce030b7ee5fdafc4142426604c2c4397db..92ebfe75f5ef3ebb26bb1fd5f00e850206db9936 100644 --- a/source/libs/sync/src/raft_handle_append_entries_message.c +++ b/source/libs/sync/src/raft_handle_append_entries_message.c @@ -16,15 +16,14 @@ #include "syncInt.h" #include "raft.h" #include "raft_log.h" -#include "raft_configuration.h" +#include "sync_raft_impl.h" #include "raft_message.h" int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { const RaftMsg_Append_Entries *appendEntries = &(pMsg->appendEntries); - int peerIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); - - if (peerIndex < 0) { + SNodeInfo* pNode = syncRaftGetNodeById(pRaft, pMsg->from); + if (pNode == NULL) { return 0; } @@ -44,6 +43,6 @@ int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMs pRaft->selfGroupId, pRaft->selfId, pMsg->from, appendEntries->index); out: - pRaft->io.send(pRespMsg, &(pRaft->cluster.nodeInfo[peerIndex])); + pRaft->io.send(pRespMsg, pNode); return 0; } \ No newline at end of file diff --git a/source/libs/sync/src/raft_handle_vote_message.c b/source/libs/sync/src/raft_handle_vote_message.c index 709e319c3ee1c3d04b33ddc8042fedc239fc9d2c..9997c5226d7b30afdf3616d18eed8ad3aca64e74 100644 --- a/source/libs/sync/src/raft_handle_vote_message.c +++ b/source/libs/sync/src/raft_handle_vote_message.c @@ -15,7 +15,7 @@ #include "syncInt.h" #include "raft.h" -#include "raft_configuration.h" +#include "sync_raft_impl.h" #include "raft_log.h" #include "raft_message.h" @@ -23,10 +23,11 @@ static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { SSyncMessage* pRespMsg; - int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); - if (voteIndex == -1) { + SNodeInfo* pNode = syncRaftGetNodeById(pRaft, pMsg->from); + if (pNode == NULL) { return 0; } + bool grant; SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log); @@ -42,7 +43,7 @@ int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { grant ? "grant" : "reject", pMsg->from, pMsg->vote.lastTerm, pMsg->vote.lastIndex, pRaft->term); - pRaft->io.send(pRespMsg, &(pRaft->cluster.nodeInfo[voteIndex])); + pRaft->io.send(pRespMsg, pNode); return 0; } diff --git a/source/libs/sync/src/raft_handle_vote_resp_message.c b/source/libs/sync/src/raft_handle_vote_resp_message.c index 1781205ec0bdfa361ce63d57e23e1ed1a7bc13ee..744d654cc5f4eeb6e377b48970b9278d9a8a1d5a 100644 --- a/source/libs/sync/src/raft_handle_vote_resp_message.c +++ b/source/libs/sync/src/raft_handle_vote_resp_message.c @@ -15,7 +15,7 @@ #include "syncInt.h" #include "raft.h" -#include "raft_configuration.h" +#include "sync_raft_impl.h" #include "raft_message.h" int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { @@ -25,8 +25,8 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { assert(pRaft->state == TAOS_SYNC_STATE_CANDIDATE); - voterIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); - if (voterIndex == -1) { + SNodeInfo* pNode = syncRaftGetNodeById(pRaft, pMsg->from); + if (pNode == NULL) { syncError("[%d:%d] recv vote resp from unknown server %d", pRaft->selfGroupId, pRaft->selfId, pMsg->from); return 0; } diff --git a/source/libs/sync/src/raft_replication.c b/source/libs/sync/src/raft_replication.c index 74f40179c6d7b0d353c0d976664ed1d47cf1b058..228d8195f63164f674cf3dfe6b946f802bdc6ed4 100644 --- a/source/libs/sync/src/raft_replication.c +++ b/source/libs/sync/src/raft_replication.c @@ -69,9 +69,12 @@ static bool sendSnapshot(SSyncRaft* pRaft, SSyncRaftProgress* progress) { static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress, SyncIndex prevIndex, SyncTerm prevTerm, SSyncRaftEntry *entries, int nEntry) { + SNodeInfo* pNode = syncRaftGetNodeById(pRaft, progress->id); + if (pNode == NULL) { + return false; + } SyncIndex lastIndex; - SyncTerm logTerm = prevTerm; - SNodeInfo* pNode = &(pRaft->cluster.nodeInfo[progress->selfIndex]); + SyncTerm logTerm = prevTerm; SSyncMessage* msg = syncNewAppendMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term, prevIndex, prevTerm, pRaft->log->commitIndex, diff --git a/source/libs/sync/src/sync_raft_election.c b/source/libs/sync/src/sync_raft_election.c index 74c3e09dae364ce4a245a97ca065efdca26106c3..b5649d5c5ea56b82c394dc642c8dd08a89a4c4a0 100644 --- a/source/libs/sync/src/sync_raft_election.c +++ b/source/libs/sync/src/sync_raft_election.c @@ -105,6 +105,6 @@ static void campaign(SSyncRaft* pRaft, ESyncRaftElectionType cType) { pRaft->selfGroupId, pRaft->selfId, lastTerm, lastIndex, nodeId, pRaft->term); - pRaft->io.send(pMsg, &(pRaft->cluster.nodeInfo[i])); + //pRaft->io.send(pMsg, &(pRaft->cluster.nodeInfo[i])); } } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_impl.c b/source/libs/sync/src/sync_raft_impl.c index ab2db1023059457cd4ef6c109d7e0a175691373d..6ec0c6c0892c58de377df4adb401103167b0b657 100644 --- a/source/libs/sync/src/sync_raft_impl.c +++ b/source/libs/sync/src/sync_raft_impl.c @@ -14,7 +14,7 @@ */ #include "raft.h" -#include "raft_configuration.h" +#include "sync_raft_impl.h" #include "raft_log.h" #include "raft_replication.h" #include "sync_raft_progress_tracker.h" @@ -123,15 +123,16 @@ bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) { } int syncRaftQuorum(SSyncRaft* pRaft) { - return pRaft->cluster.replica / 2 + 1; + return 0; + //return pRaft->cluster.replica / 2 + 1; } ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool grant, int* rejected, int *granted) { - int voterIndex = syncRaftConfigurationIndexOfNode(pRaft, id); - if (voterIndex == -1) { - return SYNC_RAFT_VOTE_PENDING; + SNodeInfo* pNode = syncRaftGetNodeById(pRaft, id); + if (pNode == NULL) { + return true; } if (grant) { @@ -142,7 +143,7 @@ ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term); } - syncRaftRecordVote(pRaft->tracker, voterIndex, grant); + syncRaftRecordVote(pRaft->tracker, pNode->nodeId, grant); return syncRaftTallyVotes(pRaft->tracker, rejected, granted); } /* @@ -154,7 +155,7 @@ ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, pRaft->selfGroupId, pRaft->selfId, id, pRaft->term); } - int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, id); + int voteIndex = syncRaftGetNodeById(pRaft, id); assert(voteIndex < pRaft->cluster.replica && voteIndex >= 0); assert(pRaft->candidateState.votes[voteIndex] == SYNC_RAFT_VOTE_RESP_UNKNOWN); @@ -198,6 +199,15 @@ void syncRaftBroadcastAppend(SSyncRaft* pRaft) { syncRaftProgressVisit(pRaft->tracker, visitProgressSendAppend, pRaft); } +SNodeInfo* syncRaftGetNodeById(SSyncRaft *pRaft, SyncNodeId id) { + SNodeInfo **ppNode = taosHashGet(pRaft->nodeInfoMap, &id, sizeof(SNodeInfo)); + if (ppNode != NULL) { + return *ppNode; + } + + return NULL; +} + static int convertClear(SSyncRaft* pRaft) { } @@ -269,7 +279,7 @@ static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) { syncRaftLogAppend(pRaft->log, entries, n); - SSyncRaftProgress* progress = &(pRaft->tracker->progressMap.progress[pRaft->cluster.selfIndex]); + SSyncRaftProgress* progress = &(pRaft->tracker->progressMap.progress[pRaft->selfIndex]); syncRaftProgressMaybeUpdate(progress, lastIndex); // Regardless of syncRaftMaybeCommit's return, our caller will call bcastAppend. syncRaftMaybeCommit(pRaft);