Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ccf8f14f
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ccf8f14f
编写于
11月 08, 2021
作者:
L
lichuang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-10645][raft]<feature>add raft progress tracker
上级
85ae64a1
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
348 addition
and
83 deletion
+348
-83
source/libs/sync/inc/raft.h
source/libs/sync/inc/raft.h
+45
-30
source/libs/sync/inc/raft_log.h
source/libs/sync/inc/raft_log.h
+12
-0
source/libs/sync/inc/raft_progress.h
source/libs/sync/inc/raft_progress.h
+15
-11
source/libs/sync/inc/sync_raft_progress_tracker.h
source/libs/sync/inc/sync_raft_progress_tracker.h
+100
-0
source/libs/sync/inc/sync_raft_quorum_joint.h
source/libs/sync/inc/sync_raft_quorum_joint.h
+30
-0
source/libs/sync/inc/sync_type.h
source/libs/sync/inc/sync_type.h
+10
-0
source/libs/sync/src/raft.c
source/libs/sync/src/raft.c
+67
-11
source/libs/sync/src/raft_handle_append_entries_message.c
source/libs/sync/src/raft_handle_append_entries_message.c
+2
-2
source/libs/sync/src/raft_handle_vote_message.c
source/libs/sync/src/raft_handle_vote_message.c
+1
-1
source/libs/sync/src/raft_log.c
source/libs/sync/src/raft_log.c
+4
-0
source/libs/sync/src/raft_progress.c
source/libs/sync/src/raft_progress.c
+16
-28
source/libs/sync/src/raft_replication.c
source/libs/sync/src/raft_replication.c
+5
-0
source/libs/sync/src/sync_raft_progress_tracker.c
source/libs/sync/src/sync_raft_progress_tracker.c
+41
-0
未找到文件。
source/libs/sync/inc/raft.h
浏览文件 @
ccf8f14f
...
...
@@ -20,17 +20,12 @@
#include "sync_type.h"
#include "raft_message.h"
typedef
struct
SSyncRaftProgress
SSyncRaftProgress
;
typedef
struct
RaftLeaderState
{
int
nProgress
;
SSyncRaftProgress
*
progress
;
}
RaftLeaderState
;
typedef
struct
RaftCandidateState
{
/* votes results */
SyncRaftVoteRespType
votes
[
TSDB_MAX_REPLICA
];
/* true if in pre-vote phase */
bool
inPreVote
;
}
RaftCandidateState
;
...
...
@@ -47,17 +42,34 @@ struct SSyncRaft {
// owner sync node
SSyncNode
*
pNode
;
int
maxMsgSize
;
SSyncCluster
cluster
;
SyncNodeId
selfId
;
SyncGroupId
selfGroupId
;
SSyncRaftIOMethods
io
;
SSyncFSM
fsm
;
SSyncLogStore
logStore
;
SStateManager
stateManager
;
union
{
RaftLeaderState
leaderState
;
RaftCandidateState
candidateState
;
};
SyncTerm
term
;
SyncNodeId
voteFor
;
SyncNodeId
selfId
;
SyncGroupId
selfGroupId
;
SSyncRaftLog
*
log
;
int
maxMsgSize
;
SSyncRaftProgressTracker
*
tracker
;
ESyncRole
state
;
// isLearner is true if the local raft node is a learner.
bool
isLearner
;
/**
* the leader id
...
...
@@ -70,15 +82,23 @@ struct SSyncRaft {
**/
SyncNodeId
leadTransferee
;
/**
* New configuration is ignored if there exists unapplied configuration.
/**
* Only one conf change may be pending (in the log, but not yet
* applied) at a time. This is enforced via pendingConfIndex, which
* is set to a value >= the log index of the latest pending
* configuration change (if any). Config changes are only allowed to
* be proposed if the leader's applied index is greater than this
* value.
**/
bool
hasPendingConf
;
SSyncCluster
cluster
;
ESyncRole
state
;
SyncIndex
pendingConfigIndex
;
/**
* an estimate of the size of the uncommitted tail of the Raft log. Used to
* prevent unbounded log growth. Only maintained by the leader. Reset on
* term changes.
**/
uint32_t
uncommittedSize
;
/**
* number of ticks since it reached last electionTimeout when it is leader
* or candidate.
...
...
@@ -96,24 +116,19 @@ struct SSyncRaft {
// current tick count since start up
uint32_t
currentTick
;
// election timeout tick(random in [3:6] tick)
uint16_t
electionTimeoutTick
;
// heartbeat timeout tick(default: 1 tick)
uint16_t
heartbeatTimeoutTick
;
bool
preVote
;
bool
checkQuorum
;
SSyncRaftIOMethods
io
;
int
heartbeatTimeout
;
int
electionTimeout
;
// union different state data
union
{
RaftLeaderState
leaderState
;
RaftCandidateState
candidateState
;
};
SSyncRaftLog
*
lo
g
;
/**
* randomizedElectionTimeout is a random number between
* [electiontimeout, 2 * electiontimeout - 1]. It gets reset
* when raft changes its state to follower or candidate.
**/
int
randomizedElectionTimeout
;
bool
disableProposalForwardin
g
;
SyncRaftStepFp
stepFp
;
...
...
source/libs/sync/inc/raft_log.h
浏览文件 @
ccf8f14f
...
...
@@ -19,8 +19,18 @@
#include "sync.h"
#include "sync_type.h"
typedef
enum
SyncEntryType
{
SYNC_ENTRY_TYPE_LOG
=
1
,
}
SyncEntryType
;
struct
SSyncRaftEntry
{
SyncTerm
term
;
SyncIndex
index
;
SyncEntryType
type
;
SSyncBuffer
buffer
;
};
struct
SSyncRaftLog
{
...
...
@@ -49,6 +59,8 @@ bool syncRaftHasUnappliedLog(SSyncRaftLog* pLog);
SyncTerm
syncRaftLogTermOf
(
SSyncRaftLog
*
pLog
,
SyncIndex
index
);
int
syncRaftLogAppend
(
SSyncRaftLog
*
pLog
,
SSyncRaftEntry
*
pEntries
,
int
n
);
int
syncRaftLogAcquire
(
SSyncRaftLog
*
pLog
,
SyncIndex
index
,
int
maxMsgSize
,
SSyncRaftEntry
**
ppEntries
,
int
*
n
);
...
...
source/libs/sync/inc/raft_progress.h
浏览文件 @
ccf8f14f
...
...
@@ -73,6 +73,8 @@ typedef enum RaftProgressState {
* progresses of all followers, and sends entries to the follower based on its progress.
**/
struct
SSyncRaftProgress
{
SyncNodeId
id
;
SyncIndex
nextIndex
;
SyncIndex
matchIndex
;
...
...
@@ -108,16 +110,18 @@ struct SSyncRaftProgress {
* flow control sliding window
**/
SSyncRaftInflights
inflights
;
// IsLearner is true if this progress is tracked for a learner.
bool
isLearner
;
};
int
syncRaftProgressCreate
(
SSyncRaft
*
pRaft
);
//int syncRaftProgressRecreate(SSyncRaft* pRaft, const RaftConfiguration* configuration);
void
syncRaftInitProgress
(
SSyncRaft
*
pRaft
,
SSyncRaftProgress
*
progress
);
/**
* syncRaftProgressMaybeUpdate returns false if the given lastIndex index comes from i-th node's log.
* Otherwise it updates the progress and returns true.
**/
bool
syncRaftProgressMaybeUpdate
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
lastIndex
);
bool
syncRaftProgressMaybeUpdate
(
SSyncRaft
Progress
*
progress
,
SyncIndex
lastIndex
);
static
FORCE_INLINE
void
syncRaftProgressOptimisticNextIndex
(
SSyncRaftProgress
*
progress
,
SyncIndex
nextIndex
)
{
progress
->
nextIndex
=
nextIndex
+
1
;
...
...
@@ -127,7 +131,7 @@ static FORCE_INLINE void syncRaftProgressOptimisticNextIndex(SSyncRaftProgress*
* syncRaftProgressMaybeDecrTo returns false if the given to index comes from an out of order message.
* Otherwise it decreases the progress next index to min(rejected, last) and returns true.
**/
bool
syncRaftProgressMaybeDecrTo
(
SSyncRaft
*
pRaft
,
int
i
,
bool
syncRaftProgressMaybeDecrTo
(
SSyncRaft
Progress
*
progress
,
SyncIndex
rejected
,
SyncIndex
lastIndex
);
/**
...
...
@@ -166,20 +170,20 @@ static FORCE_INLINE bool syncRaftProgressUpdateSendTick(SSyncRaftProgress* progr
return
progress
->
lastSendTick
=
current
;
}
void
syncRaftProgressFailure
(
SSyncRaft
*
pRaft
,
int
i
);
void
syncRaftProgressFailure
(
SSyncRaft
Progress
*
progress
);
bool
syncRaftProgressNeedAbortSnapshot
(
SSyncRaft
*
pRaft
,
int
i
);
bool
syncRaftProgressNeedAbortSnapshot
(
SSyncRaft
Progress
*
progress
);
/**
* return true if
i-th node
's log is up-todate
* return true if
progress
's log is up-todate
**/
bool
syncRaftProgressIsUptodate
(
SSyncRaft
*
pRaft
,
int
i
);
bool
syncRaftProgressIsUptodate
(
SSyncRaft
*
pRaft
,
SSyncRaftProgress
*
progress
);
void
syncRaftProgressBecomeProbe
(
SSyncRaft
*
pRaft
,
int
i
);
void
syncRaftProgressBecomeProbe
(
SSyncRaft
Progress
*
progress
);
void
syncRaftProgressBecomeReplicate
(
SSyncRaft
*
pRaft
,
int
i
);
void
syncRaftProgressBecomeReplicate
(
SSyncRaft
Progress
*
progress
);
void
syncRaftProgressBecomeSnapshot
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
snapshotIndex
);
void
syncRaftProgressBecomeSnapshot
(
SSyncRaft
Progress
*
progress
,
SyncIndex
snapshotIndex
);
/* inflights APIs */
int
syncRaftInflightReset
(
SSyncRaftInflights
*
inflights
);
...
...
source/libs/sync/inc/sync_raft_progress_tracker.h
0 → 100644
浏览文件 @
ccf8f14f
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H
#define _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H
#include "sync_type.h"
#include "sync_raft_quorum_joint.h"
#include "raft_progress.h"
struct
SSyncRaftProgressTrackerConfig
{
SSyncRaftQuorumJointConfig
voters
;
/** AutoLeave is true if the configuration is joint and a transition to the
* incoming configuration should be carried out automatically by Raft when
* this is possible. If false, the configuration will be joint until the
* application initiates the transition manually.
**/
bool
autoLeave
;
/**
* Learners is a set of IDs corresponding to the learners active in the
* current configuration.
*
* Invariant: Learners and Voters does not intersect, i.e. if a peer is in
* either half of the joint config, it can't be a learner; if it is a
* learner it can't be in either half of the joint config. This invariant
* simplifies the implementation since it allows peers to have clarity about
* its current role without taking into account joint consensus.
**/
SyncNodeId
learners
[
TSDB_MAX_REPLICA
];
/**
* When we turn a voter into a learner during a joint consensus transition,
* we cannot add the learner directly when entering the joint state. This is
* because this would violate the invariant that the intersection of
* voters and learners is empty. For example, assume a Voter is removed and
* immediately re-added as a learner (or in other words, it is demoted):
*
* Initially, the configuration will be
*
* voters: {1 2 3}
* learners: {}
*
* and we want to demote 3. Entering the joint configuration, we naively get
*
* voters: {1 2} & {1 2 3}
* learners: {3}
*
* but this violates the invariant (3 is both voter and learner). Instead,
* we get
*
* voters: {1 2} & {1 2 3}
* learners: {}
* next_learners: {3}
*
* Where 3 is now still purely a voter, but we are remembering the intention
* to make it a learner upon transitioning into the final configuration:
*
* voters: {1 2}
* learners: {3}
* next_learners: {}
*
* Note that next_learners is not used while adding a learner that is not
* also a voter in the joint config. In this case, the learner is added
* right away when entering the joint configuration, so that it is caught up
* as soon as possible.
**/
SyncNodeId
learnersNext
[
TSDB_MAX_REPLICA
];
};
struct
SSyncRaftProgressTracker
{
SSyncRaftProgressTrackerConfig
config
;
SSyncRaftProgress
progressMap
[
TSDB_MAX_REPLICA
];
SyncRaftVoteRespType
votes
[
TSDB_MAX_REPLICA
];
int
maxInflight
;
};
SSyncRaftProgressTracker
*
syncRaftOpenProgressTracker
();
void
syncRaftResetVotes
(
SSyncRaftProgressTracker
*
);
typedef
void
(
*
visitProgressFp
)(
SSyncRaftProgress
*
progress
,
void
*
arg
);
void
syncRaftProgressVisit
(
SSyncRaftProgressTracker
*
,
visitProgressFp
visit
,
void
*
arg
);
#endif
/* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */
source/libs/sync/inc/sync_raft_quorum_joint.h
0 → 100644
浏览文件 @
ccf8f14f
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H
#define _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H
#include "taosdef.h"
#include "sync.h"
/**
* JointConfig is a configuration of two groups of (possibly overlapping)
* majority configurations. Decisions require the support of both majorities.
**/
typedef
struct
SSyncRaftQuorumJointConfig
{
SyncNodeId
majorityConfig
[
2
][
TSDB_MAX_REPLICA
];
}
SSyncRaftQuorumJointConfig
;
#endif
/* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */
source/libs/sync/inc/sync_type.h
浏览文件 @
ccf8f14f
...
...
@@ -16,6 +16,9 @@
#ifndef _TD_LIBS_SYNC_TYPE_H
#define _TD_LIBS_SYNC_TYPE_H
#include <stdint.h>
#include "osMath.h"
#define SYNC_NON_NODE_ID -1
#define SYNC_NON_TERM 0
...
...
@@ -24,10 +27,16 @@ typedef uint32_t SyncTick;
typedef
struct
SSyncRaft
SSyncRaft
;
typedef
struct
SSyncRaftProgress
SSyncRaftProgress
;
typedef
struct
SSyncRaftProgressTrackerConfig
SSyncRaftProgressTrackerConfig
;
typedef
struct
SSyncRaftProgressTracker
SSyncRaftProgressTracker
;
typedef
struct
SSyncRaftLog
SSyncRaftLog
;
typedef
struct
SSyncRaftEntry
SSyncRaftEntry
;
#if 0
#ifndef MIN
#define MIN(x, y) (((x) < (y)) ? (x) : (y))
#endif
...
...
@@ -35,6 +44,7 @@ typedef struct SSyncRaftEntry SSyncRaftEntry;
#ifndef MAX
#define MAX(x, y) (((x) > (y)) ? (x) : (y))
#endif
#endif
typedef
enum
{
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
=
0
,
...
...
source/libs/sync/src/raft.c
浏览文件 @
ccf8f14f
...
...
@@ -17,6 +17,7 @@
#include "raft_configuration.h"
#include "raft_log.h"
#include "raft_replication.h"
#include "sync_raft_progress_tracker.h"
#include "syncInt.h"
#define RAFT_READ_LOG_MAX_NUM 100
...
...
@@ -35,6 +36,9 @@ static int triggerAll(SSyncRaft* pRaft);
static
void
tickElection
(
SSyncRaft
*
pRaft
);
static
void
tickHeartbeat
(
SSyncRaft
*
pRaft
);
static
void
appendEntries
(
SSyncRaft
*
pRaft
,
SSyncRaftEntry
*
entries
,
int
n
);
static
bool
maybeCommit
(
SSyncRaft
*
pRaft
);
static
void
abortLeaderTransfer
(
SSyncRaft
*
pRaft
);
static
void
resetRaft
(
SSyncRaft
*
pRaft
,
SyncTerm
term
);
...
...
@@ -59,6 +63,12 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
logStore
=
&
(
pRaft
->
logStore
);
fsm
=
&
(
pRaft
->
fsm
);
// init progress tracker
pRaft
->
tracker
=
syncRaftOpenProgressTracker
();
if
(
pRaft
->
tracker
==
NULL
)
{
return
-
1
;
}
// open raft log
if
((
pRaft
->
log
=
syncRaftLogOpen
())
==
NULL
)
{
return
-
1
;
...
...
@@ -88,7 +98,7 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
}
assert
(
initIndex
==
serverState
.
commitIndex
);
pRaft
->
heartbeatTimeoutTick
=
1
;
//
pRaft->heartbeatTimeoutTick = 1;
syncRaftBecomeFollower
(
pRaft
,
pRaft
->
term
,
SYNC_NON_NODE_ID
);
...
...
@@ -137,7 +147,7 @@ void syncRaftBecomeFollower(SSyncRaft* pRaft, SyncTerm term, SyncNodeId leaderId
void
syncRaftBecomePreCandidate
(
SSyncRaft
*
pRaft
)
{
convertClear
(
pRaft
);
memset
(
pRaft
->
candidateState
.
votes
,
SYNC_RAFT_VOTE_RESP_UNKNOWN
,
sizeof
(
SyncRaftVoteRespType
)
*
TSDB_MAX_REPLICA
);
/**
* Becoming a pre-candidate changes our step functions and state,
* but doesn't change anything else. In particular it does not increase
...
...
@@ -152,7 +162,6 @@ void syncRaftBecomePreCandidate(SSyncRaft* pRaft) {
void
syncRaftBecomeCandidate
(
SSyncRaft
*
pRaft
)
{
convertClear
(
pRaft
);
memset
(
pRaft
->
candidateState
.
votes
,
SYNC_RAFT_VOTE_RESP_UNKNOWN
,
sizeof
(
SyncRaftVoteRespType
)
*
TSDB_MAX_REPLICA
);
pRaft
->
candidateState
.
inPreVote
=
false
;
pRaft
->
stepFp
=
stepCandidate
;
...
...
@@ -176,14 +185,22 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft) {
if
(
nPendingConf
>
1
)
{
syncFatal
(
"unexpected multiple uncommitted config entry"
);
}
if
(
nPendingConf
==
1
)
{
pRaft
->
hasPendingConf
=
true
;
}
syncInfo
(
"[%d:%d] became leader at term %"
PRId64
""
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
pRaft
->
term
);
// after become leader, send initial heartbeat
syncRaftTriggerHeartbeat
(
pRaft
);
// after become leader, send a no-op log
SSyncRaftEntry
*
entry
=
(
SSyncRaftEntry
*
)
malloc
(
sizeof
(
SSyncRaftEntry
));
if
(
entry
==
NULL
)
{
return
;
}
*
entry
=
(
SSyncRaftEntry
)
{
.
buffer
=
(
SSyncBuffer
)
{
.
data
=
NULL
,
.
len
=
0
,
}
};
appendEntries
(
pRaft
,
entry
,
1
);
//syncRaftTriggerHeartbeat(pRaft);
}
void
syncRaftTriggerHeartbeat
(
SSyncRaft
*
pRaft
)
{
...
...
@@ -192,7 +209,7 @@ void syncRaftTriggerHeartbeat(SSyncRaft* pRaft) {
void
syncRaftRandomizedElectionTimeout
(
SSyncRaft
*
pRaft
)
{
// electionTimeoutTick in [3,6] tick
pRaft
->
electionTimeoutTick
=
taosRand
()
%
4
+
3
;
pRaft
->
randomizedElectionTimeout
=
taosRand
()
%
4
+
3
;
}
bool
syncRaftIsPromotable
(
SSyncRaft
*
pRaft
)
{
...
...
@@ -200,7 +217,7 @@ bool syncRaftIsPromotable(SSyncRaft* pRaft) {
}
bool
syncRaftIsPastElectionTimeout
(
SSyncRaft
*
pRaft
)
{
return
pRaft
->
electionElapsed
>=
pRaft
->
electionTimeoutTick
;
return
pRaft
->
electionElapsed
>=
pRaft
->
randomizedElectionTimeout
;
}
int
syncRaftQuorum
(
SSyncRaft
*
pRaft
)
{
...
...
@@ -208,6 +225,7 @@ int syncRaftQuorum(SSyncRaft* pRaft) {
}
int
syncRaftNumOfGranted
(
SSyncRaft
*
pRaft
,
SyncNodeId
id
,
bool
preVote
,
bool
accept
,
int
*
rejectNum
)
{
/*
if (accept) {
syncInfo("[%d:%d] received (pre-vote %d) from %d at term %" PRId64 "",
pRaft->selfGroupId, pRaft->selfId, preVote, id, pRaft->term);
...
...
@@ -230,6 +248,8 @@ int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool acc
if (rejectNum) *rejectNum = rejected;
return granted;
*/
return
0
;
}
/**
...
...
@@ -375,6 +395,34 @@ static void tickHeartbeat(SSyncRaft* pRaft) {
}
static
void
appendEntries
(
SSyncRaft
*
pRaft
,
SSyncRaftEntry
*
entries
,
int
n
)
{
SyncIndex
lastIndex
=
syncRaftLogLastIndex
(
pRaft
->
log
);
SyncTerm
term
=
pRaft
->
term
;
int
i
;
for
(
i
=
0
;
i
<
n
;
++
i
)
{
entries
[
i
].
term
=
term
;
entries
[
i
].
index
=
lastIndex
+
1
+
i
;
}
syncRaftLogAppend
(
pRaft
->
log
,
entries
,
n
);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
tracker
->
progressMap
[
pRaft
->
cluster
.
selfIndex
]);
syncRaftProgressMaybeUpdate
(
progress
,
lastIndex
);
// Regardless of maybeCommit's return, our caller will call bcastAppend.
maybeCommit
(
pRaft
);
}
/**
* maybeCommit attempts to advance the commit index. Returns true if
* the commit index changed (in which case the caller should call
* r.bcastAppend).
**/
static
bool
maybeCommit
(
SSyncRaft
*
pRaft
)
{
return
true
;
}
/**
* trigger I/O requests for newly appended log entries or heartbeats.
**/
...
...
@@ -395,6 +443,10 @@ static void abortLeaderTransfer(SSyncRaft* pRaft) {
pRaft
->
leadTransferee
=
SYNC_NON_NODE_ID
;
}
static
void
initProgress
(
SSyncRaftProgress
*
progress
,
void
*
arg
)
{
syncRaftInitProgress
((
SSyncRaft
*
)
arg
,
progress
);
}
static
void
resetRaft
(
SSyncRaft
*
pRaft
,
SyncTerm
term
)
{
if
(
pRaft
->
term
!=
term
)
{
pRaft
->
term
=
term
;
...
...
@@ -410,5 +462,9 @@ static void resetRaft(SSyncRaft* pRaft, SyncTerm term) {
abortLeaderTransfer
(
pRaft
);
pRaft
->
hasPendingConf
=
false
;
syncRaftResetVotes
(
pRaft
->
tracker
);
syncRaftProgressVisit
(
pRaft
->
tracker
,
initProgress
,
pRaft
);
pRaft
->
pendingConfigIndex
=
0
;
pRaft
->
uncommittedSize
=
0
;
}
source/libs/sync/src/raft_handle_append_entries_message.c
浏览文件 @
ccf8f14f
...
...
@@ -20,7 +20,7 @@
#include "raft_message.h"
int
syncRaftHandleAppendEntriesMessage
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
)
{
RaftMsg_Append_Entries
*
appendEntries
=
&
(
pMsg
->
appendEntries
);
const
RaftMsg_Append_Entries
*
appendEntries
=
&
(
pMsg
->
appendEntries
);
int
peerIndex
=
syncRaftConfigurationIndexOfNode
(
pRaft
,
pMsg
->
from
);
...
...
@@ -33,7 +33,7 @@ int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMs
return
0
;
}
RaftMsg_Append_Entries
*
appendResp
=
&
(
pMsg
->
appendResp
);
RaftMsg_Append_Entries
*
appendResp
=
&
(
p
Resp
Msg
->
appendResp
);
// ignore committed logs
if
(
syncRaftLogIsCommitted
(
pRaft
->
log
,
appendEntries
->
index
))
{
appendResp
->
index
=
pRaft
->
log
->
commitIndex
;
...
...
source/libs/sync/src/raft_handle_vote_message.c
浏览文件 @
ccf8f14f
...
...
@@ -36,7 +36,7 @@ int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
if
(
pRespMsg
==
NULL
)
{
return
0
;
}
syncInfo
(
"[%d:%d] [logterm: %"
PRId64
", index: %"
PRId64
", vote: %d] %s for %d"
\
syncInfo
(
"[%d:%d] [logterm: %"
PRId64
", index: %"
PRId64
", vote: %d] %s for %d"
\
"[logterm: %"
PRId64
", index: %"
PRId64
"] at term %"
PRId64
""
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
lastTerm
,
lastIndex
,
pRaft
->
voteFor
,
grant
?
"grant"
:
"reject"
,
...
...
source/libs/sync/src/raft_log.c
浏览文件 @
ccf8f14f
...
...
@@ -47,6 +47,10 @@ SyncTerm syncRaftLogTermOf(SSyncRaftLog* pLog, SyncIndex index) {
return
SYNC_NON_TERM
;
}
int
syncRaftLogAppend
(
SSyncRaftLog
*
pLog
,
SSyncRaftEntry
*
pEntries
,
int
n
)
{
}
int
syncRaftLogAcquire
(
SSyncRaftLog
*
pLog
,
SyncIndex
index
,
int
maxMsgSize
,
SSyncRaftEntry
**
ppEntries
,
int
*
n
)
{
return
0
;
...
...
source/libs/sync/src/raft_progress.c
浏览文件 @
ccf8f14f
...
...
@@ -40,9 +40,15 @@ int syncRaftProgressRecreate(SSyncRaft* pRaft, const RaftConfiguration* configur
}
*/
bool
syncRaftProgressMaybeUpdate
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
lastIndex
)
{
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
leaderState
.
progress
[
i
]);
void
syncRaftInitProgress
(
SSyncRaft
*
pRaft
,
SSyncRaftProgress
*
progress
)
{
*
progress
=
(
SSyncRaftProgress
)
{
.
matchIndex
=
progress
->
id
==
pRaft
->
selfId
?
syncRaftLogLastIndex
(
pRaft
->
log
)
:
0
,
.
nextIndex
=
syncRaftLogLastIndex
(
pRaft
->
log
)
+
1
,
//.inflights =
};
}
bool
syncRaftProgressMaybeUpdate
(
SSyncRaftProgress
*
progress
,
SyncIndex
lastIndex
)
{
bool
updated
=
false
;
if
(
progress
->
matchIndex
<
lastIndex
)
{
...
...
@@ -57,11 +63,8 @@ bool syncRaftProgressMaybeUpdate(SSyncRaft* pRaft, int i, SyncIndex lastIndex) {
return
updated
;
}
bool
syncRaftProgressMaybeDecrTo
(
SSyncRaft
*
pRaft
,
int
i
,
bool
syncRaftProgressMaybeDecrTo
(
SSyncRaft
Progress
*
progress
,
SyncIndex
rejected
,
SyncIndex
lastIndex
)
{
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
leaderState
.
progress
[
i
]);
if
(
progress
->
state
==
PROGRESS_REPLICATE
)
{
/**
* the rejection must be stale if the progress has matched and "rejected"
...
...
@@ -110,30 +113,19 @@ bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) {
}
}
void
syncRaftProgressFailure
(
SSyncRaft
*
pRaft
,
int
i
)
{
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
leaderState
.
progress
[
i
]);
void
syncRaftProgressFailure
(
SSyncRaftProgress
*
progress
)
{
progress
->
pendingSnapshotIndex
=
0
;
}
bool
syncRaftProgressNeedAbortSnapshot
(
SSyncRaft
*
pRaft
,
int
i
)
{
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
leaderState
.
progress
[
i
]);
bool
syncRaftProgressNeedAbortSnapshot
(
SSyncRaftProgress
*
progress
)
{
return
progress
->
state
==
PROGRESS_SNAPSHOT
&&
progress
->
matchIndex
>=
progress
->
pendingSnapshotIndex
;
}
bool
syncRaftProgressIsUptodate
(
SSyncRaft
*
pRaft
,
int
i
)
{
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
leaderState
.
progress
[
i
]);
bool
syncRaftProgressIsUptodate
(
SSyncRaft
*
pRaft
,
SSyncRaftProgress
*
progress
)
{
return
syncRaftLogLastIndex
(
pRaft
->
log
)
+
1
==
progress
->
nextIndex
;
}
void
syncRaftProgressBecomeProbe
(
SSyncRaft
*
pRaft
,
int
i
)
{
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
leaderState
.
progress
[
i
]);
void
syncRaftProgressBecomeProbe
(
SSyncRaftProgress
*
progress
)
{
/**
* If the original state is ProgressStateSnapshot, progress knows that
* the pending snapshot has been sent to this peer successfully, then
...
...
@@ -149,16 +141,12 @@ void syncRaftProgressBecomeProbe(SSyncRaft* pRaft, int i) {
}
}
void
syncRaftProgressBecomeReplicate
(
SSyncRaft
*
pRaft
,
int
i
)
{
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
leaderState
.
progress
[
i
]);
void
syncRaftProgressBecomeReplicate
(
SSyncRaftProgress
*
progress
)
{
resetProgressState
(
progress
,
PROGRESS_REPLICATE
);
progress
->
nextIndex
=
progress
->
matchIndex
+
1
;
}
void
syncRaftProgressBecomeSnapshot
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
snapshotIndex
)
{
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
leaderState
.
progress
[
i
]);
void
syncRaftProgressBecomeSnapshot
(
SSyncRaftProgress
*
progress
,
SyncIndex
snapshotIndex
)
{
resetProgressState
(
progress
,
PROGRESS_SNAPSHOT
);
progress
->
pendingSnapshotIndex
=
snapshotIndex
;
}
...
...
source/libs/sync/src/raft_replication.c
浏览文件 @
ccf8f14f
...
...
@@ -22,6 +22,7 @@ static int sendSnapshot(SSyncRaft* pRaft, int i);
static
int
sendAppendEntries
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
index
,
SyncTerm
term
);
int
syncRaftReplicate
(
SSyncRaft
*
pRaft
,
int
i
)
{
#if 0
assert(pRaft->state == TAOS_SYNC_ROLE_LEADER);
assert(i >= 0 && i < pRaft->leaderState.nProgress);
...
...
@@ -99,6 +100,8 @@ send_snapshot:
prevTerm = syncRaftLogLastTerm(pRaft->log);
return sendAppendEntries(pRaft, i, prevIndex, prevTerm);
}
#endif
return
0
;
}
static
int
sendSnapshot
(
SSyncRaft
*
pRaft
,
int
i
)
{
...
...
@@ -106,6 +109,7 @@ static int sendSnapshot(SSyncRaft* pRaft, int i) {
}
static
int
sendAppendEntries
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
prevIndex
,
SyncTerm
prevTerm
)
{
#if 0
SyncIndex nextIndex = prevIndex + 1;
SSyncRaftEntry *entries;
int nEntry;
...
...
@@ -139,5 +143,6 @@ static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex prevIndex, SyncT
err_release_log:
syncRaftLogRelease(pRaft->log, nextIndex, entries, nEntry);
#endif
return
0
;
}
\ No newline at end of file
source/libs/sync/src/sync_raft_progress_tracker.c
0 → 100644
浏览文件 @
ccf8f14f
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sync_raft_progress_tracker.h"
SSyncRaftProgressTracker
*
syncRaftOpenProgressTracker
()
{
SSyncRaftProgressTracker
*
tracker
=
(
SSyncRaftProgressTracker
*
)
malloc
(
sizeof
(
SSyncRaftProgressTracker
));
if
(
tracker
==
NULL
)
{
return
NULL
;
}
return
tracker
;
}
void
syncRaftResetVotes
(
SSyncRaftProgressTracker
*
tracker
)
{
memset
(
tracker
->
votes
,
SYNC_RAFT_VOTE_RESP_UNKNOWN
,
sizeof
(
SyncRaftVoteRespType
)
*
TSDB_MAX_REPLICA
);
}
void
syncRaftProgressVisit
(
SSyncRaftProgressTracker
*
tracker
,
visitProgressFp
visit
,
void
*
arg
)
{
int
i
;
for
(
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
SSyncRaftProgress
*
progress
=
&
(
tracker
->
progressMap
[
i
]);
if
(
progress
->
id
==
SYNC_NON_NODE_ID
)
{
continue
;
}
visit
(
progress
,
arg
);
}
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录