Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4d20469e
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4d20469e
编写于
2月 07, 2023
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: adjust log formats
上级
151dfea9
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
46 addition
and
46 deletion
+46
-46
source/libs/sync/src/syncCommit.c
source/libs/sync/src/syncCommit.c
+1
-1
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+10
-10
source/libs/sync/src/syncPipeline.c
source/libs/sync/src/syncPipeline.c
+33
-33
source/libs/sync/src/syncRaftEntry.c
source/libs/sync/src/syncRaftEntry.c
+1
-1
source/libs/sync/src/syncUtil.c
source/libs/sync/src/syncUtil.c
+1
-1
未找到文件。
source/libs/sync/src/syncCommit.c
浏览文件 @
4d20469e
...
...
@@ -110,7 +110,7 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) {
if
(
indexLikely
>
ths
->
commitIndex
&&
syncNodeAgreedUpon
(
ths
,
indexLikely
))
{
SyncIndex
commitIndex
=
indexLikely
;
syncNodeUpdateCommitIndex
(
ths
,
commitIndex
);
sTrace
(
"vgId:%d, agreed upon. role:%d, term:%"
PRId64
", index:
%"
PRId64
""
,
ths
->
vgId
,
ths
->
state
,
sTrace
(
"vgId:%d, agreed upon. role:%d, term:%"
PRId64
", index:%"
PRId64
""
,
ths
->
vgId
,
ths
->
state
,
ths
->
raftStore
.
currentTerm
,
commitIndex
);
}
return
ths
->
commitIndex
;
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
4d20469e
...
...
@@ -85,7 +85,7 @@ int64_t syncOpen(SSyncInfo* pSyncInfo) {
int32_t
syncStart
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
{
sError
(
"failed to acquire rid:
%"
PRId64
" of tsNodeReftId for pSyncNode"
,
rid
);
sError
(
"failed to acquire rid:%"
PRId64
" of tsNodeReftId for pSyncNode"
,
rid
);
return
-
1
;
}
...
...
@@ -756,7 +756,7 @@ int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) {
SyncIndex
lastVer
=
pNode
->
pLogStore
->
syncLogLastIndex
(
pNode
->
pLogStore
);
if
(
lastVer
<
commitIndex
||
firstVer
>
commitIndex
+
1
)
{
if
(
pNode
->
pLogStore
->
syncLogRestoreFromSnapshot
(
pNode
->
pLogStore
,
commitIndex
))
{
sError
(
"vgId:%d, failed to restore log store from snapshot since %s. lastVer:
%"
PRId64
", snapshotVer:
%"
PRId64
,
sError
(
"vgId:%d, failed to restore log store from snapshot since %s. lastVer:
%"
PRId64
", snapshotVer:
%"
PRId64
,
pNode
->
vgId
,
terrstr
(),
lastVer
,
commitIndex
);
return
-
1
;
}
...
...
@@ -1101,7 +1101,7 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) {
SyncIndex
endIndex
=
pSyncNode
->
pLogBuf
->
endIndex
;
if
(
lastVer
!=
-
1
&&
endIndex
!=
lastVer
+
1
)
{
terrno
=
TSDB_CODE_WAL_LOG_INCOMPLETE
;
sError
(
"vgId:%d, failed to restore sync node since %s. expected lastLogIndex:
%"
PRId64
", lastVer:
%"
PRId64
""
,
sError
(
"vgId:%d, failed to restore sync node since %s. expected lastLogIndex:
%"
PRId64
", lastVer:
%"
PRId64
""
,
pSyncNode
->
vgId
,
terrstr
(),
endIndex
-
1
,
lastVer
);
return
-
1
;
}
...
...
@@ -1820,7 +1820,7 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
SyncIndex
lastIndex
=
pSyncNode
->
pLogStore
->
syncLogLastIndex
(
pSyncNode
->
pLogStore
);
ASSERT
(
lastIndex
>=
0
);
sInfo
(
"vgId:%d, become leader. term:
%"
PRId64
", commit index: %"
PRId64
", last index:
%"
PRId64
""
,
sInfo
(
"vgId:%d, become leader. term:
%"
PRId64
", commit index:%"
PRId64
", last index:
%"
PRId64
""
,
pSyncNode
->
vgId
,
pSyncNode
->
raftStore
.
currentTerm
,
pSyncNode
->
commitIndex
,
lastIndex
);
}
...
...
@@ -1839,7 +1839,7 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_FOLLOWER
);
pSyncNode
->
state
=
TAOS_SYNC_STATE_CANDIDATE
;
SyncIndex
lastIndex
=
pSyncNode
->
pLogStore
->
syncLogLastIndex
(
pSyncNode
->
pLogStore
);
sInfo
(
"vgId:%d, become candidate from follower. term:
%"
PRId64
", commit index: %"
PRId64
", last index:
%"
PRId64
,
sInfo
(
"vgId:%d, become candidate from follower. term:
%"
PRId64
", commit index:%"
PRId64
", last index:
%"
PRId64
,
pSyncNode
->
vgId
,
pSyncNode
->
raftStore
.
currentTerm
,
pSyncNode
->
commitIndex
,
lastIndex
);
sNTrace
(
pSyncNode
,
"follower to candidate"
);
...
...
@@ -1849,7 +1849,7 @@ void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
);
syncNodeBecomeFollower
(
pSyncNode
,
"leader to follower"
);
SyncIndex
lastIndex
=
pSyncNode
->
pLogStore
->
syncLogLastIndex
(
pSyncNode
->
pLogStore
);
sInfo
(
"vgId:%d, become follower from leader. term:
%"
PRId64
", commit index: %"
PRId64
", last index:
%"
PRId64
,
sInfo
(
"vgId:%d, become follower from leader. term:
%"
PRId64
", commit index:%"
PRId64
", last index:
%"
PRId64
,
pSyncNode
->
vgId
,
pSyncNode
->
raftStore
.
currentTerm
,
pSyncNode
->
commitIndex
,
lastIndex
);
sNTrace
(
pSyncNode
,
"leader to follower"
);
...
...
@@ -1859,7 +1859,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
);
syncNodeBecomeFollower
(
pSyncNode
,
"candidate to follower"
);
SyncIndex
lastIndex
=
pSyncNode
->
pLogStore
->
syncLogLastIndex
(
pSyncNode
->
pLogStore
);
sInfo
(
"vgId:%d, become follower from candidate. term:
%"
PRId64
", commit index: %"
PRId64
", last index:
%"
PRId64
,
sInfo
(
"vgId:%d, become follower from candidate. term:
%"
PRId64
", commit index:%"
PRId64
", last index:
%"
PRId64
,
pSyncNode
->
vgId
,
pSyncNode
->
raftStore
.
currentTerm
,
pSyncNode
->
commitIndex
,
lastIndex
);
sNTrace
(
pSyncNode
,
"candidate to follower"
);
...
...
@@ -2299,7 +2299,7 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
// proceed match index, with replicating on needed
SyncIndex
matchIndex
=
syncLogBufferProceed
(
ths
->
pLogBuf
,
ths
,
NULL
);
sTrace
(
"vgId:%d, append raft entry. index:
%"
PRId64
", term:
%"
PRId64
" pBuf: [%"
PRId64
" %"
PRId64
" %"
PRId64
sTrace
(
"vgId:%d, append raft entry. index:
%"
PRId64
", term:
%"
PRId64
" pBuf: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
ths
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
ths
->
pLogBuf
->
startIndex
,
ths
->
pLogBuf
->
commitIndex
,
ths
->
pLogBuf
->
matchIndex
,
ths
->
pLogBuf
->
endIndex
);
...
...
@@ -2472,7 +2472,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
sError
(
"vgId:%d, sync enqueue step-down msg error, code:%d"
,
ths
->
vgId
,
code
);
rpcFreeCont
(
rpcMsgLocalCmd
.
pCont
);
}
else
{
sTrace
(
"vgId:%d, sync enqueue step-down msg, new-term:
%"
PRId64
,
ths
->
vgId
,
pSyncMsg
->
currentTerm
);
sTrace
(
"vgId:%d, sync enqueue step-down msg, new-term:%"
PRId64
,
ths
->
vgId
,
pSyncMsg
->
currentTerm
);
}
}
}
...
...
@@ -2538,7 +2538,7 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
(
void
)
syncNodeUpdateCommitIndex
(
ths
,
pMsg
->
commitIndex
);
}
if
(
syncLogBufferCommit
(
ths
->
pLogBuf
,
ths
,
ths
->
commitIndex
)
<
0
)
{
sError
(
"vgId:%d, failed to commit raft log since %s. commit index:
%"
PRId64
""
,
ths
->
vgId
,
terrstr
(),
sError
(
"vgId:%d, failed to commit raft log since %s. commit index:%"
PRId64
""
,
ths
->
vgId
,
terrstr
(),
ths
->
commitIndex
);
}
}
else
{
...
...
source/libs/sync/src/syncPipeline.c
浏览文件 @
4d20469e
...
...
@@ -132,16 +132,16 @@ SSyncRaftEntry* syncEntryBuildDummy(SyncTerm term, SyncIndex index, int32_t vgId
int32_t
syncLogValidateAlignmentOfCommit
(
SSyncNode
*
pNode
,
SyncIndex
commitIndex
)
{
SyncIndex
firstVer
=
pNode
->
pLogStore
->
syncLogBeginIndex
(
pNode
->
pLogStore
);
if
(
firstVer
>
commitIndex
+
1
)
{
sError
(
"vgId:%d, firstVer of WAL log greater than tsdb commit version + 1. firstVer:
%"
PRId64
", tsdb commit version:
%"
PRId64
""
,
sError
(
"vgId:%d, firstVer of WAL log greater than tsdb commit version + 1. firstVer:%"
PRId64
", tsdb commit version:%"
PRId64
""
,
pNode
->
vgId
,
firstVer
,
commitIndex
);
return
-
1
;
}
SyncIndex
lastVer
=
pNode
->
pLogStore
->
syncLogLastIndex
(
pNode
->
pLogStore
);
if
(
lastVer
<
commitIndex
)
{
sError
(
"vgId:%d, lastVer of WAL log less than tsdb commit version. lastVer:
%"
PRId64
", tsdb commit version:
%"
PRId64
""
,
sError
(
"vgId:%d, lastVer of WAL log less than tsdb commit version. lastVer:%"
PRId64
", tsdb commit version:%"
PRId64
""
,
pNode
->
vgId
,
lastVer
,
commitIndex
);
return
-
1
;
}
...
...
@@ -293,7 +293,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
bool
inBuf
=
true
;
if
(
index
<=
pBuf
->
commitIndex
)
{
sTrace
(
"vgId:%d, already committed. index:
%"
PRId64
", term:
%"
PRId64
". log buffer: [%"
PRId64
" %"
PRId64
sTrace
(
"vgId:%d, already committed. index:
%"
PRId64
", term:
%"
PRId64
". log buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
...
...
@@ -306,7 +306,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
}
if
(
index
-
pBuf
->
startIndex
>=
pBuf
->
size
)
{
sWarn
(
"vgId:%d, out of buffer range. index:
%"
PRId64
", term:
%"
PRId64
". log buffer: [%"
PRId64
" %"
PRId64
sWarn
(
"vgId:%d, out of buffer range. index:
%"
PRId64
", term:
%"
PRId64
". log buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
...
...
@@ -314,8 +314,8 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
}
if
(
index
>
pBuf
->
matchIndex
&&
lastMatchTerm
!=
prevTerm
)
{
sWarn
(
"vgId:%d, not ready to accept. index:
%"
PRId64
", term: %"
PRId64
": prevterm:
%"
PRId64
" != lastmatch:
%"
PRId64
". log buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
sWarn
(
"vgId:%d, not ready to accept. index:
%"
PRId64
", term:%"
PRId64
": prevterm:
%"
PRId64
" != lastmatch:%"
PRId64
". log buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
prevTerm
,
lastMatchTerm
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
goto
_out
;
...
...
@@ -328,7 +328,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
if
(
pEntry
->
term
!=
pExist
->
term
)
{
(
void
)
syncLogBufferRollback
(
pBuf
,
pNode
,
index
);
}
else
{
sTrace
(
"vgId:%d, duplicate log entry received. index:
%"
PRId64
", term:
%"
PRId64
". log buffer: [%"
PRId64
sTrace
(
"vgId:%d, duplicate log entry received. index:
%"
PRId64
", term:
%"
PRId64
". log buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
...
...
@@ -434,7 +434,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
// increase match index
pBuf
->
matchIndex
=
index
;
sTrace
(
"vgId:%d, log buffer proceed. start index:
%"
PRId64
", match index: %"
PRId64
", end index:
%"
PRId64
,
sTrace
(
"vgId:%d, log buffer proceed. start index:
%"
PRId64
", match index:%"
PRId64
", end index:
%"
PRId64
,
pNode
->
vgId
,
pBuf
->
startIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
// replicate on demand
...
...
@@ -475,7 +475,7 @@ int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, Syn
}
if
(
pEntry
->
originalRpcType
==
TDMT_VND_COMMIT
)
{
sInfo
(
"vgId:%d, fsm execute vnode commit. index:
%"
PRId64
", term:
%"
PRId64
""
,
pNode
->
vgId
,
pEntry
->
index
,
sInfo
(
"vgId:%d, fsm execute vnode commit. index:
%"
PRId64
", term:
%"
PRId64
""
,
pNode
->
vgId
,
pEntry
->
index
,
pEntry
->
term
);
}
...
...
@@ -528,7 +528,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
goto
_out
;
}
sTrace
(
"vgId:%d, commit. log buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
"), role:
%d, term:
%"
PRId64
,
sTrace
(
"vgId:%d, commit. log buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
"), role:
%d, term:
%"
PRId64
,
pNode
->
vgId
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
,
role
,
term
);
// execute in fsm
...
...
@@ -541,19 +541,19 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
// execute it
if
(
!
syncUtilUserCommit
(
pEntry
->
originalRpcType
))
{
sInfo
(
"vgId:%d, commit sync barrier. index:
%"
PRId64
", term:%"
PRId64
", type:
%s"
,
vgId
,
pEntry
->
index
,
sInfo
(
"vgId:%d, commit sync barrier. index:
%"
PRId64
", term:%"
PRId64
", type:
%s"
,
vgId
,
pEntry
->
index
,
pEntry
->
term
,
TMSG_INFO
(
pEntry
->
originalRpcType
));
}
if
(
syncLogFsmExecute
(
pNode
,
pFsm
,
role
,
term
,
pEntry
,
0
)
!=
0
)
{
sError
(
"vgId:%d, failed to execute sync log entry. index:%"
PRId64
", term:%"
PRId64
", role:
%d, current term:
%"
PRId64
,
", role:
%d, current term:
%"
PRId64
,
vgId
,
pEntry
->
index
,
pEntry
->
term
,
role
,
term
);
goto
_out
;
}
pBuf
->
commitIndex
=
index
;
sTrace
(
"vgId:%d, committed index:
%"
PRId64
", term: %"
PRId64
", role: %d, current term:
%"
PRId64
""
,
pNode
->
vgId
,
sTrace
(
"vgId:%d, committed index:
%"
PRId64
", term:%"
PRId64
", role:%d, current term:
%"
PRId64
""
,
pNode
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
role
,
term
);
if
(
!
inBuf
)
{
...
...
@@ -614,7 +614,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
SRaftId
*
pDestId
=
&
pNode
->
replicasId
[
pMgr
->
peerId
];
if
(
pMgr
->
retryBackoff
==
SYNC_MAX_RETRY_BACKOFF
)
{
syncLogReplMgrReset
(
pMgr
);
sWarn
(
"vgId:%d, reset sync log repl mgr since retry backoff exceeding limit. peer:
%"
PRIx64
,
pNode
->
vgId
,
sWarn
(
"vgId:%d, reset sync log repl mgr since retry backoff exceeding limit. peer:%"
PRIx64
,
pNode
->
vgId
,
pDestId
->
addr
);
return
-
1
;
}
...
...
@@ -639,7 +639,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
if
(
pMgr
->
states
[
pos
].
acked
)
{
if
(
pMgr
->
matchIndex
<
index
&&
pMgr
->
states
[
pos
].
timeMs
+
(
syncGetRetryMaxWaitMs
()
<<
3
)
<
nowMs
)
{
syncLogReplMgrReset
(
pMgr
);
sWarn
(
"vgId:%d, reset sync log repl mgr since stagnation. index:
%"
PRId64
", peer:
%"
PRIx64
,
pNode
->
vgId
,
sWarn
(
"vgId:%d, reset sync log repl mgr since stagnation. index:
%"
PRId64
", peer:
%"
PRIx64
,
pNode
->
vgId
,
index
,
pDestId
->
addr
);
goto
_out
;
}
...
...
@@ -648,7 +648,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
bool
barrier
=
false
;
if
(
syncLogReplMgrReplicateOneTo
(
pMgr
,
pNode
,
index
,
&
term
,
pDestId
,
&
barrier
)
<
0
)
{
sError
(
"vgId:%d, failed to replicate sync log entry since %s. index:
%"
PRId64
", dest:
%"
PRIx64
""
,
pNode
->
vgId
,
sError
(
"vgId:%d, failed to replicate sync log entry since %s. index:
%"
PRId64
", dest:
%"
PRIx64
""
,
pNode
->
vgId
,
terrstr
(),
index
,
pDestId
->
addr
);
goto
_out
;
}
...
...
@@ -670,8 +670,8 @@ _out:
if
(
retried
)
{
pMgr
->
retryBackoff
=
syncLogGetNextRetryBackoff
(
pMgr
);
SSyncLogBuffer
*
pBuf
=
pNode
->
pLogBuf
;
sInfo
(
"vgId:%d, resend %d sync log entries. dest:
%"
PRIx64
", indexes:
%"
PRId64
" ..., terms: ... %"
PRId64
", retryWaitMs:
%"
PRId64
", mgr: [%"
PRId64
" %"
PRId64
", %"
PRId64
"), buffer: [%"
PRId64
" %"
PRId64
sInfo
(
"vgId:%d, resend %d sync log entries. dest:
%"
PRIx64
", indexes:
%"
PRId64
" ..., terms: ... %"
PRId64
", retryWaitMs:%"
PRId64
", mgr: [%"
PRId64
" %"
PRId64
", %"
PRId64
"), buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
count
,
pDestId
->
addr
,
firstIndex
,
term
,
retryWaitMs
,
pMgr
->
startIndex
,
pMgr
->
matchIndex
,
pMgr
->
endIndex
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
...
...
@@ -714,7 +714,7 @@ int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* p
}
if
(
pMsg
->
success
==
false
&&
pMsg
->
matchIndex
>=
pMsg
->
lastSendIndex
)
{
sWarn
(
"vgId:%d, failed to rollback match index. peer: dnode:%d, match index:
%"
PRId64
", last sent:
%"
PRId64
,
sWarn
(
"vgId:%d, failed to rollback match index. peer: dnode:%d, match index:
%"
PRId64
", last sent:
%"
PRId64
,
pNode
->
vgId
,
DID
(
&
destId
),
pMsg
->
matchIndex
,
pMsg
->
lastSendIndex
);
if
(
syncNodeStartSnapshot
(
pNode
,
&
destId
)
<
0
)
{
sError
(
"vgId:%d, failed to start snapshot for peer dnode:%d"
,
pNode
->
vgId
,
DID
(
&
destId
));
...
...
@@ -761,7 +761,7 @@ int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pN
SSyncLogBuffer
*
pBuf
=
pNode
->
pLogBuf
;
taosThreadMutexLock
(
&
pBuf
->
mutex
);
if
(
pMsg
->
startTime
!=
0
&&
pMsg
->
startTime
!=
pMgr
->
peerStartTime
)
{
sInfo
(
"vgId:%d, reset sync log repl mgr in heartbeat. peer:
%"
PRIx64
", start time:%"
PRId64
", old:%"
PRId64
""
,
sInfo
(
"vgId:%d, reset sync log repl mgr in heartbeat. peer:%"
PRIx64
", start time:%"
PRId64
", old:%"
PRId64
""
,
pNode
->
vgId
,
pMsg
->
srcId
.
addr
,
pMsg
->
startTime
,
pMgr
->
peerStartTime
);
syncLogReplMgrReset
(
pMgr
);
pMgr
->
peerStartTime
=
pMsg
->
startTime
;
...
...
@@ -774,7 +774,7 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync
SSyncLogBuffer
*
pBuf
=
pNode
->
pLogBuf
;
taosThreadMutexLock
(
&
pBuf
->
mutex
);
if
(
pMsg
->
startTime
!=
pMgr
->
peerStartTime
)
{
sInfo
(
"vgId:%d, reset sync log repl mgr in appendlog reply. peer:
%"
PRIx64
", start time:%"
PRId64
sInfo
(
"vgId:%d, reset sync log repl mgr in appendlog reply. peer:%"
PRIx64
", start time:%"
PRId64
", old:%"
PRId64
,
pNode
->
vgId
,
pMsg
->
srcId
.
addr
,
pMsg
->
startTime
,
pMgr
->
peerStartTime
);
syncLogReplMgrReset
(
pMgr
);
...
...
@@ -815,7 +815,7 @@ int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy
bool
barrier
=
false
;
SyncTerm
term
=
-
1
;
if
(
syncLogReplMgrReplicateOneTo
(
pMgr
,
pNode
,
index
,
&
term
,
pDestId
,
&
barrier
)
<
0
)
{
sError
(
"vgId:%d, failed to replicate log entry since %s. index:
%"
PRId64
", dest: 0x%016"
PRIx64
""
,
pNode
->
vgId
,
sError
(
"vgId:%d, failed to replicate log entry since %s. index:%"
PRId64
", dest: 0x%016"
PRIx64
""
,
pNode
->
vgId
,
terrstr
(),
index
,
pDestId
->
addr
);
return
-
1
;
}
...
...
@@ -830,7 +830,7 @@ int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy
pMgr
->
endIndex
=
index
+
1
;
SSyncLogBuffer
*
pBuf
=
pNode
->
pLogBuf
;
sTrace
(
"vgId:%d, probe peer:%"
PRIx64
" with msg of index:%"
PRId64
" term:
%"
PRId64
". mgr (rs:%d): [%"
PRId64
sTrace
(
"vgId:%d, probe peer:%"
PRIx64
" with msg of index:%"
PRId64
" term:%"
PRId64
". mgr (rs:%d): [%"
PRId64
" %"
PRId64
", %"
PRId64
"), buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
pDestId
->
addr
,
index
,
term
,
pMgr
->
restored
,
pMgr
->
startIndex
,
pMgr
->
matchIndex
,
pMgr
->
endIndex
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
...
...
@@ -860,7 +860,7 @@ int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode)
bool
barrier
=
false
;
SyncTerm
term
=
-
1
;
if
(
syncLogReplMgrReplicateOneTo
(
pMgr
,
pNode
,
index
,
&
term
,
pDestId
,
&
barrier
)
<
0
)
{
sError
(
"vgId:%d, failed to replicate log entry since %s. index:
%"
PRId64
", dest: 0x%016"
PRIx64
""
,
pNode
->
vgId
,
sError
(
"vgId:%d, failed to replicate log entry since %s. index:%"
PRId64
", dest: 0x%016"
PRIx64
""
,
pNode
->
vgId
,
terrstr
(),
index
,
pDestId
->
addr
);
return
-
1
;
}
...
...
@@ -874,7 +874,7 @@ int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode)
pMgr
->
endIndex
=
index
+
1
;
if
(
barrier
)
{
sInfo
(
"vgId:%d, replicated sync barrier to dest:
%"
PRIx64
". index: %"
PRId64
", term:
%"
PRId64
sInfo
(
"vgId:%d, replicated sync barrier to dest:
%"
PRIx64
". index:%"
PRId64
", term:
%"
PRId64
", repl mgr: rs(%d) [%"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
pDestId
->
addr
,
index
,
term
,
pMgr
->
restored
,
pMgr
->
startIndex
,
pMgr
->
matchIndex
,
pMgr
->
endIndex
);
...
...
@@ -885,7 +885,7 @@ int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode)
syncLogReplMgrRetryOnNeed
(
pMgr
,
pNode
);
SSyncLogBuffer
*
pBuf
=
pNode
->
pLogBuf
;
sTrace
(
"vgId:%d, replicated %d msgs to peer:
%"
PRIx64
". indexes:
%"
PRId64
"..., terms: ...%"
PRId64
sTrace
(
"vgId:%d, replicated %d msgs to peer:
%"
PRIx64
". indexes:
%"
PRId64
"..., terms: ...%"
PRId64
", mgr: (rs:%d) [%"
PRId64
" %"
PRId64
", %"
PRId64
"), buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
count
,
pDestId
->
addr
,
firstIndex
,
term
,
pMgr
->
restored
,
pMgr
->
startIndex
,
pMgr
->
matchIndex
,
...
...
@@ -1028,7 +1028,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex
return
0
;
}
sInfo
(
"vgId:%d, rollback sync log buffer. toindex:
%"
PRId64
", buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
sInfo
(
"vgId:%d, rollback sync log buffer. toindex:%"
PRId64
", buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
toIndex
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
...
...
@@ -1119,11 +1119,11 @@ int32_t syncLogReplMgrReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy
pEntry
=
syncLogBufferGetOneEntry
(
pBuf
,
pNode
,
index
,
&
inBuf
);
if
(
pEntry
==
NULL
)
{
sError
(
"vgId:%d, failed to get raft entry for index:
%"
PRId64
""
,
pNode
->
vgId
,
index
);
sError
(
"vgId:%d, failed to get raft entry for index:%"
PRId64
""
,
pNode
->
vgId
,
index
);
if
(
terrno
==
TSDB_CODE_WAL_LOG_NOT_EXIST
)
{
SSyncLogReplMgr
*
pMgr
=
syncNodeGetLogReplMgr
(
pNode
,
pDestId
);
if
(
pMgr
)
{
sInfo
(
"vgId:%d, reset sync log repl mgr of peer:
%"
PRIx64
" since %s. index:
%"
PRId64
,
pNode
->
vgId
,
sInfo
(
"vgId:%d, reset sync log repl mgr of peer:
%"
PRIx64
" since %s. index:
%"
PRId64
,
pNode
->
vgId
,
pDestId
->
addr
,
terrstr
(),
index
);
(
void
)
syncLogReplMgrReset
(
pMgr
);
}
...
...
@@ -1134,7 +1134,7 @@ int32_t syncLogReplMgrReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy
prevLogTerm
=
syncLogReplMgrGetPrevLogTerm
(
pMgr
,
pNode
,
index
);
if
(
prevLogTerm
<
0
)
{
sError
(
"vgId:%d, failed to get prev log term since %s. index:
%"
PRId64
""
,
pNode
->
vgId
,
terrstr
(),
index
);
sError
(
"vgId:%d, failed to get prev log term since %s. index:%"
PRId64
""
,
pNode
->
vgId
,
terrstr
(),
index
);
goto
_err
;
}
if
(
pTerm
)
*
pTerm
=
pEntry
->
term
;
...
...
@@ -1147,7 +1147,7 @@ int32_t syncLogReplMgrReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy
(
void
)
syncNodeSendAppendEntries
(
pNode
,
pDestId
,
&
msgOut
);
sTrace
(
"vgId:%d, replicate one msg index:
%"
PRId64
" term: %"
PRId64
" prevterm:
%"
PRId64
" to dest: 0x%016"
PRIx64
,
sTrace
(
"vgId:%d, replicate one msg index:
%"
PRId64
" term:%"
PRId64
" prevterm:
%"
PRId64
" to dest: 0x%016"
PRIx64
,
pNode
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
prevLogTerm
,
pDestId
->
addr
);
if
(
!
inBuf
)
{
...
...
source/libs/sync/src/syncRaftEntry.c
浏览文件 @
4d20469e
...
...
@@ -91,7 +91,7 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId)
void
syncEntryDestroy
(
SSyncRaftEntry
*
pEntry
)
{
if
(
pEntry
!=
NULL
)
{
sTrace
(
"free entry:
%p"
,
pEntry
);
sTrace
(
"free entry:%p"
,
pEntry
);
taosMemoryFree
(
pEntry
);
}
}
...
...
source/libs/sync/src/syncUtil.c
浏览文件 @
4d20469e
...
...
@@ -43,7 +43,7 @@ void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) {
bool
syncUtilNodeInfo2RaftId
(
const
SNodeInfo
*
pInfo
,
SyncGroupId
vgId
,
SRaftId
*
raftId
)
{
uint32_t
ipv4
=
taosGetIpv4FromFqdn
(
pInfo
->
nodeFqdn
);
if
(
ipv4
==
0xFFFFFFFF
||
ipv4
==
1
)
{
sError
(
"failed to resolve ipv4 addr, fqdn:
%s"
,
pInfo
->
nodeFqdn
);
sError
(
"failed to resolve ipv4 addr, fqdn:%s"
,
pInfo
->
nodeFqdn
);
terrno
=
TSDB_CODE_TSC_INVALID_FQDN
;
return
false
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录