Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8069066a
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
8069066a
编写于
8月 05, 2022
作者:
L
Li Minghao
提交者:
GitHub
8月 05, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #15765 from taosdata/feature/3.0_mhli
refactor(sync): make leader life longer
上级
84e58ca2
32830ca9
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
61 addition
and
20 deletion
+61
-20
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+5
-4
source/libs/sync/inc/syncEnv.h
source/libs/sync/inc/syncEnv.h
+6
-6
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+40
-6
source/libs/sync/src/syncReplication.c
source/libs/sync/src/syncReplication.c
+2
-1
source/libs/sync/src/syncTimeout.c
source/libs/sync/src/syncTimeout.c
+4
-2
source/libs/sync/src/syncUtil.c
source/libs/sync/src/syncUtil.c
+4
-1
未找到文件。
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
8069066a
...
...
@@ -448,7 +448,8 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
}
}
vTrace
(
"vgId:%d, sync msg:%p is processed, type:%s code:0x%x"
,
pVnode
->
config
.
vgId
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
),
code
);
vTrace
(
"vgId:%d, sync msg:%p is processed, type:%s code:0x%x"
,
pVnode
->
config
.
vgId
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
),
code
);
syncNodeRelease
(
pSyncNode
);
if
(
code
!=
0
&&
terrno
==
0
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
...
...
@@ -629,8 +630,8 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void
static
int32_t
vnodeSnapshotStopWrite
(
struct
SSyncFSM
*
pFsm
,
void
*
pWriter
,
bool
isApply
,
SSnapshot
*
pSnapshot
)
{
#ifdef USE_TSDB_SNAPSHOT
SVnode
*
pVnode
=
pFsm
->
data
;
vInfo
(
"vgId:%d, stop write vnode snapshot, apply:%d, index:%"
PRId64
" term:%"
PRIu64
" config:%"
PRId64
,
pVnode
->
config
.
vgId
,
isApply
,
pSnapshot
->
lastApplyIndex
,
pSnapshot
->
lastApplyTerm
,
pSnapshot
->
lastConfigIndex
);
vInfo
(
"vgId:%d, stop write vnode snapshot, apply:%d, index:%"
PRId64
" term:%"
PRIu64
" config:%"
PRId64
,
p
Vnode
->
config
.
vgId
,
isApply
,
p
Snapshot
->
lastApplyIndex
,
pSnapshot
->
lastApplyTerm
,
pSnapshot
->
lastConfigIndex
);
int32_t
code
=
vnodeSnapWriterClose
(
pWriter
,
!
isApply
,
pSnapshot
);
vInfo
(
"vgId:%d, apply vnode snapshot finished, code:0x%x"
,
pVnode
->
config
.
vgId
,
code
);
...
...
@@ -707,7 +708,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
}
setPingTimerMS
(
pVnode
->
sync
,
5000
);
setElectTimerMS
(
pVnode
->
sync
,
13
00
);
setElectTimerMS
(
pVnode
->
sync
,
28
00
);
setHeartbeatTimerMS
(
pVnode
->
sync
,
900
);
return
0
;
}
...
...
source/libs/sync/inc/syncEnv.h
浏览文件 @
8069066a
...
...
@@ -28,13 +28,13 @@ extern "C" {
#include "trpc.h"
#include "ttimer.h"
#define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000
#define PING_TIMER_MS 5000
#define ELECT_TIMER_MS_MIN
13
00
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
#define TIMER_MAX_MS
0x7FFFFFFF
#define ENV_TICK_TIMER_MS
1000
#define PING_TIMER_MS
5000
#define ELECT_TIMER_MS_MIN
50
00
#define ELECT_TIMER_MS_MAX
(ELECT_TIMER_MS_MIN * 2)
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
#define HEARTBEAT_TIMER_MS 900
#define HEARTBEAT_TIMER_MS
900
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
8069066a
...
...
@@ -730,7 +730,8 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIs
for
(
int
i
=
0
;
i
<
arrSize
;
++
i
)
{
do
{
char
eventLog
[
128
];
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"propose message, type:%s batch:%d"
,
TMSG_INFO
(
pMsgPArr
[
i
]
->
msgType
),
arrSize
);
snprintf
(
eventLog
,
sizeof
(
eventLog
),
"propose message, type:%s batch:%d"
,
TMSG_INFO
(
pMsgPArr
[
i
]
->
msgType
),
arrSize
);
syncNodeEventLog
(
pSyncNode
,
eventLog
);
}
while
(
0
);
...
...
@@ -834,7 +835,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
rpcFreeCont
(
rpcMsg
.
pCont
);
syncRespMgrDel
(
pSyncNode
->
pSyncRespMgr
,
seqNum
);
ret
=
1
;
sDebug
(
"vgId:%d, sync optimize index:%"
PRId64
", type:%s"
,
pSyncNode
->
vgId
,
retIndex
,
TMSG_INFO
(
pMsg
->
msgType
));
sDebug
(
"vgId:%d, sync optimize index:%"
PRId64
", type:%s"
,
pSyncNode
->
vgId
,
retIndex
,
TMSG_INFO
(
pMsg
->
msgType
));
}
else
{
ret
=
-
1
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
...
...
@@ -1114,7 +1116,7 @@ void syncNodeStart(SSyncNode* pSyncNode) {
}
int32_t
ret
=
0
;
ret
=
syncNodeStartPingTimer
(
pSyncNode
);
//
ret = syncNodeStartPingTimer(pSyncNode);
ASSERT
(
ret
==
0
);
}
...
...
@@ -1250,6 +1252,13 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
taosTmrReset
(
pSyncNode
->
FpElectTimerCB
,
pSyncNode
->
electTimerMS
,
pSyncNode
,
gSyncEnv
->
pTimerManager
,
&
pSyncNode
->
pElectTimer
);
atomic_store_64
(
&
pSyncNode
->
electTimerLogicClock
,
pSyncNode
->
electTimerLogicClockUser
);
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"elect timer reset, ms:%d"
,
ms
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
while
(
0
);
}
else
{
sError
(
"vgId:%d, start elect timer error, sync env is stop"
,
pSyncNode
->
vgId
);
}
...
...
@@ -1281,6 +1290,14 @@ int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
electMS
=
syncUtilElectRandomMS
(
pSyncNode
->
electBaseLine
,
2
*
pSyncNode
->
electBaseLine
);
}
ret
=
syncNodeRestartElectTimer
(
pSyncNode
,
electMS
);
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"reset elect timer, min:%d, max:%d, ms:%d"
,
pSyncNode
->
electBaseLine
,
2
*
pSyncNode
->
electBaseLine
,
electMS
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
while
(
0
);
return
ret
;
}
...
...
@@ -1293,6 +1310,13 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
}
else
{
sError
(
"vgId:%d, start heartbeat timer error, sync env is stop"
,
pSyncNode
->
vgId
);
}
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"start heartbeat timer, ms:%d"
,
pSyncNode
->
heartbeatTimerMS
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
while
(
0
);
return
ret
;
}
...
...
@@ -1304,6 +1328,13 @@ int32_t syncNodeStartNowHeartbeatTimer(SSyncNode* pSyncNode) {
}
else
{
sError
(
"vgId:%d, start heartbeat timer error, sync env is stop"
,
pSyncNode
->
vgId
);
}
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"start heartbeat timer, ms:%d"
,
1
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
while
(
0
);
return
ret
;
}
...
...
@@ -1312,6 +1343,8 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
atomic_add_fetch_64
(
&
pSyncNode
->
heartbeatTimerLogicClockUser
,
1
);
taosTmrStop
(
pSyncNode
->
pHeartbeatTimer
);
pSyncNode
->
pHeartbeatTimer
=
NULL
;
sTrace
(
"vgId:%d, stop heartbeat timer"
,
pSyncNode
->
vgId
);
return
ret
;
}
...
...
@@ -1559,12 +1592,13 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
", sby:%d, "
"stgy:%d, bch:%d, "
"r-num:%d, "
"lcfg:%"
PRId64
", chging:%d, rsto:%d, %s"
,
"lcfg:%"
PRId64
", chging:%d, rsto:%d,
elt:%"
PRId64
", hb:%"
PRId64
",
%s"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
pRaftCfg
->
snapshotStrategy
,
pSyncNode
->
pRaftCfg
->
batchSize
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
pSyncNode
->
changing
,
pSyncNode
->
restoreFinish
,
printStr
);
pSyncNode
->
restoreFinish
,
pSyncNode
->
electTimerLogicClockUser
,
pSyncNode
->
heartbeatTimerLogicClockUser
,
printStr
);
}
else
{
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"%s"
,
str
);
}
...
...
@@ -1894,7 +1928,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
// Raft 3.6.2 Committing entries from previous terms
syncNodeAppendNoop
(
pSyncNode
);
#if 0 // simon
#if 0
// simon
syncNodeReplicate(pSyncNode);
#endif
syncMaybeAdvanceCommitIndex
(
pSyncNode
);
...
...
source/libs/sync/src/syncReplication.c
浏览文件 @
8069066a
...
...
@@ -141,7 +141,8 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
", match-index:%d, raftid:%"
PRId64
,
pSyncNode
->
vgId
,
nextIndex
,
newNextIndex
,
SYNC_INDEX_INVALID
,
pDestId
->
addr
);
syncNodeRestartNowHeartbeatTimer
(
pSyncNode
);
// syncNodeRestartNowHeartbeatTimer(pSyncNode);
syncNodeStartNowHeartbeatTimer
(
pSyncNode
);
return
-
1
;
}
...
...
source/libs/sync/src/syncTimeout.c
浏览文件 @
8069066a
...
...
@@ -48,14 +48,16 @@ int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
}
else
if
(
pMsg
->
timeoutType
==
SYNC_TIMEOUT_ELECTION
)
{
if
(
atomic_load_64
(
&
ths
->
electTimerLogicClockUser
)
<=
pMsg
->
logicClock
)
{
++
(
ths
->
electTimerCounter
);
sInfo
(
"vgId:%d, sync timeout, type:election count:%d"
,
ths
->
vgId
,
ths
->
electTimerCounter
);
sInfo
(
"vgId:%d, sync timeout, type:election count:%d, electTimerLogicClockUser:%ld"
,
ths
->
vgId
,
ths
->
electTimerCounter
,
ths
->
electTimerLogicClockUser
);
syncNodeElect
(
ths
);
}
}
else
if
(
pMsg
->
timeoutType
==
SYNC_TIMEOUT_HEARTBEAT
)
{
if
(
atomic_load_64
(
&
ths
->
heartbeatTimerLogicClockUser
)
<=
pMsg
->
logicClock
)
{
++
(
ths
->
heartbeatTimerCounter
);
sInfo
(
"vgId:%d, sync timeout, type:replicate count:%d"
,
ths
->
vgId
,
ths
->
heartbeatTimerCounter
);
sInfo
(
"vgId:%d, sync timeout, type:replicate count:%d, heartbeatTimerLogicClockUser:%ld"
,
ths
->
vgId
,
ths
->
heartbeatTimerCounter
,
ths
->
heartbeatTimerLogicClockUser
);
syncNodeReplicate
(
ths
);
}
}
else
{
...
...
source/libs/sync/src/syncUtil.c
浏览文件 @
8069066a
...
...
@@ -125,7 +125,10 @@ int32_t syncUtilRand(int32_t max) { return taosRand() % max; }
int32_t
syncUtilElectRandomMS
(
int32_t
min
,
int32_t
max
)
{
ASSERT
(
min
>
0
&&
max
>
0
&&
max
>=
min
);
return
min
+
syncUtilRand
(
max
-
min
);
int32_t
rdm
=
min
+
syncUtilRand
(
max
-
min
);
// sDebug("random min:%d, max:%d, rdm:%d", min, max, rdm);
return
rdm
;
}
int32_t
syncUtilQuorum
(
int32_t
replicaNum
)
{
return
replicaNum
/
2
+
1
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录