Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c25d174f
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c25d174f
编写于
11月 04, 2021
作者:
L
lichuang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-10645][raft]<feature>add raft vote resp message handle
上级
e05e6dba
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
203 addition
and
85 deletion
+203
-85
source/libs/sync/inc/raft.h
source/libs/sync/inc/raft.h
+7
-2
source/libs/sync/inc/raft_configuration.h
source/libs/sync/inc/raft_configuration.h
+1
-0
source/libs/sync/inc/raft_message.h
source/libs/sync/inc/raft_message.h
+7
-6
source/libs/sync/inc/sync_type.h
source/libs/sync/inc/sync_type.h
+7
-1
source/libs/sync/src/raft.c
source/libs/sync/src/raft.c
+38
-10
source/libs/sync/src/raft_election.c
source/libs/sync/src/raft_election.c
+75
-0
source/libs/sync/src/raft_handle_election_message.c
source/libs/sync/src/raft_handle_election_message.c
+10
-65
source/libs/sync/src/raft_handle_vote_resp_message.c
source/libs/sync/src/raft_handle_vote_resp_message.c
+57
-0
source/libs/sync/src/raft_progress.c
source/libs/sync/src/raft_progress.c
+1
-1
未找到文件。
source/libs/sync/inc/raft.h
浏览文件 @
c25d174f
...
@@ -31,7 +31,7 @@ typedef struct RaftLeaderState {
...
@@ -31,7 +31,7 @@ typedef struct RaftLeaderState {
typedef
struct
RaftCandidateState
{
typedef
struct
RaftCandidateState
{
/* votes results */
/* votes results */
bool
votes
[
TSDB_MAX_REPLICA
];
SyncRaftVoteRespType
votes
[
TSDB_MAX_REPLICA
];
/* true if in pre-vote phase */
/* true if in pre-vote phase */
bool
inPreVote
;
bool
inPreVote
;
...
@@ -125,10 +125,15 @@ void syncRaftBecomePreCandidate(SSyncRaft* pRaft);
...
@@ -125,10 +125,15 @@ void syncRaftBecomePreCandidate(SSyncRaft* pRaft);
void
syncRaftBecomeCandidate
(
SSyncRaft
*
pRaft
);
void
syncRaftBecomeCandidate
(
SSyncRaft
*
pRaft
);
void
syncRaftBecomeLeader
(
SSyncRaft
*
pRaft
);
void
syncRaftBecomeLeader
(
SSyncRaft
*
pRaft
);
void
syncRaftStartElection
(
SSyncRaft
*
pRaft
,
SyncRaftElectionType
cType
);
void
syncRaftTriggerReplicate
(
SSyncRaft
*
pRaft
);
void
syncRaftRandomizedElectionTimeout
(
SSyncRaft
*
pRaft
);
void
syncRaftRandomizedElectionTimeout
(
SSyncRaft
*
pRaft
);
bool
syncRaftIsPromotable
(
SSyncRaft
*
pRaft
);
bool
syncRaftIsPromotable
(
SSyncRaft
*
pRaft
);
bool
syncRaftIsPastElectionTimeout
(
SSyncRaft
*
pRaft
);
bool
syncRaftIsPastElectionTimeout
(
SSyncRaft
*
pRaft
);
int
syncRaftQuorum
(
SSyncRaft
*
pRaft
);
int
syncRaftQuorum
(
SSyncRaft
*
pRaft
);
int
syncRaftNumOfGranted
(
SSyncRaft
*
pRaft
,
SyncNodeId
id
,
bool
preVote
,
bool
accept
);
int
syncRaftNumOfGranted
(
SSyncRaft
*
pRaft
,
SyncNodeId
id
,
bool
preVote
,
bool
accept
,
int
*
rejectNum
);
#endif
/* _TD_LIBS_SYNC_RAFT_H */
#endif
/* _TD_LIBS_SYNC_RAFT_H */
\ No newline at end of file
source/libs/sync/inc/raft_configuration.h
浏览文件 @
c25d174f
...
@@ -19,6 +19,7 @@
...
@@ -19,6 +19,7 @@
#include "sync.h"
#include "sync.h"
#include "sync_type.h"
#include "sync_type.h"
// return -1 if cannot find this id
int
syncRaftConfigurationIndexOfVoter
(
SSyncRaft
*
pRaft
,
SyncNodeId
id
);
int
syncRaftConfigurationIndexOfVoter
(
SSyncRaft
*
pRaft
,
SyncNodeId
id
);
int
syncRaftConfigurationVoterCount
(
SSyncRaft
*
pRaft
);
int
syncRaftConfigurationVoterCount
(
SSyncRaft
*
pRaft
);
...
...
source/libs/sync/inc/raft_message.h
浏览文件 @
c25d174f
...
@@ -35,7 +35,7 @@ typedef enum RaftMessageType {
...
@@ -35,7 +35,7 @@ typedef enum RaftMessageType {
RAFT_MSG_VOTE
=
3
,
RAFT_MSG_VOTE
=
3
,
RAFT_MSG_VOTE_RESP
=
4
,
RAFT_MSG_VOTE_RESP
=
4
,
RAFT_MSG_APPEND
=
5
,
}
RaftMessageType
;
}
RaftMessageType
;
typedef
struct
RaftMsgInternal_Prop
{
typedef
struct
RaftMsgInternal_Prop
{
...
@@ -49,14 +49,14 @@ typedef struct RaftMsgInternal_Election {
...
@@ -49,14 +49,14 @@ typedef struct RaftMsgInternal_Election {
}
RaftMsgInternal_Election
;
}
RaftMsgInternal_Election
;
typedef
struct
RaftMsg_Vote
{
typedef
struct
RaftMsg_Vote
{
SyncRaft
Campaig
nType
cType
;
SyncRaft
Electio
nType
cType
;
SyncIndex
lastIndex
;
SyncIndex
lastIndex
;
SyncTerm
lastTerm
;
SyncTerm
lastTerm
;
}
RaftMsg_Vote
;
}
RaftMsg_Vote
;
typedef
struct
RaftMsg_VoteResp
{
typedef
struct
RaftMsg_VoteResp
{
bool
reject
;
bool
reject
;
SyncRaft
Campaig
nType
cType
;
SyncRaft
Electio
nType
cType
;
}
RaftMsg_VoteResp
;
}
RaftMsg_VoteResp
;
typedef
struct
SSyncMessage
{
typedef
struct
SSyncMessage
{
...
@@ -104,7 +104,7 @@ static FORCE_INLINE SSyncMessage* syncInitElectionMsg(SSyncMessage* pMsg, SyncNo
...
@@ -104,7 +104,7 @@ static FORCE_INLINE SSyncMessage* syncInitElectionMsg(SSyncMessage* pMsg, SyncNo
}
}
static
FORCE_INLINE
SSyncMessage
*
syncNewVoteMsg
(
SyncGroupId
groupId
,
SyncNodeId
from
,
SyncNodeId
to
,
static
FORCE_INLINE
SSyncMessage
*
syncNewVoteMsg
(
SyncGroupId
groupId
,
SyncNodeId
from
,
SyncNodeId
to
,
SyncTerm
term
,
SyncRaft
Campaig
nType
cType
,
SyncTerm
term
,
SyncRaft
Electio
nType
cType
,
SyncIndex
lastIndex
,
SyncTerm
lastTerm
)
{
SyncIndex
lastIndex
,
SyncTerm
lastTerm
)
{
SSyncMessage
*
pMsg
=
(
SSyncMessage
*
)
malloc
(
sizeof
(
SSyncMessage
));
SSyncMessage
*
pMsg
=
(
SSyncMessage
*
)
malloc
(
sizeof
(
SSyncMessage
));
if
(
pMsg
==
NULL
)
{
if
(
pMsg
==
NULL
)
{
...
@@ -134,13 +134,14 @@ static FORCE_INLINE bool syncIsPreVoteRespMsg(SSyncMessage* pMsg) {
...
@@ -134,13 +134,14 @@ static FORCE_INLINE bool syncIsPreVoteRespMsg(SSyncMessage* pMsg) {
return
pMsg
->
msgType
==
RAFT_MSG_VOTE_RESP
&&
pMsg
->
voteResp
.
cType
==
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
;
return
pMsg
->
msgType
==
RAFT_MSG_VOTE_RESP
&&
pMsg
->
voteResp
.
cType
==
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
;
}
}
static
FORCE_INLINE
bool
syncIsPreVoteMsg
(
SSyncMessage
*
pMsg
)
{
static
FORCE_INLINE
bool
syncIsPreVoteMsg
(
const
SSyncMessage
*
pMsg
)
{
return
pMsg
->
msgType
==
RAFT_MSG_VOTE
&&
pMsg
->
voteResp
.
cType
==
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
;
return
pMsg
->
msgType
==
RAFT_MSG_VOTE
&&
pMsg
->
voteResp
.
cType
==
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
;
}
}
void
syncFreeMessage
(
const
SSyncMessage
*
pMsg
);
void
syncFreeMessage
(
const
SSyncMessage
*
pMsg
);
// message handlers
// message handlers
void
syncRaftHandleElectionMessage
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
);
int
syncRaftHandleElectionMessage
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
);
int
syncRaftHandleVoteRespMessage
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
);
#endif
/* _TD_LIBS_SYNC_RAFT_MESSAGE_H */
#endif
/* _TD_LIBS_SYNC_RAFT_MESSAGE_H */
\ No newline at end of file
source/libs/sync/inc/sync_type.h
浏览文件 @
c25d174f
...
@@ -34,6 +34,12 @@ typedef enum {
...
@@ -34,6 +34,12 @@ typedef enum {
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
=
0
,
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
=
0
,
SYNC_RAFT_CAMPAIGN_ELECTION
=
1
,
SYNC_RAFT_CAMPAIGN_ELECTION
=
1
,
SYNC_RAFT_CAMPAIGN_TRANSFER
=
3
,
SYNC_RAFT_CAMPAIGN_TRANSFER
=
3
,
}
SyncRaftCampaignType
;
}
SyncRaftElectionType
;
typedef
enum
{
SYNC_RAFT_VOTE_RESP_UNKNOWN
=
0
,
SYNC_RAFT_VOTE_RESP_GRANT
=
1
,
SYNC_RAFT_VOTE_RESP_REJECT
=
2
,
}
SyncRaftVoteRespType
;
#endif
/* _TD_LIBS_SYNC_TYPE_H */
#endif
/* _TD_LIBS_SYNC_TYPE_H */
source/libs/sync/src/raft.c
浏览文件 @
c25d174f
...
@@ -15,6 +15,7 @@
...
@@ -15,6 +15,7 @@
#include "raft.h"
#include "raft.h"
#include "raft_configuration.h"
#include "raft_configuration.h"
#include "raft_log.h"
#include "syncInt.h"
#include "syncInt.h"
#define RAFT_READ_LOG_MAX_NUM 100
#define RAFT_READ_LOG_MAX_NUM 100
...
@@ -120,14 +121,19 @@ int32_t syncRaftTick(SSyncRaft* pRaft) {
...
@@ -120,14 +121,19 @@ int32_t syncRaftTick(SSyncRaft* pRaft) {
}
}
void
syncRaftBecomeFollower
(
SSyncRaft
*
pRaft
,
SyncTerm
term
,
SyncNodeId
leaderId
)
{
void
syncRaftBecomeFollower
(
SSyncRaft
*
pRaft
,
SyncTerm
term
,
SyncNodeId
leaderId
)
{
convertClear
(
pRaft
);
pRaft
->
stepFp
=
stepFollower
;
pRaft
->
stepFp
=
stepFollower
;
resetRaft
(
pRaft
,
term
);
resetRaft
(
pRaft
,
term
);
pRaft
->
tickFp
=
tickElection
;
pRaft
->
tickFp
=
tickElection
;
pRaft
->
leaderId
=
leaderId
;
pRaft
->
leaderId
=
leaderId
;
pRaft
->
state
=
TAOS_SYNC_ROLE_FOLLOWER
;
pRaft
->
state
=
TAOS_SYNC_ROLE_FOLLOWER
;
syncInfo
(
"[%d:%d] became followe at term %"
PRId64
""
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
pRaft
->
term
);
}
}
void
syncRaftBecomePreCandidate
(
SSyncRaft
*
pRaft
)
{
void
syncRaftBecomePreCandidate
(
SSyncRaft
*
pRaft
)
{
convertClear
(
pRaft
);
memset
(
pRaft
->
candidateState
.
votes
,
SYNC_RAFT_VOTE_RESP_UNKNOWN
,
sizeof
(
SyncRaftVoteRespType
)
*
TSDB_MAX_REPLICA
);
/**
/**
* Becoming a pre-candidate changes our step functions and state,
* Becoming a pre-candidate changes our step functions and state,
* but doesn't change anything else. In particular it does not increase
* but doesn't change anything else. In particular it does not increase
...
@@ -137,10 +143,13 @@ void syncRaftBecomePreCandidate(SSyncRaft* pRaft) {
...
@@ -137,10 +143,13 @@ void syncRaftBecomePreCandidate(SSyncRaft* pRaft) {
pRaft
->
tickFp
=
tickElection
;
pRaft
->
tickFp
=
tickElection
;
pRaft
->
state
=
TAOS_SYNC_ROLE_CANDIDATE
;
pRaft
->
state
=
TAOS_SYNC_ROLE_CANDIDATE
;
pRaft
->
candidateState
.
inPreVote
=
true
;
pRaft
->
candidateState
.
inPreVote
=
true
;
syncInfo
(
"[%d:%d] became pre-candidate at term %
d
"
PRId64
""
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
pRaft
->
term
);
syncInfo
(
"[%d:%d] became pre-candidate at term %"
PRId64
""
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
pRaft
->
term
);
}
}
void
syncRaftBecomeCandidate
(
SSyncRaft
*
pRaft
)
{
void
syncRaftBecomeCandidate
(
SSyncRaft
*
pRaft
)
{
convertClear
(
pRaft
);
memset
(
pRaft
->
candidateState
.
votes
,
SYNC_RAFT_VOTE_RESP_UNKNOWN
,
sizeof
(
SyncRaftVoteRespType
)
*
TSDB_MAX_REPLICA
);
pRaft
->
candidateState
.
inPreVote
=
false
;
pRaft
->
candidateState
.
inPreVote
=
false
;
pRaft
->
stepFp
=
stepCandidate
;
pRaft
->
stepFp
=
stepCandidate
;
// become candidate make term+1
// become candidate make term+1
...
@@ -148,7 +157,7 @@ void syncRaftBecomeCandidate(SSyncRaft* pRaft) {
...
@@ -148,7 +157,7 @@ void syncRaftBecomeCandidate(SSyncRaft* pRaft) {
pRaft
->
tickFp
=
tickElection
;
pRaft
->
tickFp
=
tickElection
;
pRaft
->
voteFor
=
pRaft
->
selfId
;
pRaft
->
voteFor
=
pRaft
->
selfId
;
pRaft
->
state
=
TAOS_SYNC_ROLE_CANDIDATE
;
pRaft
->
state
=
TAOS_SYNC_ROLE_CANDIDATE
;
syncInfo
(
"[%d:%d] became candidate at term %
d
"
PRId64
""
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
pRaft
->
term
);
syncInfo
(
"[%d:%d] became candidate at term %"
PRId64
""
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
pRaft
->
term
);
}
}
void
syncRaftBecomeLeader
(
SSyncRaft
*
pRaft
)
{
void
syncRaftBecomeLeader
(
SSyncRaft
*
pRaft
)
{
...
@@ -160,7 +169,11 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft) {
...
@@ -160,7 +169,11 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft) {
pRaft
->
state
=
TAOS_SYNC_ROLE_LEADER
;
pRaft
->
state
=
TAOS_SYNC_ROLE_LEADER
;
// TODO: check if there is pending config log
// TODO: check if there is pending config log
syncInfo
(
"[%d:%d] became leader at term %d"
PRId64
""
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
pRaft
->
term
);
syncInfo
(
"[%d:%d] became leader at term %"
PRId64
""
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
pRaft
->
term
);
}
void
syncRaftTriggerReplicate
(
SSyncRaft
*
pRaft
)
{
}
}
void
syncRaftRandomizedElectionTimeout
(
SSyncRaft
*
pRaft
)
{
void
syncRaftRandomizedElectionTimeout
(
SSyncRaft
*
pRaft
)
{
...
@@ -180,7 +193,7 @@ int syncRaftQuorum(SSyncRaft* pRaft) {
...
@@ -180,7 +193,7 @@ int syncRaftQuorum(SSyncRaft* pRaft) {
return
pRaft
->
cluster
.
replica
/
2
+
1
;
return
pRaft
->
cluster
.
replica
/
2
+
1
;
}
}
int
syncRaftNumOfGranted
(
SSyncRaft
*
pRaft
,
SyncNodeId
id
,
bool
preVote
,
bool
accept
)
{
int
syncRaftNumOfGranted
(
SSyncRaft
*
pRaft
,
SyncNodeId
id
,
bool
preVote
,
bool
accept
,
int
*
rejectNum
)
{
if
(
accept
)
{
if
(
accept
)
{
syncInfo
(
"[%d:%d] received (pre-vote %d) from %d at term %"
PRId64
""
,
syncInfo
(
"[%d:%d] received (pre-vote %d) from %d at term %"
PRId64
""
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
preVote
,
id
,
pRaft
->
term
);
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
preVote
,
id
,
pRaft
->
term
);
...
@@ -188,17 +201,20 @@ int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool acc
...
@@ -188,17 +201,20 @@ int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool acc
syncInfo
(
"[%d:%d] received rejection from %d at term %"
PRId64
""
,
syncInfo
(
"[%d:%d] received rejection from %d at term %"
PRId64
""
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
id
,
pRaft
->
term
);
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
id
,
pRaft
->
term
);
}
}
int
voteIndex
=
syncRaftConfigurationIndexOfVoter
(
pRaft
,
id
);
int
voteIndex
=
syncRaftConfigurationIndexOfVoter
(
pRaft
,
id
);
assert
(
voteIndex
<
pRaft
->
cluster
.
replica
&&
voteIndex
>=
0
);
assert
(
voteIndex
<
pRaft
->
cluster
.
replica
&&
voteIndex
>=
0
);
assert
(
pRaft
->
candidateState
.
votes
[
voteIndex
]
==
SYNC_RAFT_VOTE_RESP_UNKNOWN
);
pRaft
->
candidateState
.
votes
[
voteIndex
]
=
accept
;
pRaft
->
candidateState
.
votes
[
voteIndex
]
=
accept
?
SYNC_RAFT_VOTE_RESP_GRANT
:
SYNC_RAFT_VOTE_RESP_REJECT
;
int
granted
=
0
;
int
granted
=
0
,
rejected
=
0
;
int
i
;
int
i
;
for
(
i
=
0
;
i
<
pRaft
->
cluster
.
replica
;
++
i
)
{
for
(
i
=
0
;
i
<
pRaft
->
cluster
.
replica
;
++
i
)
{
if
(
pRaft
->
candidateState
.
votes
[
i
])
granted
++
;
if
(
pRaft
->
candidateState
.
votes
[
i
]
==
SYNC_RAFT_VOTE_RESP_GRANT
)
granted
++
;
else
if
(
pRaft
->
candidateState
.
votes
[
i
]
==
SYNC_RAFT_VOTE_RESP_REJECT
)
rejected
++
;
}
}
if
(
rejectNum
)
*
rejectNum
=
rejected
;
return
granted
;
return
granted
;
}
}
...
@@ -262,8 +278,20 @@ static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
...
@@ -262,8 +278,20 @@ static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
}
}
static
int
stepCandidate
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
)
{
static
int
stepCandidate
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
)
{
convertClear
(
pRaft
);
/**
memset
(
pRaft
->
candidateState
.
votes
,
0
,
sizeof
(
bool
)
*
TSDB_MAX_REPLICA
);
* Only handle vote responses corresponding to our candidacy (while in
* StateCandidate, we may get stale MsgPreVoteResp messages in this term from
* our pre-candidate state).
**/
RaftMessageType
msgType
=
pMsg
->
msgType
;
if
(
msgType
==
RAFT_MSG_INTERNAL_PROP
)
{
return
0
;
}
if
(
msgType
==
RAFT_MSG_VOTE_RESP
)
{
return
0
;
}
return
0
;
return
0
;
}
}
...
...
source/libs/sync/src/raft_election.c
0 → 100644
浏览文件 @
c25d174f
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncInt.h"
#include "raft.h"
#include "raft_message.h"
void
syncRaftStartElection
(
SSyncRaft
*
pRaft
,
SyncRaftElectionType
cType
)
{
SyncTerm
term
;
bool
preVote
;
RaftMessageType
voteMsgType
;
if
(
cType
==
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
)
{
syncRaftBecomePreCandidate
(
pRaft
);
preVote
=
true
;
// PreVote RPCs are sent for the next term before we've incremented r.Term.
term
=
pRaft
->
term
+
1
;
}
else
{
syncRaftBecomeCandidate
(
pRaft
);
voteMsgType
=
RAFT_MSG_VOTE
;
term
=
pRaft
->
term
;
preVote
=
false
;
}
int
quorum
=
syncRaftQuorum
(
pRaft
);
int
granted
=
syncRaftNumOfGranted
(
pRaft
,
pRaft
->
selfId
,
preVote
,
true
,
NULL
);
if
(
quorum
<=
granted
)
{
/**
* We won the election after voting for ourselves (which must mean that
* this is a single-node cluster). Advance to the next state.
**/
if
(
cType
==
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
)
{
syncRaftStartElection
(
pRaft
,
SYNC_RAFT_CAMPAIGN_ELECTION
);
}
else
{
syncRaftBecomeLeader
(
pRaft
);
}
return
;
}
// broadcast vote message to other peers
int
i
;
SyncIndex
lastIndex
=
syncRaftLogLastIndex
(
pRaft
->
log
);
SyncTerm
lastTerm
=
syncRaftLogLastTerm
(
pRaft
->
log
);
for
(
i
=
0
;
i
<
pRaft
->
cluster
.
replica
;
++
i
)
{
if
(
i
==
pRaft
->
cluster
.
selfIndex
)
{
continue
;
}
SyncNodeId
nodeId
=
pRaft
->
cluster
.
nodeInfo
[
i
].
nodeId
;
SSyncMessage
*
pMsg
=
syncNewVoteMsg
(
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
nodeId
,
term
,
cType
,
lastIndex
,
lastTerm
);
if
(
pMsg
==
NULL
)
{
continue
;
}
syncInfo
(
"[%d:%d] [logterm: %"
PRId64
", index: %d] sent %d request to %d at term %"
PRId64
""
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
lastTerm
,
lastIndex
,
voteMsgType
,
nodeId
,
pRaft
->
term
);
pRaft
->
io
.
send
(
pMsg
,
&
(
pRaft
->
cluster
.
nodeInfo
[
i
]));
}
}
\ No newline at end of file
source/libs/sync/src/raft_handle_election_message.c
浏览文件 @
c25d174f
...
@@ -15,84 +15,29 @@
...
@@ -15,84 +15,29 @@
#include "syncInt.h"
#include "syncInt.h"
#include "raft.h"
#include "raft.h"
#include "raft_log.h"
#include "raft_message.h"
#include "raft_message.h"
static
void
campaign
(
SSyncRaft
*
pRaft
,
SyncRaftCampaignType
cType
);
int
syncRaftHandleElectionMessage
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
)
{
void
syncRaftHandleElectionMessage
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
)
{
if
(
pRaft
->
state
==
TAOS_SYNC_ROLE_LEADER
)
{
if
(
pRaft
->
state
==
TAOS_SYNC_ROLE_LEADER
)
{
syncDebug
(
"%d ignoring RAFT_MSG_INTERNAL_ELECTION because already leader"
,
pRaft
->
selfId
);
syncDebug
(
"%d ignoring RAFT_MSG_INTERNAL_ELECTION because already leader"
,
pRaft
->
selfId
);
return
;
return
0
;
}
}
// if there is pending uncommitted config,cannot
campaig
n
// if there is pending uncommitted config,cannot
start electio
n
if
(
syncRaftLogNumOfPendingConf
(
pRaft
->
log
)
>
0
&&
syncRaftHasUnappliedLog
(
pRaft
->
log
))
{
if
(
syncRaftLogNumOfPendingConf
(
pRaft
->
log
)
>
0
&&
syncRaftHasUnappliedLog
(
pRaft
->
log
))
{
syncWarn
(
"[%d:%d] cannot
campaig
n at term %"
PRId64
" since there are still pending configuration changes to apply"
,
syncWarn
(
"[%d:%d] cannot
syncRaftStartElectio
n at term %"
PRId64
" since there are still pending configuration changes to apply"
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
pRaft
->
term
);
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
pRaft
->
term
);
return
;
return
0
;
}
}
syncInfo
(
"[%d:%d] is starting a new election at term %"
PRId64
""
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
pRaft
->
term
);
syncInfo
(
"[%d:%d] is starting a new election at term %"
PRId64
""
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
pRaft
->
term
);
if
(
pRaft
->
preVote
)
{
if
(
pRaft
->
preVote
)
{
campaig
n
(
pRaft
,
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
);
syncRaftStartElectio
n
(
pRaft
,
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
);
}
else
{
}
else
{
campaign
(
pRaft
,
SYNC_RAFT_CAMPAIGN_ELECTION
);
syncRaftStartElection
(
pRaft
,
SYNC_RAFT_CAMPAIGN_ELECTION
);
}
}
static
void
campaign
(
SSyncRaft
*
pRaft
,
SyncRaftCampaignType
cType
)
{
SyncTerm
term
;
bool
preVote
;
RaftMessageType
voteMsgType
;
if
(
cType
==
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
)
{
syncRaftBecomePreCandidate
(
pRaft
);
preVote
=
true
;
// PreVote RPCs are sent for the next term before we've incremented r.Term.
term
=
pRaft
->
term
+
1
;
}
else
{
syncRaftBecomeCandidate
(
pRaft
);
voteMsgType
=
RAFT_MSG_VOTE
;
term
=
pRaft
->
term
;
preVote
=
false
;
}
int
quorum
=
syncRaftQuorum
(
pRaft
);
int
granted
=
syncRaftNumOfGranted
(
pRaft
,
pRaft
->
selfId
,
preVote
,
true
);
if
(
quorum
<=
granted
)
{
/**
* We won the election after voting for ourselves (which must mean that
* this is a single-node cluster). Advance to the next state.
**/
if
(
cType
==
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
)
{
campaign
(
pRaft
,
SYNC_RAFT_CAMPAIGN_ELECTION
);
}
else
{
syncRaftBecomeLeader
(
pRaft
);
}
return
;
}
}
// broadcast vote message to other peers
return
0
;
int
i
;
}
SyncIndex
lastIndex
=
syncRaftLogLastIndex
(
pRaft
->
log
);
SyncTerm
lastTerm
=
syncRaftLogLastTerm
(
pRaft
->
log
);
for
(
i
=
0
;
i
<
pRaft
->
cluster
.
replica
;
++
i
)
{
if
(
i
==
pRaft
->
cluster
.
selfIndex
)
{
continue
;
}
SyncNodeId
nodeId
=
pRaft
->
cluster
.
nodeInfo
[
i
].
nodeId
;
SSyncMessage
*
pMsg
=
syncNewVoteMsg
(
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
nodeId
,
term
,
cType
,
lastIndex
,
lastTerm
);
if
(
pMsg
==
NULL
)
{
continue
;
}
syncInfo
(
"[%d:%d] [logterm: %"
PRId64
", index: %d] sent %d request to %d at term %"
PRId64
""
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
lastTerm
,
lastIndex
,
voteMsgType
,
nodeId
,
pRaft
->
term
);
pRaft
->
io
.
send
(
pMsg
,
&
(
pRaft
->
cluster
.
nodeInfo
[
i
]));
}
}
\ No newline at end of file
source/libs/sync/src/raft_handle_vote_resp_message.c
0 → 100644
浏览文件 @
c25d174f
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncInt.h"
#include "raft.h"
#include "raft_message.h"
int
syncRaftHandleVoteRespMessage
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
)
{
int
granted
,
rejected
;
int
quorum
;
int
voterIndex
;
voterIndex
=
syncRaftConfigurationIndexOfVoter
(
pRaft
,
pMsg
->
from
);
if
(
voterIndex
==
-
1
)
{
syncError
(
"[%d:%d] recv vote resp from unknown server %d"
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
pMsg
->
from
);
return
0
;
}
if
(
pRaft
->
state
!=
TAOS_SYNC_ROLE_CANDIDATE
)
{
syncError
(
"[%d:%d] is not candidate, ignore vote resp"
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
);
return
0
;
}
granted
=
syncRaftNumOfGranted
(
pRaft
,
pMsg
->
from
,
pMsg
->
voteResp
.
cType
==
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
,
!
pMsg
->
voteResp
.
reject
,
&
rejected
);
quorum
=
syncRaftQuorum
(
pRaft
);
syncInfo
(
"[%d:%d] [quorum:%d] has received %d votes and %d vote rejections"
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
quorum
,
granted
,
rejected
);
if
(
granted
>=
quorum
)
{
if
(
pMsg
->
voteResp
.
cType
==
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
)
{
syncRaftStartElection
(
pRaft
,
SYNC_RAFT_CAMPAIGN_ELECTION
);
}
else
{
syncRaftBecomeLeader
(
pRaft
);
syncRaftTriggerReplicate
(
pRaft
);
}
return
0
;
}
else
if
(
rejected
==
quorum
)
{
syncRaftBecomeFollower
(
pRaft
,
pRaft
->
term
,
SYNC_NON_NODE_ID
);
}
return
0
;
}
\ No newline at end of file
source/libs/sync/src/raft_progress.c
浏览文件 @
c25d174f
...
@@ -14,7 +14,7 @@
...
@@ -14,7 +14,7 @@
*/
*/
#include "raft.h"
#include "raft.h"
#include "raft_
unstable_
log.h"
#include "raft_log.h"
#include "raft_progress.h"
#include "raft_progress.h"
#include "sync.h"
#include "sync.h"
#include "syncInt.h"
#include "syncInt.h"
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录