Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a7cd465a
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
a7cd465a
编写于
11月 29, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
11月 29, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #18544 from taosdata/feature/3.0_mhli
refactor(sync): add trace log for elect count, become leader count, c…
上级
10c8a601
198e18c3
变更
10
显示空白变更内容
内联
并排
Showing
10 changed file
with
58 addition
and
24 deletion
+58
-24
include/libs/sync/sync.h
include/libs/sync/sync.h
+4
-1
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+4
-0
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+2
-0
source/libs/sync/src/syncCommit.c
source/libs/sync/src/syncCommit.c
+2
-0
source/libs/sync/src/syncElection.c
source/libs/sync/src/syncElection.c
+3
-3
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+20
-6
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+3
-0
source/libs/sync/src/syncReplication.c
source/libs/sync/src/syncReplication.c
+2
-0
source/libs/sync/src/syncTimeout.c
source/libs/sync/src/syncTimeout.c
+1
-1
source/libs/sync/src/syncUtil.c
source/libs/sync/src/syncUtil.c
+17
-13
未找到文件。
include/libs/sync/sync.h
浏览文件 @
a7cd465a
...
...
@@ -162,6 +162,9 @@ typedef struct SSyncFSM {
// SWal implements it
typedef
struct
SSyncLogStore
{
SLRUCache
*
pCache
;
int32_t
cacheHit
;
int32_t
cacheMiss
;
void
*
data
;
int32_t
(
*
syncLogUpdateCommitIndex
)(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
index
);
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
a7cd465a
...
...
@@ -192,6 +192,10 @@ typedef struct SSyncNode {
int64_t
leaderTime
;
int64_t
lastReplicateTime
;
int32_t
electNum
;
int32_t
becomeLeaderNum
;
int32_t
configChangeNum
;
bool
isStart
;
}
SSyncNode
;
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
a7cd465a
...
...
@@ -205,9 +205,11 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
pLocalEntry
=
(
SSyncRaftEntry
*
)
taosLRUCacheValue
(
pCache
,
hLocal
);
code
=
0
;
ths
->
pLogStore
->
cacheHit
++
;
sNTrace
(
ths
,
"hit cache index:%"
PRId64
", bytes:%u, %p"
,
appendIndex
,
pLocalEntry
->
bytes
,
pLocalEntry
);
}
else
{
ths
->
pLogStore
->
cacheMiss
++
;
sNTrace
(
ths
,
"miss cache index:%"
PRId64
,
appendIndex
);
code
=
ths
->
pLogStore
->
syncLogGetEntry
(
ths
->
pLogStore
,
appendIndex
,
&
pLocalEntry
);
...
...
source/libs/sync/src/syncCommit.c
浏览文件 @
a7cd465a
...
...
@@ -117,9 +117,11 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
if
(
h
)
{
pEntry
=
(
SSyncRaftEntry
*
)
taosLRUCacheValue
(
pCache
,
h
);
pSyncNode
->
pLogStore
->
cacheHit
++
;
sNTrace
(
pSyncNode
,
"hit cache index:%"
PRId64
", bytes:%u, %p"
,
index
,
pEntry
->
bytes
,
pEntry
);
}
else
{
pSyncNode
->
pLogStore
->
cacheMiss
++
;
sNTrace
(
pSyncNode
,
"miss cache index:%"
PRId64
,
index
);
int32_t
code
=
pSyncNode
->
pLogStore
->
syncLogGetEntry
(
pSyncNode
->
pLogStore
,
index
,
&
pEntry
);
...
...
source/libs/sync/src/syncElection.c
浏览文件 @
a7cd465a
...
...
@@ -61,7 +61,8 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) {
}
int32_t
syncNodeElect
(
SSyncNode
*
pSyncNode
)
{
sNTrace
(
pSyncNode
,
"begin election"
);
sNInfo
(
pSyncNode
,
"begin election"
);
pSyncNode
->
electNum
++
;
int32_t
ret
=
0
;
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_FOLLOWER
)
{
...
...
@@ -98,7 +99,6 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
syncNodeCandidate2Leader
(
pSyncNode
);
pSyncNode
->
pVotesGranted
->
toLeader
=
true
;
return
ret
;
}
ret
=
syncNodeRequestVotePeers
(
pSyncNode
);
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
a7cd465a
...
...
@@ -410,9 +410,11 @@ bool syncIsReadyForRead(int64_t rid) {
pEntry
=
(
SSyncRaftEntry
*
)
taosLRUCacheValue
(
pCache
,
h
);
code
=
0
;
pSyncNode
->
pLogStore
->
cacheHit
++
;
sNTrace
(
pSyncNode
,
"hit cache index:%"
PRId64
", bytes:%u, %p"
,
lastIndex
,
pEntry
->
bytes
,
pEntry
);
}
else
{
pSyncNode
->
pLogStore
->
cacheMiss
++
;
sNTrace
(
pSyncNode
,
"miss cache index:%"
PRId64
,
lastIndex
);
code
=
pSyncNode
->
pLogStore
->
syncLogGetEntry
(
pSyncNode
->
pLogStore
,
lastIndex
,
&
pEntry
);
...
...
@@ -1008,6 +1010,10 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
atomic_store_64
(
&
pSyncNode
->
snapshottingIndex
,
SYNC_INDEX_INVALID
);
pSyncNode
->
isStart
=
true
;
pSyncNode
->
electNum
=
0
;
pSyncNode
->
becomeLeaderNum
=
0
;
pSyncNode
->
configChangeNum
=
0
;
sNTrace
(
pSyncNode
,
"sync open, node:%p"
,
pSyncNode
);
return
pSyncNode
;
...
...
@@ -1340,6 +1346,8 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
pSyncNode
->
pRaftCfg
->
cfg
=
*
pNewConfig
;
pSyncNode
->
pRaftCfg
->
lastConfigIndex
=
lastConfigChangeIndex
;
pSyncNode
->
configChangeNum
++
;
bool
IamInOld
=
syncNodeInConfig
(
pSyncNode
,
&
oldConfig
);
bool
IamInNew
=
syncNodeInConfig
(
pSyncNode
,
pNewConfig
);
...
...
@@ -1363,7 +1371,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
char
newCfgStr
[
1024
]
=
{
0
};
syncCfg2SimpleStr
(
&
oldConfig
,
oldCfgStr
,
sizeof
(
oldCfgStr
));
syncCfg2SimpleStr
(
pNewConfig
,
oldCfgStr
,
sizeof
(
oldCfgStr
));
sN
Trace
(
pSyncNode
,
"begin do config change, from %s to %s"
,
oldCfgStr
,
oldCfgStr
);
sN
Info
(
pSyncNode
,
"begin do config change, from %s to %s"
,
oldCfgStr
,
oldCfgStr
);
if
(
IamInNew
)
{
pSyncNode
->
pRaftCfg
->
isStandBy
=
0
;
// change isStandBy to normal
...
...
@@ -1495,13 +1503,13 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
}
else
{
// persist cfg
raftCfgPersist
(
pSyncNode
->
pRaftCfg
);
sN
Trace
(
pSyncNode
,
"do not config change from %d to %d, index:%"
PRId64
", %s --> %s"
,
oldConfig
.
replicaNum
,
sN
Info
(
pSyncNode
,
"do not config change from %d to %d, index:%"
PRId64
", %s --> %s"
,
oldConfig
.
replicaNum
,
pNewConfig
->
replicaNum
,
lastConfigChangeIndex
,
oldCfgStr
,
newCfgStr
);
}
_END:
// log end config change
sN
Trace
(
pSyncNode
,
"end do config change, from %s to %s"
,
oldCfgStr
,
newCfgStr
);
sN
Info
(
pSyncNode
,
"end do config change, from %s to %s"
,
oldCfgStr
,
newCfgStr
);
}
// raft state change --------------
...
...
@@ -1598,6 +1606,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
void
syncNodeBecomeLeader
(
SSyncNode
*
pSyncNode
,
const
char
*
debugStr
)
{
pSyncNode
->
leaderTime
=
taosGetTimestampMs
();
pSyncNode
->
becomeLeaderNum
++
;
// reset restoreFinish
pSyncNode
->
restoreFinish
=
false
;
...
...
@@ -1666,7 +1676,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode
->
minMatchIndex
=
SYNC_INDEX_INVALID
;
// trace log
sN
Trace
(
pSyncNode
,
"become leader %s"
,
debugStr
);
sN
Info
(
pSyncNode
,
"become leader %s"
,
debugStr
);
}
void
syncNodeCandidate2Leader
(
SSyncNode
*
pSyncNode
)
{
...
...
@@ -1842,9 +1852,11 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
pPreEntry
=
(
SSyncRaftEntry
*
)
taosLRUCacheValue
(
pCache
,
h
);
code
=
0
;
pSyncNode
->
pLogStore
->
cacheHit
++
;
sNTrace
(
pSyncNode
,
"hit cache index:%"
PRId64
", bytes:%u, %p"
,
preIndex
,
pPreEntry
->
bytes
,
pPreEntry
);
}
else
{
pSyncNode
->
pLogStore
->
cacheMiss
++
;
sNTrace
(
pSyncNode
,
"miss cache index:%"
PRId64
,
preIndex
);
code
=
pSyncNode
->
pLogStore
->
syncLogGetEntry
(
pSyncNode
->
pLogStore
,
preIndex
,
&
pPreEntry
);
...
...
@@ -1971,7 +1983,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
return
;
}
sTrace
(
"
enqueue heartbeat timer"
);
sTrace
(
"
vgId:%d, enqueue heartbeat timer"
,
pNode
->
vgId
);
code
=
pNode
->
syncEqMsg
(
pNode
->
msgcb
,
&
rpcMsg
);
if
(
code
!=
0
)
{
sError
(
"failed to enqueue heartbeat msg since %s"
,
terrstr
());
...
...
@@ -2526,9 +2538,11 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
if
(
h
)
{
pEntry
=
(
SSyncRaftEntry
*
)
taosLRUCacheValue
(
pCache
,
h
);
ths
->
pLogStore
->
cacheHit
++
;
sNTrace
(
ths
,
"hit cache index:%"
PRId64
", bytes:%u, %p"
,
i
,
pEntry
->
bytes
,
pEntry
);
}
else
{
ths
->
pLogStore
->
cacheMiss
++
;
sNTrace
(
ths
,
"miss cache index:%"
PRId64
,
i
);
code
=
ths
->
pLogStore
->
syncLogGetEntry
(
ths
->
pLogStore
,
i
,
&
pEntry
);
...
...
source/libs/sync/src/syncRaftLog.c
浏览文件 @
a7cd465a
...
...
@@ -45,6 +45,9 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
return
NULL
;
}
pLogStore
->
cacheHit
=
0
;
pLogStore
->
cacheMiss
=
0
;
taosLRUCacheSetStrictCapacity
(
pLogStore
->
pCache
,
false
);
pLogStore
->
data
=
taosMemoryMalloc
(
sizeof
(
SSyncLogStoreData
));
...
...
source/libs/sync/src/syncReplication.c
浏览文件 @
a7cd465a
...
...
@@ -80,9 +80,11 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh
pEntry
=
(
SSyncRaftEntry
*
)
taosLRUCacheValue
(
pCache
,
h
);
code
=
0
;
pSyncNode
->
pLogStore
->
cacheHit
++
;
sNTrace
(
pSyncNode
,
"hit cache index:%"
PRId64
", bytes:%u, %p"
,
nextIndex
,
pEntry
->
bytes
,
pEntry
);
}
else
{
pSyncNode
->
pLogStore
->
cacheMiss
++
;
sNTrace
(
pSyncNode
,
"miss cache index:%"
PRId64
,
nextIndex
);
code
=
pSyncNode
->
pLogStore
->
syncLogGetEntry
(
pSyncNode
->
pLogStore
,
nextIndex
,
&
pEntry
);
...
...
source/libs/sync/src/syncTimeout.c
浏览文件 @
a7cd465a
...
...
@@ -52,7 +52,7 @@ static void syncNodeCleanConfigIndex(SSyncNode* ths) {
}
static
int32_t
syncNodeTimerRoutine
(
SSyncNode
*
ths
)
{
sN
Trace
(
ths
,
"timer routines"
);
sN
Info
(
ths
,
"timer routines"
);
// timer replicate
syncNodeReplicate
(
ths
);
...
...
source/libs/sync/src/syncUtil.c
浏览文件 @
a7cd465a
...
...
@@ -242,6 +242,9 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
logBeginIndex
=
pNode
->
pLogStore
->
syncLogBeginIndex
(
pNode
->
pLogStore
);
}
int32_t
cacheHit
=
pNode
->
pLogStore
->
cacheHit
;
int32_t
cacheMiss
=
pNode
->
pLogStore
->
cacheMiss
;
char
cfgStr
[
1024
];
if
(
pNode
->
pRaftCfg
!=
NULL
)
{
syncCfg2SimpleStr
(
&
(
pNode
->
pRaftCfg
->
cfg
),
cfgStr
,
sizeof
(
cfgStr
));
...
...
@@ -275,18 +278,18 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
terrno
=
errCode
;
if
(
pNode
!=
NULL
&&
pNode
->
pRaftCfg
!=
NULL
)
{
taosPrintLog
(
flags
,
level
,
dflag
,
taosPrintLog
(
flags
,
level
,
dflag
,
"vgId:%d, sync %s "
"%s"
", tm:%"
PRIu64
", cmt:%"
PRId64
", fst:%"
PRId64
", lst:%"
PRId64
", min:%"
PRId64
", snap:%"
PRId64
", snap-tm:%"
PRIu64
", sby:%d, aq:%d, snaping:%"
PRId64
", r-num:%d, lcfg:%"
PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%"
PRId64
", hb:%"
PRId64
", %s, %s, %s, %s"
,
pNode
->
vgId
,
syncStr
(
pNode
->
state
),
eventLog
,
currentTerm
,
pNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
pNode
->
minMatchIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pNode
->
pRaftCfg
->
isStandBy
,
aqItems
,
pNode
->
snapshottingIndex
,
pNode
->
replicaNum
,
pNode
->
pRaftCfg
->
lastConfigIndex
,
pNode
->
changing
,
pNode
->
restoreFinish
,
quorum
,
pNode
->
electTimerLogicClock
,
pNode
->
heartbeatTimerLogicClockUser
,
peerStr
,
cfgStr
,
hbTimeStr
,
hbrTimeStr
);
", snap-tm:%"
PRIu64
", elt-num:%d, bl-num:%d, cc-num:%d, hit:%d, mis:%d, aq:%d, snaping:%"
PRId64
", r-num:%d, lcfg:%"
PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%"
PRId64
", hb:%"
PRId64
", %s, %s, %s, %s"
,
pNode
->
vgId
,
syncStr
(
pNode
->
state
),
eventLog
,
currentTerm
,
pNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
pNode
->
minMatchIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pNode
->
electNum
,
pNode
->
becomeLeaderNum
,
pNode
->
configChangeNum
,
cacheHit
,
cacheMiss
,
aqItems
,
pNode
->
snapshottingIndex
,
pNode
->
replicaNum
,
pNode
->
pRaftCfg
->
lastConfigIndex
,
pNode
->
changing
,
pNode
->
restoreFinish
,
quorum
,
pNode
->
electTimerLogicClock
,
pNode
->
heartbeatTimerLogicClockUser
,
peerStr
,
cfgStr
,
hbTimeStr
,
hbrTimeStr
);
}
}
...
...
@@ -438,7 +441,8 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
host
,
port
,
pMsg
->
term
,
pMsg
->
privateTerm
,
pMsg
->
success
,
pMsg
->
lastSendIndex
,
pMsg
->
matchIndex
,
s
);
}
void
syncLogSendHeartbeat
(
SSyncNode
*
pSyncNode
,
const
SyncHeartbeat
*
pMsg
,
bool
printX
,
int64_t
timerElapsed
,
int64_t
execTime
)
{
void
syncLogSendHeartbeat
(
SSyncNode
*
pSyncNode
,
const
SyncHeartbeat
*
pMsg
,
bool
printX
,
int64_t
timerElapsed
,
int64_t
execTime
)
{
if
(
!
(
sDebugFlag
&
DEBUG_TRACE
))
return
;
char
host
[
64
];
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录