Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
736a1cc0
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
736a1cc0
编写于
11月 25, 2022
作者:
B
Benguang Zhao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh: adjust some info msgs for raft pipelining
上级
b63afcd5
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
34 addition
and
38 deletion
+34
-38
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+4
-4
source/libs/sync/src/syncPipeline.c
source/libs/sync/src/syncPipeline.c
+30
-34
未找到文件。
source/libs/sync/src/syncMain.c
浏览文件 @
736a1cc0
...
...
@@ -89,7 +89,7 @@ int32_t syncStart(int64_t rid) {
}
if
(
syncNodeRestore
(
pSyncNode
)
<
0
)
{
sError
(
"vgId:%d, failed to restore
raft
log buffer since %s"
,
pSyncNode
->
vgId
,
terrstr
());
sError
(
"vgId:%d, failed to restore
sync
log buffer since %s"
,
pSyncNode
->
vgId
,
terrstr
());
goto
_err
;
}
...
...
@@ -847,7 +847,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// create raft log ring buffer
pSyncNode
->
pLogBuf
=
syncLogBufferCreate
();
if
(
pSyncNode
->
pLogBuf
==
NULL
)
{
sError
(
"failed to init log buffer since %s. vgId:%d"
,
terrstr
(),
pSyncNode
->
vgId
);
sError
(
"failed to init
sync
log buffer since %s. vgId:%d"
,
terrstr
(),
pSyncNode
->
vgId
);
goto
_error
;
}
...
...
@@ -1060,7 +1060,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// init log buffer
if
(
syncLogBufferInit
(
pSyncNode
->
pLogBuf
,
pSyncNode
)
<
0
)
{
sError
(
"vgId:%d, failed to init
raft
log buffer since %s"
,
pSyncNode
->
vgId
,
terrstr
());
sError
(
"vgId:%d, failed to init
sync
log buffer since %s"
,
pSyncNode
->
vgId
,
terrstr
());
goto
_error
;
}
...
...
@@ -2239,7 +2239,7 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand
int32_t
syncNodeAppend
(
SSyncNode
*
ths
,
SSyncRaftEntry
*
pEntry
)
{
// append to log buffer
if
(
syncLogBufferAppend
(
ths
->
pLogBuf
,
ths
,
pEntry
)
<
0
)
{
sError
(
"vgId:%d, failed to enqueue log buffer. index:%"
PRId64
""
,
ths
->
vgId
,
pEntry
->
index
);
sError
(
"vgId:%d, failed to enqueue
sync
log buffer. index:%"
PRId64
""
,
ths
->
vgId
,
pEntry
->
index
);
return
-
1
;
}
...
...
source/libs/sync/src/syncPipeline.c
浏览文件 @
736a1cc0
...
...
@@ -38,7 +38,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
SyncIndex
index
=
pEntry
->
index
;
if
(
index
-
pBuf
->
startIndex
>=
pBuf
->
size
)
{
sError
(
"vgId:%d, failed to append due to log buffer full. index:%"
PRId64
""
,
pNode
->
vgId
,
index
);
sError
(
"vgId:%d, failed to append due to
sync
log buffer full. index:%"
PRId64
""
,
pNode
->
vgId
,
index
);
goto
_out
;
}
...
...
@@ -49,7 +49,7 @@ int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
// initial log buffer with at least one item, e.g. commitIndex
SSyncRaftEntry
*
pMatch
=
pBuf
->
entries
[(
index
-
1
+
pBuf
->
size
)
%
pBuf
->
size
].
pItem
;
ASSERT
(
pMatch
!=
NULL
&&
"no matched
raft
log entry"
);
ASSERT
(
pMatch
!=
NULL
&&
"no matched log entry"
);
ASSERT
(
pMatch
->
index
+
1
==
index
);
SSyncLogBufEntry
tmp
=
{.
pItem
=
pEntry
,
.
prevLogIndex
=
pMatch
->
index
,
.
prevLogTerm
=
pMatch
->
term
};
...
...
@@ -273,8 +273,8 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
SyncTerm
lastMatchTerm
=
syncLogBufferGetLastMatchTerm
(
pBuf
);
if
(
index
<=
pBuf
->
commitIndex
)
{
sTrace
(
"vgId:%d,
raft entry already committed. index: %"
PRId64
", term: %"
PRId64
". log buffer: [
%"
PRId64
" %"
PRId64
"
%"
PRId64
"
, %"
PRId64
")"
,
sTrace
(
"vgId:%d,
already committed. index: %"
PRId64
", term: %"
PRId64
". log buffer: [%"
PRId64
"
%"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
SyncTerm
term
=
syncLogReplMgrGetPrevLogTerm
(
NULL
,
pNode
,
index
+
1
);
...
...
@@ -286,15 +286,15 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
}
if
(
index
-
pBuf
->
startIndex
>=
pBuf
->
size
)
{
sInfo
(
"vgId:%d,
raft entry out of buffer capacity. index: %"
PRId64
", term: %"
PRId64
". log buffer: [
%"
PRId64
" %"
PRId64
"
%"
PRId64
"
, %"
PRId64
")"
,
sInfo
(
"vgId:%d,
out of buffer range. index: %"
PRId64
", term: %"
PRId64
". log buffer: [%"
PRId64
"
%"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
goto
_out
;
}
if
(
index
>
pBuf
->
matchIndex
&&
lastMatchTerm
!=
prevTerm
)
{
sInfo
(
"vgId:%d, not ready to accept
raft entry
. index: %"
PRId64
", term: %"
PRId64
": prevterm: %"
PRId64
sInfo
(
"vgId:%d, not ready to accept. index: %"
PRId64
", term: %"
PRId64
": prevterm: %"
PRId64
" != lastmatch: %"
PRId64
". log buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
prevTerm
,
lastMatchTerm
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
...
...
@@ -309,7 +309,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
if
(
pEntry
->
term
!=
pExist
->
term
)
{
(
void
)
syncLogBufferRollback
(
pBuf
,
index
);
}
else
{
sDebug
(
"vgId:%d, duplicate
raft
entry received. index: %"
PRId64
", term: %"
PRId64
". log buffer: [%"
PRId64
sDebug
(
"vgId:%d, duplicate
log
entry received. index: %"
PRId64
", term: %"
PRId64
". log buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
...
...
@@ -349,7 +349,7 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
ASSERT
(
pEntry
->
index
==
lastVer
+
1
);
if
(
pLogStore
->
syncLogAppendEntry
(
pLogStore
,
pEntry
)
<
0
)
{
sError
(
"failed to append
raft
log entry since %s. index:%"
PRId64
", term:%"
PRId64
""
,
terrstr
(),
pEntry
->
index
,
sError
(
"failed to append
sync
log entry since %s. index:%"
PRId64
", term:%"
PRId64
""
,
terrstr
(),
pEntry
->
index
,
pEntry
->
term
);
return
-
1
;
}
...
...
@@ -392,7 +392,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
if
(
pMatch
->
term
!=
prevLogTerm
)
{
sInfo
(
"vgId:%d, mismatching
raft
log entries encountered. "
"vgId:%d, mismatching
sync
log entries encountered. "
"{ index:%"
PRId64
", term:%"
PRId64
" } "
"{ index:%"
PRId64
", term:%"
PRId64
", prevLogIndex:%"
PRId64
", prevLogTerm:%"
PRId64
" } "
,
...
...
@@ -411,8 +411,8 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
// persist
if
(
syncLogStorePersist
(
pLogStore
,
pEntry
)
<
0
)
{
sError
(
"vgId:%d, failed to persist
raft log entry from log buffer since %s. index:%"
PRId64
,
pNode
->
vgId
,
terrstr
(),
pEntry
->
index
);
sError
(
"vgId:%d, failed to persist
sync log entry from buffer since %s. index:%"
PRId64
,
pNode
->
vgId
,
terrstr
()
,
pEntry
->
index
);
goto
_out
;
}
ASSERT
(
pEntry
->
index
==
pBuf
->
matchIndex
);
...
...
@@ -482,15 +482,14 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
bool
inBuf
=
false
;
if
(
commitIndex
<=
pBuf
->
commitIndex
)
{
sDebug
(
"vgId:%d, stale commit
update
. current:%"
PRId64
", notified:%"
PRId64
""
,
vgId
,
pBuf
->
commitIndex
,
sDebug
(
"vgId:%d, stale commit
index
. current:%"
PRId64
", notified:%"
PRId64
""
,
vgId
,
pBuf
->
commitIndex
,
commitIndex
);
ret
=
0
;
goto
_out
;
}
sTrace
(
"vgId:%d, log buffer info. role: %d, term: %"
PRId64
". start index:%"
PRId64
", commit index:%"
PRId64
", match index: %"
PRId64
", end index:%"
PRId64
""
,
pNode
->
vgId
,
role
,
term
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
sTrace
(
"vgId:%d, commit. log buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
"), role: %d, term: %"
PRId64
,
pNode
->
vgId
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
,
role
,
term
);
// execute in fsm
for
(
int64_t
index
=
pBuf
->
commitIndex
+
1
;
index
<=
upperIndex
;
index
++
)
{
...
...
@@ -513,7 +512,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
}
if
(
syncLogFsmExecute
(
pNode
,
pFsm
,
role
,
term
,
pEntry
)
!=
0
)
{
sError
(
"vgId:%d, failed to execute
raft
entry in FSM. log index:%"
PRId64
", term:%"
PRId64
""
,
vgId
,
sError
(
"vgId:%d, failed to execute
sync log
entry in FSM. log index:%"
PRId64
", term:%"
PRId64
""
,
vgId
,
pEntry
->
index
,
pEntry
->
term
);
goto
_out
;
}
...
...
@@ -605,7 +604,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
bool
barrier
=
false
;
if
(
syncLogBufferReplicateOneTo
(
pMgr
,
pNode
,
index
,
&
term
,
pDestId
,
&
barrier
)
<
0
)
{
sError
(
"vgId:%d, failed to replicate log entry since %s. index: %"
PRId64
", dest: %"
PRIx64
""
,
pNode
->
vgId
,
sError
(
"vgId:%d, failed to replicate
sync
log entry since %s. index: %"
PRId64
", dest: %"
PRIx64
""
,
pNode
->
vgId
,
terrstr
(),
index
,
pDestId
->
addr
);
goto
_out
;
}
...
...
@@ -622,9 +621,8 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
_out:
if
(
retried
)
{
pMgr
->
retryBackoff
=
syncLogGetNextRetryBackoff
(
pMgr
);
sInfo
(
"vgId:%d, resend %d raft log entries. dest: %"
PRIx64
", for indexes: %"
PRId64
" etc., maybe of term: %"
PRId64
", retryWaitMs: %"
PRId64
", repl mgr: [%"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
sInfo
(
"vgId:%d, resend %d sync log entries. dest: %"
PRIx64
", indexes: %"
PRId64
" ..., likely term: %"
PRId64
", retryWaitMs: %"
PRId64
", repl mgr: [%"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
count
,
pDestId
->
addr
,
firstIndex
,
term
,
retryWaitMs
,
pMgr
->
startIndex
,
pMgr
->
matchIndex
,
pMgr
->
endIndex
);
}
...
...
@@ -645,8 +643,8 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
ASSERT
(
pMgr
->
matchIndex
==
0
);
if
(
pMsg
->
matchIndex
<
0
)
{
pMgr
->
restored
=
true
;
sInfo
(
"vgId:%d, sync log repl mgr restored. peer: %s:%d (%"
PRIx64
"),
repl mgr(rs:%d):
[%"
PRId64
" %"
PRId64
", %"
PRId64
"),
log
buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
sInfo
(
"vgId:%d, sync log repl mgr restored. peer: %s:%d (%"
PRIx64
"),
mgr: rs(%d)
[%"
PRId64
" %"
PRId64
", %"
PRId64
"), buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
host
,
port
,
destId
.
addr
,
pMgr
->
restored
,
pMgr
->
startIndex
,
pMgr
->
matchIndex
,
pMgr
->
endIndex
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
return
0
;
...
...
@@ -661,21 +659,21 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod
if
(
pMsg
->
success
&&
pMsg
->
matchIndex
==
pMsg
->
lastSendIndex
)
{
pMgr
->
restored
=
true
;
sInfo
(
"vgId:%d, sync log repl mgr restored. peer: %s:%d (%"
PRIx64
"),
repl mgr(rs:%d):
[%"
PRId64
" %"
PRId64
", %"
PRId64
"),
log
buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
sInfo
(
"vgId:%d, sync log repl mgr restored. peer: %s:%d (%"
PRIx64
"),
mgr: rs(%d)
[%"
PRId64
" %"
PRId64
", %"
PRId64
"), buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
host
,
port
,
destId
.
addr
,
pMgr
->
restored
,
pMgr
->
startIndex
,
pMgr
->
matchIndex
,
pMgr
->
endIndex
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
return
0
;
}
if
(
pMsg
->
success
==
false
&&
pMsg
->
matchIndex
>=
pMsg
->
lastSendIndex
)
{
sError
(
"vgId:%d, failed to rollback match index. peer: %s:%d, match index: %"
PRId64
", last sent: %"
PRId64
,
pNode
->
vgId
,
host
,
port
,
pMsg
->
matchIndex
,
pMsg
->
lastSendIndex
);
sError
(
"vgId:%d, failed to rollback match index. peer: %s:%d, match index: %"
PRId64
", last sent: %"
PRId64
,
pNode
->
vgId
,
host
,
port
,
pMsg
->
matchIndex
,
pMsg
->
lastSendIndex
);
if
(
syncNodeStartSnapshot
(
pNode
,
&
destId
)
<
0
)
{
sError
(
"vgId:%d, failed to start snapshot for peer %s:%d"
,
pNode
->
vgId
,
host
,
port
);
return
-
1
;
}
sInfo
(
"vgId:%d, snapshot replication to
rollback. peer:
%s:%d"
,
pNode
->
vgId
,
host
,
port
);
sInfo
(
"vgId:%d, snapshot replication to
peer
%s:%d"
,
pNode
->
vgId
,
host
,
port
);
return
0
;
}
...
...
@@ -748,7 +746,8 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync
SSyncLogBuffer
*
pBuf
=
pNode
->
pLogBuf
;
taosThreadMutexLock
(
&
pBuf
->
mutex
);
if
(
pMsg
->
startTime
!=
pMgr
->
peerStartTime
)
{
sInfo
(
"vgId:%d, reset sync log repl mgr in append entries reply. peer: %"
PRIx64
", start time:%"
PRId64
", old:%"
PRId64
,
sInfo
(
"vgId:%d, reset sync log repl mgr in appendlog reply. peer: %"
PRIx64
", start time:%"
PRId64
", old:%"
PRId64
,
pNode
->
vgId
,
pMsg
->
srcId
.
addr
,
pMsg
->
startTime
,
pMgr
->
peerStartTime
);
syncLogReplMgrReset
(
pMgr
);
pMgr
->
peerStartTime
=
pMsg
->
startTime
;
...
...
@@ -793,8 +792,6 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode
return
0
;
}
_Atomic
int64_t
tsSendCnt
=
0
;
int32_t
syncLogReplMgrReplicateAttemptedOnce
(
SSyncLogReplMgr
*
pMgr
,
SSyncNode
*
pNode
)
{
ASSERT
(
pMgr
->
restored
);
SRaftId
*
pDestId
=
&
pNode
->
replicasId
[
pMgr
->
peerId
];
...
...
@@ -825,7 +822,6 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
pMgr
->
states
[
pos
].
acked
=
false
;
pMgr
->
endIndex
=
index
+
1
;
tsSendCnt
++
;
if
(
barrier
)
{
break
;
}
...
...
@@ -957,7 +953,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
(
void
)
syncLogBufferRollback
(
pBuf
,
pBuf
->
matchIndex
+
1
);
sInfo
(
"vgId:%d, reset sync log buffer.
log
buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
sInfo
(
"vgId:%d, reset sync log buffer. buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
pBuf
->
endIndex
=
pBuf
->
matchIndex
+
1
;
...
...
@@ -1003,7 +999,7 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn
if
(
terrno
==
TSDB_CODE_WAL_LOG_NOT_EXIST
)
{
SSyncLogReplMgr
*
pMgr
=
syncNodeGetLogReplMgr
(
pNode
,
pDestId
);
if
(
pMgr
)
{
sInfo
(
"vgId:%d, reset sync log repl mgr
for peer: 0x%016
"
PRIx64
" since %s. index: %"
PRId64
,
pNode
->
vgId
,
sInfo
(
"vgId:%d, reset sync log repl mgr
of peer: %
"
PRIx64
" since %s. index: %"
PRId64
,
pNode
->
vgId
,
pDestId
->
addr
,
terrstr
(),
index
);
(
void
)
syncLogReplMgrReset
(
pMgr
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录