Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3fdd0c30
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3fdd0c30
编写于
6月 16, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(sync): add trace log
上级
46c09f2c
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
167 addition
and
121 deletion
+167
-121
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+6
-4
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+4
-4
source/libs/sync/src/syncCommit.c
source/libs/sync/src/syncCommit.c
+3
-3
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+54
-41
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+10
-8
source/libs/sync/src/syncRespMgr.c
source/libs/sync/src/syncRespMgr.c
+17
-12
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+73
-49
未找到文件。
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
3fdd0c30
...
...
@@ -713,8 +713,8 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
// delete confict entries
code
=
ths
->
pLogStore
->
syncLogTruncate
(
ths
->
pLogStore
,
delBegin
);
ASSERT
(
code
==
0
);
sDebug
(
"vgId:%d sync event %s currentTerm:%lu log truncate, from %ld to %ld"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
pRaftStore
->
currentTerm
,
delBegin
,
delEnd
);
sDebug
(
"vgId:%d sync event %s c
ommitIndex:%ld c
urrentTerm:%lu log truncate, from %ld to %ld"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
delBegin
,
delEnd
);
logStoreSimpleLog2
(
"after syncNodeMakeLogSame"
,
ths
->
pLogStore
);
return
code
;
...
...
@@ -995,8 +995,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
SyncIndex
commitEnd
=
snapshot
.
lastApplyIndex
;
ths
->
commitIndex
=
snapshot
.
lastApplyIndex
;
sDebug
(
"vgId:%d sync event %s currentTerm:%lu commit by snapshot from index:%ld to index:%ld"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
pRaftStore
->
currentTerm
,
commitBegin
,
commitEnd
);
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
commitBegin
,
commitEnd
);
}
SyncIndex
beginIndex
=
ths
->
commitIndex
+
1
;
...
...
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
3fdd0c30
...
...
@@ -190,21 +190,21 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
if
(
gRaftDetailLog
)
{
char
*
s
=
snapshotSender2Str
(
pSender
);
sDebug
(
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d start sender first time, "
"vgId:%d sync event %s c
ommitIndex:%ld c
urrentTerm:%lu snapshot send to %s:%d start sender first time, "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu "
"sender:%s"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
pRaftStore
->
currentTerm
,
host
,
port
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
pSender
->
privateTerm
,
s
);
taosMemoryFree
(
s
);
}
else
{
sDebug
(
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d start sender first time, "
"vgId:%d sync event %s c
ommitIndex:%ld c
urrentTerm:%lu snapshot send to %s:%d start sender first time, "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
pRaftStore
->
currentTerm
,
host
,
port
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
pSender
->
privateTerm
);
}
...
...
source/libs/sync/src/syncCommit.c
浏览文件 @
3fdd0c30
...
...
@@ -56,9 +56,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SyncIndex
commitEnd
=
snapshot
.
lastApplyIndex
;
pSyncNode
->
commitIndex
=
snapshot
.
lastApplyIndex
;
sDebug
(
"vgId:%d sync event %s c
urrentTerm:%lu commit by snapshot from index:%ld to index:%ld"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
snapshot
.
lastApplyIndex
);
sDebug
(
"vgId:%d sync event %s c
ommitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
)
,
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
snapshot
.
lastApplyIndex
);
}
// update commit index
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
3fdd0c30
...
...
@@ -573,9 +573,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
return
-
1
;
}
assert
(
rid
==
pSyncNode
->
rid
);
sDebug
(
"vgId:%d sync event %s currentTerm:%lu propose msgType:%s,%d"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pMsg
->
msgType
)
,
pMsg
->
msgType
);
sDebug
(
"vgId:%d sync event %s c
ommitIndex:%ld c
urrentTerm:%lu propose msgType:%s,%d"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
msgType
);
ret
=
syncNodePropose
(
pSyncNode
,
pMsg
,
isWeak
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
...
...
@@ -584,9 +584,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t
syncNodePropose
(
SSyncNode
*
pSyncNode
,
const
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
int32_t
ret
=
0
;
sDebug
(
"vgId:%d sync event %s currentTerm:%lu propose msgType:%s,%d"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pMsg
->
msgType
)
,
pMsg
->
msgType
);
sDebug
(
"vgId:%d sync event %s c
ommitIndex:%ld c
urrentTerm:%lu propose msgType:%s,%d"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
msgType
);
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
SRespStub
stub
;
...
...
@@ -832,8 +832,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// snapshot meta
// pSyncNode->sMeta.lastConfigIndex = -1;
sDebug
(
"vgId:%d sync event %s c
urrentTerm:%lu sync open"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
)
,
pSyncNode
->
pRaftStore
->
currentTerm
);
sDebug
(
"vgId:%d sync event %s c
ommitIndex:%ld currentTerm:%lu sync open"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
);
return
pSyncNode
;
}
...
...
@@ -880,8 +880,8 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
}
void
syncNodeClose
(
SSyncNode
*
pSyncNode
)
{
sDebug
(
"vgId:%d sync event %s c
urrentTerm:%lu sync close"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
)
,
pSyncNode
->
pRaftStore
->
currentTerm
);
sDebug
(
"vgId:%d sync event %s c
ommitIndex:%ld currentTerm:%lu sync close"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
);
int32_t
ret
;
assert
(
pSyncNode
!=
NULL
);
...
...
@@ -1320,9 +1320,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
SSyncSnapshotSender
*
oldSenders
[
TSDB_MAX_REPLICA
];
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
oldSenders
[
i
]
=
(
pSyncNode
->
senders
)[
i
];
sDebug
(
"vgId:%d sync event %s c
urrentTerm:%lu save senders %d, %p, privateTerm:%lu"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
i
,
oldSenders
[
i
]
,
oldSenders
[
i
]
->
privateTerm
);
sDebug
(
"vgId:%d sync event %s c
ommitIndex:%ld currentTerm:%lu save senders %d, %p, privateTerm:%lu"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
i
,
oldSenders
[
i
],
oldSenders
[
i
]
->
privateTerm
);
if
(
gRaftDetailLog
)
{
;
}
...
...
@@ -1374,9 +1374,12 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
((
pSyncNode
->
replicasId
)[
i
].
addr
,
host
,
sizeof
(
host
),
&
port
);
sDebug
(
"vgId:%d sync event %s currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, privateTerm:%lu"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
(
pSyncNode
->
replicasId
)[
i
].
addr
,
i
,
host
,
port
,
oldSenders
[
j
],
oldSenders
[
j
]
->
privateTerm
);
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, "
"privateTerm:%lu"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
(
pSyncNode
->
replicasId
)[
i
].
addr
,
i
,
host
,
port
,
oldSenders
[
j
],
oldSenders
[
j
]
->
privateTerm
);
(
pSyncNode
->
senders
)[
i
]
=
oldSenders
[
j
];
oldSenders
[
j
]
=
NULL
;
reset
=
true
;
...
...
@@ -1384,9 +1387,11 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
// reset replicaIndex
int32_t
oldreplicaIndex
=
(
pSyncNode
->
senders
)[
i
]
->
replicaIndex
;
(
pSyncNode
->
senders
)[
i
]
->
replicaIndex
=
i
;
sDebug
(
"vgId:%d sync event %s currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
oldreplicaIndex
,
i
,
host
,
port
,
(
pSyncNode
->
senders
)[
i
],
reset
);
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, "
"reset:%d"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
oldreplicaIndex
,
i
,
host
,
port
,
(
pSyncNode
->
senders
)[
i
],
reset
);
}
}
}
...
...
@@ -1395,9 +1400,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
((
pSyncNode
->
senders
)[
i
]
==
NULL
)
{
(
pSyncNode
->
senders
)[
i
]
=
snapshotSenderCreate
(
pSyncNode
,
i
);
sDebug
(
"vgId:%d sync event %s currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
(
pSyncNode
->
senders
)[
i
],
i
,
(
pSyncNode
->
senders
)[
i
]
->
privateTerm
);
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
(
pSyncNode
->
senders
)[
i
],
i
,
(
pSyncNode
->
senders
)[
i
]
->
privateTerm
);
}
}
...
...
@@ -1405,8 +1411,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
(
oldSenders
[
i
]
!=
NULL
)
{
snapshotSenderDestroy
(
oldSenders
[
i
]);
sDebug
(
"vgId:%d sync event %s currentTerm:%lu delete old sender %p replicaIndex:%d"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
oldSenders
[
i
],
i
);
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu delete old sender %p replicaIndex:%d"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
oldSenders
[
i
],
i
);
oldSenders
[
i
]
=
NULL
;
}
}
...
...
@@ -1477,10 +1484,11 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
void
syncNodeBecomeFollower
(
SSyncNode
*
pSyncNode
,
const
char
*
debugStr
)
{
sDebug
(
"vgId:%d sync event %s currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, "
"vgId:%d sync event %s c
ommitIndex:%ld c
urrentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, "
"restoreFinish:%d, %s"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
restoreFinish
,
debugStr
);
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
restoreFinish
,
debugStr
);
// maybe clear leader cache
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
...
...
@@ -1517,9 +1525,12 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
// reset restoreFinish
pSyncNode
->
restoreFinish
=
false
;
sDebug
(
"vgId:%d sync event %s currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, restoreFinish:%d, %s"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
restoreFinish
,
debugStr
);
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, "
"restoreFinish:%d, %s"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
restoreFinish
,
debugStr
);
// state change
pSyncNode
->
state
=
TAOS_SYNC_STATE_LEADER
;
...
...
@@ -2093,14 +2104,15 @@ const char* syncStr(ESyncState state) {
static
int32_t
syncDoLeaderTransfer
(
SSyncNode
*
ths
,
SRpcMsg
*
pRpcMsg
,
SSyncRaftEntry
*
pEntry
)
{
SyncLeaderTransfer
*
pSyncLeaderTransfer
=
syncLeaderTransferFromRpcMsg2
(
pRpcMsg
);
sDebug
(
"vgId:%d sync event %s c
urrentTerm:%lu begin leader transfer"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
)
,
ths
->
pRaftStore
->
currentTerm
);
sDebug
(
"vgId:%d sync event %s c
ommitIndex:%ld currentTerm:%lu begin leader transfer"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
);
if
(
strcmp
(
pSyncLeaderTransfer
->
newNodeInfo
.
nodeFqdn
,
ths
->
myNodeInfo
.
nodeFqdn
)
==
0
&&
pSyncLeaderTransfer
->
newNodeInfo
.
nodePort
==
ths
->
myNodeInfo
.
nodePort
)
{
sDebug
(
"vgId:%d sync event %s currentTerm:%lu maybe leader transfer to %s:%d %lu"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
pRaftStore
->
currentTerm
,
pSyncLeaderTransfer
->
newNodeInfo
.
nodeFqdn
,
pSyncLeaderTransfer
->
newNodeInfo
.
nodePort
,
pSyncLeaderTransfer
->
newLeaderId
.
addr
);
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu maybe leader transfer to %s:%d %lu"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
pSyncLeaderTransfer
->
newNodeInfo
.
nodeFqdn
,
pSyncLeaderTransfer
->
newNodeInfo
.
nodePort
,
pSyncLeaderTransfer
->
newLeaderId
.
addr
);
// reset elect timer now!
int32_t
electMS
=
1
;
...
...
@@ -2238,9 +2250,10 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
int32_t
syncNodeCommit
(
SSyncNode
*
ths
,
SyncIndex
beginIndex
,
SyncIndex
endIndex
,
uint64_t
flag
)
{
int32_t
code
=
0
;
ESyncState
state
=
flag
;
sDebug
(
"vgId:%d sync event %s currentTerm:%lu commit by wal from index:%"
PRId64
" to index:%"
PRId64
", %s"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
pRaftStore
->
currentTerm
,
beginIndex
,
endIndex
,
syncUtilState2String
(
state
));
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by wal from index:%"
PRId64
" to index:%"
PRId64
", %s"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
beginIndex
,
endIndex
,
syncUtilState2String
(
state
));
// execute fsm
if
(
ths
->
pFsm
!=
NULL
)
{
...
...
@@ -2288,9 +2301,9 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ths
->
pFsm
->
FpRestoreFinishCb
(
ths
->
pFsm
);
}
ths
->
restoreFinish
=
true
;
sDebug
(
"vgId:%d sync event %s currentTerm:%lu restore finish, %s, index:%ld"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
pRaftStore
->
currentTerm
,
syncUtilState2String
(
ths
->
state
)
,
pEntry
->
index
);
sDebug
(
"vgId:%d sync event %s c
ommitIndex:%ld c
urrentTerm:%lu restore finish, %s, index:%ld"
,
ths
->
vgId
,
syncUtilState2String
(
ths
->
state
),
ths
->
commitIndex
,
ths
->
pRaftStore
->
currentTerm
,
syncUtilState2String
(
ths
->
state
),
pEntry
->
index
);
}
}
...
...
source/libs/sync/src/syncRaftLog.c
浏览文件 @
3fdd0c30
...
...
@@ -163,10 +163,12 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
walFsync
(
pWal
,
true
);
sDebug
(
"vgId:%d sync event %s currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d"
,
pData
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pData
->
pSyncNode
->
state
),
pData
->
pSyncNode
->
pRaftStore
->
currentTerm
,
pEntry
->
index
,
pData
->
pSyncNode
->
pRaftCfg
->
isStandBy
,
TMSG_INFO
(
pEntry
->
msgType
),
pEntry
->
msgType
,
TMSG_INFO
(
pEntry
->
originalRpcType
),
pEntry
->
originalRpcType
);
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, "
"originalRpcType:%s,%d"
,
pData
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pData
->
pSyncNode
->
state
),
pData
->
pSyncNode
->
commitIndex
,
pData
->
pSyncNode
->
pRaftStore
->
currentTerm
,
pEntry
->
index
,
pData
->
pSyncNode
->
pRaftCfg
->
isStandBy
,
TMSG_INFO
(
pEntry
->
msgType
),
pEntry
->
msgType
,
TMSG_INFO
(
pEntry
->
originalRpcType
),
pEntry
->
originalRpcType
);
return
code
;
}
...
...
@@ -323,11 +325,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
walFsync
(
pWal
,
true
);
sDebug
(
"vgId:%d sync event %s currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, "
"vgId:%d sync event %s c
ommitIndex:%ld c
urrentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, "
"originalRpcType:%s,%d"
,
pData
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pData
->
pSyncNode
->
state
),
pData
->
pSyncNode
->
pRaftStore
->
currentTerm
,
p
Entry
->
index
,
pData
->
pSyncNode
->
pRaftCfg
->
isStandBy
,
TMSG_INFO
(
pEntry
->
msgType
),
pEntry
->
msgType
,
TMSG_INFO
(
pEntry
->
originalRpcType
),
pEntry
->
originalRpcType
);
pData
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pData
->
pSyncNode
->
state
),
pData
->
pSyncNode
->
commitIndex
,
p
Data
->
pSyncNode
->
pRaftStore
->
currentTerm
,
pEntry
->
index
,
pData
->
pSyncNode
->
pRaftCfg
->
isStandBy
,
TMSG_INFO
(
pEntry
->
msgType
),
pEntry
->
msgType
,
TMSG_INFO
(
pEntry
->
originalRpcType
),
pEntry
->
originalRpcType
);
return
code
;
}
...
...
source/libs/sync/src/syncRespMgr.c
浏览文件 @
3fdd0c30
...
...
@@ -46,10 +46,11 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
taosHashPut
(
pObj
->
pRespHash
,
&
keyCode
,
sizeof
(
keyCode
),
pStub
,
sizeof
(
SRespStub
));
SSyncNode
*
pSyncNode
=
pObj
->
data
;
sDebug
(
"vgId:%d sync event %s currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
keyCode
,
pStub
->
rpcMsg
.
info
.
handle
,
pStub
->
rpcMsg
.
info
.
ahandle
);
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
keyCode
,
pStub
->
rpcMsg
.
info
.
handle
,
pStub
->
rpcMsg
.
info
.
ahandle
);
taosThreadMutexUnlock
(
&
(
pObj
->
mutex
));
return
keyCode
;
...
...
@@ -72,10 +73,12 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
memcpy
(
pStub
,
pTmp
,
sizeof
(
SRespStub
));
SSyncNode
*
pSyncNode
=
pObj
->
data
;
sDebug
(
"vgId:%d sync event %s currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p ahandle:%p"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
index
,
pStub
->
rpcMsg
.
info
.
handle
,
pStub
->
rpcMsg
.
info
.
ahandle
);
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p "
"ahandle:%p"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
index
,
pStub
->
rpcMsg
.
info
.
handle
,
pStub
->
rpcMsg
.
info
.
ahandle
);
taosThreadMutexUnlock
(
&
(
pObj
->
mutex
));
return
1
;
// get one object
...
...
@@ -92,10 +95,12 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
memcpy
(
pStub
,
pTmp
,
sizeof
(
SRespStub
));
SSyncNode
*
pSyncNode
=
pObj
->
data
;
sDebug
(
"vgId:%d sync event %s currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p ahandle:%p"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
index
,
pStub
->
rpcMsg
.
info
.
handle
,
pStub
->
rpcMsg
.
info
.
ahandle
);
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p "
"ahandle:%p"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
index
,
pStub
->
rpcMsg
.
info
.
handle
,
pStub
->
rpcMsg
.
info
.
ahandle
);
taosHashRemove
(
pObj
->
pRespHash
,
&
index
,
sizeof
(
index
));
taosThreadMutexUnlock
(
&
(
pObj
->
mutex
));
...
...
source/libs/sync/src/syncSnapshot.c
浏览文件 @
3fdd0c30
...
...
@@ -141,21 +141,23 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sDebug
(
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu send "
"msg:%s"
,
pSender
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSender
->
pSyncNode
->
state
),
pSender
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSender
->
pSyncNode
->
state
),
pSender
->
pSyncNode
->
commitIndex
,
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
pSender
->
privateTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
sDebug
(
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu"
,
pSender
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSender
->
pSyncNode
->
state
),
pSender
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSender
->
pSyncNode
->
state
),
pSender
->
pSyncNode
->
commitIndex
,
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
pSender
->
privateTerm
);
...
...
@@ -285,31 +287,34 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sDebug
(
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu send "
"msg:%s"
,
pSender
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSender
->
pSyncNode
->
state
),
pSender
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSender
->
pSyncNode
->
state
),
pSender
->
pSyncNode
->
commitIndex
,
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
pSender
->
privateTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
sDebug
(
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu"
,
pSender
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSender
->
pSyncNode
->
state
),
pSender
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSender
->
pSyncNode
->
state
),
pSender
->
pSyncNode
->
commitIndex
,
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
pSender
->
privateTerm
);
}
}
else
{
sDebug
(
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu"
,
pSender
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSender
->
pSyncNode
->
state
),
pSender
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSender
->
pSyncNode
->
state
),
pSender
->
pSyncNode
->
commitIndex
,
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
pSender
->
privateTerm
);
...
...
@@ -344,16 +349,19 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sDebug
(
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu send "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
"privateTerm:%lu send "
"msg:%s"
,
pSender
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSender
->
pSyncNode
->
state
),
pSender
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSender
->
pSyncNode
->
state
),
pSender
->
pSyncNode
->
commitIndex
,
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
privateTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
sDebug
(
"vgId:%d sync event %s currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu"
,
pSender
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSender
->
pSyncNode
->
state
),
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
privateTerm
);
sDebug
(
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
"privateTerm:%lu"
,
pSender
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSender
->
pSyncNode
->
state
),
pSender
->
pSyncNode
->
commitIndex
,
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
privateTerm
);
}
syncSnapshotSendDestroy
(
pMsg
);
...
...
@@ -586,19 +594,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sDebug
(
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
,
pReceiver
->
privateTerm
,
msgStr
);
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
,
pReceiver
->
privateTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
sDebug
(
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld privateTerm:%lu"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
,
pReceiver
->
privateTerm
);
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
,
pReceiver
->
privateTerm
);
}
}
else
if
(
pMsg
->
seq
==
SYNC_SNAPSHOT_SEQ_END
)
{
...
...
@@ -636,18 +648,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
bool
isDrop
;
if
(
IamInNew
)
{
sDebug
(
"vgId:%d sync event %s currentTerm:%lu update config by snapshot, lastIndex:%ld, lastTerm:%lu, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu update config by snapshot, lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld "
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
);
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
p
SyncNode
->
pRaftStore
->
currentTerm
,
p
Msg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
);
syncNodeUpdateConfig
(
pSyncNode
,
&
newSyncCfg
,
pMsg
->
lastConfigIndex
,
&
isDrop
);
}
else
{
sDebug
(
"vgId:%d sync event %s currentTerm:%lu do not update config by snapshot, I am not in newCfg, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu do not update config by snapshot, I am not in "
"newCfg, "
"lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld "
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
);
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
p
SyncNode
->
pRaftStore
->
currentTerm
,
p
Msg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
);
}
// change isStandBy to normal
...
...
@@ -680,21 +694,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if
(
gRaftDetailLog
)
{
char
*
logSimpleStr
=
logStoreSimple2Str
(
pSyncNode
->
pLogStore
);
sDebug
(
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
"index:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
p
Msg
->
lastIndex
+
1
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
snapshot
.
lastConfig
Index
,
pReceiver
->
privateTerm
,
logSimpleStr
);
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
p
SyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pMsg
->
lastIndex
+
1
,
snapshot
.
lastApply
Index
,
snapshot
.
lastApplyTerm
,
snapshot
.
lastConfigIndex
,
pReceiver
->
privateTerm
,
logSimpleStr
);
taosMemoryFree
(
logSimpleStr
);
}
else
{
sDebug
(
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
"index:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
p
Msg
->
lastIndex
+
1
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
snapshot
.
lastConfig
Index
,
pReceiver
->
privateTerm
);
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
p
SyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pMsg
->
lastIndex
+
1
,
snapshot
.
lastApply
Index
,
snapshot
.
lastApplyTerm
,
snapshot
.
lastConfigIndex
,
pReceiver
->
privateTerm
);
}
pReceiver
->
pWriter
=
NULL
;
...
...
@@ -705,17 +721,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sDebug
(
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
"lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s"
,
pReceiver
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pReceiver
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pReceiver
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
,
pReceiver
->
privateTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
sDebug
(
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
"lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu"
,
pReceiver
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pReceiver
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pReceiver
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
,
pReceiver
->
privateTerm
);
}
...
...
@@ -732,20 +750,22 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sDebug
(
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv "
"msg:%s"
,
pReceiver
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pReceiver
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pReceiver
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
,
pReceiver
->
privateTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
sDebug
(
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu"
,
pReceiver
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pReceiver
->
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pReceiver
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
,
pReceiver
->
privateTerm
);
}
...
...
@@ -767,19 +787,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sDebug
(
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
,
pReceiver
->
privateTerm
,
msgStr
);
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
,
pReceiver
->
privateTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
sDebug
(
"vgId:%d sync event %s currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, "
"vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
,
pReceiver
->
privateTerm
);
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
pSyncNode
->
commitIndex
,
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
,
pReceiver
->
privateTerm
);
}
}
else
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录