Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a880f22b
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a880f22b
编写于
11月 25, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor: adjust syncLog
上级
39ddf9fa
变更
4
显示空白变更内容
内联
并排
Showing
4 changed file
with
59 addition
and
19 deletion
+59
-19
source/libs/sync/inc/syncUtil.h
source/libs/sync/inc/syncUtil.h
+2
-2
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+1
-3
source/libs/sync/src/syncRequestVote.c
source/libs/sync/src/syncRequestVote.c
+2
-8
source/libs/sync/src/syncUtil.c
source/libs/sync/src/syncUtil.c
+54
-6
未找到文件。
source/libs/sync/inc/syncUtil.h
浏览文件 @
a880f22b
...
...
@@ -98,7 +98,7 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool
void
syncLogRecvHeartbeat
(
SSyncNode
*
pSyncNode
,
const
SyncHeartbeat
*
pMsg
,
int64_t
timeDiff
);
void
syncLogSendHeartbeatReply
(
SSyncNode
*
pSyncNode
,
const
SyncHeartbeatReply
*
pMsg
,
const
char
*
s
);
void
syncLogRecvHeartbeatReply
(
SSyncNode
*
pSyncNode
,
const
SyncHeartbeatReply
*
pMsg
,
const
char
*
s
);
void
syncLogRecvHeartbeatReply
(
SSyncNode
*
pSyncNode
,
const
SyncHeartbeatReply
*
pMsg
,
int64_t
timeDiff
);
void
syncLogSendSyncPreSnapshot
(
SSyncNode
*
pSyncNode
,
const
SyncPreSnapshot
*
pMsg
,
const
char
*
s
);
void
syncLogRecvSyncPreSnapshot
(
SSyncNode
*
pSyncNode
,
const
SyncPreSnapshot
*
pMsg
,
const
char
*
s
);
...
...
@@ -115,7 +115,7 @@ void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMs
void
syncLogRecvAppendEntries
(
SSyncNode
*
pSyncNode
,
const
SyncAppendEntries
*
pMsg
,
const
char
*
s
);
void
syncLogSendAppendEntries
(
SSyncNode
*
pSyncNode
,
const
SyncAppendEntries
*
pMsg
,
const
char
*
s
);
void
syncLogRecvRequestVote
(
SSyncNode
*
pSyncNode
,
const
SyncRequestVote
*
pMsg
,
const
char
*
s
);
void
syncLogRecvRequestVote
(
SSyncNode
*
pSyncNode
,
const
SyncRequestVote
*
pMsg
,
int32_t
voteGranted
,
const
char
*
s
);
void
syncLogSendRequestVote
(
SSyncNode
*
pNode
,
const
SyncRequestVote
*
pMsg
,
const
char
*
s
);
void
syncLogRecvRequestVoteReply
(
SSyncNode
*
pSyncNode
,
const
SyncRequestVoteReply
*
pMsg
,
const
char
*
s
);
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
a880f22b
...
...
@@ -2237,9 +2237,7 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int64_t
tsMs
=
taosGetTimestampMs
();
int64_t
timeDiff
=
tsMs
-
pMsg
->
timeStamp
;
char
buf
[
128
];
snprintf
(
buf
,
sizeof
(
buf
),
"net elapsed:%"
PRId64
,
timeDiff
);
syncLogRecvHeartbeatReply
(
ths
,
pMsg
,
buf
);
syncLogRecvHeartbeatReply
(
ths
,
pMsg
,
timeDiff
);
// update last reply time, make decision whether the other node is alive or not
syncIndexMgrSetRecvTime
(
ths
->
pMatchIndex
,
&
pMsg
->
srcId
,
tsMs
);
...
...
source/libs/sync/src/syncRequestVote.c
浏览文件 @
a880f22b
...
...
@@ -94,7 +94,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
// if already drop replica, do not process
if
(
!
syncNodeInRaftGroup
(
ths
,
&
(
pMsg
->
srcId
)))
{
syncLogRecvRequestVote
(
ths
,
pMsg
,
"not in my config"
);
syncLogRecvRequestVote
(
ths
,
pMsg
,
-
1
,
"not in my config"
);
return
-
1
;
}
...
...
@@ -133,13 +133,7 @@ int32_t syncNodeOnRequestVote(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
pReply
->
voteGranted
=
grant
;
// trace log
do
{
char
logBuf
[
32
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"grant:%d"
,
pReply
->
voteGranted
);
syncLogRecvRequestVote
(
ths
,
pMsg
,
logBuf
);
syncLogSendRequestVoteReply
(
ths
,
pReply
,
""
);
}
while
(
0
);
syncNodeSendMsgById
(
&
pReply
->
destId
,
ths
,
&
rpcMsg
);
return
0
;
}
\ No newline at end of file
source/libs/sync/src/syncUtil.c
浏览文件 @
a880f22b
...
...
@@ -396,6 +396,8 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
}
void
syncLogRecvTimer
(
SSyncNode
*
pSyncNode
,
const
SyncTimeout
*
pMsg
,
const
char
*
s
)
{
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
int64_t
tsNow
=
taosGetTimestampMs
();
int64_t
timeDIff
=
tsNow
-
pMsg
->
timeStamp
;
sNTrace
(
...
...
@@ -404,11 +406,15 @@ void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char*
}
void
syncLogRecvLocalCmd
(
SSyncNode
*
pSyncNode
,
const
SyncLocalCmd
*
pMsg
,
const
char
*
s
)
{
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
sNTrace
(
pSyncNode
,
"recv sync-local-cmd {cmd:%d-%s, sd-new-term:%"
PRId64
", fc-index:%"
PRId64
"}, %s"
,
pMsg
->
cmd
,
syncLocalCmdGetStr
(
pMsg
->
cmd
),
pMsg
->
sdNewTerm
,
pMsg
->
fcIndex
,
s
);
}
void
syncLogSendAppendEntriesReply
(
SSyncNode
*
pSyncNode
,
const
SyncAppendEntriesReply
*
pMsg
,
const
char
*
s
)
{
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
destId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
...
...
@@ -420,6 +426,8 @@ void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
}
void
syncLogRecvAppendEntriesReply
(
SSyncNode
*
pSyncNode
,
const
SyncAppendEntriesReply
*
pMsg
,
const
char
*
s
)
{
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
srcId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
...
...
@@ -464,6 +472,8 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64
}
void
syncLogSendHeartbeatReply
(
SSyncNode
*
pSyncNode
,
const
SyncHeartbeatReply
*
pMsg
,
const
char
*
s
)
{
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
destId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
...
...
@@ -472,15 +482,19 @@ void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p
pMsg
->
term
,
pMsg
->
timeStamp
,
s
);
}
void
syncLogRecvHeartbeatReply
(
SSyncNode
*
pSyncNode
,
const
SyncHeartbeatReply
*
pMsg
,
const
char
*
s
)
{
void
syncLogRecvHeartbeatReply
(
SSyncNode
*
pSyncNode
,
const
SyncHeartbeatReply
*
pMsg
,
int64_t
timeDiff
)
{
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
srcId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
sNTrace
(
pSyncNode
,
"recv sync-heartbeat-reply from %s:%d {term:%"
PRId64
", ts:%"
PRId64
"},
%s"
,
host
,
port
,
pMsg
->
term
,
pMsg
->
timeStamp
,
s
);
sNTrace
(
pSyncNode
,
"recv sync-heartbeat-reply from %s:%d {term:%"
PRId64
", ts:%"
PRId64
"},
net elapsed:%"
PRId64
,
host
,
port
,
pMsg
->
term
,
pMsg
->
timeStamp
,
timeDiff
);
}
void
syncLogSendSyncPreSnapshot
(
SSyncNode
*
pSyncNode
,
const
SyncPreSnapshot
*
pMsg
,
const
char
*
s
)
{
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
destId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
...
...
@@ -488,6 +502,8 @@ void syncLogSendSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMs
}
void
syncLogRecvSyncPreSnapshot
(
SSyncNode
*
pSyncNode
,
const
SyncPreSnapshot
*
pMsg
,
const
char
*
s
)
{
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
srcId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
...
...
@@ -495,6 +511,8 @@ void syncLogRecvSyncPreSnapshot(SSyncNode* pSyncNode, const SyncPreSnapshot* pMs
}
void
syncLogSendSyncPreSnapshotReply
(
SSyncNode
*
pSyncNode
,
const
SyncPreSnapshotReply
*
pMsg
,
const
char
*
s
)
{
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
destId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
...
...
@@ -503,6 +521,8 @@ void syncLogSendSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot
}
void
syncLogRecvSyncPreSnapshotReply
(
SSyncNode
*
pSyncNode
,
const
SyncPreSnapshotReply
*
pMsg
,
const
char
*
s
)
{
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
srcId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
...
...
@@ -511,6 +531,8 @@ void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot
}
void
syncLogSendSyncSnapshotSend
(
SSyncNode
*
pSyncNode
,
const
SyncSnapshotSend
*
pMsg
,
const
char
*
s
)
{
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
destId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
...
...
@@ -522,6 +544,8 @@ void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* p
}
void
syncLogRecvSyncSnapshotSend
(
SSyncNode
*
pSyncNode
,
const
SyncSnapshotSend
*
pMsg
,
const
char
*
s
)
{
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
srcId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
...
...
@@ -534,6 +558,8 @@ void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* p
}
void
syncLogSendSyncSnapshotRsp
(
SSyncNode
*
pSyncNode
,
const
SyncSnapshotRsp
*
pMsg
,
const
char
*
s
)
{
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
destId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
...
...
@@ -545,6 +571,8 @@ void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMs
}
void
syncLogRecvSyncSnapshotRsp
(
SSyncNode
*
pSyncNode
,
const
SyncSnapshotRsp
*
pMsg
,
const
char
*
s
)
{
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
srcId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
...
...
@@ -556,6 +584,8 @@ void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMs
}
void
syncLogRecvAppendEntries
(
SSyncNode
*
pSyncNode
,
const
SyncAppendEntries
*
pMsg
,
const
char
*
s
)
{
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
srcId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
...
...
@@ -568,6 +598,8 @@ void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
}
void
syncLogSendAppendEntries
(
SSyncNode
*
pSyncNode
,
const
SyncAppendEntries
*
pMsg
,
const
char
*
s
)
{
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
destId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
...
...
@@ -578,16 +610,28 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
pMsg
->
dataLen
,
s
);
}
void
syncLogRecvRequestVote
(
SSyncNode
*
pSyncNode
,
const
SyncRequestVote
*
pMsg
,
const
char
*
s
)
{
void
syncLogRecvRequestVote
(
SSyncNode
*
pSyncNode
,
const
SyncRequestVote
*
pMsg
,
int32_t
voteGranted
,
const
char
*
s
)
{
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
char
logBuf
[
256
];
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
srcId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
sNTrace
(
pSyncNode
,
"recv sync-request-vote from %s:%d, {term:%"
PRId64
", lindex:%"
PRId64
", lterm:%"
PRId64
"}, %s"
,
host
,
port
,
pMsg
->
term
,
pMsg
->
lastLogIndex
,
pMsg
->
lastLogTerm
,
s
);
if
(
voteGranted
==
-
1
)
{
sNTrace
(
pSyncNode
,
"recv sync-request-vote from %s:%d, {term:%"
PRId64
", lindex:%"
PRId64
", lterm:%"
PRId64
"}, %s"
,
host
,
port
,
pMsg
->
term
,
pMsg
->
lastLogIndex
,
pMsg
->
lastLogTerm
,
s
);
}
else
{
sNTrace
(
pSyncNode
,
"recv sync-request-vote from %s:%d, {term:%"
PRId64
", lindex:%"
PRId64
", lterm:%"
PRId64
"}, granted:%d"
,
host
,
port
,
pMsg
->
term
,
pMsg
->
lastLogIndex
,
pMsg
->
lastLogTerm
,
voteGranted
);
}
}
void
syncLogSendRequestVote
(
SSyncNode
*
pNode
,
const
SyncRequestVote
*
pMsg
,
const
char
*
s
)
{
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
destId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
...
...
@@ -596,6 +640,8 @@ void syncLogSendRequestVote(SSyncNode* pNode, const SyncRequestVote* pMsg, const
}
void
syncLogRecvRequestVoteReply
(
SSyncNode
*
pSyncNode
,
const
SyncRequestVoteReply
*
pMsg
,
const
char
*
s
)
{
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
srcId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
...
...
@@ -604,6 +650,8 @@ void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteRepl
}
void
syncLogSendRequestVoteReply
(
SSyncNode
*
pSyncNode
,
const
SyncRequestVoteReply
*
pMsg
,
const
char
*
s
)
{
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
destId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录