Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
aee5ebd1
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
aee5ebd1
编写于
11月 05, 2021
作者:
L
lichuang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-10645][raft]<feature>add raft append message handle
上级
d39af04c
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
287 addition
and
50 deletion
+287
-50
source/libs/sync/inc/raft.h
source/libs/sync/inc/raft.h
+7
-5
source/libs/sync/inc/raft_log.h
source/libs/sync/inc/raft_log.h
+9
-0
source/libs/sync/inc/raft_message.h
source/libs/sync/inc/raft_message.h
+46
-6
source/libs/sync/inc/raft_progress.h
source/libs/sync/inc/raft_progress.h
+37
-6
source/libs/sync/inc/raft_replication.h
source/libs/sync/inc/raft_replication.h
+25
-0
source/libs/sync/inc/sync_type.h
source/libs/sync/inc/sync_type.h
+13
-2
source/libs/sync/src/raft.c
source/libs/sync/src/raft.c
+43
-10
source/libs/sync/src/raft_election.c
source/libs/sync/src/raft_election.c
+1
-1
source/libs/sync/src/raft_handle_vote_message.c
source/libs/sync/src/raft_handle_vote_message.c
+4
-3
source/libs/sync/src/raft_handle_vote_resp_message.c
source/libs/sync/src/raft_handle_vote_resp_message.c
+2
-2
source/libs/sync/src/raft_log.c
source/libs/sync/src/raft_log.c
+9
-0
source/libs/sync/src/raft_progress.c
source/libs/sync/src/raft_progress.c
+1
-15
source/libs/sync/src/raft_replication.c
source/libs/sync/src/raft_replication.c
+90
-0
未找到文件。
source/libs/sync/inc/raft.h
浏览文件 @
aee5ebd1
...
...
@@ -20,8 +20,6 @@
#include "sync_type.h"
#include "raft_message.h"
#define SYNC_NON_NODE_ID -1
typedef
struct
SSyncRaftProgress
SSyncRaftProgress
;
typedef
struct
RaftLeaderState
{
...
...
@@ -49,7 +47,8 @@ struct SSyncRaft {
// owner sync node
SSyncNode
*
pNode
;
//SSyncInfo info;
int
maxMsgSize
;
SSyncFSM
fsm
;
SSyncLogStore
logStore
;
SStateManager
stateManager
;
...
...
@@ -74,7 +73,7 @@ struct SSyncRaft {
/**
* New configuration is ignored if there exists unapplied configuration.
**/
bool
p
endingConf
;
bool
hasP
endingConf
;
SSyncCluster
cluster
;
...
...
@@ -94,6 +93,9 @@ struct SSyncRaft {
**/
uint16_t
heartbeatElapsed
;
// current tick count since start up
uint32_t
currentTick
;
// election timeout tick(random in [3:6] tick)
uint16_t
electionTimeoutTick
;
...
...
@@ -129,7 +131,7 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft);
void
syncRaftStartElection
(
SSyncRaft
*
pRaft
,
SyncRaftElectionType
cType
);
void
syncRaftTrigger
Replicate
(
SSyncRaft
*
pRaft
);
void
syncRaftTrigger
Heartbeat
(
SSyncRaft
*
pRaft
);
void
syncRaftRandomizedElectionTimeout
(
SSyncRaft
*
pRaft
);
bool
syncRaftIsPromotable
(
SSyncRaft
*
pRaft
);
...
...
source/libs/sync/inc/raft_log.h
浏览文件 @
aee5ebd1
...
...
@@ -19,6 +19,10 @@
#include "sync.h"
#include "sync_type.h"
struct
SSyncRaftEntry
{
};
struct
SSyncRaftLog
{
SyncIndex
uncommittedConfigIndex
;
...
...
@@ -41,4 +45,9 @@ int syncRaftLogNumOfPendingConf(SSyncRaftLog* pLog);
bool
syncRaftHasUnappliedLog
(
SSyncRaftLog
*
pLog
);
SyncTerm
syncRaftLogTermOf
(
SSyncRaftLog
*
pLog
,
SyncIndex
index
);
int
syncRaftLogAcquire
(
SSyncRaftLog
*
pLog
,
SyncIndex
index
,
int
maxMsgSize
,
SSyncRaftEntry
**
ppEntries
,
int
*
n
);
#endif
/* _TD_LIBS_SYNC_RAFT_LOG_H */
source/libs/sync/inc/raft_message.h
浏览文件 @
aee5ebd1
...
...
@@ -63,12 +63,28 @@ typedef struct RaftMsg_VoteResp {
SyncRaftElectionType
cType
;
}
RaftMsg_VoteResp
;
typedef
struct
RaftMsg_Append_Entries
{
// index of log entry preceeding new ones
SyncIndex
prevIndex
;
// term of entry at prevIndex
SyncTerm
prevTerm
;
// leader's commit index.
SyncIndex
commitIndex
;
// size of the log entries array
int
nEntries
;
// log entries array
SSyncRaftEntry
*
entries
;
}
RaftMsg_Append_Entries
;
typedef
struct
SSyncMessage
{
RaftMessageType
msgType
;
SyncTerm
term
;
SyncGroupId
groupId
;
SyncNodeId
from
;
SyncNodeId
to
;
union
{
RaftMsgInternal_Prop
propose
;
...
...
@@ -77,6 +93,8 @@ typedef struct SSyncMessage {
RaftMsg_Vote
vote
;
RaftMsg_VoteResp
voteResp
;
RaftMsg_Append_Entries
appendEntries
;
};
}
SSyncMessage
;
...
...
@@ -107,7 +125,7 @@ static FORCE_INLINE SSyncMessage* syncInitElectionMsg(SSyncMessage* pMsg, SyncNo
return
pMsg
;
}
static
FORCE_INLINE
SSyncMessage
*
syncNewVoteMsg
(
SyncGroupId
groupId
,
SyncNodeId
from
,
SyncNodeId
to
,
static
FORCE_INLINE
SSyncMessage
*
syncNewVoteMsg
(
SyncGroupId
groupId
,
SyncNodeId
from
,
SyncTerm
term
,
SyncRaftElectionType
cType
,
SyncIndex
lastIndex
,
SyncTerm
lastTerm
)
{
SSyncMessage
*
pMsg
=
(
SSyncMessage
*
)
malloc
(
sizeof
(
SSyncMessage
));
...
...
@@ -117,7 +135,6 @@ static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId
*
pMsg
=
(
SSyncMessage
)
{
.
groupId
=
groupId
,
.
from
=
from
,
.
to
=
to
,
.
term
=
term
,
.
msgType
=
RAFT_MSG_VOTE
,
.
vote
=
(
RaftMsg_Vote
)
{
...
...
@@ -130,7 +147,7 @@ static FORCE_INLINE SSyncMessage* syncNewVoteMsg(SyncGroupId groupId, SyncNodeId
return
pMsg
;
}
static
FORCE_INLINE
SSyncMessage
*
syncNewVoteRespMsg
(
SyncGroupId
groupId
,
SyncNodeId
from
,
SyncNodeId
to
,
static
FORCE_INLINE
SSyncMessage
*
syncNewVoteRespMsg
(
SyncGroupId
groupId
,
SyncNodeId
from
,
SyncRaftElectionType
cType
,
bool
rejected
)
{
SSyncMessage
*
pMsg
=
(
SSyncMessage
*
)
malloc
(
sizeof
(
SSyncMessage
));
if
(
pMsg
==
NULL
)
{
...
...
@@ -139,7 +156,6 @@ static FORCE_INLINE SSyncMessage* syncNewVoteRespMsg(SyncGroupId groupId, SyncNo
*
pMsg
=
(
SSyncMessage
)
{
.
groupId
=
groupId
,
.
from
=
from
,
.
to
=
to
,
.
msgType
=
RAFT_MSG_VOTE_RESP
,
.
voteResp
=
(
RaftMsg_VoteResp
)
{
.
cType
=
cType
,
...
...
@@ -150,12 +166,36 @@ static FORCE_INLINE SSyncMessage* syncNewVoteRespMsg(SyncGroupId groupId, SyncNo
return
pMsg
;
}
static
FORCE_INLINE
SSyncMessage
*
syncNewAppendMsg
(
SyncGroupId
groupId
,
SyncNodeId
from
,
SyncTerm
term
,
SyncIndex
prevIndex
,
SyncTerm
prevTerm
,
SyncIndex
commitIndex
,
int
nEntries
,
SSyncRaftEntry
*
entries
)
{
SSyncMessage
*
pMsg
=
(
SSyncMessage
*
)
malloc
(
sizeof
(
SSyncMessage
));
if
(
pMsg
==
NULL
)
{
return
NULL
;
}
*
pMsg
=
(
SSyncMessage
)
{
.
groupId
=
groupId
,
.
from
=
from
,
.
term
=
term
,
.
msgType
=
RAFT_MSG_APPEND
,
.
appendEntries
=
(
RaftMsg_Append_Entries
)
{
.
prevIndex
=
prevIndex
,
.
prevTerm
=
prevTerm
,
.
commitIndex
=
commitIndex
,
.
nEntries
=
nEntries
,
.
entries
=
entries
,
},
};
return
pMsg
;
}
static
FORCE_INLINE
bool
syncIsInternalMsg
(
RaftMessageType
msgType
)
{
return
msgType
==
RAFT_MSG_INTERNAL_PROP
||
msgType
==
RAFT_MSG_INTERNAL_ELECTION
;
}
static
FORCE_INLINE
bool
syncIsPreVoteRespMsg
(
SSyncMessage
*
pMsg
)
{
static
FORCE_INLINE
bool
syncIsPreVoteRespMsg
(
const
SSyncMessage
*
pMsg
)
{
return
pMsg
->
msgType
==
RAFT_MSG_VOTE_RESP
&&
pMsg
->
voteResp
.
cType
==
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
;
}
...
...
source/libs/sync/inc/raft_progress.h
浏览文件 @
aee5ebd1
...
...
@@ -85,6 +85,9 @@ struct SSyncRaftProgress {
**/
bool
paused
;
// last send append message tick
uint32_t
lastSendTick
;
/**
* pendingSnapshotIndex is used in PROGRESS_SNAPSHOT.
* If there is a pending snapshot, the pendingSnapshotIndex will be set to the
...
...
@@ -116,7 +119,9 @@ int syncRaftProgressCreate(SSyncRaft* pRaft);
**/
bool
syncRaftProgressMaybeUpdate
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
lastIndex
);
void
syncRaftProgressOptimisticNextIndex
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
nextIndex
);
static
FORCE_INLINE
void
syncRaftProgressOptimisticNextIndex
(
SSyncRaftProgress
*
progress
,
SyncIndex
nextIndex
)
{
progress
->
nextIndex
=
nextIndex
+
1
;
}
/**
* syncRaftProgressMaybeDecrTo returns false if the given to index comes from an out of order message.
...
...
@@ -131,7 +136,35 @@ bool syncRaftProgressMaybeDecrTo(SSyncRaft* pRaft, int i,
* MsgApps, is currently waiting for a snapshot, or has reached the
* MaxInflightMsgs limit.
**/
bool
syncRaftProgressIsPaused
(
SSyncRaft
*
pRaft
,
int
i
);
bool
syncRaftProgressIsPaused
(
SSyncRaftProgress
*
progress
);
static
FORCE_INLINE
void
syncRaftProgressPause
(
SSyncRaftProgress
*
progress
)
{
progress
->
paused
=
true
;
}
static
FORCE_INLINE
SyncIndex
syncRaftProgressNextIndex
(
SSyncRaftProgress
*
progress
)
{
return
progress
->
nextIndex
;
}
static
FORCE_INLINE
RaftProgressState
syncRaftProgressInReplicate
(
SSyncRaftProgress
*
progress
)
{
return
progress
->
state
==
PROGRESS_REPLICATE
;
}
static
FORCE_INLINE
RaftProgressState
syncRaftProgressInSnapshot
(
SSyncRaftProgress
*
progress
)
{
return
progress
->
state
==
PROGRESS_SNAPSHOT
;
}
static
FORCE_INLINE
RaftProgressState
syncRaftProgressInProbe
(
SSyncRaftProgress
*
progress
)
{
return
progress
->
state
==
PROGRESS_PROBE
;
}
static
FORCE_INLINE
bool
syncRaftProgressRecentActive
(
SSyncRaftProgress
*
progress
)
{
return
progress
->
recentActive
;
}
static
FORCE_INLINE
bool
syncRaftProgressUpdateSendTick
(
SSyncRaftProgress
*
progress
,
SyncTick
current
)
{
return
progress
->
lastSendTick
=
current
;
}
void
syncRaftProgressFailure
(
SSyncRaft
*
pRaft
,
int
i
);
...
...
@@ -159,7 +192,7 @@ void syncRaftInflightFreeFirstOne(SSyncRaftInflights* inflights);
void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i);
SyncIndex syncRaftProgressNextIndex(SSyncRaft* pRaft, int i);
SyncIndex syncRaftProgressMatchIndex(SSyncRaft* pRaft, int i);
...
...
@@ -171,11 +204,9 @@ bool syncRaftProgressResetRecentRecv(SSyncRaft* pRaft, int i);
void syncRaftProgressMarkRecentRecv(SSyncRaft* pRaft, int i);
bool syncRaftProgressGetRecentRecv(SSyncRaft* pRaft, int i);
void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i);
RaftProgressState syncRaftProgressState
(SSyncRaft* pRaft, int i);
void syncRaftProgressAbortSnapshot
(SSyncRaft* pRaft, int i);
#endif
...
...
source/libs/sync/inc/raft_replication.h
0 → 100644
浏览文件 @
aee5ebd1
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TD_SYNC_RAFT_REPLICATION_H
#define TD_SYNC_RAFT_REPLICATION_H
#include "sync.h"
#include "syncInt.h"
#include "sync_type.h"
int
syncRaftReplicate
(
SSyncRaft
*
pRaft
,
int
i
);
#endif
/* TD_SYNC_RAFT_REPLICATION_H */
source/libs/sync/inc/sync_type.h
浏览文件 @
aee5ebd1
...
...
@@ -16,12 +16,18 @@
#ifndef _TD_LIBS_SYNC_TYPE_H
#define _TD_LIBS_SYNC_TYPE_H
#define SYNC_NON_NODE_ID -1
#define SYNC_NON_TERM 0
typedef
int32_t
SyncTime
;
typedef
uint32_t
SyncTick
;
typedef
struct
SSyncRaft
SSyncRaft
;
typedef
struct
SSyncRaftLog
SSyncRaftLog
;
typedef
struct
SSyncRaftEntry
SSyncRaftEntry
;
#ifndef MIN
#define MIN(x, y) (((x) < (y)) ? (x) : (y))
#endif
...
...
@@ -32,13 +38,18 @@ typedef struct SSyncRaftLog SSyncRaftLog;
typedef
enum
{
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
=
0
,
SYNC_RAFT_CAMPAIGN_ELECTION
=
1
,
SYNC_RAFT_CAMPAIGN_TRANSFER
=
3
,
SYNC_RAFT_CAMPAIGN_ELECTION
=
1
,
SYNC_RAFT_CAMPAIGN_TRANSFER
=
2
,
}
SyncRaftElectionType
;
typedef
enum
{
// the init vote resp status
SYNC_RAFT_VOTE_RESP_UNKNOWN
=
0
,
// grant the vote request
SYNC_RAFT_VOTE_RESP_GRANT
=
1
,
//reject the vote request
SYNC_RAFT_VOTE_RESP_REJECT
=
2
,
}
SyncRaftVoteRespType
;
...
...
source/libs/sync/src/raft.c
浏览文件 @
aee5ebd1
...
...
@@ -29,6 +29,8 @@ static int stepFollower(SSyncRaft* pRaft, const SSyncMessage* pMsg);
static
int
stepCandidate
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
);
static
int
stepLeader
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
);
static
int
triggerAll
(
SSyncRaft
*
pRaft
);
static
void
tickElection
(
SSyncRaft
*
pRaft
);
static
void
tickHeartbeat
(
SSyncRaft
*
pRaft
);
...
...
@@ -95,8 +97,8 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
}
int32_t
syncRaftStep
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
)
{
syncDebug
(
"from %d, t
o %d, t
ype:%d, term:%"
PRId64
", state:%d"
,
pMsg
->
from
,
pMsg
->
to
,
pMsg
->
msgType
,
pMsg
->
term
,
pRaft
->
state
);
syncDebug
(
"from %d, type:%d, term:%"
PRId64
", state:%d"
,
pMsg
->
from
,
pMsg
->
msgType
,
pMsg
->
term
,
pRaft
->
state
);
if
(
preHandleMessage
(
pRaft
,
pMsg
))
{
syncFreeMessage
(
pMsg
);
...
...
@@ -117,6 +119,7 @@ int32_t syncRaftStep(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
}
int32_t
syncRaftTick
(
SSyncRaft
*
pRaft
)
{
pRaft
->
currentTick
+=
1
;
return
0
;
}
...
...
@@ -168,12 +171,22 @@ void syncRaftBecomeLeader(SSyncRaft* pRaft) {
pRaft
->
leaderId
=
pRaft
->
leaderId
;
pRaft
->
state
=
TAOS_SYNC_ROLE_LEADER
;
// TODO: check if there is pending config log
int
nPendingConf
=
syncRaftLogNumOfPendingConf
(
pRaft
->
log
);
if
(
nPendingConf
>
1
)
{
syncFatal
(
"unexpected multiple uncommitted config entry"
);
}
if
(
nPendingConf
==
1
)
{
pRaft
->
hasPendingConf
=
true
;
}
syncInfo
(
"[%d:%d] became leader at term %"
PRId64
""
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
pRaft
->
term
);
}
void
syncRaftTriggerReplicate
(
SSyncRaft
*
pRaft
)
{
// after become leader, send initial heartbeat
syncRaftTriggerHeartbeat
(
pRaft
);
}
void
syncRaftTriggerHeartbeat
(
SSyncRaft
*
pRaft
)
{
triggerAll
(
pRaft
);
}
void
syncRaftRandomizedElectionTimeout
(
SSyncRaft
*
pRaft
)
{
...
...
@@ -219,7 +232,7 @@ int syncRaftNumOfGranted(SSyncRaft* pRaft, SyncNodeId id, bool preVote, bool acc
}
/**
* pre-handle message, return true
i
s no need to continue
* pre-handle message, return true
mean
s no need to continue
* Handle the message term, which may result in our stepping down to a follower.
**/
static
bool
preHandleMessage
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
)
{
...
...
@@ -230,9 +243,11 @@ static bool preHandleMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
if
(
pMsg
->
term
>
pRaft
->
term
)
{
return
preHandleNewTermMessage
(
pRaft
,
pMsg
);
}
else
if
(
pMsg
->
term
<
pRaft
->
term
)
{
return
preHandleOldTermMessage
(
pRaft
,
pMsg
);
}
return
preHandleOldTermMessage
(
pRaft
,
pMsg
);
;
return
false
;
}
static
bool
preHandleNewTermMessage
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
)
{
...
...
@@ -240,6 +255,7 @@ static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg)
RaftMessageType
msgType
=
pMsg
->
msgType
;
if
(
msgType
==
RAFT_MSG_VOTE
)
{
// TODO
leaderId
=
SYNC_NON_NODE_ID
;
}
...
...
@@ -263,7 +279,7 @@ static bool preHandleNewTermMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg)
}
static
bool
preHandleOldTermMessage
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
)
{
// TODO
// if receive old term message, no need to continue
return
true
;
}
...
...
@@ -273,7 +289,7 @@ static int convertClear(SSyncRaft* pRaft) {
}
static
int
stepFollower
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
)
{
convertClear
(
pRaft
);
return
0
;
}
...
...
@@ -290,6 +306,7 @@ static int stepCandidate(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
}
if
(
msgType
==
RAFT_MSG_VOTE_RESP
)
{
syncRaftHandleVoteRespMessage
(
pRaft
,
pMsg
);
return
0
;
}
return
0
;
...
...
@@ -324,6 +341,22 @@ static void tickHeartbeat(SSyncRaft* pRaft) {
}
/**
* trigger I/O requests for newly appended log entries or heartbeats.
**/
static
int
triggerAll
(
SSyncRaft
*
pRaft
)
{
assert
(
pRaft
->
state
==
TAOS_SYNC_ROLE_LEADER
);
int
i
;
for
(
i
=
0
;
i
<
pRaft
->
cluster
.
replica
;
++
i
)
{
if
(
i
==
pRaft
->
cluster
.
selfIndex
)
{
continue
;
}
}
}
static
void
abortLeaderTransfer
(
SSyncRaft
*
pRaft
)
{
pRaft
->
leadTransferee
=
SYNC_NON_NODE_ID
;
}
...
...
@@ -343,5 +376,5 @@ static void resetRaft(SSyncRaft* pRaft, SyncTerm term) {
abortLeaderTransfer
(
pRaft
);
pRaft
->
pendingConf
=
false
;
}
\ No newline at end of file
pRaft
->
hasPendingConf
=
false
;
}
source/libs/sync/src/raft_election.c
浏览文件 @
aee5ebd1
...
...
@@ -62,7 +62,7 @@ void syncRaftStartElection(SSyncRaft* pRaft, SyncRaftElectionType cType) {
SyncNodeId
nodeId
=
pRaft
->
cluster
.
nodeInfo
[
i
].
nodeId
;
SSyncMessage
*
pMsg
=
syncNewVoteMsg
(
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
nodeId
,
term
,
cType
,
lastIndex
,
lastTerm
);
term
,
cType
,
lastIndex
,
lastTerm
);
if
(
pMsg
==
NULL
)
{
continue
;
}
...
...
source/libs/sync/src/raft_handle_vote_message.c
浏览文件 @
aee5ebd1
...
...
@@ -15,6 +15,7 @@
#include "syncInt.h"
#include "raft.h"
#include "raft_configuration.h"
#include "raft_log.h"
#include "raft_message.h"
...
...
@@ -31,12 +32,12 @@ int syncRaftHandleVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
SyncTerm
lastTerm
=
syncRaftLogLastTerm
(
pRaft
->
log
);
grant
=
canGrantVoteMessage
(
pRaft
,
pMsg
);
pRespMsg
=
syncNewVoteRespMsg
(
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
pMsg
->
to
,
pMsg
->
vote
.
cType
,
!
grant
);
pRespMsg
=
syncNewVoteRespMsg
(
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
pMsg
->
vote
.
cType
,
!
grant
);
if
(
pRespMsg
==
NULL
)
{
return
0
;
}
syncInfo
(
"[%d:%d] [logterm: %"
PRId64
", index: %"
PRId64
", vote: %d] %s for %d"
\
"[logterm: %"
PRId64
", index: %"
PRId64
"
, vote: %d
] at term %"
PRId64
""
,
"[logterm: %"
PRId64
", index: %"
PRId64
"] at term %"
PRId64
""
,
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
lastTerm
,
lastIndex
,
pRaft
->
voteFor
,
grant
?
"grant"
:
"reject"
,
pMsg
->
from
,
pMsg
->
vote
.
lastTerm
,
pMsg
->
vote
.
lastIndex
,
pRaft
->
term
);
...
...
@@ -49,7 +50,7 @@ static bool canGrantVoteMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
if
(
!
(
pRaft
->
voteFor
==
SYNC_NON_NODE_ID
||
pMsg
->
term
>
pRaft
->
term
||
pRaft
->
voteFor
==
pMsg
->
from
))
{
return
false
;
}
if
(
!
syncRaftLogIsUptodate
(
pRaft
,
pMsg
->
vote
.
lastIndex
,
pMsg
->
vote
.
lastTerm
))
{
if
(
!
syncRaftLogIsUptodate
(
pRaft
->
log
,
pMsg
->
vote
.
lastIndex
,
pMsg
->
vote
.
lastTerm
))
{
return
false
;
}
...
...
source/libs/sync/src/raft_handle_vote_resp_message.c
浏览文件 @
aee5ebd1
...
...
@@ -15,6 +15,7 @@
#include "syncInt.h"
#include "raft.h"
#include "raft_configuration.h"
#include "raft_message.h"
int
syncRaftHandleVoteRespMessage
(
SSyncRaft
*
pRaft
,
const
SSyncMessage
*
pMsg
)
{
...
...
@@ -45,8 +46,7 @@ int syncRaftHandleVoteRespMessage(SSyncRaft* pRaft, const SSyncMessage* pMsg) {
if
(
pMsg
->
voteResp
.
cType
==
SYNC_RAFT_CAMPAIGN_PRE_ELECTION
)
{
syncRaftStartElection
(
pRaft
,
SYNC_RAFT_CAMPAIGN_ELECTION
);
}
else
{
syncRaftBecomeLeader
(
pRaft
);
syncRaftTriggerReplicate
(
pRaft
);
syncRaftBecomeLeader
(
pRaft
);
}
return
0
;
...
...
source/libs/sync/src/raft_log.c
浏览文件 @
aee5ebd1
...
...
@@ -37,4 +37,13 @@ int syncRaftLogNumOfPendingConf(SSyncRaftLog* pLog) {
bool
syncRaftHasUnappliedLog
(
SSyncRaftLog
*
pLog
)
{
return
pLog
->
commitIndex
>
pLog
->
appliedIndex
;
}
SyncTerm
syncRaftLogTermOf
(
SSyncRaftLog
*
pLog
,
SyncIndex
index
)
{
return
SYNC_NON_TERM
;
}
int
syncRaftLogAcquire
(
SSyncRaftLog
*
pLog
,
SyncIndex
index
,
int
maxMsgSize
,
SSyncRaftEntry
**
ppEntries
,
int
*
n
)
{
return
0
;
}
\ No newline at end of file
source/libs/sync/src/raft_progress.c
浏览文件 @
aee5ebd1
...
...
@@ -22,7 +22,6 @@
static
void
resetProgressState
(
SSyncRaftProgress
*
progress
,
RaftProgressState
state
);
static
void
resumeProgress
(
SSyncRaftProgress
*
progress
);
static
void
pauseProgress
(
SSyncRaftProgress
*
progress
);
int
syncRaftProgressCreate
(
SSyncRaft
*
pRaft
)
{
...
...
@@ -58,11 +57,6 @@ bool syncRaftProgressMaybeUpdate(SSyncRaft* pRaft, int i, SyncIndex lastIndex) {
return
updated
;
}
void
syncRaftProgressOptimisticNextIndex
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
nextIndex
)
{
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
pRaft
->
leaderState
.
progress
[
i
].
nextIndex
=
nextIndex
+
1
;
}
bool
syncRaftProgressMaybeDecrTo
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
rejected
,
SyncIndex
lastIndex
)
{
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
...
...
@@ -103,15 +97,7 @@ static void resumeProgress(SSyncRaftProgress* progress) {
progress
->
paused
=
false
;
}
static
void
pauseProgress
(
SSyncRaftProgress
*
progress
)
{
progress
->
paused
=
true
;
}
bool
syncRaftProgressIsPaused
(
SSyncRaft
*
pRaft
,
int
i
)
{
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
leaderState
.
progress
[
i
]);
bool
syncRaftProgressIsPaused
(
SSyncRaftProgress
*
progress
)
{
switch
(
progress
->
state
)
{
case
PROGRESS_PROBE
:
return
progress
->
paused
;
...
...
source/libs/sync/src/raft_replication.c
0 → 100644
浏览文件 @
aee5ebd1
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "raft.h"
#include "raft_log.h"
#include "raft_progress.h"
#include "raft_replication.h"
static
int
sendSnapshot
(
SSyncRaft
*
pRaft
,
int
i
);
static
int
sendAppendEntries
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
index
,
SyncTerm
term
);
int
syncRaftReplicate
(
SSyncRaft
*
pRaft
,
int
i
)
{
assert
(
pRaft
->
state
==
TAOS_SYNC_ROLE_LEADER
);
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
SyncNodeId
nodeId
=
pRaft
->
cluster
.
nodeInfo
[
i
].
nodeId
;
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
leaderState
.
progress
[
i
]);
if
(
syncRaftProgressIsPaused
(
progress
))
{
syncInfo
(
"node %d paused"
,
nodeId
);
return
0
;
}
SyncIndex
nextIndex
=
syncRaftProgressNextIndex
(
progress
);
SyncIndex
prevIndex
=
nextIndex
-
1
;
SyncTerm
prevTerm
=
syncRaftLogTermOf
(
pRaft
->
log
,
prevIndex
);
if
(
prevTerm
==
SYNC_NON_TERM
&&
!
syncRaftProgressInSnapshot
(
progress
))
{
goto
send_snapshot
;
}
send_snapshot:
if
(
syncRaftProgressRecentActive
(
progress
))
{
/* Only send a snapshot when we have heard from the server */
return
sendSnapshot
(
pRaft
,
i
);
}
else
{
/* Send empty AppendEntries RPC when we haven't heard from the server */
prevIndex
=
syncRaftLogLastIndex
(
pRaft
->
log
);
prevTerm
=
syncRaftLogLastTerm
(
pRaft
->
log
);
return
sendAppendEntries
(
pRaft
,
i
,
prevIndex
,
prevTerm
);
}
}
static
int
sendSnapshot
(
SSyncRaft
*
pRaft
,
int
i
)
{
return
0
;
}
static
int
sendAppendEntries
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
prevIndex
,
SyncTerm
prevTerm
)
{
SyncIndex
nextIndex
=
prevIndex
+
1
;
SSyncRaftEntry
*
entries
;
int
nEntry
;
SNodeInfo
*
pNode
=
&
(
pRaft
->
cluster
.
nodeInfo
[
i
]);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
leaderState
.
progress
[
i
]);
syncRaftLogAcquire
(
pRaft
->
log
,
nextIndex
,
pRaft
->
maxMsgSize
,
&
entries
,
&
nEntry
);
SSyncMessage
*
msg
=
syncNewAppendMsg
(
pRaft
->
selfGroupId
,
pRaft
->
selfId
,
pRaft
->
term
,
prevIndex
,
prevTerm
,
pRaft
->
log
->
commitIndex
,
nEntry
,
entries
);
if
(
msg
==
NULL
)
{
return
0
;
}
pRaft
->
io
.
send
(
msg
,
pNode
);
if
(
syncRaftProgressInReplicate
(
progress
))
{
SyncIndex
lastIndex
=
nextIndex
+
nEntry
;
syncRaftProgressOptimisticNextIndex
(
progress
,
lastIndex
);
syncRaftInflightAdd
(
&
progress
->
inflights
,
lastIndex
);
}
else
if
(
syncRaftProgressInProbe
(
progress
))
{
syncRaftProgressPause
(
progress
);
}
else
{
}
syncRaftProgressUpdateSendTick
(
progress
,
pRaft
->
currentTick
);
return
0
;
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录