Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
da469149
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
da469149
编写于
11月 17, 2022
作者:
B
Benguang Zhao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh: debug to info
上级
051b73c1
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
41 addition
and
57 deletion
+41
-57
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+6
-6
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+13
-13
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+6
-6
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+16
-32
未找到文件。
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
da469149
...
...
@@ -307,10 +307,10 @@ static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const S
rpcMsg
.
info
.
conn
.
applyTerm
=
pMeta
->
term
;
const
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
vG
Trace
(
"vgId:%d, commit-cb is excuted, fsm:%p, index:%"
PRId64
", term:%"
PRIu64
", msg-index:%"
PRId64
", weak:%d, code:%d, state:%d %s, type:%s"
,
pVnode
->
config
.
vgId
,
pFsm
,
pMeta
->
index
,
pMeta
->
term
,
rpcMsg
.
info
.
conn
.
applyIndex
,
pMeta
->
isWeak
,
pMeta
->
code
,
pMeta
->
state
,
syncStr
(
pMeta
->
state
),
TMSG_INFO
(
pMsg
->
msgType
));
vG
Info
(
"vgId:%d, commit-cb is excuted, fsm:%p, index:%"
PRId64
", term:%"
PRIu64
", msg-index:%"
PRId64
", weak:%d, code:%d, state:%d %s, type:%s"
,
pVnode
->
config
.
vgId
,
pFsm
,
pMeta
->
index
,
pMeta
->
term
,
rpcMsg
.
info
.
conn
.
applyIndex
,
pMeta
->
isWeak
,
pMeta
->
code
,
pMeta
->
state
,
syncStr
(
pMeta
->
state
),
TMSG_INFO
(
pMsg
->
msgType
));
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
APPLY_QUEUE
,
&
rpcMsg
);
}
else
{
...
...
@@ -558,12 +558,12 @@ bool vnodeIsLeader(SVnode *pVnode) {
}
else
{
terrno
=
TSDB_CODE_APP_NOT_READY
;
}
v
Debug
(
"vgId:%d, vnode not ready, state:%s, restore:%d"
,
pVnode
->
config
.
vgId
,
syncStr
(
state
.
state
),
state
.
restored
);
v
Info
(
"vgId:%d, vnode not ready, state:%s, restore:%d"
,
pVnode
->
config
.
vgId
,
syncStr
(
state
.
state
),
state
.
restored
);
return
false
;
}
if
(
!
pVnode
->
restored
)
{
v
Debug
(
"vgId:%d, vnode not restored"
,
pVnode
->
config
.
vgId
);
v
Info
(
"vgId:%d, vnode not restored"
,
pVnode
->
config
.
vgId
);
terrno
=
TSDB_CODE_APP_NOT_READY
;
return
false
;
}
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
da469149
...
...
@@ -245,19 +245,19 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
SyncTerm
lastMatchTerm
=
syncLogBufferGetLastMatchTerm
(
pBuf
);
if
(
index
<=
pBuf
->
commitIndex
)
{
s
Debug
(
"vgId:%d, raft entry already committed. index: %"
PRId64
", term: %"
PRId64
". log buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
s
Info
(
"vgId:%d, raft entry already committed. index: %"
PRId64
", term: %"
PRId64
". log buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
ret
=
0
;
goto
_out
;
}
if
(
index
-
pBuf
->
startIndex
>=
pBuf
->
size
)
{
s
Debug
(
"vgId:%d, raft entry out of buffer capacity. index: %"
PRId64
", term: %"
PRId64
". log buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
s
Info
(
"vgId:%d, raft entry out of buffer capacity. index: %"
PRId64
", term: %"
PRId64
". log buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
goto
_out
;
}
...
...
@@ -382,8 +382,8 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
// increase match index
pBuf
->
matchIndex
=
index
;
s
Debug
(
"vgId:%d, log buffer proceed. start index: %"
PRId64
", match index: %"
PRId64
", end index: %"
PRId64
,
pNode
->
vgId
,
pBuf
->
startIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
s
Info
(
"vgId:%d, log buffer proceed. start index: %"
PRId64
", match index: %"
PRId64
", end index: %"
PRId64
,
pNode
->
vgId
,
pBuf
->
startIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
// replicate on demand
(
void
)
syncNodeReplicate
(
pNode
);
...
...
@@ -586,9 +586,9 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
goto
_IGNORE
;
}
s
Debug
(
"vgId:%d, recv append entries msg. index:%"
PRId64
", term:%"
PRId64
", preLogIndex:%"
PRId64
", prevLogTerm:%"
PRId64
" commitIndex:%"
PRId64
""
,
pMsg
->
vgId
,
pMsg
->
prevLogIndex
+
1
,
pMsg
->
term
,
pMsg
->
prevLogIndex
,
pMsg
->
prevLogTerm
,
pMsg
->
commitIndex
);
s
Info
(
"vgId:%d, recv append entries msg. index:%"
PRId64
", term:%"
PRId64
", preLogIndex:%"
PRId64
", prevLogTerm:%"
PRId64
" commitIndex:%"
PRId64
""
,
pMsg
->
vgId
,
pMsg
->
prevLogIndex
+
1
,
pMsg
->
term
,
pMsg
->
prevLogIndex
,
pMsg
->
prevLogTerm
,
pMsg
->
commitIndex
);
// accept
if
(
syncLogBufferAccept
(
ths
->
pLogBuf
,
ths
,
pEntry
,
pMsg
->
prevLogTerm
)
<
0
)
{
...
...
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
da469149
...
...
@@ -50,8 +50,8 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) {
if
(
indexLikely
>
ths
->
commitIndex
&&
syncNodeAgreedUpon
(
ths
,
indexLikely
))
{
SyncIndex
commitIndex
=
indexLikely
;
syncNodeUpdateCommitIndex
(
ths
,
commitIndex
);
s
Debug
(
"vgId:%d, agreed upon. role:%d, term:%"
PRId64
", index: %"
PRId64
""
,
ths
->
vgId
,
ths
->
state
,
ths
->
pRaftStore
->
currentTerm
,
commitIndex
);
s
Info
(
"vgId:%d, agreed upon. role:%d, term:%"
PRId64
", index: %"
PRId64
""
,
ths
->
vgId
,
ths
->
state
,
ths
->
pRaftStore
->
currentTerm
,
commitIndex
);
}
return
ths
->
commitIndex
;
}
...
...
@@ -113,8 +113,8 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn
(
void
)
syncNodeSendAppendEntries
(
pNode
,
pDestId
,
&
msgOut
);
ret
=
0
;
s
Debug
(
"vgId:%d, replicate one msg index: %"
PRId64
" term: %"
PRId64
" prevterm: %"
PRId64
" to dest: 0x%016"
PRIx64
,
pNode
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
prevLogTerm
,
pDestId
->
addr
);
s
Info
(
"vgId:%d, replicate one msg index: %"
PRId64
" term: %"
PRId64
" prevterm: %"
PRId64
" to dest: 0x%016"
PRIx64
,
pNode
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
prevLogTerm
,
pDestId
->
addr
);
if
(
!
inBuf
)
{
syncEntryDestroy
(
pEntry
);
...
...
@@ -157,8 +157,8 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
ASSERT
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
);
s
Debug
(
"vgId:%d received append entries reply. srcId:0x%016"
PRIx64
", term:%"
PRId64
", matchIndex:%"
PRId64
""
,
pMsg
->
vgId
,
pMsg
->
srcId
.
addr
,
pMsg
->
term
,
pMsg
->
matchIndex
);
s
Info
(
"vgId:%d received append entries reply. srcId:0x%016"
PRIx64
", term:%"
PRId64
", matchIndex:%"
PRId64
""
,
pMsg
->
vgId
,
pMsg
->
srcId
.
addr
,
pMsg
->
term
,
pMsg
->
matchIndex
);
if
(
pMsg
->
success
)
{
SyncIndex
oldMatchIndex
=
syncIndexMgrGetIndex
(
ths
->
pMatchIndex
,
&
(
pMsg
->
srcId
));
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
da469149
...
...
@@ -93,13 +93,13 @@ int32_t syncStart(int64_t rid) {
goto
_err
;
}
if
(
syncNodeStart
(
pSyncNode
)
<
0
)
{
sError
(
"vgId:%d, failed to start sync node since %s"
,
pSyncNode
->
vgId
,
terrstr
());
goto
_err
;
}
if
(
syncNodeStart
(
pSyncNode
)
<
0
)
{
sError
(
"vgId:%d, failed to start sync node since %s"
,
pSyncNode
->
vgId
,
terrstr
());
goto
_err
;
}
syncNodeRelease
(
pSyncNode
);
return
0
;
syncNodeRelease
(
pSyncNode
);
return
0
;
_err:
syncNodeRelease
(
pSyncNode
);
...
...
@@ -524,13 +524,8 @@ int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapsho
pSnapshot->lastApplyTerm = pEntry->term;
pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);
<<<<<<< HEAD
syncEntryDestroy(pEntry);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
=======
syncEntryDestroy(pEntry);
syncNodeRelease(pSyncNode);
>>>>>>> 3.0
return 0;
}
...
...
@@ -771,7 +766,7 @@ _out:
int32_t
syncLogReplMgrProcessReplyInRecoveryMode
(
SSyncLogReplMgr
*
pMgr
,
SSyncNode
*
pNode
,
SyncAppendEntriesReply
*
pMsg
)
{
SSyncLogBuffer
*
pBuf
=
pNode
->
pLogBuf
;
SRaftId
destId
=
pMsg
->
srcId
;
SRaftId
destId
=
pMsg
->
srcId
;
ASSERT
(
pMgr
->
restored
==
false
);
char
host
[
64
];
uint16_t
port
;
...
...
@@ -2508,10 +2503,10 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
// proceed match index, with replicating on needed
SyncIndex
matchIndex
=
syncLogBufferProceed
(
ths
->
pLogBuf
,
ths
);
s
Debug
(
"vgId:%d, append raft entry. index: %"
PRId64
", term: %"
PRId64
" pBuf: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
ths
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
ths
->
pLogBuf
->
startIndex
,
ths
->
pLogBuf
->
commitIndex
,
ths
->
pLogBuf
->
matchIndex
,
ths
->
pLogBuf
->
endIndex
);
s
Info
(
"vgId:%d, append raft entry. index: %"
PRId64
", term: %"
PRId64
" pBuf: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
ths
->
vgId
,
pEntry
->
index
,
pEntry
->
term
,
ths
->
pLogBuf
->
startIndex
,
ths
->
pLogBuf
->
commitIndex
,
ths
->
pLogBuf
->
matchIndex
,
ths
->
pLogBuf
->
endIndex
);
// multi replica
if
(
ths
->
replicaNum
>
1
)
{
...
...
@@ -2646,7 +2641,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t
syncNodeOnHeartbeatReply
(
SSyncNode
*
ths
,
const
SRpcMsg
*
pRpcMsg
)
{
SyncHeartbeatReply
*
pMsg
=
pRpcMsg
->
pCont
;
SSyncLogReplMgr
*
pMgr
=
syncNodeGetLogReplMgr
(
ths
,
&
pMsg
->
srcId
);
SSyncLogReplMgr
*
pMgr
=
syncNodeGetLogReplMgr
(
ths
,
&
pMsg
->
srcId
);
if
(
pMgr
==
NULL
)
{
sError
(
"vgId:%d, failed to get log repl mgr for the peer at addr 0x016%"
PRIx64
""
,
ths
->
vgId
,
pMsg
->
srcId
.
addr
);
return
-
1
;
...
...
@@ -2797,17 +2792,6 @@ int32_t syncLogToAppendEntries(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTer
return
0
;
}
#if 0
void syncLogReplicateAppendEntries(SSyncNode* pNode, SyncAppendEntries* pMsg) {
for (int i = 0; i < pNode->replicaNum; i++) {
SRaftId* pDestId = &pNode->peersId[i];
if (!syncUtilSameId(pDestId, &pNode->myRaftId)) {
(void)syncNodeSendAppendEntries(pNode, pDestId, pMsg);
}
}
}
#endif
// TLA+ Spec
// ClientRequest(i, v) ==
// /\ state[i] = Leader
...
...
@@ -2824,9 +2808,9 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
int32_t
code
=
0
;
SyncIndex
index
=
syncLogBufferGetEndIndex
(
ths
->
pLogBuf
);
SyncTerm
term
=
ths
->
pRaftStore
->
currentTerm
;
SSyncRaftEntry
*
pEntry
=
NULL
;
SyncIndex
index
=
syncLogBufferGetEndIndex
(
ths
->
pLogBuf
);
SyncTerm
term
=
ths
->
pRaftStore
->
currentTerm
;
SSyncRaftEntry
*
pEntry
=
NULL
;
if
(
pMsg
->
msgType
==
TDMT_SYNC_CLIENT_REQUEST
)
{
pEntry
=
syncEntryBuildFromClientRequest
(
pMsg
->
pCont
,
term
,
index
);
}
else
{
...
...
@@ -2841,7 +2825,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
return
syncNodeAppend
(
ths
,
pEntry
);
}
return
0
;
return
-
1
;
}
int32_t
syncNodeOnClientRequestOld
(
SSyncNode
*
ths
,
SRpcMsg
*
pMsg
,
SyncIndex
*
pRetIndex
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录