Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
84f8c767
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
84f8c767
编写于
2月 03, 2023
作者:
X
Xiaoyu Wang
提交者:
GitHub
2月 03, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #19768 from taosdata/FIX/TD-22121-main
enh: improve logging msgs for sync snapshot repl
上级
a76dbe0b
561ee87e
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
78 addition
and
85 deletion
+78
-85
source/libs/sync/inc/syncSnapshot.h
source/libs/sync/inc/syncSnapshot.h
+1
-1
source/libs/sync/src/syncPipeline.c
source/libs/sync/src/syncPipeline.c
+1
-1
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+13
-13
source/libs/sync/src/syncUtil.c
source/libs/sync/src/syncUtil.c
+63
-70
未找到文件。
source/libs/sync/inc/syncSnapshot.h
浏览文件 @
84f8c767
...
...
@@ -24,7 +24,7 @@ extern "C" {
#define SYNC_SNAPSHOT_SEQ_INVALID -2
#define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -3
#define SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT -1
#define SYNC_SNAPSHOT_SEQ_PRE
P
_SNAPSHOT -1
#define SYNC_SNAPSHOT_SEQ_BEGIN 0
#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF
...
...
source/libs/sync/src/syncPipeline.c
浏览文件 @
84f8c767
...
...
@@ -830,7 +830,7 @@ int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy
pMgr
->
endIndex
=
index
+
1
;
SSyncLogBuffer
*
pBuf
=
pNode
->
pLogBuf
;
s
Info
(
"vgId:%d, probe peer:%"
PRIx64
" with msg of index:%"
PRId64
" term: %"
PRId64
". mgr (rs:%d): [%"
PRId64
s
Trace
(
"vgId:%d, probe peer:%"
PRIx64
" with msg of index:%"
PRId64
" term: %"
PRId64
". mgr (rs:%d): [%"
PRId64
" %"
PRId64
", %"
PRId64
"), buffer: [%"
PRId64
" %"
PRId64
" %"
PRId64
", %"
PRId64
")"
,
pNode
->
vgId
,
pDestId
->
addr
,
index
,
term
,
pMgr
->
restored
,
pMgr
->
startIndex
,
pMgr
->
matchIndex
,
pMgr
->
endIndex
,
pBuf
->
startIndex
,
pBuf
->
commitIndex
,
pBuf
->
matchIndex
,
pBuf
->
endIndex
);
...
...
source/libs/sync/src/syncSnapshot.c
浏览文件 @
84f8c767
...
...
@@ -112,7 +112,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) {
pMsg
->
lastConfigIndex
=
pSender
->
snapshot
.
lastConfigIndex
;
pMsg
->
lastConfig
=
pSender
->
lastConfig
;
pMsg
->
startTime
=
pSender
->
startTime
;
pMsg
->
seq
=
SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT
;
pMsg
->
seq
=
SYNC_SNAPSHOT_SEQ_PRE
P
_SNAPSHOT
;
// event log
syncLogSendSyncSnapshotSend
(
pSender
->
pSyncNode
,
pMsg
,
"snapshot sender start"
);
...
...
@@ -379,7 +379,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p
}
pReceiver
->
start
=
true
;
pReceiver
->
ack
=
SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT
;
pReceiver
->
ack
=
SYNC_SNAPSHOT_SEQ_PRE
P
_SNAPSHOT
;
pReceiver
->
term
=
pReceiver
->
pSyncNode
->
raftStore
.
currentTerm
;
pReceiver
->
fromId
=
pPreMsg
->
srcId
;
pReceiver
->
startTime
=
pPreMsg
->
startTime
;
...
...
@@ -527,7 +527,7 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) {
return
snapStart
;
}
static
int32_t
syncNodeOnSnapshotPre
(
SSyncNode
*
pSyncNode
,
SyncSnapshotSend
*
pMsg
)
{
static
int32_t
syncNodeOnSnapshotPre
p
(
SSyncNode
*
pSyncNode
,
SyncSnapshotSend
*
pMsg
)
{
SSyncSnapshotReceiver
*
pReceiver
=
pSyncNode
->
pNewNodeReceiver
;
int64_t
timeNow
=
taosGetTimestampMs
();
int32_t
code
=
0
;
...
...
@@ -565,7 +565,7 @@ _START_RECEIVER:
}
else
{
// waiting for clock match
while
(
timeNow
<
pMsg
->
startTime
)
{
sRInfo
(
pReceiver
,
"snapshot receiver pre waitting for true time, now:%"
PRId64
", stime:%"
PRId64
,
timeNow
,
sRInfo
(
pReceiver
,
"snapshot receiver pre waitting for true time, now:%"
PRId64
", st
artT
ime:%"
PRId64
,
timeNow
,
pMsg
->
startTime
);
taosMsleep
(
10
);
timeNow
=
taosGetTimestampMs
();
...
...
@@ -765,7 +765,7 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs
// receiver on message
//
// condition 1, recv SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT
// condition 1, recv SYNC_SNAPSHOT_SEQ_PRE
P
_SNAPSHOT
// if receiver already start
// if sender.start-time > receiver.start-time, restart receiver(reply snapshot start)
// if sender.start-time = receiver.start-time, maybe duplicate msg
...
...
@@ -809,9 +809,9 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
int32_t
code
=
0
;
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_FOLLOWER
)
{
if
(
pMsg
->
term
==
pSyncNode
->
raftStore
.
currentTerm
)
{
if
(
pMsg
->
seq
==
SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT
)
{
if
(
pMsg
->
seq
==
SYNC_SNAPSHOT_SEQ_PRE
P
_SNAPSHOT
)
{
syncLogRecvSyncSnapshotSend
(
pSyncNode
,
pMsg
,
"process seq pre-snapshot"
);
code
=
syncNodeOnSnapshotPre
(
pSyncNode
,
pMsg
);
code
=
syncNodeOnSnapshotPre
p
(
pSyncNode
,
pMsg
);
}
else
if
(
pMsg
->
seq
==
SYNC_SNAPSHOT_SEQ_BEGIN
)
{
syncLogRecvSyncSnapshotSend
(
pSyncNode
,
pMsg
,
"process seq begin"
);
code
=
syncNodeOnSnapshotBegin
(
pSyncNode
,
pMsg
);
...
...
@@ -848,7 +848,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
return
code
;
}
static
int32_t
syncNodeOnSnapshotPreRsp
(
SSyncNode
*
pSyncNode
,
SSyncSnapshotSender
*
pSender
,
SyncSnapshotRsp
*
pMsg
)
{
static
int32_t
syncNodeOnSnapshotPre
p
Rsp
(
SSyncNode
*
pSyncNode
,
SSyncSnapshotSender
*
pSender
,
SyncSnapshotRsp
*
pMsg
)
{
SSnapshot
snapshot
=
{
0
};
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
(
pSyncNode
->
pFsm
,
&
snapshot
);
...
...
@@ -945,8 +945,8 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
if
(
pMsg
->
startTime
!=
pSender
->
startTime
)
{
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"snapshot sender and receiver time not match"
);
sSError
(
pSender
,
"sender:%"
PRId64
" receiver:%"
PRId64
" time not match,
code:
0x%x"
,
pMsg
->
startTime
,
pSender
->
startTime
,
pMsg
->
code
);
sSError
(
pSender
,
"sender:%"
PRId64
" receiver:%"
PRId64
" time not match,
error:%s
0x%x"
,
pMsg
->
startTime
,
pSender
->
startTime
,
tstrerror
(
pMsg
->
code
),
pMsg
->
code
);
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
goto
_ERROR
;
}
...
...
@@ -961,15 +961,15 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
if
(
pMsg
->
code
!=
0
)
{
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"receive error code"
);
sSError
(
pSender
,
"snapshot sender receive error
code:0x%x and stop sender"
,
pMsg
->
code
);
sSError
(
pSender
,
"snapshot sender receive error
:%s 0x%x and stop sender"
,
tstrerror
(
pMsg
->
code
)
,
pMsg
->
code
);
terrno
=
pMsg
->
code
;
goto
_ERROR
;
}
// prepare <begin, end>, send begin msg
if
(
pMsg
->
ack
==
SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT
)
{
if
(
pMsg
->
ack
==
SYNC_SNAPSHOT_SEQ_PRE
P
_SNAPSHOT
)
{
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"process seq pre-snapshot"
);
return
syncNodeOnSnapshotPreRsp
(
pSyncNode
,
pSender
,
pMsg
);
return
syncNodeOnSnapshotPre
p
Rsp
(
pSyncNode
,
pSender
,
pMsg
);
}
if
(
pSender
->
pReader
==
NULL
||
pSender
->
finish
)
{
...
...
source/libs/sync/src/syncUtil.c
浏览文件 @
84f8c767
...
...
@@ -141,20 +141,15 @@ static void syncLogReplMgrStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bu
}
static
void
syncPeerState2Str
(
SSyncNode
*
pSyncNode
,
char
*
buf
,
int32_t
bufLen
)
{
int32_t
len
=
1
;
int32_t
len
=
0
;
len
+=
snprintf
(
buf
+
len
,
bufLen
-
len
,
"%s"
,
"{"
);
for
(
int32_t
i
=
0
;
i
<
pSyncNode
->
replicaNum
;
++
i
)
{
SPeerState
*
pState
=
syncNodeGetPeerState
(
pSyncNode
,
&
(
pSyncNode
->
replicasId
[
i
]));
if
(
pState
==
NULL
)
break
;
if
(
i
<
pSyncNode
->
replicaNum
-
1
)
{
len
+=
snprintf
(
buf
+
len
,
bufLen
-
len
,
"%d:%"
PRId64
" %"
PRId64
", "
,
i
,
pState
->
lastSendIndex
,
pState
->
lastSendTime
);
}
else
{
len
+=
snprintf
(
buf
+
len
,
bufLen
-
len
,
"%d:%"
PRId64
" %"
PRId64
"}"
,
i
,
pState
->
lastSendIndex
,
pState
->
lastSendTime
);
}
len
+=
snprintf
(
buf
+
len
,
bufLen
-
len
,
"%d:%"
PRId64
" %"
PRId64
"%s"
,
i
,
pState
->
lastSendIndex
,
pState
->
lastSendTime
,
(
i
<
pSyncNode
->
replicaNum
-
1
)
?
", "
:
""
);
}
len
+=
snprintf
(
buf
+
len
,
bufLen
-
len
,
"%s"
,
"}"
);
}
void
syncPrintNodeLog
(
const
char
*
flags
,
ELogLevel
level
,
int32_t
dflag
,
SSyncNode
*
pNode
,
const
char
*
format
,
...)
{
...
...
@@ -245,7 +240,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
char
cfgStr
[
1024
]
=
""
;
syncCfg2SimpleStr
(
&
pNode
->
raftCfg
.
cfg
,
cfgStr
,
sizeof
(
cfgStr
));
char
peerStr
[
1024
]
=
"
{
"
;
char
peerStr
[
1024
]
=
""
;
syncPeerState2Str
(
pNode
,
peerStr
,
sizeof
(
peerStr
));
char
eventLog
[
512
];
// {0};
...
...
@@ -255,20 +250,21 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
va_end
(
argpointer
);
taosPrintLog
(
flags
,
level
,
dflag
,
"vgId:%d, %s, sync:%s, {%p s-param:%"
PRId64
" e-param:%"
PRId64
" laindex:%"
PRId64
" laterm:%"
PRIu64
" lcindex:%"
PRId64
" seq:%d ack:%d finish:%d replica-index:%d dnode:%d}"
", tm:%"
PRIu64
", cmt:%"
PRId64
", fst:%"
PRId64
", lst:%"
PRId64
", min:%"
PRId64
", snap:%"
PRId64
", snap-tm:%"
PRIu64
", sby:%d, stgy:%d, bch:%d, r-num:%d, lcfg:%"
PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%"
PRId64
", hb:%"
PRId64
", %s, %s"
,
"vgId:%d, %s, sync:%s, snap-sender:{%p start:%"
PRId64
" end:%"
PRId64
" last-index:%"
PRId64
" last-term:%"
PRIu64
" last-cfg:%"
PRId64
", seq:%d ack:%d finish:%d, as:%d dnode:%d}"
", term:%"
PRIu64
", commit-index:%"
PRId64
", firstver:%"
PRId64
", lastver:%"
PRId64
", min-match:%"
PRId64
", snap:{last-index:%"
PRId64
", term:%"
PRIu64
"}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%"
PRId64
", chging:%d, restore:%d, quorum:%d, lc-timer:{elect:%"
PRId64
", hb:%"
PRId64
"}, peer:%s, cfg:%s"
,
pNode
->
vgId
,
eventLog
,
syncStr
(
pNode
->
state
),
pSender
,
pSender
->
snapshotParam
.
start
,
pSender
->
snapshotParam
.
end
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
finish
,
pSender
->
replicaIndex
,
DID
(
&
pNode
->
replicasId
[
pSender
->
replicaIndex
]),
pNode
->
raftStore
.
currentTerm
,
pNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
pNode
->
minMatchIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pNode
->
raftCfg
.
isStandBy
,
pNode
->
raftCfg
.
snapshotStrategy
,
pNode
->
raftCfg
.
batchSize
,
pNode
->
replicaNum
,
pNode
->
raftCfg
.
lastConfigIndex
,
pNode
->
changing
,
pNode
->
restoreFinish
,
syncNodeDynamicQuorum
(
pNode
)
,
pNode
->
electTimerLogicClock
,
pNode
->
heartbeatTimerLogicClockUser
,
peerStr
,
cfgStr
);
pNode
->
raftCfg
.
isStandBy
,
pNode
->
raftCfg
.
batchSize
,
pNode
->
replicaNum
,
pNode
->
raftCfg
.
lastConfigIndex
,
pNode
->
changing
,
pNode
->
restoreFinish
,
syncNodeDynamicQuorum
(
pNode
),
pNode
->
electTimerLogicClock
,
pNode
->
heartbeatTimerLogicClockUser
,
peerStr
,
cfgStr
);
}
void
syncPrintSnapshotReceiverLog
(
const
char
*
flags
,
ELogLevel
level
,
int32_t
dflag
,
SSyncSnapshotReceiver
*
pReceiver
,
...
...
@@ -291,7 +287,7 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
char
cfgStr
[
1024
]
=
""
;
syncCfg2SimpleStr
(
&
pNode
->
raftCfg
.
cfg
,
cfgStr
,
sizeof
(
cfgStr
));
char
peerStr
[
1024
]
=
"
{
"
;
char
peerStr
[
1024
]
=
""
;
syncPeerState2Str
(
pNode
,
peerStr
,
sizeof
(
peerStr
));
char
eventLog
[
512
];
// {0};
...
...
@@ -300,22 +296,22 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
int32_t
writeLen
=
vsnprintf
(
eventLog
,
sizeof
(
eventLog
),
format
,
argpointer
);
va_end
(
argpointer
);
taosPrintLog
(
flags
,
level
,
dflag
,
"vgId:%d, %s, sync:%s,"
" {%p start:%d ack:%d term:%"
PRIu64
" start-time:%"
PRId64
" from dnode:%d s-param:%"
PRId64
" e-param:%"
PRId64
" laindex:%"
PRId64
" laterm:%"
PRIu64
" lcindex
:%"
PRId64
"}"
", tm:%"
PRIu64
", cmt:%"
PRId64
", fst:%"
PRId64
", lst:%"
PRId64
", min:%"
PRId64
", snap:%"
PRId64
", snap-tm:%"
PRIu64
", sby:%d, stgy:%d, bch:%d, r-num:%d, lcfg
:%"
PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%"
PRId64
", hb:%"
PRId64
", %s, %s"
,
pNode
->
vgId
,
eventLog
,
syncStr
(
pNode
->
state
),
pReceiver
,
pReceiver
->
start
,
pReceiver
->
ack
,
pReceiver
->
term
,
pReceiver
->
startTime
,
DID
(
&
pReceiver
->
fromId
),
pReceiver
->
snapshotParam
.
start
,
pReceiver
->
snapshotParam
.
end
,
pReceiver
->
snapshot
.
lastApplyIndex
,
pReceiver
->
snapshot
.
lastApplyTerm
,
pReceiver
->
snapshot
.
lastConfigIndex
,
pNode
->
raftStore
.
currentTerm
,
pNode
->
commitIndex
,
logBegin
Index
,
logLastIndex
,
pNode
->
minMatchIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pNode
->
raftCfg
.
isStandBy
,
pNode
->
raftCfg
.
snapshotStrategy
,
pNode
->
raftCfg
.
batchSize
,
pNode
->
replicaNum
,
pNode
->
raftCfg
.
lastConfigIndex
,
pNode
->
changing
,
pNode
->
restoreFinish
,
syncNodeDynamicQuorum
(
pNode
)
,
pNode
->
electTimerLogicClock
,
pNode
->
heartbeatTimerLogicClockUser
,
peerStr
,
cfgStr
);
taosPrintLog
(
flags
,
level
,
dflag
,
"vgId:%d, %s, sync:%s,"
" snap-receiver:{%p started:%d acked:%d term:%"
PRIu64
" start-time:%"
PRId64
" from-dnode:%d, start
:%"
PRId64
" end:%"
PRId64
" last-index:%"
PRId64
" last-term:%"
PRIu64
" last-cfg:%"
PRId64
"}"
", term:%"
PRIu64
", commit-index:%"
PRId64
", firstver:%"
PRId64
", lastver:%"
PRId64
", min-match
:%"
PRId64
", snap:{last-index:%"
PRId64
", last-term:%"
PRIu64
"}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%"
PRId64
", chging:%d, restore:%d, quorum:%d, lc-timers:{elect:%"
PRId64
", hb:%"
PRId64
"}, peer:%s, cfg:%s"
,
pNode
->
vgId
,
eventLog
,
syncStr
(
pNode
->
state
),
pReceiver
,
pReceiver
->
start
,
pReceiver
->
ack
,
pReceiver
->
term
,
pReceiver
->
startTime
,
DID
(
&
pReceiver
->
fromId
),
pReceiver
->
snapshotParam
.
start
,
pReceiver
->
snapshotParam
.
end
,
pReceiver
->
snapshot
.
lastApplyIndex
,
pReceiver
->
snapshot
.
lastApplyTerm
,
pReceiver
->
snapshot
.
lastConfig
Index
,
pNode
->
raftStore
.
currentTerm
,
pNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
pNode
->
minMatchIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pNode
->
raftCfg
.
isStandBy
,
pNode
->
raftCfg
.
batchSize
,
pNode
->
replicaNum
,
pNode
->
raftCfg
.
lastConfigIndex
,
pNode
->
changing
,
pNode
->
restoreFinish
,
syncNodeDynamicQuorum
(
pNode
),
pNode
->
electTimerLogicClock
,
pNode
->
heartbeatTimerLogicClockUser
,
peerStr
,
cfgStr
);
}
void
syncLogRecvTimer
(
SSyncNode
*
pSyncNode
,
const
SyncTimeout
*
pMsg
,
const
char
*
s
)
{
...
...
@@ -351,13 +347,13 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool
int64_t
execTime
)
{
if
(
printX
)
{
sNTrace
(
pSyncNode
,
"send sync-heartbeat to dnode:%d {term:%"
PRId64
", c
mt:%"
PRId64
", min-match:%"
PRId64
", ts
:%"
PRId64
"}, x"
,
"send sync-heartbeat to dnode:%d {term:%"
PRId64
", c
ommit-index:%"
PRId64
", min-match
:%"
PRId64
"
, ts:%"
PRId64
"
}, x"
,
DID
(
&
pMsg
->
destId
),
pMsg
->
term
,
pMsg
->
commitIndex
,
pMsg
->
minMatchIndex
,
pMsg
->
timeStamp
);
}
else
{
sNTrace
(
pSyncNode
,
"send sync-heartbeat to dnode:%d {term:%"
PRId64
", c
mt:%"
PRId64
", min-match:%"
PRId64
", ts
:%"
PRId64
"}, timer-elapsed:%"
PRId64
", next-exec:%"
PRId64
,
"send sync-heartbeat to dnode:%d {term:%"
PRId64
", c
ommit-index:%"
PRId64
", min-match
:%"
PRId64
"
, ts:%"
PRId64
"
}, timer-elapsed:%"
PRId64
", next-exec:%"
PRId64
,
DID
(
&
pMsg
->
destId
),
pMsg
->
term
,
pMsg
->
commitIndex
,
pMsg
->
minMatchIndex
,
pMsg
->
timeStamp
,
timerElapsed
,
execTime
);
}
...
...
@@ -368,14 +364,14 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64
pSyncNode
->
hbSlowNum
++
;
sNInfo
(
pSyncNode
,
"recv sync-heartbeat from dnode:%d slow {term:%"
PRId64
", c
mt
:%"
PRId64
", min-match:%"
PRId64
"recv sync-heartbeat from dnode:%d slow {term:%"
PRId64
", c
ommit-index
:%"
PRId64
", min-match:%"
PRId64
", ts:%"
PRId64
"}, %s, net elapsed:%"
PRId64
,
DID
(
&
pMsg
->
srcId
),
pMsg
->
term
,
pMsg
->
commitIndex
,
pMsg
->
minMatchIndex
,
pMsg
->
timeStamp
,
s
,
timeDiff
);
}
sNTrace
(
pSyncNode
,
"recv sync-heartbeat from dnode:%d {term:%"
PRId64
", c
mt:%"
PRId64
", min-match:%"
PRId64
", ts
:%"
PRId64
"}, %s, net elapsed:%"
PRId64
,
"recv sync-heartbeat from dnode:%d {term:%"
PRId64
", c
ommit-index:%"
PRId64
", min-match
:%"
PRId64
"
, ts:%"
PRId64
"
}, %s, net elapsed:%"
PRId64
,
DID
(
&
pMsg
->
srcId
),
pMsg
->
term
,
pMsg
->
commitIndex
,
pMsg
->
minMatchIndex
,
pMsg
->
timeStamp
,
s
,
timeDiff
);
}
...
...
@@ -400,67 +396,64 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p
void
syncLogSendSyncSnapshotSend
(
SSyncNode
*
pSyncNode
,
const
SyncSnapshotSend
*
pMsg
,
const
char
*
s
)
{
sNDebug
(
pSyncNode
,
"send sync-snapshot-send to dnode:%d, %s, seq:%d, term:%"
PRId64
", begin
:%"
PRId64
", end
:%"
PRId64
", l
term:%"
PRId64
", s
time:%"
PRId64
,
"send sync-snapshot-send to dnode:%d, %s, seq:%d, term:%"
PRId64
", begin
-index
:%"
PRId64
", l
ast-index:%"
PRId64
", last-term:%"
PRId64
", start-
time:%"
PRId64
,
DID
(
&
pMsg
->
destId
),
s
,
pMsg
->
seq
,
pMsg
->
term
,
pMsg
->
beginIndex
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
startTime
);
}
void
syncLogRecvSyncSnapshotSend
(
SSyncNode
*
pSyncNode
,
const
SyncSnapshotSend
*
pMsg
,
const
char
*
s
)
{
sNDebug
(
pSyncNode
,
"recv sync-snapshot-send from dnode:%d, %s, seq:%d, term:%"
PRId64
", begin
:%"
PRId64
", lst
:%"
PRId64
", l
term:%"
PRId64
", stime:%"
PRId64
",
len:%u"
,
"recv sync-snapshot-send from dnode:%d, %s, seq:%d, term:%"
PRId64
", begin
-index
:%"
PRId64
", l
ast-index:%"
PRId64
", last-term:%"
PRId64
", start-time:%"
PRId64
", data-
len:%u"
,
DID
(
&
pMsg
->
srcId
),
s
,
pMsg
->
seq
,
pMsg
->
term
,
pMsg
->
beginIndex
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
startTime
,
pMsg
->
dataLen
);
}
void
syncLogSendSyncSnapshotRsp
(
SSyncNode
*
pSyncNode
,
const
SyncSnapshotRsp
*
pMsg
,
const
char
*
s
)
{
sNDebug
(
pSyncNode
,
"send sync-snapshot-rsp to dnode:%d, %s, ack
:%d, term:%"
PRId64
", begin:%"
PRId64
", lst
:%"
PRId64
", l
term:%"
PRId64
", s
time:%"
PRId64
,
"send sync-snapshot-rsp to dnode:%d, %s, ack
ed:%d, term:%"
PRId64
", begin-index
:%"
PRId64
", l
ast-index:%"
PRId64
", last-term:%"
PRId64
", start-
time:%"
PRId64
,
DID
(
&
pMsg
->
destId
),
s
,
pMsg
->
ack
,
pMsg
->
term
,
pMsg
->
snapBeginIndex
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
startTime
);
}
void
syncLogRecvSyncSnapshotRsp
(
SSyncNode
*
pSyncNode
,
const
SyncSnapshotRsp
*
pMsg
,
const
char
*
s
)
{
sNDebug
(
pSyncNode
,
"recv sync-snapshot-rsp from dnode:%d, %s, ack:%d, term:%"
PRId64
", begin
:%"
PRId64
", lst
:%"
PRId64
", l
term:%"
PRId64
", s
time:%"
PRId64
,
"recv sync-snapshot-rsp from dnode:%d, %s, ack:%d, term:%"
PRId64
", begin
-index
:%"
PRId64
", l
ast-index:%"
PRId64
", last-term:%"
PRId64
", start-
time:%"
PRId64
,
DID
(
&
pMsg
->
srcId
),
s
,
pMsg
->
ack
,
pMsg
->
term
,
pMsg
->
snapBeginIndex
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
startTime
);
}
void
syncLogRecvAppendEntries
(
SSyncNode
*
pSyncNode
,
const
SyncAppendEntries
*
pMsg
,
const
char
*
s
)
{
sNTrace
(
pSyncNode
,
"recv sync-append-entries from dnode:%d {term:%"
PRId64
", pre-index:%"
PRId64
", pre-term:%"
PRId64
", cmt:%"
PRId64
", pterm:%"
PRId64
", datalen:%d}, %s"
,
DID
(
&
pMsg
->
srcId
),
pMsg
->
term
,
pMsg
->
prevLogIndex
,
pMsg
->
prevLogTerm
,
pMsg
->
commitIndex
,
pMsg
->
privateTerm
,
pMsg
->
dataLen
,
s
);
"recv sync-append-entries from dnode:%d {term:%"
PRId64
", prev-log:{index:%"
PRId64
", term:%"
PRId64
"}, commit-index:%"
PRId64
", datalen:%d}, %s"
,
DID
(
&
pMsg
->
srcId
),
pMsg
->
term
,
pMsg
->
prevLogIndex
,
pMsg
->
prevLogTerm
,
pMsg
->
commitIndex
,
pMsg
->
dataLen
,
s
);
}
void
syncLogSendAppendEntries
(
SSyncNode
*
pSyncNode
,
const
SyncAppendEntries
*
pMsg
,
const
char
*
s
)
{
sNTrace
(
pSyncNode
,
"send sync-append-entries to dnode:%d, {term:%"
PRId64
", pre
-index:%"
PRId64
", pre-
term:%"
PRId64
"
, lsend-index:%"
PRId64
", cmt
:%"
PRId64
", datalen:%d}, %s"
,
"send sync-append-entries to dnode:%d, {term:%"
PRId64
", pre
v-log:{index:%"
PRId64
",
term:%"
PRId64
"
}, index:%"
PRId64
", commit-index
:%"
PRId64
", datalen:%d}, %s"
,
DID
(
&
pMsg
->
destId
),
pMsg
->
term
,
pMsg
->
prevLogIndex
,
pMsg
->
prevLogTerm
,
(
pMsg
->
prevLogIndex
+
1
),
pMsg
->
commitIndex
,
pMsg
->
dataLen
,
s
);
}
void
syncLogRecvRequestVote
(
SSyncNode
*
pSyncNode
,
const
SyncRequestVote
*
pMsg
,
int32_t
voteGranted
,
const
char
*
s
)
{
if
(
voteGranted
==
-
1
)
{
sNInfo
(
pSyncNode
,
"recv sync-request-vote from dnode:%d, {term:%"
PRId64
", lindex:%"
PRId64
", lterm:%"
PRId64
"}, %s"
,
DID
(
&
pMsg
->
srcId
),
pMsg
->
term
,
pMsg
->
lastLogIndex
,
pMsg
->
lastLogTerm
,
s
);
}
else
{
sNInfo
(
pSyncNode
,
"recv sync-request-vote from dnode:%d, {term:%"
PRId64
", lindex:%"
PRId64
", lterm:%"
PRId64
"}, granted:%d"
,
DID
(
&
pMsg
->
srcId
),
pMsg
->
term
,
pMsg
->
lastLogIndex
,
pMsg
->
lastLogTerm
,
voteGranted
);
}
void
syncLogRecvRequestVote
(
SSyncNode
*
pSyncNode
,
const
SyncRequestVote
*
pMsg
,
int32_t
voteGranted
,
const
char
*
errmsg
)
{
char
statusMsg
[
64
];
snprintf
(
statusMsg
,
sizeof
(
statusMsg
),
"granted:%d"
,
voteGranted
);
sNInfo
(
pSyncNode
,
"recv sync-request-vote from dnode:%d, {term:%"
PRId64
", last-index:%"
PRId64
", last-term:%"
PRId64
"}, %s"
,
DID
(
&
pMsg
->
srcId
),
pMsg
->
term
,
pMsg
->
lastLogIndex
,
pMsg
->
lastLogTerm
,
(
voteGranted
!=
-
1
)
?
statusMsg
:
errmsg
);
}
void
syncLogSendRequestVote
(
SSyncNode
*
pNode
,
const
SyncRequestVote
*
pMsg
,
const
char
*
s
)
{
sNInfo
(
pNode
,
"send sync-request-vote to dnode:%d {term:%"
PRId64
", lindex:%"
PRId64
", lterm:%"
PRId64
"}, %s"
,
sNInfo
(
pNode
,
"send sync-request-vote to dnode:%d {term:%"
PRId64
", last-index:%"
PRId64
", last-term:%"
PRId64
"}, %s"
,
DID
(
&
pMsg
->
destId
),
pMsg
->
term
,
pMsg
->
lastLogIndex
,
pMsg
->
lastLogTerm
,
s
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录