提交 0c65b848 编写于 作者: L lichuang

[TD-10645][raft]<feature>add node map

上级 a936a7a5
......@@ -18,6 +18,7 @@
#include "sync.h"
#include "sync_type.h"
#include "thash.h"
#include "raft_message.h"
#include "sync_raft_impl.h"
#include "sync_raft_quorum.h"
......@@ -43,7 +44,8 @@ struct SSyncRaft {
// owner sync node
SSyncNode* pNode;
SSyncCluster cluster;
// hash map nodeId -> SNodeInfo*
SHashObj* nodeInfoMap;
int selfIndex;
SyncNodeId selfId;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_RAFT_CONFIGURATION_H
#define _TD_LIBS_SYNC_RAFT_CONFIGURATION_H
#include "sync.h"
#include "sync_type.h"
// return -1 if cannot find this id
int syncRaftConfigurationIndexOfNode(SSyncRaft *pRaft, SyncNodeId id);
int syncRaftConfigurationVoterCount(SSyncRaft *pRaft);
#endif /* _TD_LIBS_SYNC_RAFT_CONFIGURATION_H */
\ No newline at end of file
......@@ -51,4 +51,6 @@ void syncRaftLoadState(SSyncRaft* pRaft, const SSyncServerState* serverState);
void syncRaftBroadcastAppend(SSyncRaft* pRaft);
SNodeInfo* syncRaftGetNodeById(SSyncRaft *pRaft, SyncNodeId id);
#endif /* _TD_LIBS_SYNC_RAFT_IMPL_H */
......@@ -14,7 +14,7 @@
*/
#include "raft.h"
#include "raft_configuration.h"
#include "sync_raft_impl.h"
#include "raft_log.h"
#include "sync_raft_restore.h"
#include "raft_replication.h"
......@@ -59,6 +59,11 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
logStore = &(pRaft->logStore);
fsm = &(pRaft->fsm);
pRaft->nodeInfoMap = taosHashInit(TSDB_MAX_REPLICA, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (pRaft->nodeInfoMap == NULL) {
return -1;
}
// init progress tracker
pRaft->tracker = syncRaftOpenProgressTracker();
if (pRaft->tracker == NULL) {
......@@ -290,8 +295,8 @@ static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg)
* but it will not receive MsgApp or MsgHeartbeat, so it will not create
* disruptive term increases
**/
int peerIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from);
if (peerIndex < 0) {
SNodeInfo* pNode = syncRaftGetNodeById(pRaft, pMsg->from);
if (pNode == NULL) {
return true;
}
SSyncMessage* msg = syncNewEmptyAppendRespMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term);
......@@ -299,7 +304,7 @@ static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg)
return true;
}
pRaft->io.send(msg, &(pRaft->cluster.nodeInfo[peerIndex]));
pRaft->io.send(msg, pNode);
} else {
// ignore other cases
syncInfo("[%d:%d] [term:%" PRId64 "] ignored a %d message with lower term from %d [term:%" PRId64 "]",
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "raft_configuration.h"
#include "raft.h"
int syncRaftConfigurationIndexOfNode(SSyncRaft *pRaft, SyncNodeId id) {
return (int)(id);
}
int syncRaftConfigurationVoterCount(SSyncRaft *pRaft) {
return pRaft->cluster.replica;
}
\ No newline at end of file
......@@ -16,15 +16,14 @@
#include "syncInt.h"
#include "raft.h"
#include "raft_log.h"
#include "raft_configuration.h"
#include "sync_raft_impl.h"
#include "raft_message.h"
int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
const RaftMsg_Append_Entries *appendEntries = &(pMsg->appendEntries);
int peerIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from);
if (peerIndex < 0) {
SNodeInfo* pNode = syncRaftGetNodeById(pRaft, pMsg->from);
if (pNode == NULL) {
return 0;
}
......@@ -44,6 +43,6 @@ int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMs
pRaft->selfGroupId, pRaft->selfId, pMsg->from, appendEntries->index);
out:
pRaft->io.send(pRespMsg, &(pRaft->cluster.nodeInfo[peerIndex]));
pRaft->io.send(pRespMsg, pNode);
return 0;
}
\ No newline at end of file
......@@ -15,7 +15,7 @@
#include "syncInt.h"
#include "raft.h"
#include "raft_configuration.h"
#include "sync_raft_impl.h"
#include "raft_log.h"
#include "raft_message.h"
......@@ -23,10 +23,11 @@ static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
SSyncMessage* pRespMsg;
int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from);
if (voteIndex == -1) {
SNodeInfo* pNode = syncRaftGetNodeById(pRaft, pMsg->from);
if (pNode == NULL) {
return 0;
}
bool grant;
SyncIndex lastIndex = syncRaftLogLastIndex(pRaft->log);
SyncTerm lastTerm = syncRaftLogLastTerm(pRaft->log);
......@@ -42,7 +43,7 @@ int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
grant ? "grant" : "reject",
pMsg->from, pMsg->vote.lastTerm, pMsg->vote.lastIndex, pRaft->term);
pRaft->io.send(pRespMsg, &(pRaft->cluster.nodeInfo[voteIndex]));
pRaft->io.send(pRespMsg, pNode);
return 0;
}
......
......@@ -15,7 +15,7 @@
#include "syncInt.h"
#include "raft.h"
#include "raft_configuration.h"
#include "sync_raft_impl.h"
#include "raft_message.h"
int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
......@@ -25,8 +25,8 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
assert(pRaft->state == TAOS_SYNC_STATE_CANDIDATE);
voterIndex = syncRaftConfigurationIndexOfNode(pRaft, pMsg->from);
if (voterIndex == -1) {
SNodeInfo* pNode = syncRaftGetNodeById(pRaft, pMsg->from);
if (pNode == NULL) {
syncError("[%d:%d] recv vote resp from unknown server %d", pRaft->selfGroupId, pRaft->selfId, pMsg->from);
return 0;
}
......
......@@ -69,9 +69,12 @@ static bool sendSnapshot(SSyncRaft* pRaft, SSyncRaftProgress* progress) {
static bool sendAppendEntries(SSyncRaft* pRaft, SSyncRaftProgress* progress,
SyncIndex prevIndex, SyncTerm prevTerm,
SSyncRaftEntry *entries, int nEntry) {
SNodeInfo* pNode = syncRaftGetNodeById(pRaft, progress->id);
if (pNode == NULL) {
return false;
}
SyncIndex lastIndex;
SyncTerm logTerm = prevTerm;
SNodeInfo* pNode = &(pRaft->cluster.nodeInfo[progress->selfIndex]);
SyncTerm logTerm = prevTerm;
SSyncMessage* msg = syncNewAppendMsg(pRaft->selfGroupId, pRaft->selfId, pRaft->term,
prevIndex, prevTerm, pRaft->log->commitIndex,
......
......@@ -105,6 +105,6 @@ static void campaign(SSyncRaft* pRaft, ESyncRaftElectionType cType) {
pRaft->selfGroupId, pRaft->selfId, lastTerm,
lastIndex, nodeId, pRaft->term);
pRaft->io.send(pMsg, &(pRaft->cluster.nodeInfo[i]));
//pRaft->io.send(pMsg, &(pRaft->cluster.nodeInfo[i]));
}
}
\ No newline at end of file
......@@ -14,7 +14,7 @@
*/
#include "raft.h"
#include "raft_configuration.h"
#include "sync_raft_impl.h"
#include "raft_log.h"
#include "raft_replication.h"
#include "sync_raft_progress_tracker.h"
......@@ -123,15 +123,16 @@ bool syncRaftIsPastElectionTimeout(SSyncRaft* pRaft) {
}
int syncRaftQuorum(SSyncRaft* pRaft) {
return pRaft->cluster.replica / 2 + 1;
return 0;
//return pRaft->cluster.replica / 2 + 1;
}
ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id,
bool preVote, bool grant,
int* rejected, int *granted) {
int voterIndex = syncRaftConfigurationIndexOfNode(pRaft, id);
if (voterIndex == -1) {
return SYNC_RAFT_VOTE_PENDING;
SNodeInfo* pNode = syncRaftGetNodeById(pRaft, id);
if (pNode == NULL) {
return true;
}
if (grant) {
......@@ -142,7 +143,7 @@ ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id,
pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term);
}
syncRaftRecordVote(pRaft->tracker, voterIndex, grant);
syncRaftRecordVote(pRaft->tracker, pNode->nodeId, grant);
return syncRaftTallyVotes(pRaft->tracker, rejected, granted);
}
/*
......@@ -154,7 +155,7 @@ ESyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id,
pRaft->selfGroupId, pRaft->selfId, id, pRaft->term);
}
int voteIndex = syncRaftConfigurationIndexOfNode(pRaft, id);
int voteIndex = syncRaftGetNodeById(pRaft, id);
assert(voteIndex < pRaft->cluster.replica && voteIndex >= 0);
assert(pRaft->candidateState.votes[voteIndex] == SYNC_RAFT_VOTE_RESP_UNKNOWN);
......@@ -198,6 +199,15 @@ void syncRaftBroadcastAppend(SSyncRaft* pRaft) {
syncRaftProgressVisit(pRaft->tracker, visitProgressSendAppend, pRaft);
}
SNodeInfo* syncRaftGetNodeById(SSyncRaft *pRaft, SyncNodeId id) {
SNodeInfo **ppNode = taosHashGet(pRaft->nodeInfoMap, &id, sizeof(SNodeInfo));
if (ppNode != NULL) {
return *ppNode;
}
return NULL;
}
static int convertClear(SSyncRaft* pRaft) {
}
......@@ -269,7 +279,7 @@ static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) {
syncRaftLogAppend(pRaft->log, entries, n);
SSyncRaftProgress* progress = &(pRaft->tracker->progressMap.progress[pRaft->cluster.selfIndex]);
SSyncRaftProgress* progress = &(pRaft->tracker->progressMap.progress[pRaft->selfIndex]);
syncRaftProgressMaybeUpdate(progress, lastIndex);
// Regardless of syncRaftMaybeCommit's return, our caller will call bcastAppend.
syncRaftMaybeCommit(pRaft);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册