Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
24a0966d
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
24a0966d
编写于
11月 02, 2021
作者:
L
lichuang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-10645][raft]<feature>add raft progress
上级
fca35ceb
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
727 addition
and
15 deletion
+727
-15
include/libs/sync/sync.h
include/libs/sync/sync.h
+11
-5
source/libs/sync/inc/raft.h
source/libs/sync/inc/raft.h
+33
-2
source/libs/sync/inc/raft_progress.h
source/libs/sync/inc/raft_progress.h
+181
-0
source/libs/sync/inc/raft_unstable_log.h
source/libs/sync/inc/raft_unstable_log.h
+115
-0
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+1
-0
source/libs/sync/inc/sync_type.h
source/libs/sync/inc/sync_type.h
+33
-0
source/libs/sync/src/raft.c
source/libs/sync/src/raft.c
+15
-8
source/libs/sync/src/raft_progress.c
source/libs/sync/src/raft_progress.c
+317
-0
source/libs/sync/src/raft_unstable_log.c
source/libs/sync/src/raft_unstable_log.c
+21
-0
未找到文件。
include/libs/sync/sync.h
浏览文件 @
24a0966d
...
...
@@ -61,13 +61,13 @@ typedef struct {
typedef
struct
SSyncFSM
{
void
*
pData
;
// apply committed log, bufs will be free by
raft
module
// apply committed log, bufs will be free by
sync
module
int32_t
(
*
applyLog
)(
struct
SSyncFSM
*
fsm
,
SyncIndex
index
,
const
SSyncBuffer
*
buf
,
void
*
pData
);
// cluster commit callback
int32_t
(
*
onClusterChanged
)(
struct
SSyncFSM
*
fsm
,
const
SSyncCluster
*
cluster
,
void
*
pData
);
// fsm return snapshot in ppBuf, bufs will be free by
raft
module
// fsm return snapshot in ppBuf, bufs will be free by
sync
module
// TODO: getSnapshot SHOULD be async?
int32_t
(
*
getSnapshot
)(
struct
SSyncFSM
*
fsm
,
SSyncBuffer
**
ppBuf
,
int32_t
*
objId
,
bool
*
isLast
);
...
...
@@ -89,18 +89,24 @@ typedef struct SSyncLogStore {
// write log with given index
int32_t
(
*
logWrite
)(
struct
SSyncLogStore
*
logStore
,
SyncIndex
index
,
SSyncBuffer
*
pBuf
);
// read log from given index with limit, return the actual num in nBuf
/**
* read log from given index(included) with limit, return the actual num in nBuf,
* pBuf will be free in sync module
**/
int32_t
(
*
logRead
)(
struct
SSyncLogStore
*
logStore
,
SyncIndex
index
,
int
limit
,
SSyncBuffer
*
pBuf
,
int
*
nBuf
);
// mark log with given index has been commtted
int32_t
(
*
logCommit
)(
struct
SSyncLogStore
*
logStore
,
SyncIndex
index
);
// prune log before given index
// prune log before given index
(not included)
int32_t
(
*
logPrune
)(
struct
SSyncLogStore
*
logStore
,
SyncIndex
index
);
// rollback log after given index
// rollback log after given index
(included)
int32_t
(
*
logRollback
)(
struct
SSyncLogStore
*
logStore
,
SyncIndex
index
);
// return last index of log
SyncIndex
(
*
logLastIndex
)(
struct
SSyncLogStore
*
logStore
);
}
SSyncLogStore
;
typedef
struct
SSyncServerState
{
...
...
source/libs/sync/inc/raft.h
浏览文件 @
24a0966d
...
...
@@ -17,15 +17,46 @@
#define _TD_LIBS_SYNC_RAFT_H
#include "sync.h"
#include "sync_type.h"
#include "raft_message.h"
typedef
struct
SSyncRaft
{
typedef
struct
SSyncRaftProgress
SSyncRaftProgress
;
typedef
struct
RaftLeaderState
{
int
nProgress
;
SSyncRaftProgress
*
progress
;
}
RaftLeaderState
;
typedef
struct
SSyncRaftIOMethods
{
SyncTime
(
*
time
)(
SSyncRaft
*
);
}
SSyncRaftIOMethods
;
struct
SSyncRaft
{
// owner sync node
SSyncNode
*
pNode
;
SSyncInfo
info
;
}
SSyncRaft
;
// election timeout tick(random in [3:6] tick)
uint16_t
electionTick
;
// heartbeat timeout tick(default: 1 tick)
uint16_t
heartbeatTick
;
int
installSnapShotTimeoutMS
;
//
int
heartbeatTimeoutMS
;
bool
preVote
;
SSyncRaftIOMethods
io
;
RaftLeaderState
leaderState
;
SSyncRaftUnstableLog
*
log
;
};
int32_t
syncRaftStart
(
SSyncRaft
*
pRaft
,
const
SSyncInfo
*
pInfo
);
int32_t
syncRaftStep
(
SSyncRaft
*
pRaft
,
const
RaftMessage
*
pMsg
);
...
...
source/libs/sync/inc/raft_progress.h
0 → 100644
浏览文件 @
24a0966d
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TD_SYNC_RAFT_PROGRESS_H
#define TD_SYNC_RAFT_PROGRESS_H
#include "sync_type.h"
/**
* SSyncRaftInflights is a sliding window for the inflight messages.
* Thus inflight effectively limits both the number of inflight messages
* and the bandwidth each Progress can use.
* When inflights is full, no more message should be sent.
* When a leader sends out a message, the index of the last
* entry should be added to inflights. The index MUST be added
* into inflights in order.
* When a leader receives a reply, the previous inflights should
* be freed by calling syncRaftInflightFreeTo with the index of the last
* received entry.
**/
typedef
struct
SSyncRaftInflights
{
/* the starting index in the buffer */
int
start
;
/* number of inflights in the buffer */
int
count
;
/* the size of the buffer */
int
size
;
/**
* buffer contains the index of the last entry
* inside one message.
**/
SyncIndex
*
buffer
;
}
SSyncRaftInflights
;
/**
* State defines how the leader should interact with the follower.
*
* When in PROGRESS_PROBE, leader sends at most one replication message
* per heartbeat interval. It also probes actual progress of the follower.
*
* When in PROGRESS_REPLICATE, leader optimistically increases next
* to the latest entry sent after sending replication message. This is
* an optimized state for fast replicating log entries to the follower.
*
* When in PROGRESS_SNAPSHOT, leader should have sent out snapshot
* before and stops sending any replication message.
*
* PROGRESS_PROBE is the initial state.
**/
typedef
enum
RaftProgressState
{
PROGRESS_PROBE
=
0
,
PROGRESS_REPLICATE
,
PROGRESS_SNAPSHOT
,
}
RaftProgressState
;
/**
* Progress represents a follower’s progress in the view of the leader. Leader maintains
* progresses of all followers, and sends entries to the follower based on its progress.
**/
struct
SSyncRaftProgress
{
SyncIndex
nextIndex
;
SyncIndex
matchIndex
;
RaftProgressState
state
;
/**
* paused is used in PROGRESS_PROBE.
* When paused is true, raft should pause sending replication message to this peer.
**/
bool
paused
;
/**
* pendingSnapshotIndex is used in PROGRESS_SNAPSHOT.
* If there is a pending snapshot, the pendingSnapshotIndex will be set to the
* index of the snapshot. If pendingSnapshotIndex is set, the replication process of
* this Progress will be paused. raft will not resend snapshot until the pending one
* is reported to be failed.
**/
SyncIndex
pendingSnapshotIndex
;
/**
* recentActive is true if the progress is recently active. Receiving any messages
* from the corresponding follower indicates the progress is active.
* RecentActive can be reset to false after an election timeout.
**/
bool
recentActive
;
/**
* flow control sliding window
**/
SSyncRaftInflights
inflights
;
};
int
syncRaftProgressCreate
(
SSyncRaft
*
pRaft
);
//int syncRaftProgressRecreate(SSyncRaft* pRaft, const RaftConfiguration* configuration);
/**
* syncRaftProgressMaybeUpdate returns false if the given lastIndex index comes from i-th node's log.
* Otherwise it updates the progress and returns true.
**/
bool
syncRaftProgressMaybeUpdate
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
lastIndex
);
void
syncRaftProgressOptimisticNextIndex
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
nextIndex
);
/**
* syncRaftProgressMaybeDecrTo returns false if the given to index comes from an out of order message.
* Otherwise it decreases the progress next index to min(rejected, last) and returns true.
**/
bool
syncRaftProgressMaybeDecrTo
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
rejected
,
SyncIndex
lastIndex
);
/**
* syncRaftProgressIsPaused returns whether sending log entries to this node has been
* paused. A node may be paused because it has rejected recent
* MsgApps, is currently waiting for a snapshot, or has reached the
* MaxInflightMsgs limit.
**/
bool
syncRaftProgressIsPaused
(
SSyncRaft
*
pRaft
,
int
i
);
void
syncRaftProgressFailure
(
SSyncRaft
*
pRaft
,
int
i
);
bool
syncRaftProgressNeedAbortSnapshot
(
SSyncRaft
*
pRaft
,
int
i
);
/**
* return true if i-th node's log is up-todate
**/
bool
syncRaftProgressIsUptodate
(
SSyncRaft
*
pRaft
,
int
i
);
void
syncRaftProgressBecomeProbe
(
SSyncRaft
*
pRaft
,
int
i
);
void
syncRaftProgressBecomeReplicate
(
SSyncRaft
*
pRaft
,
int
i
);
void
syncRaftProgressBecomeSnapshot
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
snapshotIndex
);
int
syncRaftInflightReset
(
SSyncRaftInflights
*
inflights
);
bool
syncRaftInflightFull
(
SSyncRaftInflights
*
inflights
);
void
syncRaftInflightAdd
(
SSyncRaftInflights
*
inflights
,
SyncIndex
inflightIndex
);
void
syncRaftInflightFreeTo
(
SSyncRaftInflights
*
inflights
,
SyncIndex
toIndex
);
void
syncRaftInflightFreeFirstOne
(
SSyncRaftInflights
*
inflights
);
#if 0
void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i);
SyncIndex syncRaftProgressNextIndex(SSyncRaft* pRaft, int i);
SyncIndex syncRaftProgressMatchIndex(SSyncRaft* pRaft, int i);
void syncRaftProgressUpdateLastSend(SSyncRaft* pRaft, int i);
void syncRaftProgressUpdateSnapshotLastSend(SSyncRaft* pRaft, int i);
bool syncRaftProgressResetRecentRecv(SSyncRaft* pRaft, int i);
void syncRaftProgressMarkRecentRecv(SSyncRaft* pRaft, int i);
bool syncRaftProgressGetRecentRecv(SSyncRaft* pRaft, int i);
void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i);
RaftProgressState syncRaftProgressState(SSyncRaft* pRaft, int i);
#endif
#endif
/* TD_SYNC_RAFT_PROGRESS_H */
\ No newline at end of file
source/libs/sync/inc/raft_unstable_log.h
0 → 100644
浏览文件 @
24a0966d
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TD_SYNC_RAFT_UNSTABLE_LOG_H
#define TD_SYNC_RAFT_UNSTABLE_LOG_H
#include "sync_type.h"
/* in-memory unstable raft log storage */
struct
SSyncRaftUnstableLog
{
#if 0
/* Circular buffer of log entries */
RaftEntry *entries;
/* size of Circular buffer */
int size;
/* Indexes of used slots [front, back) */
int front, back;
/* Index of first entry is offset + 1 */
SyncIndex offset;
/* meta data of snapshot */
SSyncRaftUnstableLog snapshot;
#endif
};
/**
* return index of last in memory log, return 0 if log is empty
**/
SyncIndex
syncRaftLogLastIndex
(
SSyncRaftUnstableLog
*
pLog
);
#if 0
void raftLogInit(RaftLog* pLog);
void raftLogClose(RaftLog* pLog);
/**
* When startup populating log entrues loaded from disk,
* init raft memory log with snapshot index,term and log start idnex.
**/
/*
void raftLogStart(RaftLog* pLog,
RaftSnapshotMeta snapshot,
SyncIndex startIndex);
*/
/**
* Get the number of entries the log.
**/
int raftLogNumEntries(const RaftLog* pLog);
/**
* return last term of in memory log, return 0 if log is empty
**/
SSyncTerm raftLogLastTerm(RaftLog* pLog);
/**
* return term of log with the given index, return 0 if the term of index cannot be found
* , errCode will save the error code.
**/
SSyncTerm raftLogTermOf(RaftLog* pLog, SyncIndex index, RaftCode* errCode);
/**
* Get the last index of the most recent snapshot. Return 0 if there are no *
* snapshots.
**/
SyncIndex raftLogSnapshotIndex(RaftLog* pLog);
/* Append a new entry to the log. */
int raftLogAppend(RaftLog* pLog,
SSyncTerm term,
const SSyncBuffer *buf);
/**
* acquire log from given index onwards.
**/
/*
int raftLogAcquire(RaftLog* pLog,
SyncIndex index,
RaftEntry **ppEntries,
int *n);
void raftLogRelease(RaftLog* pLog,
SyncIndex index,
RaftEntry *pEntries,
int n);
*/
/* Delete all entries from the given index (included) onwards. */
void raftLogTruncate(RaftLog* pLog, SyncIndex index);
/**
* when taking a new snapshot, the function will update the last snapshot information and delete
* all entries up last_index - trailing (included). If the log contains no entry
* a last_index - trailing, then no entry will be deleted.
**/
void raftLogSnapshot(RaftLog* pLog, SyncIndex index, SyncIndex trailing);
#endif
#endif
/* TD_SYNC_RAFT_UNSTABLE_LOG_H */
\ No newline at end of file
source/libs/sync/inc/syncInt.h
浏览文件 @
24a0966d
...
...
@@ -19,6 +19,7 @@
#include "thash.h"
#include "os.h"
#include "sync.h"
#include "sync_type.h"
#include "raft.h"
#include "tlog.h"
...
...
source/libs/sync/inc/sync_type.h
0 → 100644
浏览文件 @
24a0966d
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIBS_SYNC_TYPE_H
#define _TD_LIBS_SYNC_TYPE_H
typedef
int32_t
SyncTime
;
typedef
struct
SSyncRaftUnstableLog
SSyncRaftUnstableLog
;
typedef
struct
SSyncRaft
SSyncRaft
;
#ifndef MIN
#define MIN(x, y) (((x) < (y)) ? (x) : (y))
#endif
#ifndef MAX
#define MAX(x, y) (((x) > (y)) ? (x) : (y))
#endif
#endif
/* _TD_LIBS_SYNC_TYPE_H */
source/libs/sync/src/raft.c
浏览文件 @
24a0966d
...
...
@@ -16,12 +16,10 @@
#include "raft.h"
#include "syncInt.h"
#ifndef MIN
#define MIN(x, y) (((x) < (y)) ? (x) : (y))
#endif
#define RAFT_READ_LOG_MAX_NUM 100
static
void
syncRaftBecomeFollower
(
SSyncRaft
*
pRaft
,
SSyncTerm
term
);
int32_t
syncRaftStart
(
SSyncRaft
*
pRaft
,
const
SSyncInfo
*
pInfo
)
{
SSyncNode
*
pNode
=
pRaft
->
pNode
;
SSyncServerState
serverState
;
...
...
@@ -44,10 +42,10 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
}
assert
(
initIndex
<=
serverState
.
commitIndex
);
// restore fsm state from snapshot index + 1
,
until commitIndex
// restore fsm state from snapshot index + 1 until commitIndex
++
initIndex
;
while
(
initIndex
<
serverState
.
commitIndex
)
{
limit
=
MIN
(
RAFT_READ_LOG_MAX_NUM
,
serverState
.
commitIndex
-
initIndex
);
while
(
initIndex
<
=
serverState
.
commitIndex
)
{
limit
=
MIN
(
RAFT_READ_LOG_MAX_NUM
,
serverState
.
commitIndex
-
initIndex
+
1
);
if
(
logStore
->
logRead
(
logStore
,
initIndex
,
limit
,
buffer
,
&
nBuf
)
!=
0
)
{
return
-
1
;
...
...
@@ -62,7 +60,11 @@ int32_t syncRaftStart(SSyncRaft* pRaft, const SSyncInfo* pInfo) {
}
assert
(
initIndex
==
serverState
.
commitIndex
);
syncInfo
(
"restore vgid %d state: snapshot index:"
,
pInfo
->
vgId
);
pRaft
->
heartbeatTick
=
1
;
syncRaftBecomeFollower
(
pRaft
,
1
);
syncInfo
(
"restore vgid %d state: snapshot index success"
,
pInfo
->
vgId
);
return
0
;
}
...
...
@@ -73,4 +75,9 @@ int32_t syncRaftStep(SSyncRaft* pRaft, const RaftMessage* pMsg) {
int32_t
syncRaftTick
(
SSyncRaft
*
pRaft
)
{
return
0
;
}
static
void
syncRaftBecomeFollower
(
SSyncRaft
*
pRaft
,
SSyncTerm
term
)
{
pRaft
->
electionTick
=
taosRand
()
%
3
+
3
;
return
;
}
\ No newline at end of file
source/libs/sync/src/raft_progress.c
0 → 100644
浏览文件 @
24a0966d
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "raft.h"
#include "raft_unstable_log.h"
#include "raft_progress.h"
#include "sync.h"
#include "syncInt.h"
static
void
resetProgressState
(
SSyncRaftProgress
*
progress
,
RaftProgressState
state
);
static
void
resumeProgress
(
SSyncRaftProgress
*
progress
);
static
void
pauseProgress
(
SSyncRaftProgress
*
progress
);
int
syncRaftProgressCreate
(
SSyncRaft
*
pRaft
)
{
/*
inflights->buffer = (SyncIndex*)malloc(sizeof(SyncIndex) * pRaft->maxInflightMsgs);
if (inflights->buffer == NULL) {
return RAFT_OOM;
}
inflights->size = pRaft->maxInflightMsgs;
*/
}
/*
int syncRaftProgressRecreate(SSyncRaft* pRaft, const RaftConfiguration* configuration) {
}
*/
bool
syncRaftProgressMaybeUpdate
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
lastIndex
)
{
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
leaderState
.
progress
[
i
]);
bool
updated
=
false
;
if
(
progress
->
matchIndex
<
lastIndex
)
{
progress
->
matchIndex
=
lastIndex
;
updated
=
true
;
resumeProgress
(
progress
);
}
if
(
progress
->
nextIndex
<
lastIndex
+
1
)
{
progress
->
nextIndex
=
lastIndex
+
1
;
}
return
updated
;
}
void
syncRaftProgressOptimisticNextIndex
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
nextIndex
)
{
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
pRaft
->
leaderState
.
progress
[
i
].
nextIndex
=
nextIndex
+
1
;
}
bool
syncRaftProgressMaybeDecrTo
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
rejected
,
SyncIndex
lastIndex
)
{
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
leaderState
.
progress
[
i
]);
if
(
progress
->
state
==
PROGRESS_REPLICATE
)
{
/**
* the rejection must be stale if the progress has matched and "rejected"
* is smaller than "match".
**/
if
(
rejected
<=
progress
->
matchIndex
)
{
syncDebug
(
"match index is up to date,ignore"
);
return
false
;
}
/* directly decrease next to match + 1 */
progress
->
nextIndex
=
progress
->
matchIndex
+
1
;
//syncRaftProgressBecomeProbe(raft, i);
return
true
;
}
if
(
rejected
!=
progress
->
nextIndex
-
1
)
{
syncDebug
(
"rejected index %"
PRId64
" different from next index %"
PRId64
" -> ignore"
,
rejected
,
progress
->
nextIndex
);
return
false
;
}
progress
->
nextIndex
=
MIN
(
rejected
,
lastIndex
+
1
);
if
(
progress
->
nextIndex
<
1
)
{
progress
->
nextIndex
=
1
;
}
resumeProgress
(
progress
);
return
true
;
}
static
void
resumeProgress
(
SSyncRaftProgress
*
progress
)
{
progress
->
paused
=
false
;
}
static
void
pauseProgress
(
SSyncRaftProgress
*
progress
)
{
progress
->
paused
=
true
;
}
bool
syncRaftProgressIsPaused
(
SSyncRaft
*
pRaft
,
int
i
)
{
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
leaderState
.
progress
[
i
]);
switch
(
progress
->
state
)
{
case
PROGRESS_PROBE
:
return
progress
->
paused
;
case
PROGRESS_REPLICATE
:
return
syncRaftInflightFull
(
&
progress
->
inflights
);
case
PROGRESS_SNAPSHOT
:
return
true
;
default:
syncFatal
(
"error sync state:%d"
,
progress
->
state
);
}
}
void
syncRaftProgressFailure
(
SSyncRaft
*
pRaft
,
int
i
)
{
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
leaderState
.
progress
[
i
]);
progress
->
pendingSnapshotIndex
=
0
;
}
bool
syncRaftProgressNeedAbortSnapshot
(
SSyncRaft
*
pRaft
,
int
i
)
{
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
leaderState
.
progress
[
i
]);
return
progress
->
state
==
PROGRESS_SNAPSHOT
&&
progress
->
matchIndex
>=
progress
->
pendingSnapshotIndex
;
}
bool
syncRaftProgressIsUptodate
(
SSyncRaft
*
pRaft
,
int
i
)
{
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
leaderState
.
progress
[
i
]);
return
syncRaftLogLastIndex
(
pRaft
->
log
)
+
1
==
progress
->
nextIndex
;
}
void
syncRaftProgressBecomeProbe
(
SSyncRaft
*
pRaft
,
int
i
)
{
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
leaderState
.
progress
[
i
]);
/**
* If the original state is ProgressStateSnapshot, progress knows that
* the pending snapshot has been sent to this peer successfully, then
* probes from pendingSnapshot + 1.
**/
if
(
progress
->
state
==
PROGRESS_SNAPSHOT
)
{
SyncIndex
pendingSnapshotIndex
=
progress
->
pendingSnapshotIndex
;
resetProgressState
(
progress
,
PROGRESS_PROBE
);
progress
->
nextIndex
=
MAX
(
progress
->
matchIndex
+
1
,
pendingSnapshotIndex
+
1
);
}
else
{
resetProgressState
(
progress
,
PROGRESS_PROBE
);
progress
->
nextIndex
=
progress
->
matchIndex
+
1
;
}
}
void
syncRaftProgressBecomeReplicate
(
SSyncRaft
*
pRaft
,
int
i
)
{
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
leaderState
.
progress
[
i
]);
resetProgressState
(
progress
,
PROGRESS_REPLICATE
);
progress
->
nextIndex
=
progress
->
matchIndex
+
1
;
}
void
syncRaftProgressBecomeSnapshot
(
SSyncRaft
*
pRaft
,
int
i
,
SyncIndex
snapshotIndex
)
{
assert
(
i
>=
0
&&
i
<
pRaft
->
leaderState
.
nProgress
);
SSyncRaftProgress
*
progress
=
&
(
pRaft
->
leaderState
.
progress
[
i
]);
resetProgressState
(
progress
,
PROGRESS_SNAPSHOT
);
progress
->
pendingSnapshotIndex
=
snapshotIndex
;
}
static
void
resetProgressState
(
SSyncRaftProgress
*
progress
,
RaftProgressState
state
)
{
progress
->
paused
=
false
;
progress
->
pendingSnapshotIndex
=
0
;
progress
->
state
=
state
;
syncRaftInflightReset
(
&
(
progress
->
inflights
));
}
int
syncRaftInflightReset
(
SSyncRaftInflights
*
inflights
)
{
inflights
->
count
=
0
;
inflights
->
start
=
0
;
return
0
;
}
bool
syncRaftInflightFull
(
SSyncRaftInflights
*
inflights
)
{
return
inflights
->
count
==
inflights
->
size
;
}
void
syncRaftInflightAdd
(
SSyncRaftInflights
*
inflights
,
SyncIndex
inflightIndex
)
{
assert
(
!
syncRaftInflightFull
(
inflights
));
int
next
=
inflights
->
start
+
inflights
->
count
;
int
size
=
inflights
->
size
;
/* is next wrapped around buffer? */
if
(
next
>=
size
)
{
next
-=
size
;
}
inflights
->
buffer
[
next
]
=
inflightIndex
;
inflights
->
count
++
;
}
void
syncRaftInflightFreeTo
(
SSyncRaftInflights
*
inflights
,
SyncIndex
toIndex
)
{
if
(
inflights
->
count
==
0
||
toIndex
<
inflights
->
buffer
[
inflights
->
start
])
{
return
;
}
int
i
,
idx
;
for
(
i
=
0
,
idx
=
inflights
->
start
;
i
<
inflights
->
count
;
i
++
)
{
if
(
toIndex
<
inflights
->
buffer
[
idx
])
{
break
;
}
int
size
=
inflights
->
size
;
idx
++
;
if
(
idx
>=
size
)
{
idx
-=
size
;
}
}
inflights
->
count
-=
i
;
inflights
->
start
=
idx
;
assert
(
inflights
->
count
>=
0
);
if
(
inflights
->
count
==
0
)
{
inflights
->
start
=
0
;
}
}
void
syncRaftInflightFreeFirstOne
(
SSyncRaftInflights
*
inflights
)
{
syncRaftInflightFreeTo
(
inflights
,
inflights
->
buffer
[
inflights
->
start
]);
}
#if 0
SyncIndex syncRaftProgressNextIndex(SSyncRaft* pRaft, int i) {
return pRaft->leaderState.progress[i].nextIndex;
}
SyncIndex syncRaftProgressMatchIndex(SSyncRaft* pRaft, int i) {
return pRaft->leaderState.progress[i].matchIndex;
}
void syncRaftProgressUpdateLastSend(SSyncRaft* pRaft, int i) {
pRaft->leaderState.progress[i].lastSend = pRaft->io.time(pRaft);
}
void syncRaftProgressUpdateSnapshotLastSend(SSyncRaft* pRaft, int i) {
pRaft->leaderState.progress[i].lastSendSnapshot = pRaft->io.time(pRaft);
}
bool syncRaftProgressResetRecentRecv(SSyncRaft* pRaft, int i) {
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
bool prev = progress->recentRecv;
progress->recentRecv = false;
return prev;
}
void syncRaftProgressMarkRecentRecv(SSyncRaft* pRaft, int i) {
pRaft->leaderState.progress[i].recentRecv = true;
}
bool syncRaftProgressGetRecentRecv(SSyncRaft* pRaft, int i) {
return pRaft->leaderState.progress[i].recentRecv;
}
void syncRaftProgressBecomeSnapshot(SSyncRaft* pRaft, int i) {
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
resetProgressState(progress, PROGRESS_SNAPSHOT);
progress->pendingSnapshotIndex = raftLogSnapshotIndex(pRaft->log);
}
void syncRaftProgressBecomeProbe(SSyncRaft* pRaft, int i) {
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
if (progress->state == PROGRESS_SNAPSHOT) {
assert(progress->pendingSnapshotIndex > 0);
SyncIndex pendingSnapshotIndex = progress->pendingSnapshotIndex;
resetProgressState(progress, PROGRESS_PROBE);
progress->nextIndex = max(progress->matchIndex + 1, pendingSnapshotIndex);
} else {
resetProgressState(progress, PROGRESS_PROBE);
progress->nextIndex = progress->matchIndex + 1;
}
}
void syncRaftProgressBecomeReplicate(SSyncRaft* pRaft, int i) {
resetProgressState(pRaft->leaderState.progress, PROGRESS_REPLICATE);
pRaft->leaderState.progress->nextIndex = pRaft->leaderState.progress->matchIndex + 1;
}
void syncRaftProgressAbortSnapshot(SSyncRaft* pRaft, int i) {
SSyncRaftProgress* progress = &(pRaft->leaderState.progress[i]);
progress->pendingSnapshotIndex = 0;
progress->state = PROGRESS_PROBE;
}
RaftProgressState syncRaftProgressState(SSyncRaft* pRaft, int i) {
return pRaft->leaderState.progress[i].state;
}
#endif
\ No newline at end of file
source/libs/sync/src/raft_unstable_log.c
0 → 100644
浏览文件 @
24a0966d
/*
* Copyright (c) 2019 TAOS Data, Inc. <cli@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sync.h"
#include "raft_unstable_log.h"
SyncIndex
syncRaftLogLastIndex
(
SSyncRaftUnstableLog
*
pLog
)
{
return
0
;
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录