Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c1c31645
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c1c31645
编写于
11月 16, 2021
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' of github.com:taosdata/TDengine into 3.0
上级
078e13b3
8440483c
变更
28
隐藏空白更改
内联
并排
Showing
28 changed file
with
1253 addition
and
265 deletion
+1253
-265
include/libs/sync/sync.h
include/libs/sync/sync.h
+9
-22
source/libs/sync/inc/raft.h
source/libs/sync/inc/raft.h
+2
-1
source/libs/sync/inc/raft_log.h
source/libs/sync/inc/raft_log.h
+5
-3
source/libs/sync/inc/raft_message.h
source/libs/sync/inc/raft_message.h
+8
-8
source/libs/sync/inc/raft_replication.h
source/libs/sync/inc/raft_replication.h
+6
-1
source/libs/sync/inc/sync_raft_config_change.h
source/libs/sync/inc/sync_raft_config_change.h
+42
-0
source/libs/sync/inc/sync_raft_impl.h
source/libs/sync/inc/sync_raft_impl.h
+14
-2
source/libs/sync/inc/sync_raft_progress.h
source/libs/sync/inc/sync_raft_progress.h
+32
-6
source/libs/sync/inc/sync_raft_progress_tracker.h
source/libs/sync/inc/sync_raft_progress_tracker.h
+63
-59
source/libs/sync/inc/sync_raft_proto.h
source/libs/sync/inc/sync_raft_proto.h
+61
-0
source/libs/sync/inc/sync_raft_quorum.h
source/libs/sync/inc/sync_raft_quorum.h
+3
-3
source/libs/sync/inc/sync_raft_quorum_joint.h
source/libs/sync/inc/sync_raft_quorum_joint.h
+30
-3
source/libs/sync/inc/sync_raft_quorum_majority.h
source/libs/sync/inc/sync_raft_quorum_majority.h
+2
-1
source/libs/sync/inc/sync_raft_restore.h
source/libs/sync/inc/sync_raft_restore.h
+32
-0
source/libs/sync/inc/sync_type.h
source/libs/sync/inc/sync_type.h
+30
-3
source/libs/sync/src/raft.c
source/libs/sync/src/raft.c
+127
-24
source/libs/sync/src/raft_election.c
source/libs/sync/src/raft_election.c
+3
-3
source/libs/sync/src/raft_handle_append_entries_message.c
source/libs/sync/src/raft_handle_append_entries_message.c
+1
-1
source/libs/sync/src/raft_handle_vote_resp_message.c
source/libs/sync/src/raft_handle_vote_resp_message.c
+1
-1
source/libs/sync/src/raft_log.c
source/libs/sync/src/raft_log.c
+4
-0
source/libs/sync/src/raft_replication.c
source/libs/sync/src/raft_replication.c
+57
-98
source/libs/sync/src/sync_raft_config_change.c
source/libs/sync/src/sync_raft_config_change.c
+388
-0
source/libs/sync/src/sync_raft_impl.c
source/libs/sync/src/sync_raft_impl.c
+41
-13
source/libs/sync/src/sync_raft_progress.c
source/libs/sync/src/sync_raft_progress.c
+46
-4
source/libs/sync/src/sync_raft_progress_tracker.c
source/libs/sync/src/sync_raft_progress_tracker.c
+16
-4
source/libs/sync/src/sync_raft_quorum_joint.c
source/libs/sync/src/sync_raft_quorum_joint.c
+47
-3
source/libs/sync/src/sync_raft_quorum_majority.c
source/libs/sync/src/sync_raft_quorum_majority.c
+2
-2
source/libs/sync/src/sync_raft_restore.c
source/libs/sync/src/sync_raft_restore.c
+181
-0
未找到文件。
include/libs/sync/sync.h
浏览文件 @
c1c31645
...
@@ -109,38 +109,25 @@ typedef struct SSyncLogStore {
...
@@ -109,38 +109,25 @@ typedef struct SSyncLogStore {
SyncIndex
(
*
logLastIndex
)(
struct
SSyncLogStore
*
logStore
);
SyncIndex
(
*
logLastIndex
)(
struct
SSyncLogStore
*
logStore
);
}
SSyncLogStore
;
}
SSyncLogStore
;
typedef
struct
SSyncServerState
{
SyncNodeId
voteFor
;
SyncTerm
term
;
SyncIndex
commitIndex
;
}
SSyncServerState
;
typedef
struct
SSyncClusterConfig
{
// Log index number of current cluster config.
SyncIndex
index
;
// Log index number of previous cluster config.
SyncIndex
prevIndex
;
// current cluster
const
SSyncCluster
*
cluster
;
}
SSyncClusterConfig
;
typedef
struct
SStateManager
{
typedef
struct
SStateManager
{
void
*
pData
;
void
*
pData
;
int32_t
(
*
saveServerState
)(
struct
SStateManager
*
stateMng
,
SSyncServerState
*
state
);
// save serialized server state data, buffer will be free by Sync
int32_t
(
*
saveServerState
)(
struct
SStateManager
*
stateMng
,
const
char
*
buffer
,
int
n
);
int32_t
(
*
readServerState
)(
struct
SStateManager
*
stateMng
,
SSyncServerState
*
state
);
// read serialized server state data, buffer will be free by Sync
int32_t
(
*
readServerState
)(
struct
SStateManager
*
stateMng
,
char
**
ppBuffer
,
int
*
n
);
void
(
*
saveCluster
)(
struct
SStateManager
*
stateMng
,
const
SSyncClusterConfig
*
cluster
);
// save serialized cluster state data, buffer will be free by Sync
void
(
*
saveClusterState
)(
struct
SStateManager
*
stateMng
,
const
char
*
buffer
,
int
n
);
const
SSyncClusterConfig
*
(
*
readCluster
)(
struct
SStateManager
*
stateMng
);
// read serialized cluster state data, buffer will be free by Sync
int32_t
(
*
readClusterState
)(
struct
SStateManager
*
stateMng
,
char
**
ppBuffer
,
int
*
n
);
}
SStateManager
;
}
SStateManager
;
typedef
struct
{
typedef
struct
{
SyncGroupId
vgId
;
SyncGroupId
vgId
;
SyncIndex
snapshot
Index
;
SyncIndex
applied
Index
;
SSyncCluster
syncCfg
;
SSyncCluster
syncCfg
;
SSyncFSM
fsm
;
SSyncFSM
fsm
;
SSyncLogStore
logStore
;
SSyncLogStore
logStore
;
...
...
source/libs/sync/inc/raft.h
浏览文件 @
c1c31645
...
@@ -65,7 +65,8 @@ struct SSyncRaft {
...
@@ -65,7 +65,8 @@ struct SSyncRaft {
SSyncRaftLog
*
log
;
SSyncRaftLog
*
log
;
int
maxMsgSize
;
uint64_t
maxMsgSize
;
uint64_t
maxUncommittedSize
;
SSyncRaftProgressTracker
*
tracker
;
SSyncRaftProgressTracker
*
tracker
;
ESyncState
state
;
ESyncState
state
;
...
...
source/libs/sync/inc/raft_log.h
浏览文件 @
c1c31645
...
@@ -19,16 +19,16 @@
...
@@ -19,16 +19,16 @@
#include "sync.h"
#include "sync.h"
#include "sync_type.h"
#include "sync_type.h"
typedef
enum
Sync
EntryType
{
typedef
enum
ESyncRaft
EntryType
{
SYNC_ENTRY_TYPE_LOG
=
1
,
SYNC_ENTRY_TYPE_LOG
=
1
,
}
Sync
EntryType
;
}
ESyncRaft
EntryType
;
struct
SSyncRaftEntry
{
struct
SSyncRaftEntry
{
SyncTerm
term
;
SyncTerm
term
;
SyncIndex
index
;
SyncIndex
index
;
Sync
EntryType
type
;
ESyncRaft
EntryType
type
;
SSyncBuffer
buffer
;
SSyncBuffer
buffer
;
};
};
...
@@ -51,6 +51,8 @@ SyncIndex syncRaftLogSnapshotIndex(SSyncRaftLog* pLog);
...
@@ -51,6 +51,8 @@ SyncIndex syncRaftLogSnapshotIndex(SSyncRaftLog* pLog);
SyncTerm
syncRaftLogLastTerm
(
SSyncRaftLog
*
pLog
);
SyncTerm
syncRaftLogLastTerm
(
SSyncRaftLog
*
pLog
);
void
syncRaftLogAppliedTo
(
SSyncRaftLog
*
pLog
,
SyncIndex
appliedIndex
);
bool
syncRaftLogIsUptodate
(
SSyncRaftLog
*
pLog
,
SyncIndex
index
,
SyncTerm
term
);
bool
syncRaftLogIsUptodate
(
SSyncRaftLog
*
pLog
,
SyncIndex
index
,
SyncTerm
term
);
int
syncRaftLogNumOfPendingConf
(
SSyncRaftLog
*
pLog
);
int
syncRaftLogNumOfPendingConf
(
SSyncRaftLog
*
pLog
);
...
...
source/libs/sync/inc/raft_message.h
浏览文件 @
c1c31645
...
@@ -28,7 +28,7 @@
...
@@ -28,7 +28,7 @@
* outter message start with RAFT_MSG_*, which communicate between cluster peers,
* outter message start with RAFT_MSG_*, which communicate between cluster peers,
* need to implement its decode/encode functions.
* need to implement its decode/encode functions.
**/
**/
typedef
enum
RaftMessageType
{
typedef
enum
ESync
RaftMessageType
{
// client propose a cmd
// client propose a cmd
RAFT_MSG_INTERNAL_PROP
=
1
,
RAFT_MSG_INTERNAL_PROP
=
1
,
...
@@ -40,7 +40,7 @@ typedef enum RaftMessageType {
...
@@ -40,7 +40,7 @@ typedef enum RaftMessageType {
RAFT_MSG_APPEND
=
5
,
RAFT_MSG_APPEND
=
5
,
RAFT_MSG_APPEND_RESP
=
6
,
RAFT_MSG_APPEND_RESP
=
6
,
}
RaftMessageType
;
}
ESync
RaftMessageType
;
typedef
struct
RaftMsgInternal_Prop
{
typedef
struct
RaftMsgInternal_Prop
{
const
SSyncBuffer
*
pBuf
;
const
SSyncBuffer
*
pBuf
;
...
@@ -53,14 +53,14 @@ typedef struct RaftMsgInternal_Election {
...
@@ -53,14 +53,14 @@ typedef struct RaftMsgInternal_Election {
}
RaftMsgInternal_Election
;
}
RaftMsgInternal_Election
;
typedef
struct
RaftMsg_Vote
{
typedef
struct
RaftMsg_Vote
{
SyncRaftElectionType
cType
;
E
SyncRaftElectionType
cType
;
SyncIndex
lastIndex
;
SyncIndex
lastIndex
;
SyncTerm
lastTerm
;
SyncTerm
lastTerm
;
}
RaftMsg_Vote
;
}
RaftMsg_Vote
;
typedef
struct
RaftMsg_VoteResp
{
typedef
struct
RaftMsg_VoteResp
{
bool
rejected
;
bool
rejected
;
SyncRaftElectionType
cType
;
E
SyncRaftElectionType
cType
;
}
RaftMsg_VoteResp
;
}
RaftMsg_VoteResp
;
typedef
struct
RaftMsg_Append_Entries
{
typedef
struct
RaftMsg_Append_Entries
{
...
@@ -85,7 +85,7 @@ typedef struct RaftMsg_Append_Resp {
...
@@ -85,7 +85,7 @@ typedef struct RaftMsg_Append_Resp {
}
RaftMsg_Append_Resp
;
}
RaftMsg_Append_Resp
;
typedef
struct
SSyncMessage
{
typedef
struct
SSyncMessage
{
RaftMessageType
msgType
;
ESync
RaftMessageType
msgType
;
SyncTerm
term
;
SyncTerm
term
;
SyncGroupId
groupId
;
SyncGroupId
groupId
;
SyncNodeId
from
;
SyncNodeId
from
;
...
@@ -131,7 +131,7 @@ static FORCE_INLINE SSyncMessage* syncInitElectionMsg(SSyncMessage* pMsg, SyncNo
...
@@ -131,7 +131,7 @@ static FORCE_INLINE SSyncMessage* syncInitElectionMsg(SSyncMessage* pMsg, SyncNo
}
}
static
FORCE_INLINE
SSyncMessage
*
syncNewVoteMsg
(
SyncGroupId
groupId
,
SyncNodeId
from
,
static
FORCE_INLINE
SSyncMessage
*
syncNewVoteMsg
(
SyncGroupId
groupId
,
SyncNodeId
from
,
SyncTerm
term
,
SyncRaftElectionType
cType
,
SyncTerm
term
,
E
SyncRaftElectionType
cType
,
SyncIndex
lastIndex
,
SyncTerm
lastTerm
)
{
SyncIndex
lastIndex
,
SyncTerm
lastTerm
)
{
SSyncMessage
*
pMsg
=
(
SSyncMessage
*
)
malloc
(
sizeof
(
SSyncMessage
));
SSyncMessage
*
pMsg
=
(
SSyncMessage
*
)
malloc
(
sizeof
(
SSyncMessage
));
if
(
pMsg
==
NULL
)
{
if
(
pMsg
==
NULL
)
{
...
@@ -153,7 +153,7 @@ static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId
...
@@ -153,7 +153,7 @@ static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId
}
}
static
FORCE_INLINE
SSyncMessage
*
syncNewVoteRespMsg
(
SyncGroupId
groupId
,
SyncNodeId
from
,
static
FORCE_INLINE
SSyncMessage
*
syncNewVoteRespMsg
(
SyncGroupId
groupId
,
SyncNodeId
from
,
SyncRaftElectionType
cType
,
bool
rejected
)
{
E
SyncRaftElectionType
cType
,
bool
rejected
)
{
SSyncMessage
*
pMsg
=
(
SSyncMessage
*
)
malloc
(
sizeof
(
SSyncMessage
));
SSyncMessage
*
pMsg
=
(
SSyncMessage
*
)
malloc
(
sizeof
(
SSyncMessage
));
if
(
pMsg
==
NULL
)
{
if
(
pMsg
==
NULL
)
{
return
NULL
;
return
NULL
;
...
@@ -213,7 +213,7 @@ static FORCE_INLINE SSyncMessage* syncNewEmptyAppendRespMsg(SyncGroupId groupId,
...
@@ -213,7 +213,7 @@ static FORCE_INLINE SSyncMessage* syncNewEmptyAppendRespMsg(SyncGroupId groupId,
return
pMsg
;
return
pMsg
;
}
}
static
FORCE_INLINE
bool
syncIsInternalMsg
(
RaftMessageType
msgType
)
{
static
FORCE_INLINE
bool
syncIsInternalMsg
(
ESync
RaftMessageType
msgType
)
{
return
msgType
==
RAFT_MSG_INTERNAL_PROP
||
return
msgType
==
RAFT_MSG_INTERNAL_PROP
||
msgType
==
RAFT_MSG_INTERNAL_ELECTION
;
msgType
==
RAFT_MSG_INTERNAL_ELECTION
;
}
}
...
...
source/libs/sync/inc/raft_replication.h
浏览文件 @
c1c31645
...
@@ -20,6 +20,11 @@
...
@@ -20,6 +20,11 @@
#include "syncInt.h"
#include "syncInt.h"
#include "sync_type.h"
#include "sync_type.h"
int
syncRaftReplicate
(
SSyncRaft
*
pRaft
,
int
i
);
// syncRaftReplicate sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
bool
syncRaftReplicate
(
SSyncRaft
*
pRaft
,
SSyncRaftProgress
*
progress
,
bool
sendIfEmpty
);
#endif
/* TD_SYNC_RAFT_REPLICATION_H */
#endif
/* TD_SYNC_RAFT_REPLICATION_H */
source/libs/sync/inc/sync_raft_config_change.h
0 → 100644
浏览文件 @
c1c31645
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TD_SYNC_RAFT_CONFIG_CHANGE_H
#define TD_SYNC_RAFT_CONFIG_CHANGE_H
#include "sync_type.h"
#include "sync_raft_proto.h"
/**
* Changer facilitates configuration changes. It exposes methods to handle
* simple and joint consensus while performing the proper validation that allows
* refusing invalid configuration changes before they affect the active
* configuration.
**/
struct
SSyncRaftChanger
{
SSyncRaftProgressTracker
*
tracker
;
SyncIndex
lastIndex
;
};
typedef
int
(
*
configChangeFp
)(
SSyncRaftChanger
*
changer
,
const
SSyncConfChangeSingleArray
*
css
,
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
);
int
syncRaftChangerSimpleConfig
(
SSyncRaftChanger
*
changer
,
const
SSyncConfChangeSingleArray
*
css
,
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
);
int
syncRaftChangerEnterJoint
(
SSyncRaftChanger
*
changer
,
bool
autoLeave
,
const
SSyncConfChangeSingleArray
*
css
,
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
);
#endif
/* TD_SYNC_RAFT_CONFIG_CHANGE_H */
source/libs/sync/inc/sync_raft_impl.h
浏览文件 @
c1c31645
...
@@ -26,7 +26,7 @@ void syncRaftBecomePreCandidate(SSyncRaft* pRaft);
...
@@ -26,7 +26,7 @@ void syncRaftBecomePreCandidate(SSyncRaft* pRaft);
void
syncRaftBecomeCandidate
(
SSyncRaft
*
pRaft
);
void
syncRaftBecomeCandidate
(
SSyncRaft
*
pRaft
);
void
syncRaftBecomeLeader
(
SSyncRaft
*
pRaft
);
void
syncRaftBecomeLeader
(
SSyncRaft
*
pRaft
);
void
syncRaftStartElection
(
SSyncRaft
*
pRaft
,
SyncRaftElectionType
cType
);
void
syncRaftStartElection
(
SSyncRaft
*
pRaft
,
E
SyncRaftElectionType
cType
);
void
syncRaftTriggerHeartbeat
(
SSyncRaft
*
pRaft
);
void
syncRaftTriggerHeartbeat
(
SSyncRaft
*
pRaft
);
...
@@ -35,8 +35,20 @@ bool syncRaftIsPromotable(SSyncRaft* pRaft);
...
@@ -35,8 +35,20 @@ bool syncRaftIsPromotable(SSyncRaft* pRaft);
bool
syncRaftIsPastElectionTimeout
(
SSyncRaft
*
pRaft
);
bool
syncRaftIsPastElectionTimeout
(
SSyncRaft
*
pRaft
);
int
syncRaftQuorum
(
SSyncRaft
*
pRaft
);
int
syncRaftQuorum
(
SSyncRaft
*
pRaft
);
SSyncRaftVoteResult
syncRaftPollVote
(
SSyncRaft
*
pRaft
,
SyncNodeId
id
,
bool
syncRaftMaybeCommit
(
SSyncRaft
*
pRaft
);
ESyncRaftVoteResult
syncRaftPollVote
(
SSyncRaft
*
pRaft
,
SyncNodeId
id
,
bool
preVote
,
bool
accept
,
bool
preVote
,
bool
accept
,
int
*
rejectNum
,
int
*
granted
);
int
*
rejectNum
,
int
*
granted
);
static
FORCE_INLINE
bool
syncRaftIsEmptyServerState
(
const
SSyncServerState
*
serverState
)
{
return
serverState
->
commitIndex
==
0
&&
serverState
->
term
==
SYNC_NON_TERM
&&
serverState
->
voteFor
==
SYNC_NON_NODE_ID
;
}
void
syncRaftLoadState
(
SSyncRaft
*
pRaft
,
const
SSyncServerState
*
serverState
);
void
syncRaftBroadcastAppend
(
SSyncRaft
*
pRaft
);
#endif
/* _TD_LIBS_SYNC_RAFT_IMPL_H */
#endif
/* _TD_LIBS_SYNC_RAFT_IMPL_H */
source/libs/sync/inc/sync_raft_progress.h
浏览文件 @
c1c31645
...
@@ -34,7 +34,7 @@
...
@@ -34,7 +34,7 @@
*
*
* PROGRESS_STATE_PROBE is the initial state.
* PROGRESS_STATE_PROBE is the initial state.
**/
**/
typedef
enum
RaftProgressState
{
typedef
enum
ESync
RaftProgressState
{
/**
/**
* StateProbe indicates a follower whose last index isn't known. Such a
* StateProbe indicates a follower whose last index isn't known. Such a
* follower is "probed" (i.e. an append sent periodically) to narrow down
* follower is "probed" (i.e. an append sent periodically) to narrow down
...
@@ -56,13 +56,22 @@ typedef enum RaftProgressState {
...
@@ -56,13 +56,22 @@ typedef enum RaftProgressState {
* return to StateReplicate.
* return to StateReplicate.
**/
**/
PROGRESS_STATE_SNAPSHOT
,
PROGRESS_STATE_SNAPSHOT
,
}
RaftProgressState
;
}
ESyncRaftProgressState
;
static
const
char
*
kProgressStateString
[]
=
{
"Probe"
,
"Replicate"
,
"Snapshot"
,
};
/**
/**
* Progress represents a follower’s progress in the view of the leader. Leader maintains
* Progress represents a follower’s progress in the view of the leader. Leader maintains
* progresses of all followers, and sends entries to the follower based on its progress.
* progresses of all followers, and sends entries to the follower based on its progress.
**/
**/
struct
SSyncRaftProgress
{
struct
SSyncRaftProgress
{
// index in raft cluster config
int
selfIndex
;
SyncNodeId
id
;
SyncNodeId
id
;
SyncIndex
nextIndex
;
SyncIndex
nextIndex
;
...
@@ -82,7 +91,7 @@ struct SSyncRaftProgress {
...
@@ -82,7 +91,7 @@ struct SSyncRaftProgress {
* When in StateSnapshot, leader should have sent out snapshot
* When in StateSnapshot, leader should have sent out snapshot
* before and stops sending any replication message.
* before and stops sending any replication message.
**/
**/
RaftProgressState
state
;
ESync
RaftProgressState
state
;
/**
/**
* pendingSnapshotIndex is used in PROGRESS_STATE_SNAPSHOT.
* pendingSnapshotIndex is used in PROGRESS_STATE_SNAPSHOT.
...
@@ -129,6 +138,15 @@ struct SSyncRaftProgress {
...
@@ -129,6 +138,15 @@ struct SSyncRaftProgress {
bool
isLearner
;
bool
isLearner
;
};
};
struct
SSyncRaftProgressMap
{
SSyncRaftProgress
progress
[
TSDB_MAX_REPLICA
];
};
static
FORCE_INLINE
const
char
*
syncRaftProgressStateString
(
const
SSyncRaftProgress
*
progress
)
{
return
kProgressStateString
[
progress
->
state
];
}
void
syncRaftInitProgress
(
int
i
,
SSyncRaft
*
pRaft
,
SSyncRaftProgress
*
progress
);
void
syncRaftInitProgress
(
int
i
,
SSyncRaft
*
pRaft
,
SSyncRaftProgress
*
progress
);
/**
/**
...
@@ -187,15 +205,15 @@ static FORCE_INLINE SyncIndex syncRaftProgressNextIndex(SSyncRaftProgress* progr
...
@@ -187,15 +205,15 @@ static FORCE_INLINE SyncIndex syncRaftProgressNextIndex(SSyncRaftProgress* progr
return
progress
->
nextIndex
;
return
progress
->
nextIndex
;
}
}
static
FORCE_INLINE
RaftProgressState
syncRaftProgressInReplicate
(
SSyncRaftProgress
*
progress
)
{
static
FORCE_INLINE
ESync
RaftProgressState
syncRaftProgressInReplicate
(
SSyncRaftProgress
*
progress
)
{
return
progress
->
state
==
PROGRESS_STATE_REPLICATE
;
return
progress
->
state
==
PROGRESS_STATE_REPLICATE
;
}
}
static
FORCE_INLINE
RaftProgressState
syncRaftProgressInSnapshot
(
SSyncRaftProgress
*
progress
)
{
static
FORCE_INLINE
ESync
RaftProgressState
syncRaftProgressInSnapshot
(
SSyncRaftProgress
*
progress
)
{
return
progress
->
state
==
PROGRESS_STATE_SNAPSHOT
;
return
progress
->
state
==
PROGRESS_STATE_SNAPSHOT
;
}
}
static
FORCE_INLINE
RaftProgressState
syncRaftProgressInProbe
(
SSyncRaftProgress
*
progress
)
{
static
FORCE_INLINE
ESync
RaftProgressState
syncRaftProgressInProbe
(
SSyncRaftProgress
*
progress
)
{
return
progress
->
state
==
PROGRESS_STATE_PROBE
;
return
progress
->
state
==
PROGRESS_STATE_PROBE
;
}
}
...
@@ -203,6 +221,12 @@ static FORCE_INLINE bool syncRaftProgressRecentActive(SSyncRaftProgress* progres
...
@@ -203,6 +221,12 @@ static FORCE_INLINE bool syncRaftProgressRecentActive(SSyncRaftProgress* progres
return
progress
->
recentActive
;
return
progress
->
recentActive
;
}
}
int
syncRaftFindProgressIndexByNodeId
(
const
SSyncRaftProgressMap
*
progressMap
,
SyncNodeId
id
);
int
syncRaftAddToProgressMap
(
SSyncRaftProgressMap
*
progressMap
,
SyncNodeId
id
);
void
syncRaftRemoveFromProgressMap
(
SSyncRaftProgressMap
*
progressMap
,
SyncNodeId
id
);
/**
/**
* return true if progress's log is up-todate
* return true if progress's log is up-todate
**/
**/
...
@@ -210,7 +234,9 @@ bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress);
...
@@ -210,7 +234,9 @@ bool syncRaftProgressIsUptodate(SSyncRaft* pRaft, SSyncRaftProgress* progress);
void
syncRaftProgressBecomeSnapshot
(
SSyncRaftProgress
*
progress
,
SyncIndex
snapshotIndex
);
void
syncRaftProgressBecomeSnapshot
(
SSyncRaftProgress
*
progress
,
SyncIndex
snapshotIndex
);
void
syncRaftCopyProgress
(
const
SSyncRaftProgress
*
from
,
SSyncRaftProgress
*
to
);
void
syncRaftProgressMapCopy
(
const
SSyncRaftProgressMap
*
from
,
SSyncRaftProgressMap
*
to
);
#if 0
#if 0
...
...
source/libs/sync/inc/sync_raft_progress_tracker.h
浏览文件 @
c1c31645
...
@@ -17,78 +17,74 @@
...
@@ -17,78 +17,74 @@
#define _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H
#define _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H
#include "sync_type.h"
#include "sync_type.h"
#include "sync_raft_quorum.h"
#include "sync_raft_quorum_joint.h"
#include "sync_raft_quorum_joint.h"
#include "sync_raft_progress.h"
#include "sync_raft_progress.h"
#include "sync_raft_proto.h"
struct
SSyncRaftProgressTrackerConfig
{
struct
SSyncRaftProgressTrackerConfig
{
SSyncRaftQuorumJointConfig
voters
;
SSyncRaftQuorumJointConfig
voters
;
/**
// autoLeave is true if the configuration is joint and a transition to the
* autoLeave is true if the configuration is joint and a transition to the
// incoming configuration should be carried out automatically by Raft when
* incoming configuration should be carried out automatically by Raft when
// this is possible. If false, the configuration will be joint until the
* this is possible. If false, the configuration will be joint until the
// application initiates the transition manually.
* application initiates the transition manually.
**/
bool
autoLeave
;
bool
autoLeave
;
/**
// Learners is a set of IDs corresponding to the learners active in the
* Learners is a set of IDs corresponding to the learners active in the
// current configuration.
* current configuration.
//
*
// Invariant: Learners and Voters does not intersect, i.e. if a peer is in
* Invariant: Learners and Voters does not intersect, i.e. if a peer is in
// either half of the joint config, it can't be a learner; if it is a
* either half of the joint config, it can't be a learner; if it is a
// learner it can't be in either half of the joint config. This invariant
* learner it can't be in either half of the joint config. This invariant
// simplifies the implementation since it allows peers to have clarity about
* simplifies the implementation since it allows peers to have clarity about
// its current role without taking into account joint consensus.
* its current role without taking into account joint consensus.
SSyncRaftNodeMap
learners
;
**/
SyncNodeId
learners
[
TSDB_MAX_REPLICA
];
// When we turn a voter into a learner during a joint consensus transition,
// we cannot add the learner directly when entering the joint state. This is
/**
// because this would violate the invariant that the intersection of
* When we turn a voter into a learner during a joint consensus transition,
// voters and learners is empty. For example, assume a Voter is removed and
* we cannot add the learner directly when entering the joint state. This is
// immediately re-added as a learner (or in other words, it is demoted):
* because this would violate the invariant that the intersection of
//
* voters and learners is empty. For example, assume a Voter is removed and
// Initially, the configuration will be
* immediately re-added as a learner (or in other words, it is demoted):
//
*
// voters: {1 2 3}
* Initially, the configuration will be
// learners: {}
*
//
* voters: {1 2 3}
// and we want to demote 3. Entering the joint configuration, we naively get
* learners: {}
//
*
// voters: {1 2} & {1 2 3}
* and we want to demote 3. Entering the joint configuration, we naively get
// learners: {3}
*
//
* voters: {1 2} & {1 2 3}
// but this violates the invariant (3 is both voter and learner). Instead,
* learners: {3}
// we get
*
//
* but this violates the invariant (3 is both voter and learner). Instead,
// voters: {1 2} & {1 2 3}
* we get
// learners: {}
*
// next_learners: {3}
* voters: {1 2} & {1 2 3}
//
* learners: {}
// Where 3 is now still purely a voter, but we are remembering the intention
* next_learners: {3}
// to make it a learner upon transitioning into the final configuration:
*
//
* Where 3 is now still purely a voter, but we are remembering the intention
// voters: {1 2}
* to make it a learner upon transitioning into the final configuration:
// learners: {3}
*
// next_learners: {}
* voters: {1 2}
//
* learners: {3}
// Note that next_learners is not used while adding a learner that is not
* next_learners: {}
// also a voter in the joint config. In this case, the learner is added
*
// right away when entering the joint configuration, so that it is caught up
* Note that next_learners is not used while adding a learner that is not
// as soon as possible.
* also a voter in the joint config. In this case, the learner is added
SSyncRaftNodeMap
learnersNext
;
* right away when entering the joint configuration, so that it is caught up
* as soon as possible.
**/
SyncNodeId
learnersNext
[
TSDB_MAX_REPLICA
];
};
};
struct
SSyncRaftProgressTracker
{
struct
SSyncRaftProgressTracker
{
SSyncRaftProgressTrackerConfig
config
;
SSyncRaftProgressTrackerConfig
config
;
SSyncRaftProgress
progressMap
[
TSDB_MAX_REPLICA
]
;
SSyncRaftProgress
Map
progressMap
;
SyncRaftVoteResult
votes
[
TSDB_MAX_REPLICA
];
ESyncRaftVoteType
votes
[
TSDB_MAX_REPLICA
];
int
maxInflight
;
int
maxInflight
Msgs
;
};
};
SSyncRaftProgressTracker
*
syncRaftOpenProgressTracker
();
SSyncRaftProgressTracker
*
syncRaftOpenProgressTracker
();
...
@@ -104,10 +100,18 @@ void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, voi
...
@@ -104,10 +100,18 @@ void syncRaftProgressVisit(SSyncRaftProgressTracker*, visitProgressFp visit, voi
**/
**/
void
syncRaftRecordVote
(
SSyncRaftProgressTracker
*
tracker
,
int
i
,
bool
grant
);
void
syncRaftRecordVote
(
SSyncRaftProgressTracker
*
tracker
,
int
i
,
bool
grant
);
void
syncRaftCloneTrackerConfig
(
const
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressTrackerConfig
*
result
);
int
syncRaftCheckProgress
(
const
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
);
/**
/**
* syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the
* syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the
* election outcome is known.
* election outcome is known.
**/
**/
SyncRaftVoteResult
syncRaftTallyVotes
(
SSyncRaftProgressTracker
*
tracker
,
int
*
rejected
,
int
*
granted
);
ESyncRaftVoteResult
syncRaftTallyVotes
(
SSyncRaftProgressTracker
*
tracker
,
int
*
rejected
,
int
*
granted
);
void
syncRaftConfigState
(
const
SSyncRaftProgressTracker
*
tracker
,
SSyncConfigState
*
cs
);
bool
syncRaftIsInNodeMap
(
const
SSyncRaftNodeMap
*
nodeMap
,
SyncNodeId
nodeId
);
#endif
/* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */
#endif
/* _TD_LIBS_SYNC_RAFT_PROGRESS_TRACKER_H */
source/libs/sync/inc/sync_raft_proto.h
0 → 100644
浏览文件 @
c1c31645
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TD_SYNC_RAFT_PROTO_H
#define TD_SYNC_RAFT_PROTO_H
#include "sync_type.h"
typedef
enum
ESyncRaftConfChangeType
{
SYNC_RAFT_Conf_AddNode
=
0
,
SYNC_RAFT_Conf_RemoveNode
=
1
,
SYNC_RAFT_Conf_UpdateNode
=
2
,
SYNC_RAFT_Conf_AddLearnerNode
=
3
,
}
ESyncRaftConfChangeType
;
// ConfChangeSingle is an individual configuration change operation. Multiple
// such operations can be carried out atomically via a ConfChangeV2.
typedef
struct
SSyncConfChangeSingle
{
ESyncRaftConfChangeType
type
;
SyncNodeId
nodeId
;
}
SSyncConfChangeSingle
;
typedef
struct
SSyncConfChangeSingleArray
{
int
n
;
SSyncConfChangeSingle
*
changes
;
}
SSyncConfChangeSingleArray
;
typedef
struct
SSyncConfigState
{
// The voters in the incoming config. (If the configuration is not joint,
// then the outgoing config is empty).
SSyncRaftNodeMap
voters
;
// The learners in the incoming config.
SSyncRaftNodeMap
learners
;
// The voters in the outgoing config.
SSyncRaftNodeMap
votersOutgoing
;
// The nodes that will become learners when the outgoing config is removed.
// These nodes are necessarily currently in nodes_joint (or they would have
// been added to the incoming config right away).
SSyncRaftNodeMap
learnersNext
;
// If set, the config is joint and Raft will automatically transition into
// the final config (i.e. remove the outgoing config) when this is safe.
bool
autoLeave
;
}
SSyncConfigState
;
#endif
/* TD_SYNC_RAFT_PROTO_H */
source/libs/sync/inc/sync_raft_quorum.h
浏览文件 @
c1c31645
...
@@ -17,11 +17,11 @@
...
@@ -17,11 +17,11 @@
#define TD_SYNC_RAFT_QUORUM_H
#define TD_SYNC_RAFT_QUORUM_H
/**
/**
*
S
SyncRaftVoteResult indicates the outcome of a vote.
*
E
SyncRaftVoteResult indicates the outcome of a vote.
**/
**/
typedef
enum
{
typedef
enum
{
/**
/**
*
SYNC_RAFT_VOTE_PENDING indicates that the decision of the vote depends on future
* SYNC_RAFT_VOTE_PENDING indicates that the decision of the vote depends on future
* votes, i.e. neither "yes" or "no" has reached quorum yet.
* votes, i.e. neither "yes" or "no" has reached quorum yet.
**/
**/
SYNC_RAFT_VOTE_PENDING
=
1
,
SYNC_RAFT_VOTE_PENDING
=
1
,
...
@@ -35,6 +35,6 @@ typedef enum {
...
@@ -35,6 +35,6 @@ typedef enum {
* SYNC_RAFT_VOTE_WON indicates that the quorum has voted "yes".
* SYNC_RAFT_VOTE_WON indicates that the quorum has voted "yes".
**/
**/
SYNC_RAFT_VOTE_WON
=
3
,
SYNC_RAFT_VOTE_WON
=
3
,
}
S
SyncRaftVoteResult
;
}
E
SyncRaftVoteResult
;
#endif
/* TD_SYNC_RAFT_QUORUM_H */
#endif
/* TD_SYNC_RAFT_QUORUM_H */
\ No newline at end of file
source/libs/sync/inc/sync_raft_quorum_joint.h
浏览文件 @
c1c31645
...
@@ -25,14 +25,41 @@
...
@@ -25,14 +25,41 @@
* majority configurations. Decisions require the support of both majorities.
* majority configurations. Decisions require the support of both majorities.
**/
**/
typedef
struct
SSyncRaftQuorumJointConfig
{
typedef
struct
SSyncRaftQuorumJointConfig
{
SSyncCluster
majorityConfig
[
2
];
SSyncRaftNodeMap
outgoing
;
}
SSyncRaftQuorumJointConfig
;
SSyncRaftNodeMap
incoming
;
}
SSyncRaftQuorumJointConfig
;
/**
/**
* syncRaftVoteResult takes a mapping of voters to yes/no (true/false) votes and returns
* syncRaftVoteResult takes a mapping of voters to yes/no (true/false) votes and returns
* a result indicating whether the vote is pending, lost, or won. A joint quorum
* a result indicating whether the vote is pending, lost, or won. A joint quorum
* requires both majority quorums to vote in favor.
* requires both majority quorums to vote in favor.
**/
**/
SyncRaftVoteResult
syncRaftVoteResult
(
SSyncRaftQuorumJointConfig
*
config
,
const
SyncRaftVoteResult
*
votes
);
ESyncRaftVoteType
syncRaftVoteResult
(
SSyncRaftQuorumJointConfig
*
config
,
const
ESyncRaftVoteType
*
votes
);
bool
syncRaftIsInNodeMap
(
const
SSyncRaftNodeMap
*
nodeMap
,
SyncNodeId
nodeId
);
static
FORCE_INLINE
bool
syncRaftJointConfigInOutgoing
(
const
SSyncRaftQuorumJointConfig
*
config
,
SyncNodeId
id
)
{
return
syncRaftIsInNodeMap
(
&
config
->
outgoing
,
id
);
}
static
FORCE_INLINE
bool
syncRaftJointConfigInIncoming
(
const
SSyncRaftQuorumJointConfig
*
config
,
SyncNodeId
id
)
{
return
syncRaftIsInNodeMap
(
&
config
->
incoming
,
id
);
}
void
syncRaftJointConfigAddToIncoming
(
SSyncRaftQuorumJointConfig
*
config
,
SyncNodeId
id
);
void
syncRaftJointConfigRemoveFromIncoming
(
SSyncRaftQuorumJointConfig
*
config
,
SyncNodeId
id
);
static
FORCE_INLINE
const
SSyncRaftNodeMap
*
syncRaftJointConfigIncoming
(
const
SSyncRaftQuorumJointConfig
*
config
)
{
return
&
config
->
incoming
;
}
static
FORCE_INLINE
const
SSyncRaftNodeMap
*
syncRaftJointConfigOutgoing
(
const
SSyncRaftQuorumJointConfig
*
config
)
{
return
&
config
->
outgoing
;
}
static
FORCE_INLINE
void
syncRaftJointConfigClearOutgoing
(
SSyncRaftQuorumJointConfig
*
config
)
{
memset
(
&
config
->
outgoing
,
0
,
sizeof
(
SSyncCluster
));
}
#endif
/* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */
#endif
/* _TD_LIBS_SYNC_RAFT_QUORUM_JOINT_H */
source/libs/sync/inc/sync_raft_quorum_majority.h
浏览文件 @
c1c31645
...
@@ -18,6 +18,7 @@
...
@@ -18,6 +18,7 @@
#include "sync.h"
#include "sync.h"
#include "sync_type.h"
#include "sync_type.h"
#include "sync_raft_quorum.h"
/**
/**
* syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns
* syncRaftMajorityVoteResult takes a mapping of voters to yes/no (true/false) votes and returns
...
@@ -25,6 +26,6 @@
...
@@ -25,6 +26,6 @@
* yes/no has been reached), won (a quorum of yes has been reached), or lost (a
* yes/no has been reached), won (a quorum of yes has been reached), or lost (a
* quorum of no has been reached).
* quorum of no has been reached).
**/
**/
SyncRaftVoteResult
syncRaftMajorityVoteResult
(
SSyncCluster
*
config
,
const
SyncRaftVoteResult
*
votes
);
ESyncRaftVoteResult
syncRaftMajorityVoteResult
(
SSyncRaftNodeMap
*
config
,
const
ESyncRaftVoteType
*
votes
);
#endif
/* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */
#endif
/* _TD_LIBS_SYNC_RAFT_QUORUM_MAJORITY_H */
source/libs/sync/inc/sync_raft_restore.h
0 → 100644
浏览文件 @
c1c31645
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TD_SYNC_RAFT_RESTORE_H
#define TD_SYNC_RAFT_RESTORE_H
#include "sync_type.h"
#include "sync_raft_proto.h"
// syncRaftRestoreConfig takes a Changer (which must represent an empty configuration), and
// runs a sequence of changes enacting the configuration described in the
// ConfState.
//
// TODO(tbg) it's silly that this takes a Changer. Unravel this by making sure
// the Changer only needs a ProgressMap (not a whole Tracker) at which point
// this can just take LastIndex and MaxInflight directly instead and cook up
// the results from that alone.
int
syncRaftRestoreConfig
(
SSyncRaftChanger
*
changer
,
const
SSyncConfigState
*
cs
);
#endif
/* TD_SYNC_RAFT_RESTORE_H */
source/libs/sync/inc/sync_type.h
浏览文件 @
c1c31645
...
@@ -17,6 +17,7 @@
...
@@ -17,6 +17,7 @@
#define _TD_LIBS_SYNC_TYPE_H
#define _TD_LIBS_SYNC_TYPE_H
#include <stdint.h>
#include <stdint.h>
#include "sync.h"
#include "osMath.h"
#include "osMath.h"
#define SYNC_NON_NODE_ID -1
#define SYNC_NON_NODE_ID -1
...
@@ -28,10 +29,13 @@ typedef uint32_t SyncTick;
...
@@ -28,10 +29,13 @@ typedef uint32_t SyncTick;
typedef
struct
SSyncRaft
SSyncRaft
;
typedef
struct
SSyncRaft
SSyncRaft
;
typedef
struct
SSyncRaftProgress
SSyncRaftProgress
;
typedef
struct
SSyncRaftProgress
SSyncRaftProgress
;
typedef
struct
SSyncRaftProgressMap
SSyncRaftProgressMap
;
typedef
struct
SSyncRaftProgressTrackerConfig
SSyncRaftProgressTrackerConfig
;
typedef
struct
SSyncRaftProgressTrackerConfig
SSyncRaftProgressTrackerConfig
;
typedef
struct
SSyncRaftProgressTracker
SSyncRaftProgressTracker
;
typedef
struct
SSyncRaftProgressTracker
SSyncRaftProgressTracker
;
typedef
struct
SSyncRaftChanger
SSyncRaftChanger
;
typedef
struct
SSyncRaftLog
SSyncRaftLog
;
typedef
struct
SSyncRaftLog
SSyncRaftLog
;
typedef
struct
SSyncRaftEntry
SSyncRaftEntry
;
typedef
struct
SSyncRaftEntry
SSyncRaftEntry
;
...
@@ -46,11 +50,34 @@ typedef struct SSyncRaftEntry SSyncRaftEntry;
...
@@ -46,11 +50,34 @@ typedef struct SSyncRaftEntry SSyncRaftEntry;
#endif
#endif
#endif
#endif
typedef
struct
SSyncServerState
{
SyncNodeId
voteFor
;
SyncTerm
term
;
SyncIndex
commitIndex
;
}
SSyncServerState
;
typedef
struct
SSyncClusterConfig
{
// Log index number of current cluster config.
SyncIndex
index
;
// Log index number of previous cluster config.
SyncIndex
prevIndex
;
// current cluster
const
SSyncCluster
*
cluster
;
}
SSyncClusterConfig
;
typedef
struct
{
int32_t
replica
;
SyncNodeId
nodeId
[
TSDB_MAX_REPLICA
];
}
SSyncRaftNodeMap
;
typedef
enum
{
typedef
enum
{
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
=
0
,
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
=
0
,
SYNC_RAFT_CAMPAIGN_ELECTION
=
1
,
SYNC_RAFT_CAMPAIGN_ELECTION
=
1
,
SYNC_RAFT_CAMPAIGN_TRANSFER
=
2
,
SYNC_RAFT_CAMPAIGN_TRANSFER
=
2
,
}
SyncRaftElectionType
;
}
E
SyncRaftElectionType
;
typedef
enum
{
typedef
enum
{
// the init vote resp status
// the init vote resp status
...
@@ -59,8 +86,8 @@ typedef enum {
...
@@ -59,8 +86,8 @@ typedef enum {
// grant the vote request
// grant the vote request
SYNC_RAFT_VOTE_RESP_GRANT
=
1
,
SYNC_RAFT_VOTE_RESP_GRANT
=
1
,
//reject the vote request
//
reject the vote request
SYNC_RAFT_VOTE_RESP_REJECT
=
2
,
SYNC_RAFT_VOTE_RESP_REJECT
=
2
,
}
SyncRaftVoteResult
;
}
ESyncRaftVoteType
;
#endif
/* _TD_LIBS_SYNC_TYPE_H */
#endif
/* _TD_LIBS_SYNC_TYPE_H */
source/libs/sync/src/raft.c
浏览文件 @
c1c31645
...
@@ -16,12 +16,22 @@
...
@@ -16,12 +16,22 @@
#include "raft.h"
#include "raft.h"
#include "raft_configuration.h"
#include "raft_configuration.h"
#include "raft_log.h"
#include "raft_log.h"
#include "sync_raft_restore.h"
#include "raft_replication.h"
#include "raft_replication.h"
#include "sync_raft_config_change.h"
#include "sync_raft_progress_tracker.h"
#include "sync_raft_progress_tracker.h"
#include "syncInt.h"
#include "syncInt.h"
#define RAFT_READ_LOG_MAX_NUM 100
#define RAFT_READ_LOG_MAX_NUM 100
static
int
deserializeServerStateFromBuffer
(
SSyncServerState
*
server
,
const
char
*
buffer
,
int
n
);
static
int
deserializeClusterStateFromBuffer
(
SSyncConfigState
*
cluster
,
const
char
*
buffer
,
int
n
);
static
void
switchToConfig
(
SSyncRaft
*
pRaft
,
const
SSyncRaftProgressTrackerConfig
*
config
,
const
SSyncRaftProgressMap
*
progressMap
,
SSyncConfigState
*
cs
);
static
void
abortLeaderTransfer
(
SSyncRaft
*
pRaft
);
static
bool
preHandleMessage
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
);
static
bool
preHandleMessage
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
);
static
bool
preHandleNewTermMessage
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
);
static
bool
preHandleNewTermMessage
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
);
static
bool
preHandleOldTermMessage
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
);
static
bool
preHandleOldTermMessage
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
);
...
@@ -29,13 +39,16 @@ static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
...
@@ -29,13 +39,16 @@ static bool preHandleOldTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg);
int32_t
syncRaftStart
(
SSyncRaft
*
pRaft
,
const
SSyncInfo
*
pInfo
)
{
int32_t
syncRaftStart
(
SSyncRaft
*
pRaft
,
const
SSyncInfo
*
pInfo
)
{
SSyncNode
*
pNode
=
pRaft
->
pNode
;
SSyncNode
*
pNode
=
pRaft
->
pNode
;
SSyncServerState
serverState
;
SSyncServerState
serverState
;
SSyncConfigState
confState
;
SStateManager
*
stateManager
;
SStateManager
*
stateManager
;
SSyncLogStore
*
logStore
;
SSyncLogStore
*
logStore
;
SSyncFSM
*
fsm
;
SSyncFSM
*
fsm
;
SyncIndex
initIndex
=
pInfo
->
snapshotIndex
;
SSyncBuffer
buffer
[
RAFT_READ_LOG_MAX_NUM
];
SSyncBuffer
buffer
[
RAFT_READ_LOG_MAX_NUM
];
int
nBuf
,
limit
,
i
;
int
nBuf
,
limit
,
i
;
char
*
buf
;
int
n
;
SSyncRaftChanger
changer
;
memset
(
pRaft
,
0
,
sizeof
(
SSyncRaft
));
memset
(
pRaft
,
0
,
sizeof
(
SSyncRaft
));
memcpy
(
&
pRaft
->
fsm
,
&
pInfo
->
fsm
,
sizeof
(
SSyncFSM
));
memcpy
(
&
pRaft
->
fsm
,
&
pInfo
->
fsm
,
sizeof
(
SSyncFSM
));
...
@@ -57,35 +70,46 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
...
@@ -57,35 +70,46 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
return
-
1
;
return
-
1
;
}
}
// read server state
// read server state
if
(
stateManager
->
readServerState
(
stateManager
,
&
serverState
)
!=
0
)
{
if
(
stateManager
->
readServerState
(
stateManager
,
&
buf
,
&
n
)
!=
0
)
{
syncError
(
"readServerState for vgid %d fail"
,
pInfo
->
vgId
);
syncError
(
"readServerState for vgid %d fail"
,
pInfo
->
vgId
);
return
-
1
;
return
-
1
;
}
}
assert
(
initIndex
<=
serverState
.
commitIndex
);
if
(
deserializeServerStateFromBuffer
(
&
serverState
,
buf
,
n
)
!=
0
)
{
syncError
(
"deserializeServerStateFromBuffer for vgid %d fail"
,
pInfo
->
vgId
);
return
-
1
;
}
free
(
buf
);
//assert(initIndex <= serverState.commitIndex);
// restore fsm state from snapshot index + 1 until commitIndex
// read config state
++
initIndex
;
if
(
stateManager
->
readClusterState
(
stateManager
,
&
buf
,
&
n
)
!=
0
)
{
while
(
initIndex
<=
serverState
.
commitIndex
)
{
syncError
(
"readClusterState for vgid %d fail"
,
pInfo
->
vgId
);
limit
=
MIN
(
RAFT_READ_LOG_MAX_NUM
,
serverState
.
commitIndex
-
initIndex
+
1
);
return
-
1
;
}
if
(
deserializeClusterStateFromBuffer
(
&
confState
,
buf
,
n
)
!=
0
)
{
syncError
(
"deserializeClusterStateFromBuffer for vgid %d fail"
,
pInfo
->
vgId
);
return
-
1
;
}
free
(
buf
);
if
(
logStore
->
logRead
(
logStore
,
initIndex
,
limit
,
buffer
,
&
nBuf
)
!=
0
)
{
changer
=
(
SSyncRaftChanger
)
{
return
-
1
;
.
tracker
=
pRaft
->
tracker
,
}
.
lastIndex
=
syncRaftLogLastIndex
(
pRaft
->
log
),
assert
(
limit
==
nBuf
);
};
if
(
syncRaftRestoreConfig
(
&
changer
,
&
confState
)
<
0
)
{
for
(
i
=
0
;
i
<
limit
;
++
i
)
{
syncError
(
"syncRaftRestoreConfig for vgid %d fail"
,
pInfo
->
vgId
);
fsm
->
applyLog
(
fsm
,
initIndex
+
i
,
&
(
buffer
[
i
]),
NULL
);
return
-
1
;
free
(
buffer
[
i
].
data
);
}
initIndex
+=
nBuf
;
}
}
assert
(
initIndex
==
serverState
.
commitIndex
);
//pRaft->heartbeatTimeoutTick = 1;
if
(
!
syncRaftIsEmptyServerState
(
&
serverState
))
{
syncRaftLoadState
(
pRaft
,
&
serverState
);
}
syncRaftBecomeFollower
(
pRaft
,
pRaft
->
term
,
SYNC_NON_NODE_ID
);
if
(
pInfo
->
appliedIndex
>
0
)
{
syncRaftLogAppliedTo
(
pRaft
->
log
,
pInfo
->
appliedIndex
);
}
pRaft
->
selfIndex
=
pRaft
->
cluster
.
selfIndex
;
syncRaftBecomeFollower
(
pRaft
,
pRaft
->
term
,
SYNC_NON_NODE_ID
)
;
syncInfo
(
"[%d:%d] restore vgid %d state: snapshot index success"
,
syncInfo
(
"[%d:%d] restore vgid %d state: snapshot index success"
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
pInfo
->
vgId
);
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
pInfo
->
vgId
);
...
@@ -101,7 +125,7 @@ int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
...
@@ -101,7 +125,7 @@ int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
return
0
;
return
0
;
}
}
RaftMessageType
msgType
=
pMsg
->
msgType
;
ESync
RaftMessageType
msgType
=
pMsg
->
msgType
;
if
(
msgType
==
RAFT_MSG_INTERNAL_ELECTION
)
{
if
(
msgType
==
RAFT_MSG_INTERNAL_ELECTION
)
{
syncRaftHandleElectionMessage
(
pRaft
,
pMsg
);
syncRaftHandleElectionMessage
(
pRaft
,
pMsg
);
}
else
if
(
msgType
==
RAFT_MSG_VOTE
)
{
}
else
if
(
msgType
==
RAFT_MSG_VOTE
)
{
...
@@ -119,6 +143,85 @@ int32_t syncRaftTick(SSyncRaft* pRaft) {
...
@@ -119,6 +143,85 @@ int32_t syncRaftTick(SSyncRaft* pRaft) {
return
0
;
return
0
;
}
}
static
int
deserializeServerStateFromBuffer
(
SSyncServerState
*
server
,
const
char
*
buffer
,
int
n
)
{
return
0
;
}
static
int
deserializeClusterStateFromBuffer
(
SSyncConfigState
*
cluster
,
const
char
*
buffer
,
int
n
)
{
return
0
;
}
static
void
visitProgressMaybeSendAppend
(
int
i
,
SSyncRaftProgress
*
progress
,
void
*
arg
)
{
syncRaftReplicate
(
arg
,
progress
,
false
);
}
// switchToConfig reconfigures this node to use the provided configuration. It
// updates the in-memory state and, when necessary, carries out additional
// actions such as reacting to the removal of nodes or changed quorum
// requirements.
//
// The inputs usually result from restoring a ConfState or applying a ConfChange.
static
void
switchToConfig
(
SSyncRaft
*
pRaft
,
const
SSyncRaftProgressTrackerConfig
*
config
,
const
SSyncRaftProgressMap
*
progressMap
,
SSyncConfigState
*
cs
)
{
SyncNodeId
selfId
=
pRaft
->
selfId
;
int
i
;
bool
exist
;
SSyncRaftProgress
*
progress
=
NULL
;
syncRaftConfigState
(
pRaft
->
tracker
,
cs
);
i
=
syncRaftFindProgressIndexByNodeId
(
&
pRaft
->
tracker
->
progressMap
,
selfId
);
exist
=
(
i
!=
-
1
);
// Update whether the node itself is a learner, resetting to false when the
// node is removed.
if
(
exist
)
{
progress
=
&
pRaft
->
tracker
->
progressMap
.
progress
[
i
];
pRaft
->
isLearner
=
progress
->
isLearner
;
}
else
{
pRaft
->
isLearner
=
false
;
}
if
((
!
exist
||
pRaft
->
isLearner
)
&&
pRaft
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
// This node is leader and was removed or demoted. We prevent demotions
// at the time writing but hypothetically we handle them the same way as
// removing the leader: stepping down into the next Term.
//
// TODO(tbg): step down (for sanity) and ask follower with largest Match
// to TimeoutNow (to avoid interruption). This might still drop some
// proposals but it's better than nothing.
//
// TODO(tbg): test this branch. It is untested at the time of writing.
return
;
}
// The remaining steps only make sense if this node is the leader and there
// are other nodes.
if
(
pRaft
->
state
!=
TAOS_SYNC_STATE_LEADER
||
cs
->
voters
.
replica
==
0
)
{
return
;
}
if
(
syncRaftMaybeCommit
(
pRaft
))
{
// If the configuration change means that more entries are committed now,
// broadcast/append to everyone in the updated config.
syncRaftBroadcastAppend
(
pRaft
);
}
else
{
// Otherwise, still probe the newly added replicas; there's no reason to
// let them wait out a heartbeat interval (or the next incoming
// proposal).
syncRaftProgressVisit
(
pRaft
->
tracker
,
visitProgressMaybeSendAppend
,
pRaft
);
// If the the leadTransferee was removed or demoted, abort the leadership transfer.
SyncNodeId
leadTransferee
=
pRaft
->
leadTransferee
;
if
(
leadTransferee
!=
SYNC_NON_NODE_ID
&&
!
syncRaftIsInNodeMap
(
&
pRaft
->
tracker
->
config
.
voters
,
leadTransferee
))
{
abortLeaderTransfer
(
pRaft
);
}
}
}
static
void
abortLeaderTransfer
(
SSyncRaft
*
pRaft
)
{
pRaft
->
leadTransferee
=
SYNC_NON_NODE_ID
;
}
/**
/**
* pre-handle message, return true means no need to continue
* pre-handle message, return true means no need to continue
* Handle the message term, which may result in our stepping down to a follower.
* Handle the message term, which may result in our stepping down to a follower.
...
@@ -140,7 +243,7 @@ static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
...
@@ -140,7 +243,7 @@ static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
static
bool
preHandleNewTermMessage
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
)
{
static
bool
preHandleNewTermMessage
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
)
{
SyncNodeId
leaderId
=
pMsg
->
from
;
SyncNodeId
leaderId
=
pMsg
->
from
;
RaftMessageType
msgType
=
pMsg
->
msgType
;
ESync
RaftMessageType
msgType
=
pMsg
->
msgType
;
if
(
msgType
==
RAFT_MSG_VOTE
)
{
if
(
msgType
==
RAFT_MSG_VOTE
)
{
// TODO
// TODO
...
...
source/libs/sync/src/raft_election.c
浏览文件 @
c1c31645
...
@@ -18,10 +18,10 @@
...
@@ -18,10 +18,10 @@
#include "raft_log.h"
#include "raft_log.h"
#include "raft_message.h"
#include "raft_message.h"
void
syncRaftStartElection
(
SSyncRaft
*
pRaft
,
SyncRaftElectionType
cType
)
{
void
syncRaftStartElection
(
SSyncRaft
*
pRaft
,
E
SyncRaftElectionType
cType
)
{
SyncTerm
term
;
SyncTerm
term
;
bool
preVote
;
bool
preVote
;
RaftMessageType
voteMsgType
;
ESync
RaftMessageType
voteMsgType
;
if
(
syncRaftIsPromotable
(
pRaft
))
{
if
(
syncRaftIsPromotable
(
pRaft
))
{
syncDebug
(
"[%d:%d] is unpromotable; campaign() should have been called"
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
);
syncDebug
(
"[%d:%d] is unpromotable; campaign() should have been called"
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
);
...
@@ -41,7 +41,7 @@ void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType) {
...
@@ -41,7 +41,7 @@ void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType) {
}
}
int
quorum
=
syncRaftQuorum
(
pRaft
);
int
quorum
=
syncRaftQuorum
(
pRaft
);
S
SyncRaftVoteResult
result
=
syncRaftPollVote
(
pRaft
,
pRaft
->
selfId
,
preVote
,
true
,
NULL
,
NULL
);
E
SyncRaftVoteResult
result
=
syncRaftPollVote
(
pRaft
,
pRaft
->
selfId
,
preVote
,
true
,
NULL
,
NULL
);
if
(
result
==
SYNC_RAFT_VOTE_WON
)
{
if
(
result
==
SYNC_RAFT_VOTE_WON
)
{
/**
/**
* We won the election after voting for ourselves (which must mean that
* We won the election after voting for ourselves (which must mean that
...
...
source/libs/sync/src/raft_handle_append_entries_message.c
浏览文件 @
c1c31645
...
@@ -33,7 +33,7 @@ int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMs
...
@@ -33,7 +33,7 @@ int syncRaftHandleAppendEntriesMessage(SSyncRaft* pRaft, const SSyncMessage* pMs
return
0
;
return
0
;
}
}
RaftMsg_Append_
Entries
*
appendResp
=
&
(
pRespMsg
->
appendResp
);
RaftMsg_Append_
Resp
*
appendResp
=
&
(
pRespMsg
->
appendResp
);
// ignore committed logs
// ignore committed logs
if
(
syncRaftLogIsCommitted
(
pRaft
->
log
,
appendEntries
->
index
))
{
if
(
syncRaftLogIsCommitted
(
pRaft
->
log
,
appendEntries
->
index
))
{
appendResp
->
index
=
pRaft
->
log
->
commitIndex
;
appendResp
->
index
=
pRaft
->
log
->
commitIndex
;
...
...
source/libs/sync/src/raft_handle_vote_resp_message.c
浏览文件 @
c1c31645
...
@@ -36,7 +36,7 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
...
@@ -36,7 +36,7 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
return
0
;
return
0
;
}
}
S
SyncRaftVoteResult
result
=
syncRaftPollVote
(
pRaft
,
pMsg
->
from
,
E
SyncRaftVoteResult
result
=
syncRaftPollVote
(
pRaft
,
pMsg
->
from
,
pMsg
->
voteResp
.
cType
==
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
,
pMsg
->
voteResp
.
cType
==
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
,
!
pMsg
->
voteResp
.
rejected
,
&
rejected
,
&
granted
);
!
pMsg
->
voteResp
.
rejected
,
&
rejected
,
&
granted
);
...
...
source/libs/sync/src/raft_log.c
浏览文件 @
c1c31645
...
@@ -31,6 +31,10 @@ SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog) {
...
@@ -31,6 +31,10 @@ SyncTerm syncRaftLogLastTerm(SSyncRaftLog* pLog) {
return
0
;
return
0
;
}
}
void
syncRaftLogAppliedTo
(
SSyncRaftLog
*
pLog
,
SyncIndex
appliedIndex
)
{
}
bool
syncRaftLogIsUptodate
(
SSyncRaftLog
*
pLog
,
SyncIndex
index
,
SyncTerm
term
)
{
bool
syncRaftLogIsUptodate
(
SSyncRaftLog
*
pLog
,
SyncIndex
index
,
SyncTerm
term
)
{
return
true
;
return
true
;
}
}
...
...
source/libs/sync/src/raft_replication.c
浏览文件 @
c1c31645
...
@@ -16,106 +16,62 @@
...
@@ -16,106 +16,62 @@
#include "raft.h"
#include "raft.h"
#include "raft_log.h"
#include "raft_log.h"
#include "sync_raft_progress.h"
#include "sync_raft_progress.h"
#include "syncInt.h"
#include "raft_replication.h"
#include "raft_replication.h"
static
int
sendSnapshot
(
SSyncRaft
*
pRaft
,
int
i
);
static
bool
sendSnapshot
(
SSyncRaft
*
pRaft
,
SSyncRaftProgress
*
progress
);
static
int
sendAppendEntries
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
index
,
SyncTerm
term
);
static
bool
sendAppendEntries
(
SSyncRaft
*
pRaft
,
SSyncRaftProgress
*
progress
,
SyncIndex
prevIndex
,
SyncTerm
prevTerm
,
int
syncRaftReplicate
(
SSyncRaft
*
pRaft
,
int
i
)
{
const
SSyncRaftEntry
*
entries
,
int
nEntry
);
#if 0
// syncRaftReplicate sends an append RPC with new entries to the given peer,
// if necessary. Returns true if a message was sent. The sendIfEmpty
// argument controls whether messages with no entries will be sent
// ("empty" messages are useful to convey updated Commit indexes, but
// are undesirable when we're sending multiple messages in a batch).
bool
syncRaftReplicate
(
SSyncRaft
*
pRaft
,
SSyncRaftProgress
*
progress
,
bool
sendIfEmpty
)
{
assert
(
pRaft
->
state
==
TAOS_SYNC_STATE_LEADER
);
assert
(
pRaft
->
state
==
TAOS_SYNC_STATE_LEADER
);
assert(i >= 0 && i < pRaft->leaderState.nProgress)
;
SyncNodeId
nodeId
=
progress
->
id
;
SyncNodeId nodeId = pRaft->cluster.nodeInfo[i].nodeId;
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
if
(
syncRaftProgressIsPaused
(
progress
))
{
if
(
syncRaftProgressIsPaused
(
progress
))
{
syncInfo("node
%d paused"
, nodeId);
syncInfo
(
"node
[%d:%d] paused"
,
pRaft
->
selfGroupId
,
nodeId
);
return
0
;
return
false
;
}
}
SyncIndex
nextIndex
=
syncRaftProgressNextIndex
(
progress
);
SyncIndex
nextIndex
=
syncRaftProgressNextIndex
(
progress
);
S
yncIndex snapshotIndex = syncRaftLogSnapshotIndex(pRaft->log)
;
S
SyncRaftEntry
*
entries
;
bool inSnapshot = syncRaftProgressInSnapshot(progress);
int
nEntry
;
SyncIndex
prevIndex
;
SyncIndex
prevIndex
;
SyncTerm
prevTerm
;
SyncTerm
prevTerm
;
/**
prevIndex
=
nextIndex
-
1
;
* From Section 3.5:
prevTerm
=
syncRaftLogTermOf
(
pRaft
->
log
,
prevIndex
);
*
int
ret
=
syncRaftLogAcquire
(
pRaft
->
log
,
nextIndex
,
pRaft
->
maxMsgSize
,
&
entries
,
&
nEntry
);
* When sending an AppendEntries RPC, the leader includes the index and
* term of the entry in its log that immediately precedes the new
* entries. If the follower does not find an entry in its log with the
* same index and term, then it refuses the new entries. The consistency
* check acts as an induction step: the initial empty state of the logs
* satisfies the Log Matching Property, and the consistency check
* preserves the Log Matching Property whenever logs are extended. As a
* result, whenever AppendEntries returns successfully, the leader knows
* that the follower's log is identical to its own log up through the new
* entries (Log Matching Property in Figure 3.2).
**/
if (nextIndex == 1) {
/**
* We're including the very first entry, so prevIndex and prevTerm are
* null. If the first entry is not available anymore, send the last
* snapshot if we're not already sending one.
**/
if (snapshotIndex > 0 && !inSnapshot) {
goto send_snapshot;
}
// otherwise send append entries from start
if
(
nEntry
==
0
&&
!
sendIfEmpty
)
{
prevIndex = 0;
return
false
;
prevTerm = 0;
} else {
/**
* Set prevIndex and prevTerm to the index and term of the entry at
* nextIndex - 1.
**/
prevIndex = nextIndex - 1;
prevTerm = syncRaftLogTermOf(pRaft->log, prevIndex);
/**
* If the entry is not anymore in our log, send the last snapshot if we're
* not doing so already.
**/
if (prevTerm == SYNC_NON_TERM && !inSnapshot) {
goto send_snapshot;
}
}
}
/* Send empty AppendEntries RPC when installing a snaphot */
if
(
ret
!=
0
||
prevTerm
==
SYNC_NON_TERM
)
{
if (inSnapshot) {
return
sendSnapshot
(
pRaft
,
progress
);
prevIndex = syncRaftLogLastIndex(pRaft->log);
prevTerm = syncRaftLogLastTerm(pRaft->log);
}
}
return sendAppendEntries(pRaft, i, prevIndex, prevTerm);
return
sendAppendEntries
(
pRaft
,
progress
,
prevIndex
,
prevTerm
,
entries
,
nEntry
);
send_snapshot:
if (syncRaftProgressRecentActive(progress)) {
/* Only send a snapshot when we have heard from the server */
return sendSnapshot(pRaft, i);
} else {
/* Send empty AppendEntries RPC when we haven't heard from the server */
prevIndex = syncRaftLogLastIndex(pRaft->log);
prevTerm = syncRaftLogLastTerm(pRaft->log);
return sendAppendEntries(pRaft, i, prevIndex, prevTerm);
}
#endif
return
0
;
}
}
static
int
sendSnapshot
(
SSyncRaft
*
pRaft
,
int
i
)
{
static
bool
sendSnapshot
(
SSyncRaft
*
pRaft
,
SSyncRaftProgress
*
progress
)
{
return
0
;
if
(
!
syncRaftProgressRecentActive
(
progress
))
{
return
false
;
}
return
true
;
}
}
static
int
sendAppendEntries
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
prevIndex
,
SyncTerm
prevTerm
)
{
static
bool
sendAppendEntries
(
SSyncRaft
*
pRaft
,
SSyncRaftProgress
*
progress
,
#if 0
SyncIndex
prevIndex
,
SyncTerm
prevTerm
,
SyncIndex nextIndex = prevIndex + 1;
const
SSyncRaftEntry
*
entries
,
int
nEntry
)
{
SSyncRaftEntry *entries;
SyncIndex
lastIndex
;
int nEntry;
SyncTerm
logTerm
=
prevTerm
;
SNodeInfo* pNode = &(pRaft->cluster.nodeInfo[i]);
SNodeInfo
*
pNode
=
&
(
pRaft
->
cluster
.
nodeInfo
[
progress
->
selfIndex
]);
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
syncRaftLogAcquire(pRaft->log, nextIndex, pRaft->maxMsgSize, &entries, &nEntry);
SSyncMessage
*
msg
=
syncNewAppendMsg
(
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
pRaft
->
term
,
SSyncMessage
*
msg
=
syncNewAppendMsg
(
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
pRaft
->
term
,
prevIndex
,
prevTerm
,
pRaft
->
log
->
commitIndex
,
prevIndex
,
prevTerm
,
pRaft
->
log
->
commitIndex
,
...
@@ -125,24 +81,27 @@ static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex prevIndex, SyncT
...
@@ -125,24 +81,27 @@ static int sendAppendEntries(SSyncRaft* pRaft, int i, SyncIndex prevIndex, SyncT
goto
err_release_log
;
goto
err_release_log
;
}
}
pRaft->io.send(msg, pNode);
if
(
nEntry
!=
0
)
{
switch
(
progress
->
state
)
{
if (syncRaftProgressInReplicate(progress)) {
// optimistically increase the next when in StateReplicate
SyncIndex lastIndex = nextIndex + nEntry;
case
PROGRESS_STATE_REPLICATE
:
syncRaftProgressOptimisticNextIndex(progress, lastIndex);
lastIndex
=
entries
[
nEntry
-
1
].
index
;
syncRaftInflightAdd(&progress->inflights, lastIndex);
syncRaftProgressOptimisticNextIndex
(
progress
,
lastIndex
);
} else if (syncRaftProgressInProbe(progress)) {
syncRaftInflightAdd
(
&
progress
->
inflights
,
lastIndex
);
syncRaftProgressPause(progress);
break
;
} else {
case
PROGRESS_STATE_PROBE
:
progress
->
probeSent
=
true
;
break
;
default:
syncFatal
(
"[%d:%d] is sending append in unhandled state %s"
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
syncRaftProgressStateString
(
progress
));
break
;
}
}
}
pRaft
->
io
.
send
(
msg
,
pNode
);
syncRaftProgressUpdateSendTick(progress, pRaft->currentTick);
return
true
;
return 0;
err_release_log:
err_release_log:
syncRaftLogRelease(pRaft->log, nextIndex, entries, nEntry);
syncRaftLogRelease
(
pRaft
->
log
,
prevIndex
+
1
,
entries
,
nEntry
);
#endif
return
false
;
return
0
;
}
}
\ No newline at end of file
source/libs/sync/src/sync_raft_config_change.c
0 → 100644
浏览文件 @
c1c31645
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "syncInt.h"
#include "sync_raft_config_change.h"
#include "sync_raft_progress.h"
#include "sync_raft_progress_tracker.h"
#include "sync_raft_quorum_joint.h"
static
int
checkAndCopy
(
SSyncRaftChanger
*
changer
,
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
);
static
int
checkAndReturn
(
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
);
static
int
checkInvariants
(
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
);
static
int
checkInvariants
(
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
);
static
bool
hasJointConfig
(
const
SSyncRaftProgressTrackerConfig
*
config
);
static
int
applyConfig
(
SSyncRaftChanger
*
changer
,
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
,
const
SSyncConfChangeSingleArray
*
css
);
static
int
symDiff
(
const
SSyncRaftNodeMap
*
l
,
const
SSyncRaftNodeMap
*
r
);
static
void
initProgress
(
SSyncRaftChanger
*
changer
,
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
,
SyncNodeId
id
,
bool
isLearner
);
static
void
nilAwareDelete
(
SSyncRaftNodeMap
*
nodeMap
,
SyncNodeId
id
);
static
void
nilAwareAdd
(
SSyncRaftNodeMap
*
nodeMap
,
SyncNodeId
id
);
static
void
makeVoter
(
SSyncRaftChanger
*
changer
,
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
,
SyncNodeId
id
);
static
void
makeLearner
(
SSyncRaftChanger
*
changer
,
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
,
SyncNodeId
id
);
static
void
removeNodeId
(
SSyncRaftChanger
*
changer
,
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
,
SyncNodeId
id
);
// syncRaftChangerSimpleConfig carries out a series of configuration changes that (in aggregate)
// mutates the incoming majority config Voters[0] by at most one. This method
// will return an error if that is not the case, if the resulting quorum is
// zero, or if the configuration is in a joint state (i.e. if there is an
// outgoing configuration).
int
syncRaftChangerSimpleConfig
(
SSyncRaftChanger
*
changer
,
const
SSyncConfChangeSingleArray
*
css
,
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
)
{
int
ret
;
ret
=
checkAndCopy
(
changer
,
config
,
progressMap
);
if
(
ret
!=
0
)
{
return
ret
;
}
if
(
hasJointConfig
(
config
))
{
syncError
(
"can't apply simple config change in joint config"
);
return
-
1
;
}
ret
=
applyConfig
(
changer
,
config
,
progressMap
,
css
);
if
(
ret
!=
0
)
{
return
ret
;
}
int
n
=
symDiff
(
syncRaftJointConfigIncoming
(
&
changer
->
tracker
->
config
.
voters
),
syncRaftJointConfigIncoming
(
&
config
->
voters
));
if
(
n
>
1
)
{
syncError
(
"more than one voter changed without entering joint config"
);
return
-
1
;
}
return
checkAndReturn
(
config
,
progressMap
);
}
// EnterJoint verifies that the outgoing (=right) majority config of the joint
// config is empty and initializes it with a copy of the incoming (=left)
// majority config. That is, it transitions from
//
// (1 2 3)&&()
// to
// (1 2 3)&&(1 2 3).
//
// The supplied changes are then applied to the incoming majority config,
// resulting in a joint configuration that in terms of the Raft thesis[1]
// (Section 4.3) corresponds to `C_{new,old}`.
//
// [1]: https://github.com/ongardie/dissertation/blob/master/online-trim.pdf
int
syncRaftChangerEnterJoint
(
SSyncRaftChanger
*
changer
,
bool
autoLeave
,
const
SSyncConfChangeSingleArray
*
css
,
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
)
{
int
ret
;
ret
=
checkAndCopy
(
changer
,
config
,
progressMap
);
if
(
ret
!=
0
)
{
return
ret
;
}
if
(
hasJointConfig
(
config
))
{
syncError
(
"config is already joint"
);
return
-
1
;
}
if
(
config
->
voters
.
incoming
.
replica
==
0
)
{
// We allow adding nodes to an empty config for convenience (testing and
// bootstrap), but you can't enter a joint state.
syncError
(
"can't make a zero-voter config joint"
);
return
-
1
;
}
// Clear the outgoing config.
syncRaftJointConfigClearOutgoing
(
&
config
->
voters
);
// Copy incoming to outgoing.
memcpy
(
&
config
->
voters
.
outgoing
,
&
config
->
voters
.
incoming
,
sizeof
(
SSyncCluster
));
ret
=
applyConfig
(
changer
,
config
,
progressMap
,
css
);
if
(
ret
!=
0
)
{
return
ret
;
}
config
->
autoLeave
=
autoLeave
;
return
checkAndReturn
(
config
,
progressMap
);
}
// checkAndCopy copies the tracker's config and progress map (deeply enough for
// the purposes of the Changer) and returns those copies. It returns an error
// if checkInvariants does.
static
int
checkAndCopy
(
SSyncRaftChanger
*
changer
,
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
)
{
syncRaftCloneTrackerConfig
(
&
changer
->
tracker
->
config
,
config
);
int
i
;
for
(
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
SSyncRaftProgress
*
progress
=
&
(
changer
->
tracker
->
progressMap
.
progress
[
i
]);
if
(
progress
->
id
==
SYNC_NON_NODE_ID
)
{
continue
;
}
syncRaftCopyProgress
(
progress
,
&
(
progressMap
->
progress
[
i
]));
}
return
checkAndReturn
(
config
,
progressMap
);
}
// checkAndReturn calls checkInvariants on the input and returns either the
// resulting error or the input.
static
int
checkAndReturn
(
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
)
{
if
(
checkInvariants
(
config
,
progressMap
)
!=
0
)
{
return
-
1
;
}
return
0
;
}
// checkInvariants makes sure that the config and progress are compatible with
// each other. This is used to check both what the Changer is initialized with,
// as well as what it returns.
static
int
checkInvariants
(
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
)
{
int
ret
=
syncRaftCheckProgress
(
config
,
progressMap
);
if
(
ret
!=
0
)
{
return
ret
;
}
int
i
;
// Any staged learner was staged because it could not be directly added due
// to a conflicting voter in the outgoing config.
for
(
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
(
!
syncRaftJointConfigInOutgoing
(
&
config
->
voters
,
config
->
learnersNext
.
nodeId
[
i
]))
{
return
-
1
;
}
if
(
progressMap
->
progress
[
i
].
id
!=
SYNC_NON_NODE_ID
&&
progressMap
->
progress
[
i
].
isLearner
)
{
syncError
(
"%d is in LearnersNext, but is already marked as learner"
,
progressMap
->
progress
[
i
].
id
);
return
-
1
;
}
}
// Conversely Learners and Voters doesn't intersect at all.
for
(
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
(
syncRaftJointConfigInIncoming
(
&
config
->
voters
,
config
->
learners
.
nodeId
[
i
]))
{
syncError
(
"%d is in Learners and voter.incoming"
,
progressMap
->
progress
[
i
].
id
);
return
-
1
;
}
if
(
progressMap
->
progress
[
i
].
id
!=
SYNC_NON_NODE_ID
&&
!
progressMap
->
progress
[
i
].
isLearner
)
{
syncError
(
"%d is in Learners, but is not marked as learner"
,
progressMap
->
progress
[
i
].
id
);
return
-
1
;
}
}
if
(
!
hasJointConfig
(
config
))
{
// We enforce that empty maps are nil instead of zero.
if
(
config
->
learnersNext
.
replica
>
0
)
{
syncError
(
"cfg.LearnersNext must be nil when not joint"
);
return
-
1
;
}
if
(
config
->
autoLeave
)
{
syncError
(
"AutoLeave must be false when not joint"
);
return
-
1
;
}
}
return
0
;
}
static
bool
hasJointConfig
(
const
SSyncRaftProgressTrackerConfig
*
config
)
{
return
config
->
voters
.
outgoing
.
replica
>
0
;
}
static
int
applyConfig
(
SSyncRaftChanger
*
changer
,
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
,
const
SSyncConfChangeSingleArray
*
css
)
{
int
i
;
for
(
i
=
0
;
i
<
css
->
n
;
++
i
)
{
const
SSyncConfChangeSingle
*
cs
=
&
(
css
->
changes
[
i
]);
if
(
cs
->
nodeId
==
SYNC_NON_NODE_ID
)
{
continue
;
}
ESyncRaftConfChangeType
type
=
cs
->
type
;
switch
(
type
)
{
case
SYNC_RAFT_Conf_AddNode
:
makeVoter
(
changer
,
config
,
progressMap
,
cs
->
nodeId
);
break
;
case
SYNC_RAFT_Conf_AddLearnerNode
:
makeLearner
(
changer
,
config
,
progressMap
,
cs
->
nodeId
);
break
;
case
SYNC_RAFT_Conf_RemoveNode
:
removeNodeId
(
changer
,
config
,
progressMap
,
cs
->
nodeId
);
break
;
case
SYNC_RAFT_Conf_UpdateNode
:
break
;
}
}
if
(
config
->
voters
.
incoming
.
replica
==
0
)
{
syncError
(
"removed all voters"
);
return
-
1
;
}
return
0
;
}
// symdiff returns the count of the symmetric difference between the sets of
// uint64s, i.e. len( (l - r) \union (r - l)).
static
int
symDiff
(
const
SSyncRaftNodeMap
*
l
,
const
SSyncRaftNodeMap
*
r
)
{
int
n
;
int
i
;
int
j0
,
j1
;
const
SSyncRaftNodeMap
*
pairs
[
2
][
2
]
=
{
{
l
,
r
},
// count elems in l but not in r
{
r
,
l
},
// count elems in r but not in l
};
for
(
n
=
0
,
i
=
0
;
i
<
2
;
++
i
)
{
const
SSyncRaftNodeMap
**
pp
=
pairs
[
i
];
const
SSyncRaftNodeMap
*
p0
=
pp
[
0
];
const
SSyncRaftNodeMap
*
p1
=
pp
[
1
];
for
(
j0
=
0
;
j0
<
TSDB_MAX_REPLICA
;
++
j0
)
{
SyncNodeId
id
=
p0
->
nodeId
[
j0
];
if
(
id
==
SYNC_NON_NODE_ID
)
{
continue
;
}
for
(
j1
=
0
;
j1
<
p1
->
replica
;
++
j1
)
{
if
(
p1
->
nodeId
[
j1
]
!=
SYNC_NON_NODE_ID
&&
p1
->
nodeId
[
j1
]
!=
id
)
{
n
+=
1
;
}
}
}
}
return
n
;
}
static
void
initProgress
(
SSyncRaftChanger
*
changer
,
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
,
SyncNodeId
id
,
bool
isLearner
)
{
}
// nilAwareDelete deletes from a map, nil'ing the map itself if it is empty after.
static
void
nilAwareDelete
(
SSyncRaftNodeMap
*
nodeMap
,
SyncNodeId
id
)
{
int
i
;
for
(
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
(
nodeMap
->
nodeId
[
i
]
==
id
)
{
nodeMap
->
replica
-=
1
;
nodeMap
->
nodeId
[
i
]
=
SYNC_NON_NODE_ID
;
break
;
}
}
assert
(
nodeMap
->
replica
>=
0
);
}
// nilAwareAdd populates a map entry, creating the map if necessary.
static
void
nilAwareAdd
(
SSyncRaftNodeMap
*
nodeMap
,
SyncNodeId
id
)
{
int
i
,
j
;
for
(
i
=
0
,
j
=
-
1
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
(
nodeMap
->
nodeId
[
i
]
==
id
)
{
return
;
}
if
(
j
==
-
1
&&
nodeMap
->
nodeId
[
i
]
==
SYNC_NON_NODE_ID
)
{
j
=
i
;
}
}
assert
(
j
!=
-
1
);
nodeMap
->
nodeId
[
j
]
=
id
;
nodeMap
->
replica
+=
1
;
}
// makeVoter adds or promotes the given ID to be a voter in the incoming
// majority config.
static
void
makeVoter
(
SSyncRaftChanger
*
changer
,
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
,
SyncNodeId
id
)
{
int
i
=
syncRaftFindProgressIndexByNodeId
(
progressMap
,
id
);
if
(
i
==
-
1
)
{
initProgress
(
changer
,
config
,
progressMap
,
id
,
false
);
i
=
syncRaftFindProgressIndexByNodeId
(
progressMap
,
id
);
}
assert
(
i
!=
-
1
);
SSyncRaftProgress
*
progress
=
&
(
progressMap
->
progress
[
i
]);
progress
->
isLearner
=
false
;
nilAwareDelete
(
&
config
->
learners
,
id
);
nilAwareDelete
(
&
config
->
learnersNext
,
id
);
syncRaftJointConfigAddToIncoming
(
&
config
->
voters
,
id
);
}
// makeLearner makes the given ID a learner or stages it to be a learner once
// an active joint configuration is exited.
//
// The former happens when the peer is not a part of the outgoing config, in
// which case we either add a new learner or demote a voter in the incoming
// config.
//
// The latter case occurs when the configuration is joint and the peer is a
// voter in the outgoing config. In that case, we do not want to add the peer
// as a learner because then we'd have to track a peer as a voter and learner
// simultaneously. Instead, we add the learner to LearnersNext, so that it will
// be added to Learners the moment the outgoing config is removed by
// LeaveJoint().
static
void
makeLearner
(
SSyncRaftChanger
*
changer
,
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
,
SyncNodeId
id
)
{
int
i
=
syncRaftFindProgressIndexByNodeId
(
progressMap
,
id
);
if
(
i
==
-
1
)
{
initProgress
(
changer
,
config
,
progressMap
,
id
,
false
);
i
=
syncRaftFindProgressIndexByNodeId
(
progressMap
,
id
);
}
assert
(
i
!=
-
1
);
SSyncRaftProgress
*
progress
=
&
(
progressMap
->
progress
[
i
]);
if
(
progress
->
isLearner
)
{
return
;
}
// Remove any existing voter in the incoming config...
removeNodeId
(
changer
,
config
,
progressMap
,
id
);
// ... but save the Progress.
syncRaftAddToProgressMap
(
progressMap
,
id
);
// Use LearnersNext if we can't add the learner to Learners directly, i.e.
// if the peer is still tracked as a voter in the outgoing config. It will
// be turned into a learner in LeaveJoint().
//
// Otherwise, add a regular learner right away.
bool
inOutgoing
=
syncRaftJointConfigInCluster
(
&
config
->
voters
.
outgoing
,
id
);
if
(
inOutgoing
)
{
nilAwareAdd
(
&
config
->
learnersNext
,
id
);
}
else
{
nilAwareAdd
(
&
config
->
learners
,
id
);
progress
->
isLearner
=
true
;
}
}
// removeNodeId this peer as a voter or learner from the incoming config.
static
void
removeNodeId
(
SSyncRaftChanger
*
changer
,
SSyncRaftProgressTrackerConfig
*
config
,
SSyncRaftProgressMap
*
progressMap
,
SyncNodeId
id
)
{
int
i
=
syncRaftFindProgressIndexByNodeId
(
progressMap
,
id
);
if
(
i
==
-
1
)
{
return
;
}
syncRaftJointConfigRemoveFromIncoming
(
&
config
->
voters
,
id
);
nilAwareDelete
(
&
config
->
learners
,
id
);
nilAwareDelete
(
&
config
->
learnersNext
,
id
);
// If the peer is still a voter in the outgoing config, keep the Progress.
bool
inOutgoing
=
syncRaftJointConfigInCluster
(
&
config
->
voters
.
outgoing
,
id
);
if
(
!
inOutgoing
)
{
syncRaftRemoveFromProgressMap
(
progressMap
,
id
);
}
}
\ No newline at end of file
source/libs/sync/src/sync_raft_impl.c
浏览文件 @
c1c31645
...
@@ -31,7 +31,6 @@ static void tickElection(SSyncRaft* pRaft);
...
@@ -31,7 +31,6 @@ static void tickElection(SSyncRaft* pRaft);
static
void
tickHeartbeat
(
SSyncRaft
*
pRaft
);
static
void
tickHeartbeat
(
SSyncRaft
*
pRaft
);
static
void
appendEntries
(
SSyncRaft
*
pRaft
,
SSyncRaftEntry
*
entries
,
int
n
);
static
void
appendEntries
(
SSyncRaft
*
pRaft
,
SSyncRaftEntry
*
entries
,
int
n
);
static
bool
maybeCommit
(
SSyncRaft
*
pRaft
);
static
void
abortLeaderTransfer
(
SSyncRaft
*
pRaft
);
static
void
abortLeaderTransfer
(
SSyncRaft
*
pRaft
);
...
@@ -127,7 +126,7 @@ int syncRaftQuorum(SSyncRaft* pRaft) {
...
@@ -127,7 +126,7 @@ int syncRaftQuorum(SSyncRaft* pRaft) {
return
pRaft
->
cluster
.
replica
/
2
+
1
;
return
pRaft
->
cluster
.
replica
/
2
+
1
;
}
}
S
SyncRaftVoteResult
syncRaftPollVote
(
SSyncRaft
*
pRaft
,
SyncNodeId
id
,
E
SyncRaftVoteResult
syncRaftPollVote
(
SSyncRaft
*
pRaft
,
SyncNodeId
id
,
bool
preVote
,
bool
grant
,
bool
preVote
,
bool
grant
,
int
*
rejected
,
int
*
granted
)
{
int
*
rejected
,
int
*
granted
)
{
int
voterIndex
=
syncRaftConfigurationIndexOfNode
(
pRaft
,
id
);
int
voterIndex
=
syncRaftConfigurationIndexOfNode
(
pRaft
,
id
);
...
@@ -171,6 +170,34 @@ SSyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id,
...
@@ -171,6 +170,34 @@ SSyncRaftVoteResult syncRaftPollVote(SSyncRaft* pRaft, SyncNodeId id,
return granted;
return granted;
*/
*/
void
syncRaftLoadState
(
SSyncRaft
*
pRaft
,
const
SSyncServerState
*
serverState
)
{
SyncIndex
commitIndex
=
serverState
->
commitIndex
;
SyncIndex
lastIndex
=
syncRaftLogLastIndex
(
pRaft
->
log
);
if
(
commitIndex
<
pRaft
->
log
->
commitIndex
||
commitIndex
>
lastIndex
)
{
syncFatal
(
"[%d:%d] state.commit %"
PRId64
" is out of range [%"
PRId64
",%"
PRId64
""
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
commitIndex
,
pRaft
->
log
->
commitIndex
,
lastIndex
);
return
;
}
pRaft
->
log
->
commitIndex
=
commitIndex
;
pRaft
->
term
=
serverState
->
term
;
pRaft
->
voteFor
=
serverState
->
voteFor
;
}
static
void
visitProgressSendAppend
(
int
i
,
SSyncRaftProgress
*
progress
,
void
*
arg
)
{
SSyncRaft
*
pRaft
=
(
SSyncRaft
*
)
arg
;
if
(
pRaft
->
selfId
==
progress
->
id
)
{
return
;
}
syncRaftReplicate
(
arg
,
progress
,
true
);
}
void
syncRaftBroadcastAppend
(
SSyncRaft
*
pRaft
)
{
syncRaftProgressVisit
(
pRaft
->
tracker
,
visitProgressSendAppend
,
pRaft
);
}
static
int
convertClear
(
SSyncRaft
*
pRaft
)
{
static
int
convertClear
(
SSyncRaft
*
pRaft
)
{
}
}
...
@@ -186,7 +213,7 @@ static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
...
@@ -186,7 +213,7 @@ static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
* StateCandidate, we may get stale MsgPreVoteResp messages in this term from
* StateCandidate, we may get stale MsgPreVoteResp messages in this term from
* our pre-candidate state).
* our pre-candidate state).
**/
**/
RaftMessageType
msgType
=
pMsg
->
msgType
;
ESync
RaftMessageType
msgType
=
pMsg
->
msgType
;
if
(
msgType
==
RAFT_MSG_INTERNAL_PROP
)
{
if
(
msgType
==
RAFT_MSG_INTERNAL_PROP
)
{
return
0
;
return
0
;
...
@@ -243,18 +270,16 @@ static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) {
...
@@ -243,18 +270,16 @@ static void appendEntries(SSyncRaft* pRaft, SSyncRaftEntry* entries, int n) {
syncRaftLogAppend
(
pRaft
->
log
,
entries
,
n
);
syncRaftLogAppend
(
pRaft
->
log
,
entries
,
n
);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
tracker
->
progressMap
[
pRaft
->
cluster
.
selfIndex
]);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
tracker
->
progressMap
.
progress
[
pRaft
->
cluster
.
selfIndex
]);
syncRaftProgressMaybeUpdate
(
progress
,
lastIndex
);
syncRaftProgressMaybeUpdate
(
progress
,
lastIndex
);
// Regardless of
m
aybeCommit's return, our caller will call bcastAppend.
// Regardless of
syncRaftM
aybeCommit's return, our caller will call bcastAppend.
m
aybeCommit
(
pRaft
);
syncRaftM
aybeCommit
(
pRaft
);
}
}
/**
// syncRaftMaybeCommit attempts to advance the commit index. Returns true if
* maybeCommit attempts to advance the commit index. Returns true if
// the commit index changed (in which case the caller should call
* the commit index changed (in which case the caller should call
// r.bcastAppend).
* r.bcastAppend).
bool
syncRaftMaybeCommit
(
SSyncRaft
*
pRaft
)
{
**/
static
bool
maybeCommit
(
SSyncRaft
*
pRaft
)
{
return
true
;
return
true
;
}
}
...
@@ -263,6 +288,7 @@ static bool maybeCommit(SSyncRaft* pRaft) {
...
@@ -263,6 +288,7 @@ static bool maybeCommit(SSyncRaft* pRaft) {
* trigger I/O requests for newly appended log entries or heartbeats.
* trigger I/O requests for newly appended log entries or heartbeats.
**/
**/
static
int
triggerAll
(
SSyncRaft
*
pRaft
)
{
static
int
triggerAll
(
SSyncRaft
*
pRaft
)
{
#if 0
assert(pRaft->state == TAOS_SYNC_STATE_LEADER);
assert(pRaft->state == TAOS_SYNC_STATE_LEADER);
int i;
int i;
...
@@ -271,8 +297,10 @@ static int triggerAll(SSyncRaft* pRaft) {
...
@@ -271,8 +297,10 @@ static int triggerAll(SSyncRaft* pRaft) {
continue;
continue;
}
}
syncRaftReplicate
(
pRaft
,
i
);
syncRaftReplicate(pRaft,
pRaft->tracker->progressMap.progress[i], true
);
}
}
#endif
return
0
;
}
}
static
void
abortLeaderTransfer
(
SSyncRaft
*
pRaft
)
{
static
void
abortLeaderTransfer
(
SSyncRaft
*
pRaft
)
{
...
...
source/libs/sync/src/sync_raft_progress.c
浏览文件 @
c1c31645
...
@@ -20,13 +20,13 @@
...
@@ -20,13 +20,13 @@
#include "sync.h"
#include "sync.h"
#include "syncInt.h"
#include "syncInt.h"
static
void
resetProgressState
(
SSyncRaftProgress
*
progress
,
RaftProgressState
state
);
static
void
resetProgressState
(
SSyncRaftProgress
*
progress
,
ESync
RaftProgressState
state
);
static
void
probeAcked
(
SSyncRaftProgress
*
progress
);
static
void
probeAcked
(
SSyncRaftProgress
*
progress
);
static
void
resumeProgress
(
SSyncRaftProgress
*
progress
);
static
void
resumeProgress
(
SSyncRaftProgress
*
progress
);
void
syncRaftInitProgress
(
int
i
,
SSyncRaft
*
pRaft
,
SSyncRaftProgress
*
progress
)
{
void
syncRaftInitProgress
(
int
i
,
SSyncRaft
*
pRaft
,
SSyncRaftProgress
*
progress
)
{
SSyncRaftInflights
*
inflights
=
syncRaftOpenInflights
(
pRaft
->
tracker
->
maxInflight
);
SSyncRaftInflights
*
inflights
=
syncRaftOpenInflights
(
pRaft
->
tracker
->
maxInflight
Msgs
);
if
(
inflights
==
NULL
)
{
if
(
inflights
==
NULL
)
{
return
;
return
;
}
}
...
@@ -112,6 +112,44 @@ bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) {
...
@@ -112,6 +112,44 @@ bool syncRaftProgressIsPaused(SSyncRaftProgress* progress) {
}
}
}
}
int
syncRaftFindProgressIndexByNodeId
(
const
SSyncRaftProgressMap
*
progressMap
,
SyncNodeId
id
)
{
int
i
;
for
(
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
(
progressMap
->
progress
[
i
].
id
==
id
)
{
return
i
;
}
}
return
-
1
;
}
int
syncRaftAddToProgressMap
(
SSyncRaftProgressMap
*
progressMap
,
SyncNodeId
id
)
{
int
i
,
j
;
for
(
i
=
0
,
j
=
-
1
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
(
progressMap
->
progress
[
i
].
id
==
id
)
{
return
i
;
}
if
(
j
==
-
1
&&
progressMap
->
progress
[
i
].
id
==
SYNC_NON_NODE_ID
)
{
j
=
i
;
}
}
assert
(
j
!=
-
1
);
progressMap
->
progress
[
i
].
id
=
id
;
}
void
syncRaftRemoveFromProgressMap
(
SSyncRaftProgressMap
*
progressMap
,
SyncNodeId
id
)
{
int
i
;
for
(
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
(
progressMap
->
progress
[
i
].
id
==
id
)
{
progressMap
->
progress
[
i
].
id
=
SYNC_NON_NODE_ID
;
break
;
}
}
}
bool
syncRaftProgressIsUptodate
(
SSyncRaft
*
pRaft
,
SSyncRaftProgress
*
progress
)
{
bool
syncRaftProgressIsUptodate
(
SSyncRaft
*
pRaft
,
SSyncRaftProgress
*
progress
)
{
return
syncRaftLogLastIndex
(
pRaft
->
log
)
+
1
==
progress
->
nextIndex
;
return
syncRaftLogLastIndex
(
pRaft
->
log
)
+
1
==
progress
->
nextIndex
;
}
}
...
@@ -149,11 +187,15 @@ void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snaps
...
@@ -149,11 +187,15 @@ void syncRaftProgressBecomeSnapshot(SSyncRaftProgress* progress, SyncIndex snaps
progress
->
pendingSnapshotIndex
=
snapshotIndex
;
progress
->
pendingSnapshotIndex
=
snapshotIndex
;
}
}
void
syncRaftCopyProgress
(
const
SSyncRaftProgress
*
progress
,
SSyncRaftProgress
*
out
)
{
}
/**
/**
* ResetState moves the Progress into the specified State, resetting ProbeSent,
* ResetState moves the Progress into the specified State, resetting ProbeSent,
* PendingSnapshot, and Inflights.
* PendingSnapshot, and Inflights.
**/
**/
static
void
resetProgressState
(
SSyncRaftProgress
*
progress
,
RaftProgressState
state
)
{
static
void
resetProgressState
(
SSyncRaftProgress
*
progress
,
ESync
RaftProgressState
state
)
{
progress
->
probeSent
=
false
;
progress
->
probeSent
=
false
;
progress
->
pendingSnapshotIndex
=
0
;
progress
->
pendingSnapshotIndex
=
0
;
progress
->
state
=
state
;
progress
->
state
=
state
;
...
@@ -233,7 +275,7 @@ void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i) {
...
@@ -233,7 +275,7 @@ void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i) {
progress->state = PROGRESS_STATE_PROBE;
progress->state = PROGRESS_STATE_PROBE;
}
}
RaftProgressState syncRaftProgressState(SSyncRaft* pRaft, int i) {
ESync
RaftProgressState syncRaftProgressState(SSyncRaft* pRaft, int i) {
return pRaft->leaderState.progress[i].state;
return pRaft->leaderState.progress[i].state;
}
}
...
...
source/libs/sync/src/sync_raft_progress_tracker.c
浏览文件 @
c1c31645
...
@@ -14,6 +14,7 @@
...
@@ -14,6 +14,7 @@
*/
*/
#include "sync_raft_progress_tracker.h"
#include "sync_raft_progress_tracker.h"
#include "sync_raft_proto.h"
SSyncRaftProgressTracker
*
syncRaftOpenProgressTracker
()
{
SSyncRaftProgressTracker
*
syncRaftOpenProgressTracker
()
{
SSyncRaftProgressTracker
*
tracker
=
(
SSyncRaftProgressTracker
*
)
malloc
(
sizeof
(
SSyncRaftProgressTracker
));
SSyncRaftProgressTracker
*
tracker
=
(
SSyncRaftProgressTracker
*
)
malloc
(
sizeof
(
SSyncRaftProgressTracker
));
...
@@ -25,13 +26,13 @@ SSyncRaftProgressTracker* syncRaftOpenProgressTracker() {
...
@@ -25,13 +26,13 @@ SSyncRaftProgressTracker* syncRaftOpenProgressTracker() {
}
}
void
syncRaftResetVotes
(
SSyncRaftProgressTracker
*
tracker
)
{
void
syncRaftResetVotes
(
SSyncRaftProgressTracker
*
tracker
)
{
memset
(
tracker
->
votes
,
SYNC_RAFT_VOTE_RESP_UNKNOWN
,
sizeof
(
SyncRaftVoteResult
)
*
TSDB_MAX_REPLICA
);
memset
(
tracker
->
votes
,
SYNC_RAFT_VOTE_RESP_UNKNOWN
,
sizeof
(
ESyncRaftVoteType
)
*
TSDB_MAX_REPLICA
);
}
}
void
syncRaftProgressVisit
(
SSyncRaftProgressTracker
*
tracker
,
visitProgressFp
visit
,
void
*
arg
)
{
void
syncRaftProgressVisit
(
SSyncRaftProgressTracker
*
tracker
,
visitProgressFp
visit
,
void
*
arg
)
{
int
i
;
int
i
;
for
(
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
for
(
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
SSyncRaftProgress
*
progress
=
&
(
tracker
->
progressMap
[
i
]);
SSyncRaftProgress
*
progress
=
&
(
tracker
->
progressMap
.
progress
[
i
]);
visit
(
i
,
progress
,
arg
);
visit
(
i
,
progress
,
arg
);
}
}
}
}
...
@@ -44,17 +45,21 @@ void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant) {
...
@@ -44,17 +45,21 @@ void syncRaftRecordVote(SSyncRaftProgressTracker* tracker, int i, bool grant) {
tracker
->
votes
[
i
]
=
grant
?
SYNC_RAFT_VOTE_RESP_GRANT
:
SYNC_RAFT_VOTE_RESP_REJECT
;
tracker
->
votes
[
i
]
=
grant
?
SYNC_RAFT_VOTE_RESP_GRANT
:
SYNC_RAFT_VOTE_RESP_REJECT
;
}
}
void
syncRaftCloneTrackerConfig
(
const
SSyncRaftProgressTrackerConfig
*
from
,
SSyncRaftProgressTrackerConfig
*
to
)
{
}
/**
/**
* syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the
* syncRaftTallyVotes returns the number of granted and rejected Votes, and whether the
* election outcome is known.
* election outcome is known.
**/
**/
SyncRaftVoteResult
syncRaftTallyVotes
(
SSyncRaftProgressTracker
*
tracker
,
int
*
rejected
,
int
*
granted
)
{
E
SyncRaftVoteResult
syncRaftTallyVotes
(
SSyncRaftProgressTracker
*
tracker
,
int
*
rejected
,
int
*
granted
)
{
int
i
;
int
i
;
SSyncRaftProgress
*
progress
;
SSyncRaftProgress
*
progress
;
int
r
,
g
;
int
r
,
g
;
for
(
i
=
0
,
r
=
0
,
g
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
for
(
i
=
0
,
r
=
0
,
g
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
progress
=
&
(
tracker
->
progressMap
[
i
]);
progress
=
&
(
tracker
->
progressMap
.
progress
[
i
]);
if
(
progress
->
id
==
SYNC_NON_NODE_ID
)
{
if
(
progress
->
id
==
SYNC_NON_NODE_ID
)
{
continue
;
continue
;
}
}
...
@@ -73,4 +78,11 @@ SyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* re
...
@@ -73,4 +78,11 @@ SyncRaftVoteResult syncRaftTallyVotes(SSyncRaftProgressTracker* tracker, int* re
if
(
rejected
)
*
rejected
=
r
;
if
(
rejected
)
*
rejected
=
r
;
if
(
granted
)
*
granted
=
g
;
if
(
granted
)
*
granted
=
g
;
return
syncRaftVoteResult
(
&
(
tracker
->
config
.
voters
),
tracker
->
votes
);
return
syncRaftVoteResult
(
&
(
tracker
->
config
.
voters
),
tracker
->
votes
);
}
void
syncRaftConfigState
(
const
SSyncRaftProgressTracker
*
tracker
,
SSyncConfigState
*
cs
)
{
memcpy
(
&
cs
->
voters
,
&
tracker
->
config
.
voters
.
incoming
,
sizeof
(
SSyncRaftNodeMap
));
memcpy
(
&
cs
->
votersOutgoing
,
&
tracker
->
config
.
voters
.
outgoing
,
sizeof
(
SSyncRaftNodeMap
));
memcpy
(
&
cs
->
learners
,
&
tracker
->
config
.
learners
,
sizeof
(
SSyncRaftNodeMap
));
memcpy
(
&
cs
->
learnersNext
,
&
tracker
->
config
.
learnersNext
,
sizeof
(
SSyncRaftNodeMap
));
}
}
\ No newline at end of file
source/libs/sync/src/sync_raft_quorum_joint.c
浏览文件 @
c1c31645
...
@@ -22,9 +22,9 @@
...
@@ -22,9 +22,9 @@
* a result indicating whether the vote is pending, lost, or won. A joint quorum
* a result indicating whether the vote is pending, lost, or won. A joint quorum
* requires both majority quorums to vote in favor.
* requires both majority quorums to vote in favor.
**/
**/
SyncRaftVoteResult
syncRaftVoteResult
(
SSyncRaftQuorumJointConfig
*
config
,
const
SyncRaftVoteResult
*
votes
)
{
ESyncRaftVoteType
syncRaftVoteResult
(
SSyncRaftQuorumJointConfig
*
config
,
const
ESyncRaftVoteType
*
votes
)
{
SyncRaftVoteResult
r1
=
syncRaftMajorityVoteResult
(
&
(
config
->
majorityConfig
[
0
]
),
votes
);
ESyncRaftVoteResult
r1
=
syncRaftMajorityVoteResult
(
&
(
config
->
incoming
),
votes
);
SyncRaftVoteResult
r2
=
syncRaftMajorityVoteResult
(
&
(
config
->
majorityConfig
[
1
]
),
votes
);
ESyncRaftVoteResult
r2
=
syncRaftMajorityVoteResult
(
&
(
config
->
outgoing
),
votes
);
if
(
r1
==
r2
)
{
if
(
r1
==
r2
)
{
// If they agree, return the agreed state.
// If they agree, return the agreed state.
...
@@ -39,3 +39,47 @@ SyncRaftVoteResult syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const
...
@@ -39,3 +39,47 @@ SyncRaftVoteResult syncRaftVoteResult(SSyncRaftQuorumJointConfig* config, const
// One side won, the other one is pending, so the whole outcome is.
// One side won, the other one is pending, so the whole outcome is.
return
SYNC_RAFT_VOTE_PENDING
;
return
SYNC_RAFT_VOTE_PENDING
;
}
}
void
syncRaftJointConfigAddToIncoming
(
SSyncRaftQuorumJointConfig
*
config
,
SyncNodeId
id
)
{
int
i
,
min
;
for
(
i
=
0
,
min
=
-
1
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
(
config
->
incoming
.
nodeId
[
i
]
==
id
)
{
return
;
}
if
(
min
==
-
1
&&
config
->
incoming
.
nodeId
[
i
]
==
SYNC_NON_NODE_ID
)
{
min
=
i
;
}
}
assert
(
min
!=
-
1
);
config
->
incoming
.
nodeId
[
min
]
=
id
;
config
->
incoming
.
replica
+=
1
;
}
void
syncRaftJointConfigRemoveFromIncoming
(
SSyncRaftQuorumJointConfig
*
config
,
SyncNodeId
id
)
{
int
i
;
for
(
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
(
config
->
incoming
.
nodeId
[
i
]
==
id
)
{
config
->
incoming
.
replica
-=
1
;
config
->
incoming
.
nodeId
[
i
]
=
SYNC_NON_NODE_ID
;
break
;
}
}
assert
(
config
->
incoming
.
replica
>=
0
);
}
bool
syncRaftIsInNodeMap
(
const
SSyncRaftNodeMap
*
nodeMap
,
SyncNodeId
nodeId
)
{
int
i
;
for
(
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
(
nodeId
==
nodeMap
->
nodeId
[
i
])
{
return
true
;
}
}
return
false
;
}
\ No newline at end of file
source/libs/sync/src/sync_raft_quorum_majority.c
浏览文件 @
c1c31645
...
@@ -22,14 +22,14 @@
...
@@ -22,14 +22,14 @@
* yes/no has been reached), won (a quorum of yes has been reached), or lost (a
* yes/no has been reached), won (a quorum of yes has been reached), or lost (a
* quorum of no has been reached).
* quorum of no has been reached).
**/
**/
SyncRaftVoteResult
syncRaftMajorityVoteResult
(
SSyncCluster
*
config
,
const
SyncRaftVoteResult
*
votes
)
{
ESyncRaftVoteResult
syncRaftMajorityVoteResult
(
SSyncRaftNodeMap
*
config
,
const
ESyncRaftVoteType
*
votes
)
{
if
(
config
->
replica
==
0
)
{
if
(
config
->
replica
==
0
)
{
return
SYNC_RAFT_VOTE_WON
;
return
SYNC_RAFT_VOTE_WON
;
}
}
int
i
,
g
,
r
,
missing
;
int
i
,
g
,
r
,
missing
;
for
(
i
=
g
=
r
=
missing
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
for
(
i
=
g
=
r
=
missing
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
(
config
->
nodeI
nfo
[
i
].
nodeId
==
SYNC_NON_NODE_ID
)
{
if
(
config
->
nodeI
d
[
i
]
==
SYNC_NON_NODE_ID
)
{
continue
;
continue
;
}
}
...
...
source/libs/sync/src/sync_raft_restore.c
0 → 100644
浏览文件 @
c1c31645
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sync_raft_config_change.h"
#include "sync_raft_restore.h"
#include "sync_raft_progress_tracker.h"
static
int
toConfChangeSingle
(
const
SSyncConfigState
*
cs
,
SSyncConfChangeSingleArray
*
out
,
SSyncConfChangeSingleArray
*
in
);
// syncRaftRestoreConfig takes a Changer (which must represent an empty configuration), and
// runs a sequence of changes enacting the configuration described in the
// ConfState.
//
// TODO(tbg) it's silly that this takes a Changer. Unravel this by making sure
// the Changer only needs a ProgressMap (not a whole Tracker) at which point
// this can just take LastIndex and MaxInflight directly instead and cook up
// the results from that alone.
int
syncRaftRestoreConfig
(
SSyncRaftChanger
*
changer
,
const
SSyncConfigState
*
cs
)
{
SSyncConfChangeSingleArray
outgoing
;
SSyncConfChangeSingleArray
incoming
;
SSyncConfChangeSingleArray
css
;
SSyncRaftProgressTracker
*
tracker
=
changer
->
tracker
;
SSyncRaftProgressTrackerConfig
*
config
=
&
tracker
->
config
;
SSyncRaftProgressMap
*
progressMap
=
&
tracker
->
progressMap
;
int
i
,
ret
;
ret
=
toConfChangeSingle
(
cs
,
&
outgoing
,
&
incoming
);
if
(
ret
!=
0
)
{
goto
out
;
}
if
(
outgoing
.
n
==
0
)
{
// No outgoing config, so just apply the incoming changes one by one.
for
(
i
=
0
;
i
<
incoming
.
n
;
++
i
)
{
css
=
(
SSyncConfChangeSingleArray
)
{
.
n
=
1
,
.
changes
=
&
incoming
.
changes
[
i
],
};
ret
=
syncRaftChangerSimpleConfig
(
changer
,
&
css
,
config
,
progressMap
);
if
(
ret
!=
0
)
{
goto
out
;
}
}
}
else
{
// The ConfState describes a joint configuration.
//
// First, apply all of the changes of the outgoing config one by one, so
// that it temporarily becomes the incoming active config. For example,
// if the config is (1 2 3)&(2 3 4), this will establish (2 3 4)&().
for
(
i
=
0
;
i
<
outgoing
.
n
;
++
i
)
{
css
=
(
SSyncConfChangeSingleArray
)
{
.
n
=
1
,
.
changes
=
&
outgoing
.
changes
[
i
],
};
ret
=
syncRaftChangerSimpleConfig
(
changer
,
&
css
,
config
,
progressMap
);
if
(
ret
!=
0
)
{
goto
out
;
}
}
ret
=
syncRaftChangerEnterJoint
(
changer
,
cs
->
autoLeave
,
&
incoming
,
config
,
progressMap
);
if
(
ret
!=
0
)
{
goto
out
;
}
}
out:
if
(
incoming
.
n
!=
0
)
free
(
incoming
.
changes
);
if
(
outgoing
.
n
!=
0
)
free
(
outgoing
.
changes
);
return
ret
;
}
// toConfChangeSingle translates a conf state into 1) a slice of operations creating
// first the config that will become the outgoing one, and then the incoming one, and
// b) another slice that, when applied to the config resulted from 1), represents the
// ConfState.
static
int
toConfChangeSingle
(
const
SSyncConfigState
*
cs
,
SSyncConfChangeSingleArray
*
out
,
SSyncConfChangeSingleArray
*
in
)
{
int
i
;
out
->
n
=
in
->
n
=
0
;
out
->
n
=
cs
->
votersOutgoing
.
replica
;
out
->
changes
=
(
SSyncConfChangeSingle
*
)
malloc
(
sizeof
(
SSyncConfChangeSingle
)
*
out
->
n
);
if
(
out
->
changes
==
NULL
)
{
out
->
n
=
0
;
return
-
1
;
}
in
->
n
=
cs
->
votersOutgoing
.
replica
+
cs
->
voters
.
replica
+
cs
->
learners
.
replica
+
cs
->
learnersNext
.
replica
;
out
->
changes
=
(
SSyncConfChangeSingle
*
)
malloc
(
sizeof
(
SSyncConfChangeSingle
)
*
in
->
n
);
if
(
in
->
changes
==
NULL
)
{
in
->
n
=
0
;
return
-
1
;
}
// Example to follow along this code:
// voters=(1 2 3) learners=(5) outgoing=(1 2 4 6) learners_next=(4)
//
// This means that before entering the joint config, the configuration
// had voters (1 2 4 6) and perhaps some learners that are already gone.
// The new set of voters is (1 2 3), i.e. (1 2) were kept around, and (4 6)
// are no longer voters; however 4 is poised to become a learner upon leaving
// the joint state.
// We can't tell whether 5 was a learner before entering the joint config,
// but it doesn't matter (we'll pretend that it wasn't).
//
// The code below will construct
// outgoing = add 1; add 2; add 4; add 6
// incoming = remove 1; remove 2; remove 4; remove 6
// add 1; add 2; add 3;
// add-learner 5;
// add-learner 4;
//
// So, when starting with an empty config, after applying 'outgoing' we have
//
// quorum=(1 2 4 6)
//
// From which we enter a joint state via 'incoming'
//
// quorum=(1 2 3)&&(1 2 4 6) learners=(5) learners_next=(4)
//
// as desired.
for
(
i
=
0
;
i
<
cs
->
votersOutgoing
.
replica
;
++
i
)
{
// If there are outgoing voters, first add them one by one so that the
// (non-joint) config has them all.
out
->
changes
[
i
]
=
(
SSyncConfChangeSingle
)
{
.
type
=
SYNC_RAFT_Conf_AddNode
,
.
nodeId
=
cs
->
votersOutgoing
.
nodeId
[
i
],
};
}
// We're done constructing the outgoing slice, now on to the incoming one
// (which will apply on top of the config created by the outgoing slice).
// First, we'll remove all of the outgoing voters.
int
j
=
0
;
for
(
i
=
0
;
i
<
cs
->
votersOutgoing
.
replica
;
++
i
)
{
in
->
changes
[
j
]
=
(
SSyncConfChangeSingle
)
{
.
type
=
SYNC_RAFT_Conf_RemoveNode
,
.
nodeId
=
cs
->
votersOutgoing
.
nodeId
[
i
],
};
j
+=
1
;
}
// Then we'll add the incoming voters and learners.
for
(
i
=
0
;
i
<
cs
->
voters
.
replica
;
++
i
)
{
in
->
changes
[
j
]
=
(
SSyncConfChangeSingle
)
{
.
type
=
SYNC_RAFT_Conf_AddNode
,
.
nodeId
=
cs
->
voters
.
nodeId
[
i
],
};
j
+=
1
;
}
for
(
i
=
0
;
i
<
cs
->
learners
.
replica
;
++
i
)
{
in
->
changes
[
j
]
=
(
SSyncConfChangeSingle
)
{
.
type
=
SYNC_RAFT_Conf_AddLearnerNode
,
.
nodeId
=
cs
->
learners
.
nodeId
[
i
],
};
j
+=
1
;
}
// Same for LearnersNext; these are nodes we want to be learners but which
// are currently voters in the outgoing config.
for
(
i
=
0
;
i
<
cs
->
learnersNext
.
replica
;
++
i
)
{
in
->
changes
[
j
]
=
(
SSyncConfChangeSingle
)
{
.
type
=
SYNC_RAFT_Conf_AddLearnerNode
,
.
nodeId
=
cs
->
learnersNext
.
nodeId
[
i
],
};
j
+=
1
;
}
return
0
;
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录