From 0c65b84886ede5d64c434f5d106ceb0851f69856 Mon Sep 17 00:00:00 2001 From: lichuang Date: Wed, 17 Nov 2021 08:29:24 +0800 Subject: [PATCH] [TD-10645][raft]add node map --- source/libs/sync/inc/raft.h | 4 ++- source/libs/sync/inc/raft_configuration.h | 27 ------------------- source/libs/sync/inc/sync_raft_impl.h | 2 ++ source/libs/sync/src/raft.c | 13 ++++++--- source/libs/sync/src/raft_configuration.c | 25 ----------------- .../src/raft_handle_append_entries_message.c | 9 +++---- .../libs/sync/src/raft_handle_vote_message.c | 9 ++++--- .../sync/src/raft_handle_vote_resp_message.c | 6 ++--- source/libs/sync/src/raft_replication.c | 7 +++-- source/libs/sync/src/sync_raft_election.c | 2 +- source/libs/sync/src/sync_raft_impl.c | 26 ++++++++++++------ 11 files changed, 50 insertions(+), 80 deletions(-) delete mode 100644 source/libs/sync/inc/raft_configuration.h delete mode 100644 source/libs/sync/src/raft_configuration.c diff --git a/source/libs/sync/inc/raft.h b/source/libs/sync/inc/raft.h index 5b6efb95e5..6fa6c6e346 100644 --- a/source/libs/sync/inc/raft.h +++ b/source/libs/sync/inc/raft.h @@ -18,6 +18,7 @@ #include "sync.h" #include "sync_type.h" +#include "thash.h" #include "raft_message.h" #include "sync_raft_impl.h" #include "sync_raft_quorum.h" @@ -43,7 +44,8 @@ struct SSyncRaft { // owner sync node SSyncNode* pNode; - SSyncCluster cluster; + // hash map nodeId -> SNodeInfo* + SHashObj* nodeInfoMap; int selfIndex; SyncNodeId selfId; diff --git a/source/libs/sync/inc/raft_configuration.h b/source/libs/sync/inc/raft_configuration.h deleted file mode 100644 index ac9bbb5e55..0000000000 --- a/source/libs/sync/inc/raft_configuration.h +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_LIBS_SYNC_RAFT_CONFIGURATION_H -#define _TD_LIBS_SYNC_RAFT_CONFIGURATION_H - -#include "sync.h" -#include "sync_type.h" - -// return -1 if cannot find this id -int syncRaftConfigurationIndexOfNode(SSyncRaft *pRaft, SyncNodeId id); - -int syncRaftConfigurationVoterCount(SSyncRaft *pRaft); - -#endif /* _TD_LIBS_SYNC_RAFT_CONFIGURATION_H */ \ No newline at end of file diff --git a/source/libs/sync/inc/sync_raft_impl.h b/source/libs/sync/inc/sync_raft_impl.h index bd77978c28..a8615f17eb 100644 --- a/source/libs/sync/inc/sync_raft_impl.h +++ b/source/libs/sync/inc/sync_raft_impl.h @@ -51,4 +51,6 @@ void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState); void syncRaftBroadcastAppend(SSyncRaft* pRaft); +SNodeInfo* syncRaftGetNodeById(SSyncRaft *pRaft, SyncNodeId id); + #endif /* _TD_LIBS_SYNC_RAFT_IMPL_H */ diff --git a/source/libs/sync/src/raft.c b/source/libs/sync/src/raft.c index 39e0377545..d39e047492 100644 --- a/source/libs/sync/src/raft.c +++ b/source/libs/sync/src/raft.c @@ -14,7 +14,7 @@ */ #include "raft.h" -#include "raft_configuration.h" +#include "sync_raft_impl.h" #include "raft_log.h" #include "sync_raft_restore.h" #include "raft_replication.h" @@ -59,6 +59,11 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) { logStore = &(pRaft->logStore); fsm = &(pRaft->fsm); + pRaft->nodeInfoMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); + if (pRaft->nodeInfoMap == NULL) { + return -1; + } + // init progress tracker pRaft->tracker = syncRaftOpenProgressTracker(); if (pRaft->tracker == NULL) { @@ -290,8 +295,8 @@ static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) * but it will not receive MsgApp or MsgHeartbeat, so it will not create * disruptive term increases **/ - int peerIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); - if (peerIndex < 0) { + SNodeInfo* pNode = syncRaftGetNodeById(pRaft, pMsg->from); + if (pNode == NULL) { return true; } SSyncMessage* msg = syncNewEmptyAppendRespMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term); @@ -299,7 +304,7 @@ static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) return true; } - pRaft->io.send(msg, &(pRaft->cluster.nodeInfo[peerIndex])); + pRaft->io.send(msg, pNode); } else { // ignore other cases syncInfo("[%d:%d] [term:%" PRId64 "] ignored a %d message with lower term from %d [term:%" PRId64 "]", diff --git a/source/libs/sync/src/raft_configuration.c b/source/libs/sync/src/raft_configuration.c deleted file mode 100644 index e16cb34989..0000000000 --- a/source/libs/sync/src/raft_configuration.c +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "raft_configuration.h" -#include "raft.h" - -int syncRaftConfigurationIndexOfNode(SSyncRaft *pRaft, SyncNodeId id) { - return (int)(id); -} - -int syncRaftConfigurationVoterCount(SSyncRaft *pRaft) { - return pRaft->cluster.replica; -} \ No newline at end of file diff --git a/source/libs/sync/src/raft_handle_append_entries_message.c b/source/libs/sync/src/raft_handle_append_entries_message.c index 4797b6ce03..92ebfe75f5 100644 --- a/source/libs/sync/src/raft_handle_append_entries_message.c +++ b/source/libs/sync/src/raft_handle_append_entries_message.c @@ -16,15 +16,14 @@ #include "syncInt.h" #include "raft.h" #include "raft_log.h" -#include "raft_configuration.h" +#include "sync_raft_impl.h" #include "raft_message.h" int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { const RaftMsg_Append_Entries *appendEntries = &(pMsg->appendEntries); - int peerIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); - - if (peerIndex < 0) { + SNodeInfo* pNode = syncRaftGetNodeById(pRaft, pMsg->from); + if (pNode == NULL) { return 0; } @@ -44,6 +43,6 @@ int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMs pRaft->selfGroupId, pRaft->selfId, pMsg->from, appendEntries->index); out: - pRaft->io.send(pRespMsg, &(pRaft->cluster.nodeInfo[peerIndex])); + pRaft->io.send(pRespMsg, pNode); return 0; } \ No newline at end of file diff --git a/source/libs/sync/src/raft_handle_vote_message.c b/source/libs/sync/src/raft_handle_vote_message.c index 709e319c3e..9997c5226d 100644 --- a/source/libs/sync/src/raft_handle_vote_message.c +++ b/source/libs/sync/src/raft_handle_vote_message.c @@ -15,7 +15,7 @@ #include "syncInt.h" #include "raft.h" -#include "raft_configuration.h" +#include "sync_raft_impl.h" #include "raft_log.h" #include "raft_message.h" @@ -23,10 +23,11 @@ static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg); int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { SSyncMessage* pRespMsg; - int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); - if (voteIndex == -1) { + SNodeInfo* pNode = syncRaftGetNodeById(pRaft, pMsg->from); + if (pNode == NULL) { return 0; } + bool grant; SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log); SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log); @@ -42,7 +43,7 @@ int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { grant ? "grant" : "reject", pMsg->from, pMsg->vote.lastTerm, pMsg->vote.lastIndex, pRaft->term); - pRaft->io.send(pRespMsg, &(pRaft->cluster.nodeInfo[voteIndex])); + pRaft->io.send(pRespMsg, pNode); return 0; } diff --git a/source/libs/sync/src/raft_handle_vote_resp_message.c b/source/libs/sync/src/raft_handle_vote_resp_message.c index 1781205ec0..744d654cc5 100644 --- a/source/libs/sync/src/raft_handle_vote_resp_message.c +++ b/source/libs/sync/src/raft_handle_vote_resp_message.c @@ -15,7 +15,7 @@ #include "syncInt.h" #include "raft.h" -#include "raft_configuration.h" +#include "sync_raft_impl.h" #include "raft_message.h" int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { @@ -25,8 +25,8 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) { assert(pRaft->state == TAOS_SYNC_STATE_CANDIDATE); - voterIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from); - if (voterIndex == -1) { + SNodeInfo* pNode = syncRaftGetNodeById(pRaft, pMsg->from); + if (pNode == NULL) { syncError("[%d:%d] recv vote resp from unknown server %d", pRaft->selfGroupId, pRaft->selfId, pMsg->from); return 0; } diff --git a/source/libs/sync/src/raft_replication.c b/source/libs/sync/src/raft_replication.c index 74f40179c6..228d8195f6 100644 --- a/source/libs/sync/src/raft_replication.c +++ b/source/libs/sync/src/raft_replication.c @@ -69,9 +69,12 @@ static bool sendSnapshot(SSyncRaft* pRaft, SSyncRaftProgress* progress) { static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress, SyncIndex prevIndex, SyncTerm prevTerm, SSyncRaftEntry *entries, int nEntry) { + SNodeInfo* pNode = syncRaftGetNodeById(pRaft, progress->id); + if (pNode == NULL) { + return false; + } SyncIndex lastIndex; - SyncTerm logTerm = prevTerm; - SNodeInfo* pNode = &(pRaft->cluster.nodeInfo[progress->selfIndex]); + SyncTerm logTerm = prevTerm; SSyncMessage* msg = syncNewAppendMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term, prevIndex, prevTerm, pRaft->log->commitIndex, diff --git a/source/libs/sync/src/sync_raft_election.c b/source/libs/sync/src/sync_raft_election.c index 74c3e09dae..b5649d5c5e 100644 --- a/source/libs/sync/src/sync_raft_election.c +++ b/source/libs/sync/src/sync_raft_election.c @@ -105,6 +105,6 @@ static void campaign(SSyncRaft* pRaft, ESyncRaftElectionType cType) { pRaft->selfGroupId, pRaft->selfId, lastTerm, lastIndex, nodeId, pRaft->term); - pRaft->io.send(pMsg, &(pRaft->cluster.nodeInfo[i])); + //pRaft->io.send(pMsg, &(pRaft->cluster.nodeInfo[i])); } } \ No newline at end of file diff --git a/source/libs/sync/src/sync_raft_impl.c b/source/libs/sync/src/sync_raft_impl.c index ab2db10230..6ec0c6c089 100644 --- a/source/libs/sync/src/sync_raft_impl.c +++ b/source/libs/sync/src/sync_raft_impl.c @@ -14,7 +14,7 @@ */ #include "raft.h" -#include "raft_configuration.h" +#include "sync_raft_impl.h" #include "raft_log.h" #include "raft_replication.h" #include "sync_raft_progress_tracker.h" @@ -123,15 +123,16 @@ bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) { } int syncRaftQuorum(SSyncRaft* pRaft) { - return pRaft->cluster.replica / 2 + 1; + return 0; + //return pRaft->cluster.replica / 2 + 1; } ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool grant, int* rejected, int *granted) { - int voterIndex = syncRaftConfigurationIndexOfNode(pRaft, id); - if (voterIndex == -1) { - return SYNC_RAFT_VOTE_PENDING; + SNodeInfo* pNode = syncRaftGetNodeById(pRaft, id); + if (pNode == NULL) { + return true; } if (grant) { @@ -142,7 +143,7 @@ ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term); } - syncRaftRecordVote(pRaft->tracker, voterIndex, grant); + syncRaftRecordVote(pRaft->tracker, pNode->nodeId, grant); return syncRaftTallyVotes(pRaft->tracker, rejected, granted); } /* @@ -154,7 +155,7 @@ ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id, pRaft->selfGroupId, pRaft->selfId, id, pRaft->term); } - int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, id); + int voteIndex = syncRaftGetNodeById(pRaft, id); assert(voteIndex < pRaft->cluster.replica && voteIndex >= 0); assert(pRaft->candidateState.votes[voteIndex] == SYNC_RAFT_VOTE_RESP_UNKNOWN); @@ -198,6 +199,15 @@ void syncRaftBroadcastAppend(SSyncRaft* pRaft) { syncRaftProgressVisit(pRaft->tracker, visitProgressSendAppend, pRaft); } +SNodeInfo* syncRaftGetNodeById(SSyncRaft *pRaft, SyncNodeId id) { + SNodeInfo **ppNode = taosHashGet(pRaft->nodeInfoMap, &id, sizeof(SNodeInfo)); + if (ppNode != NULL) { + return *ppNode; + } + + return NULL; +} + static int convertClear(SSyncRaft* pRaft) { } @@ -269,7 +279,7 @@ static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) { syncRaftLogAppend(pRaft->log, entries, n); - SSyncRaftProgress* progress = &(pRaft->tracker->progressMap.progress[pRaft->cluster.selfIndex]); + SSyncRaftProgress* progress = &(pRaft->tracker->progressMap.progress[pRaft->selfIndex]); syncRaftProgressMaybeUpdate(progress, lastIndex); // Regardless of syncRaftMaybeCommit's return, our caller will call bcastAppend. syncRaftMaybeCommit(pRaft); -- GitLab