Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4f39cd31
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4f39cd31
编写于
7月 01, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(sync): add snapshot2 interface
上级
24a3c817
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
181 addition
and
1 deletion
+181
-1
include/libs/sync/sync.h
include/libs/sync/sync.h
+1
-1
source/libs/sync/inc/syncAppendEntries.h
source/libs/sync/inc/syncAppendEntries.h
+1
-0
source/libs/sync/inc/syncAppendEntriesReply.h
source/libs/sync/inc/syncAppendEntriesReply.h
+1
-0
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+1
-0
source/libs/sync/inc/syncReplication.h
source/libs/sync/inc/syncReplication.h
+7
-0
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+2
-0
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+57
-0
source/libs/sync/src/syncReplication.c
source/libs/sync/src/syncReplication.c
+111
-0
未找到文件。
include/libs/sync/sync.h
浏览文件 @
4f39cd31
...
...
@@ -204,7 +204,7 @@ SyncGroupId syncGetVgId(int64_t rid);
void
syncGetEpSet
(
int64_t
rid
,
SEpSet
*
pEpSet
);
void
syncGetRetryEpSet
(
int64_t
rid
,
SEpSet
*
pEpSet
);
int32_t
syncPropose
(
int64_t
rid
,
SRpcMsg
*
pMsg
,
bool
isWeak
);
int32_t
syncProposeBatch
(
int64_t
rid
,
SRpcMsg
*
pMsgArr
,
bool
*
pIsWeakArr
,
int32_t
arrSize
);
//
int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize);
bool
syncEnvIsStart
();
const
char
*
syncStr
(
ESyncState
state
);
bool
syncIsRestoreFinish
(
int64_t
rid
);
...
...
source/libs/sync/inc/syncAppendEntries.h
浏览文件 @
4f39cd31
...
...
@@ -94,6 +94,7 @@ extern "C" {
//
int32_t
syncNodeOnAppendEntriesCb
(
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
);
int32_t
syncNodeOnAppendEntriesSnapshotCb
(
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
);
int32_t
syncNodeOnAppendEntriesSnapshot2Cb
(
SSyncNode
*
ths
,
SyncAppendEntriesBatch
*
pMsg
);
#ifdef __cplusplus
}
...
...
source/libs/sync/inc/syncAppendEntriesReply.h
浏览文件 @
4f39cd31
...
...
@@ -42,6 +42,7 @@ extern "C" {
//
int32_t
syncNodeOnAppendEntriesReplyCb
(
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
);
int32_t
syncNodeOnAppendEntriesReplySnapshotCb
(
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
);
int32_t
syncNodeOnAppendEntriesReplySnapshot2Cb
(
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
);
#ifdef __cplusplus
}
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
4f39cd31
...
...
@@ -67,6 +67,7 @@ typedef struct SSyncNode {
char
path
[
TSDB_FILENAME_LEN
];
char
raftStorePath
[
TSDB_FILENAME_LEN
*
2
];
char
configPath
[
TSDB_FILENAME_LEN
*
2
];
int32_t
batchSize
;
// sync io
SWal
*
pWal
;
...
...
source/libs/sync/inc/syncReplication.h
浏览文件 @
4f39cd31
...
...
@@ -53,8 +53,15 @@ extern "C" {
//
int32_t
syncNodeAppendEntriesPeers
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeAppendEntriesPeersSnapshot
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeAppendEntriesPeersSnapshot2
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeReplicate
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeAppendEntries
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
const
SyncAppendEntries
*
pMsg
);
int32_t
syncNodeAppendEntriesBatch
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
const
SyncAppendEntriesBatch
*
pMsg
);
typedef
struct
SReaderParam
{
SyncIndex
start
;
SyncIndex
end
;
}
SReaderParam
;
#ifdef __cplusplus
}
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
4f39cd31
...
...
@@ -719,6 +719,8 @@ static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries
return
false
;
}
int32_t
syncNodeOnAppendEntriesSnapshot2Cb
(
SSyncNode
*
ths
,
SyncAppendEntriesBatch
*
pMsg
)
{
return
0
;
}
int32_t
syncNodeOnAppendEntriesSnapshotCb
(
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
)
{
int32_t
ret
=
0
;
int32_t
code
=
0
;
...
...
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
4f39cd31
...
...
@@ -170,6 +170,63 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
}
#endif
int32_t
syncNodeOnAppendEntriesReplySnapshot2Cb
(
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
)
{
int32_t
ret
=
0
;
// if already drop replica, do not process
if
(
!
syncNodeInRaftGroup
(
ths
,
&
(
pMsg
->
srcId
))
&&
!
ths
->
pRaftCfg
->
isStandBy
)
{
syncNodeEventLog
(
ths
,
"recv sync-append-entries-reply, maybe replica already dropped"
);
return
-
1
;
}
// drop stale response
if
(
pMsg
->
term
<
ths
->
pRaftStore
->
currentTerm
)
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"recv sync-append-entries-reply, recv-term:%lu, drop stale response"
,
pMsg
->
term
);
syncNodeEventLog
(
ths
,
logBuf
);
return
-
1
;
}
if
(
pMsg
->
term
>
ths
->
pRaftStore
->
currentTerm
)
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"recv sync-append-entries-reply, error term, recv-term:%lu"
,
pMsg
->
term
);
syncNodeErrorLog
(
ths
,
logBuf
);
return
-
1
;
}
ASSERT
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
);
if
(
pMsg
->
success
)
{
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
syncIndexMgrSetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
),
pMsg
->
matchIndex
+
1
);
if
(
gRaftDetailLog
)
{
sTrace
(
"update next match, index:%ld, success:%d"
,
pMsg
->
matchIndex
+
1
,
pMsg
->
success
);
}
// matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
syncIndexMgrSetIndex
(
ths
->
pMatchIndex
,
&
(
pMsg
->
srcId
),
pMsg
->
matchIndex
);
// maybe commit
if
(
ths
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
syncMaybeAdvanceCommitIndex
(
ths
);
}
}
else
{
SyncIndex
nextIndex
=
syncIndexMgrGetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
));
// notice! int64, uint64
if
(
nextIndex
>
SYNC_INDEX_BEGIN
)
{
--
nextIndex
;
}
else
{
nextIndex
=
SYNC_INDEX_BEGIN
;
}
syncIndexMgrSetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
),
nextIndex
);
}
return
0
;
}
int32_t
syncNodeOnAppendEntriesReplySnapshotCb
(
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
)
{
int32_t
ret
=
0
;
...
...
source/libs/sync/src/syncReplication.c
浏览文件 @
4f39cd31
...
...
@@ -116,6 +116,97 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
return
ret
;
}
int32_t
syncNodeAppendEntriesPeersSnapshot2
(
SSyncNode
*
pSyncNode
)
{
if
(
pSyncNode
->
state
!=
TAOS_SYNC_STATE_LEADER
)
{
return
-
1
;
}
int32_t
ret
=
0
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
SRaftId
*
pDestId
=
&
(
pSyncNode
->
peersId
[
i
]);
// next index
SyncIndex
nextIndex
=
syncIndexMgrGetIndex
(
pSyncNode
->
pNextIndex
,
pDestId
);
// if pre-entry not exist, create snapshot
SyncIndex
preLogIndex
=
syncNodeGetPreIndex
(
pSyncNode
,
nextIndex
);
SSyncRaftEntry
*
pPreEntry
=
NULL
;
int32_t
code
=
pSyncNode
->
pLogStore
->
syncLogGetEntry
(
pSyncNode
->
pLogStore
,
preLogIndex
,
&
pPreEntry
);
if
(
code
==
-
1
&&
terrno
==
TSDB_CODE_WAL_LOG_NOT_EXIST
)
{
SSnapshot
snapshot
=
{.
data
=
NULL
,
.
lastApplyIndex
=
-
1
,
.
lastApplyTerm
=
0
,
.
lastConfigIndex
=
-
1
};
if
(
pSyncNode
->
pFsm
->
FpGetSnapshot
!=
NULL
)
{
void
*
pReader
=
NULL
;
SReaderParam
readerParam
=
{.
start
=
0
,
.
end
=
nextIndex
};
pSyncNode
->
pFsm
->
FpGetSnapshot
(
pSyncNode
->
pFsm
,
&
snapshot
,
&
readerParam
,
&
pReader
);
// get sender
SSyncSnapshotSender
*
pSender
=
syncNodeGetSnapshotSender
(
pSyncNode
,
pDestId
);
ASSERT
(
pSender
!=
NULL
);
if
(
snapshot
.
lastApplyIndex
>=
SYNC_INDEX_BEGIN
&&
nextIndex
<=
snapshot
.
lastApplyIndex
+
1
&&
!
snapshotSenderIsStart
(
pSender
))
{
// start snapshot
snapshotSenderStart
(
pSender
,
snapshot
,
pReader
);
}
else
{
// no snapshot
if
(
pReader
!=
NULL
)
{
pSyncNode
->
pFsm
->
FpSnapshotStopRead
(
pSyncNode
->
pFsm
,
pReader
);
}
}
}
}
// pre index, pre term
preLogIndex
=
syncNodeGetPreIndex
(
pSyncNode
,
nextIndex
);
SyncTerm
preLogTerm
=
syncNodeGetPreTerm
(
pSyncNode
,
nextIndex
);
if
(
preLogTerm
==
SYNC_TERM_INVALID
)
{
SyncIndex
newNextIndex
=
syncNodeGetLastIndex
(
pSyncNode
)
+
1
;
syncIndexMgrSetIndex
(
pSyncNode
->
pNextIndex
,
pDestId
,
newNextIndex
);
syncIndexMgrSetIndex
(
pSyncNode
->
pMatchIndex
,
pDestId
,
SYNC_INDEX_INVALID
);
sError
(
"vgId:%d sync get pre term error, nextIndex:%ld, update next-index:%ld, match-index:%d, raftid:%ld"
,
pSyncNode
->
vgId
,
nextIndex
,
newNextIndex
,
SYNC_INDEX_INVALID
,
pDestId
->
addr
);
return
-
1
;
}
SRpcMsg
rpcMsgArr
[
SYNC_MAX_BATCH_SIZE
];
memset
(
rpcMsgArr
,
0
,
sizeof
(
rpcMsgArr
));
int32_t
getCount
=
0
;
for
(
int32_t
i
=
0
;
i
<
pSyncNode
->
batchSize
;
++
i
)
{
SSyncRaftEntry
*
pEntry
;
int32_t
code
=
pSyncNode
->
pLogStore
->
syncLogGetEntry
(
pSyncNode
->
pLogStore
,
nextIndex
,
&
pEntry
);
if
(
code
==
0
)
{
ASSERT
(
pEntry
!=
NULL
);
// get rpc msg [i] from entry
syncEntryDestory
(
pEntry
);
getCount
++
;
}
else
{
break
;
}
}
SyncAppendEntriesBatch
*
pMsg
=
syncAppendEntriesBatchBuild
(
rpcMsgArr
,
getCount
,
pSyncNode
->
vgId
);
ASSERT
(
pMsg
!=
NULL
);
// prepare msg
pMsg
->
srcId
=
pSyncNode
->
myRaftId
;
pMsg
->
destId
=
*
pDestId
;
pMsg
->
term
=
pSyncNode
->
pRaftStore
->
currentTerm
;
pMsg
->
prevLogIndex
=
preLogIndex
;
pMsg
->
prevLogTerm
=
preLogTerm
;
pMsg
->
commitIndex
=
pSyncNode
->
commitIndex
;
pMsg
->
privateTerm
=
0
;
pMsg
->
dataCount
=
getCount
;
// send msg
syncNodeAppendEntriesBatch
(
pSyncNode
,
pDestId
,
pMsg
);
syncAppendEntriesBatchDestroy
(
pMsg
);
}
return
0
;
}
int32_t
syncNodeAppendEntriesPeersSnapshot
(
SSyncNode
*
pSyncNode
)
{
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
);
...
...
@@ -234,4 +325,24 @@ int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, c
syncAppendEntries2RpcMsg
(
pMsg
,
&
rpcMsg
);
syncNodeSendMsgById
(
destRaftId
,
pSyncNode
,
&
rpcMsg
);
return
ret
;
}
int32_t
syncNodeAppendEntriesBatch
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
const
SyncAppendEntriesBatch
*
pMsg
)
{
do
{
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
destRaftId
->
addr
,
host
,
sizeof
(
host
),
&
port
);
sDebug
(
"vgId:%d, send sync-append-entries-batch to %s:%d, {term:%lu, pre-index:%ld, pre-term:%lu, pterm:%lu, "
"commit:%ld, "
"datalen:%d, dataCount:%d}"
,
pSyncNode
->
vgId
,
host
,
port
,
pMsg
->
term
,
pMsg
->
prevLogIndex
,
pMsg
->
prevLogTerm
,
pMsg
->
privateTerm
,
pMsg
->
commitIndex
,
pMsg
->
dataLen
,
pMsg
->
dataCount
);
}
while
(
0
);
SRpcMsg
rpcMsg
;
syncAppendEntriesBatch2RpcMsg
(
pMsg
,
&
rpcMsg
);
syncNodeSendMsgById
(
destRaftId
,
pSyncNode
,
&
rpcMsg
);
return
0
;
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录