Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b08cdf2f
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b08cdf2f
编写于
3月 16, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
sync refactor
上级
acfe73ed
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
34 addition
and
3 deletion
+34
-3
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+20
-2
source/libs/sync/src/syncRaftStore.c
source/libs/sync/src/syncRaftStore.c
+1
-1
source/libs/sync/src/syncRequestVoteReply.c
source/libs/sync/src/syncRequestVoteReply.c
+13
-0
未找到文件。
source/libs/sync/src/syncMain.c
浏览文件 @
b08cdf2f
...
@@ -510,15 +510,17 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
...
@@ -510,15 +510,17 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
}
}
void
syncNodeBecomeFollower
(
SSyncNode
*
pSyncNode
)
{
void
syncNodeBecomeFollower
(
SSyncNode
*
pSyncNode
)
{
// maybe clear leader cache
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
pSyncNode
->
leaderCache
=
EMPTY_RAFT_ID
;
pSyncNode
->
leaderCache
=
EMPTY_RAFT_ID
;
}
}
// state change
pSyncNode
->
state
=
TAOS_SYNC_STATE_FOLLOWER
;
pSyncNode
->
state
=
TAOS_SYNC_STATE_FOLLOWER
;
syncNodeStopHeartbeatTimer
(
pSyncNode
);
syncNodeStopHeartbeatTimer
(
pSyncNode
);
int32_t
electMS
=
syncUtilElectRandomMS
();
// reset elect timer
syncNodeRes
tartElectTimer
(
pSyncNode
,
electMS
);
syncNodeRes
etElectTimer
(
pSyncNode
);
}
}
// TLA+ Spec
// TLA+ Spec
...
@@ -540,19 +542,31 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
...
@@ -540,19 +542,31 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
//
void
syncNodeBecomeLeader
(
SSyncNode
*
pSyncNode
)
{
void
syncNodeBecomeLeader
(
SSyncNode
*
pSyncNode
)
{
// state change
pSyncNode
->
state
=
TAOS_SYNC_STATE_LEADER
;
pSyncNode
->
state
=
TAOS_SYNC_STATE_LEADER
;
// set leader cache
pSyncNode
->
leaderCache
=
pSyncNode
->
myRaftId
;
pSyncNode
->
leaderCache
=
pSyncNode
->
myRaftId
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
pNextIndex
->
replicaNum
;
++
i
)
{
for
(
int
i
=
0
;
i
<
pSyncNode
->
pNextIndex
->
replicaNum
;
++
i
)
{
// maybe overwrite myself, no harm
// just do it!
pSyncNode
->
pNextIndex
->
index
[
i
]
=
pSyncNode
->
pLogStore
->
getLastIndex
(
pSyncNode
->
pLogStore
)
+
1
;
pSyncNode
->
pNextIndex
->
index
[
i
]
=
pSyncNode
->
pLogStore
->
getLastIndex
(
pSyncNode
->
pLogStore
)
+
1
;
}
}
for
(
int
i
=
0
;
i
<
pSyncNode
->
pMatchIndex
->
replicaNum
;
++
i
)
{
for
(
int
i
=
0
;
i
<
pSyncNode
->
pMatchIndex
->
replicaNum
;
++
i
)
{
// maybe overwrite myself, no harm
// just do it!
pSyncNode
->
pMatchIndex
->
index
[
i
]
=
SYNC_INDEX_INVALID
;
pSyncNode
->
pMatchIndex
->
index
[
i
]
=
SYNC_INDEX_INVALID
;
}
}
// stop elect timer
syncNodeStopElectTimer
(
pSyncNode
);
syncNodeStopElectTimer
(
pSyncNode
);
// start heartbeat timer
syncNodeStartHeartbeatTimer
(
pSyncNode
);
syncNodeStartHeartbeatTimer
(
pSyncNode
);
// start replicate right now!
syncNodeReplicate
(
pSyncNode
);
syncNodeReplicate
(
pSyncNode
);
}
}
...
@@ -578,6 +592,9 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
...
@@ -578,6 +592,9 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
}
}
// raft vote --------------
// raft vote --------------
// just called by syncNodeVoteForSelf
// need assert
void
syncNodeVoteForTerm
(
SSyncNode
*
pSyncNode
,
SyncTerm
term
,
SRaftId
*
pRaftId
)
{
void
syncNodeVoteForTerm
(
SSyncNode
*
pSyncNode
,
SyncTerm
term
,
SRaftId
*
pRaftId
)
{
assert
(
term
==
pSyncNode
->
pRaftStore
->
currentTerm
);
assert
(
term
==
pSyncNode
->
pRaftStore
->
currentTerm
);
assert
(
!
raftStoreHasVoted
(
pSyncNode
->
pRaftStore
));
assert
(
!
raftStoreHasVoted
(
pSyncNode
->
pRaftStore
));
...
@@ -585,6 +602,7 @@ void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId)
...
@@ -585,6 +602,7 @@ void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId)
raftStoreVote
(
pSyncNode
->
pRaftStore
,
pRaftId
);
raftStoreVote
(
pSyncNode
->
pRaftStore
,
pRaftId
);
}
}
// simulate get vote from outside
void
syncNodeVoteForSelf
(
SSyncNode
*
pSyncNode
)
{
void
syncNodeVoteForSelf
(
SSyncNode
*
pSyncNode
)
{
syncNodeVoteForTerm
(
pSyncNode
,
pSyncNode
->
pRaftStore
->
currentTerm
,
&
(
pSyncNode
->
myRaftId
));
syncNodeVoteForTerm
(
pSyncNode
,
pSyncNode
->
pRaftStore
->
currentTerm
,
&
(
pSyncNode
->
myRaftId
));
...
...
source/libs/sync/src/syncRaftStore.c
浏览文件 @
b08cdf2f
...
@@ -216,7 +216,7 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) {
...
@@ -216,7 +216,7 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) {
char
*
raftStore2Str
(
SRaftStore
*
pRaftStore
)
{
char
*
raftStore2Str
(
SRaftStore
*
pRaftStore
)
{
cJSON
*
pJson
=
raftStore2Json
(
pRaftStore
);
cJSON
*
pJson
=
raftStore2Json
(
pRaftStore
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
return
serialized
;
}
}
...
...
source/libs/sync/src/syncRequestVoteReply.c
浏览文件 @
b08cdf2f
...
@@ -45,6 +45,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
...
@@ -45,6 +45,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
return
ret
;
return
ret
;
}
}
assert
(
!
(
pMsg
->
term
>
ths
->
pRaftStore
->
currentTerm
));
// no need this code, because if I receive reply.term, then I must have sent for that term.
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) {
// if (pMsg->term > ths->pRaftStore->currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term);
// syncNodeUpdateTerm(ths, pMsg->term);
...
@@ -52,17 +53,29 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
...
@@ -52,17 +53,29 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
assert
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
);
assert
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
);
// This tallies votes even when the current state is not Candidate,
// but they won't be looked at, so it doesn't matter.
if
(
ths
->
state
==
TAOS_SYNC_STATE_CANDIDATE
)
{
if
(
ths
->
state
==
TAOS_SYNC_STATE_CANDIDATE
)
{
votesRespondAdd
(
ths
->
pVotesRespond
,
pMsg
);
votesRespondAdd
(
ths
->
pVotesRespond
,
pMsg
);
if
(
pMsg
->
voteGranted
)
{
if
(
pMsg
->
voteGranted
)
{
// add vote
voteGrantedVote
(
ths
->
pVotesGranted
,
pMsg
);
voteGrantedVote
(
ths
->
pVotesGranted
,
pMsg
);
// maybe to leader
if
(
voteGrantedMajority
(
ths
->
pVotesGranted
))
{
if
(
voteGrantedMajority
(
ths
->
pVotesGranted
))
{
if
(
!
ths
->
pVotesGranted
->
toLeader
)
{
if
(
!
ths
->
pVotesGranted
->
toLeader
)
{
syncNodeCandidate2Leader
(
ths
);
syncNodeCandidate2Leader
(
ths
);
// prevent to leader again!
ths
->
pVotesGranted
->
toLeader
=
true
;
ths
->
pVotesGranted
->
toLeader
=
true
;
}
}
}
}
}
else
{
;
// do nothing
// UNCHANGED <<votesGranted, voterLog>>
}
}
}
}
return
ret
;
return
ret
;
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录