Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f87b9fa7
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f87b9fa7
编写于
11月 29, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(sync): add interface syncSnapshotSending, syncSnapshotRecving
上级
198e18c3
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
84 addition
and
5 deletion
+84
-5
include/libs/sync/sync.h
include/libs/sync/sync.h
+5
-0
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+4
-0
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+46
-0
source/libs/sync/src/syncUtil.c
source/libs/sync/src/syncUtil.c
+29
-5
未找到文件。
include/libs/sync/sync.h
浏览文件 @
f87b9fa7
...
...
@@ -43,6 +43,9 @@ extern "C" {
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
#define SYNC_HEART_TIMEOUT_MS 1000 * 8
#define SYNC_HEARTBEAT_SLOW_MS 1500
#define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500
#define SYNC_MAX_BATCH_SIZE 1
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
...
...
@@ -226,6 +229,8 @@ int32_t syncEndSnapshot(int64_t rid);
int32_t
syncLeaderTransfer
(
int64_t
rid
);
int32_t
syncStepDown
(
int64_t
rid
,
SyncTerm
newTerm
);
bool
syncIsReadyForRead
(
int64_t
rid
);
bool
syncSnapshotSending
(
int64_t
rid
);
bool
syncSnapshotRecving
(
int64_t
rid
);
SSyncState
syncGetState
(
int64_t
rid
);
void
syncGetRetryEpSet
(
int64_t
rid
,
SEpSet
*
pEpSet
);
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
f87b9fa7
...
...
@@ -195,6 +195,8 @@ typedef struct SSyncNode {
int32_t
electNum
;
int32_t
becomeLeaderNum
;
int32_t
configChangeNum
;
int32_t
hbSlowNum
;
int32_t
hbrSlowNum
;
bool
isStart
;
...
...
@@ -239,6 +241,8 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode,
SyncIndex
syncMinMatchIndex
(
SSyncNode
*
pSyncNode
);
int32_t
syncCacheEntry
(
SSyncLogStore
*
pLogStore
,
SSyncRaftEntry
*
pEntry
,
LRUHandle
**
h
);
bool
syncNodeHeartbeatReplyTimeout
(
SSyncNode
*
pSyncNode
);
bool
syncNodeSnapshotSending
(
SSyncNode
*
pSyncNode
);
bool
syncNodeSnapshotRecving
(
SSyncNode
*
pSyncNode
);
// raft state change --------------
void
syncNodeUpdateTerm
(
SSyncNode
*
pSyncNode
,
SyncTerm
term
);
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
f87b9fa7
...
...
@@ -447,6 +447,28 @@ bool syncIsReadyForRead(int64_t rid) {
return
ready
;
}
bool
syncSnapshotSending
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
{
return
false
;
}
bool
b
=
syncNodeSnapshotSending
(
pSyncNode
);
syncNodeRelease
(
pSyncNode
);
return
b
;
}
bool
syncSnapshotRecving
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
rid
);
if
(
pSyncNode
==
NULL
)
{
return
false
;
}
bool
b
=
syncNodeSnapshotRecving
(
pSyncNode
);
syncNodeRelease
(
pSyncNode
);
return
b
;
}
int32_t
syncNodeLeaderTransfer
(
SSyncNode
*
pSyncNode
)
{
if
(
pSyncNode
->
peersNum
==
0
)
{
sDebug
(
"vgId:%d, only one replica, cannot leader transfer"
,
pSyncNode
->
vgId
);
...
...
@@ -1013,6 +1035,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
pSyncNode
->
electNum
=
0
;
pSyncNode
->
becomeLeaderNum
=
0
;
pSyncNode
->
configChangeNum
=
0
;
pSyncNode
->
hbSlowNum
=
0
;
pSyncNode
->
hbrSlowNum
=
0
;
sNTrace
(
pSyncNode
,
"sync open, node:%p"
,
pSyncNode
);
...
...
@@ -1563,6 +1587,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode
->
leaderCache
=
EMPTY_RAFT_ID
;
}
pSyncNode
->
hbSlowNum
=
0
;
// state change
pSyncNode
->
state
=
TAOS_SYNC_STATE_FOLLOWER
;
syncNodeStopHeartbeatTimer
(
pSyncNode
);
...
...
@@ -1607,6 +1633,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode
->
leaderTime
=
taosGetTimestampMs
();
pSyncNode
->
becomeLeaderNum
++
;
pSyncNode
->
hbrSlowNum
=
0
;
// reset restoreFinish
pSyncNode
->
restoreFinish
=
false
;
...
...
@@ -2167,6 +2194,25 @@ bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
return
b
;
}
bool
syncNodeSnapshotSending
(
SSyncNode
*
pSyncNode
)
{
if
(
pSyncNode
==
NULL
)
return
false
;
bool
b
=
false
;
for
(
int32_t
i
=
0
;
i
<
pSyncNode
->
replicaNum
;
++
i
)
{
if
(
pSyncNode
->
senders
[
i
]
!=
NULL
&&
pSyncNode
->
senders
[
i
]
->
start
)
{
b
=
true
;
break
;
}
}
return
b
;
}
bool
syncNodeSnapshotRecving
(
SSyncNode
*
pSyncNode
)
{
if
(
pSyncNode
==
NULL
)
return
false
;
if
(
pSyncNode
->
pNewNodeReceiver
==
NULL
)
return
false
;
if
(
pSyncNode
->
pNewNodeReceiver
->
start
)
return
true
;
return
false
;
}
static
int32_t
syncNodeAppendNoop
(
SSyncNode
*
ths
)
{
int32_t
ret
=
0
;
...
...
source/libs/sync/src/syncUtil.c
浏览文件 @
f87b9fa7
...
...
@@ -283,13 +283,15 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
"vgId:%d, sync %s "
"%s"
", tm:%"
PRIu64
", cmt:%"
PRId64
", fst:%"
PRId64
", lst:%"
PRId64
", min:%"
PRId64
", snap:%"
PRId64
", snap-tm:%"
PRIu64
", elt-num:%d, bl-num:%d, cc-num:%d, hit:%d, mis:%d, aq:%d, snaping:%"
PRId64
", snap-tm:%"
PRIu64
", elt-num:%d, bl-num:%d, cc-num:%d, hit:%d, mis:%d, hb-slow:%d, hbr-slow:%d, aq:%d, snaping:%"
PRId64
", r-num:%d, lcfg:%"
PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%"
PRId64
", hb:%"
PRId64
", %s, %s, %s, %s"
,
pNode
->
vgId
,
syncStr
(
pNode
->
state
),
eventLog
,
currentTerm
,
pNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
pNode
->
minMatchIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pNode
->
electNum
,
pNode
->
becomeLeaderNum
,
pNode
->
configChangeNum
,
cacheHit
,
cacheMiss
,
aqItems
,
pNode
->
snapshottingIndex
,
pNode
->
replicaNum
,
pNode
->
pRaftCfg
->
lastConfigIndex
,
pNode
->
changing
,
pNode
->
restoreFinish
,
quorum
,
pNode
->
electTimerLogicClock
,
pNode
->
heartbeatTimerLogicClockUser
,
peerStr
,
cfgStr
,
hbTimeStr
,
hbrTimeStr
);
pNode
->
configChangeNum
,
cacheHit
,
cacheMiss
,
pNode
->
hbSlowNum
,
pNode
->
hbrSlowNum
,
aqItems
,
pNode
->
snapshottingIndex
,
pNode
->
replicaNum
,
pNode
->
pRaftCfg
->
lastConfigIndex
,
pNode
->
changing
,
pNode
->
restoreFinish
,
quorum
,
pNode
->
electTimerLogicClock
,
pNode
->
heartbeatTimerLogicClockUser
,
peerStr
,
cfgStr
,
hbTimeStr
,
hbrTimeStr
);
}
}
...
...
@@ -463,12 +465,23 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool
}
void
syncLogRecvHeartbeat
(
SSyncNode
*
pSyncNode
,
const
SyncHeartbeat
*
pMsg
,
int64_t
timeDiff
)
{
if
(
timeDiff
>
SYNC_HEARTBEAT_SLOW_MS
)
{
pSyncNode
->
hbSlowNum
++
;
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
srcId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
sNInfo
(
pSyncNode
,
"recv sync-heartbeat from %s:%d slow {term:%"
PRId64
", cmt:%"
PRId64
", min-match:%"
PRId64
", ts:%"
PRId64
"}, net elapsed:%"
PRId64
,
host
,
port
,
pMsg
->
term
,
pMsg
->
commitIndex
,
pMsg
->
minMatchIndex
,
pMsg
->
timeStamp
,
timeDiff
);
}
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
srcId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
sNTrace
(
pSyncNode
,
"recv sync-heartbeat from %s:%d {term:%"
PRId64
", cmt:%"
PRId64
", min-match:%"
PRId64
", ts:%"
PRId64
"}, net elapsed:%"
PRId64
,
...
...
@@ -487,6 +500,17 @@ void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p
}
void
syncLogRecvHeartbeatReply
(
SSyncNode
*
pSyncNode
,
const
SyncHeartbeatReply
*
pMsg
,
int64_t
timeDiff
)
{
if
(
timeDiff
>
SYNC_HEARTBEAT_REPLY_SLOW_MS
)
{
pSyncNode
->
hbrSlowNum
++
;
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
srcId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
sNTrace
(
pSyncNode
,
"recv sync-heartbeat-reply from %s:%d slow {term:%"
PRId64
", ts:%"
PRId64
"}, net elapsed:%"
PRId64
,
host
,
port
,
pMsg
->
term
,
pMsg
->
timeStamp
,
timeDiff
);
}
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
char
host
[
64
];
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录