Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
573b0042
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
573b0042
编写于
3月 07, 2022
作者:
L
Li Minghao
提交者:
GitHub
3月 07, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #10588 from taosdata/feature/3.0_mhli
Feature/3.0 mhli
上级
dfb7ed06
b8aa420e
变更
14
隐藏空白更改
内联
并排
Showing
14 changed file
with
174 addition
and
16 deletion
+174
-16
source/libs/sync/inc/syncAppendEntries.h
source/libs/sync/inc/syncAppendEntries.h
+65
-0
source/libs/sync/inc/syncAppendEntriesReply.h
source/libs/sync/inc/syncAppendEntriesReply.h
+13
-0
source/libs/sync/inc/syncElection.h
source/libs/sync/inc/syncElection.h
+13
-2
source/libs/sync/inc/syncIO.h
source/libs/sync/inc/syncIO.h
+3
-3
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+3
-3
source/libs/sync/inc/syncReplication.h
source/libs/sync/inc/syncReplication.h
+25
-0
source/libs/sync/inc/syncRequestVote.h
source/libs/sync/inc/syncRequestVote.h
+22
-0
source/libs/sync/inc/syncRequestVoteReply.h
source/libs/sync/inc/syncRequestVoteReply.h
+17
-0
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+1
-0
source/libs/sync/src/syncElection.c
source/libs/sync/src/syncElection.c
+6
-5
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+3
-3
source/libs/sync/src/syncReplication.c
source/libs/sync/src/syncReplication.c
+1
-0
source/libs/sync/src/syncRequestVote.c
source/libs/sync/src/syncRequestVote.c
+1
-0
source/libs/sync/src/syncRequestVoteReply.c
source/libs/sync/src/syncRequestVoteReply.c
+1
-0
未找到文件。
source/libs/sync/inc/syncAppendEntries.h
浏览文件 @
573b0042
...
...
@@ -28,6 +28,71 @@ extern "C" {
#include "syncRaft.h"
#include "taosdef.h"
// TLA+ Spec
// HandleAppendEntriesRequest(i, j, m) ==
// LET logOk == \/ m.mprevLogIndex = 0
// \/ /\ m.mprevLogIndex > 0
// /\ m.mprevLogIndex <= Len(log[i])
// /\ m.mprevLogTerm = log[i][m.mprevLogIndex].term
// IN /\ m.mterm <= currentTerm[i]
// /\ \/ /\ \* reject request
// \/ m.mterm < currentTerm[i]
// \/ /\ m.mterm = currentTerm[i]
// /\ state[i] = Follower
// /\ \lnot logOk
// /\ Reply([mtype |-> AppendEntriesResponse,
// mterm |-> currentTerm[i],
// msuccess |-> FALSE,
// mmatchIndex |-> 0,
// msource |-> i,
// mdest |-> j],
// m)
// /\ UNCHANGED <<serverVars, logVars>>
// \/ \* return to follower state
// /\ m.mterm = currentTerm[i]
// /\ state[i] = Candidate
// /\ state' = [state EXCEPT ![i] = Follower]
// /\ UNCHANGED <<currentTerm, votedFor, logVars, messages>>
// \/ \* accept request
// /\ m.mterm = currentTerm[i]
// /\ state[i] = Follower
// /\ logOk
// /\ LET index == m.mprevLogIndex + 1
// IN \/ \* already done with request
// /\ \/ m.mentries = << >>
// \/ /\ m.mentries /= << >>
// /\ Len(log[i]) >= index
// /\ log[i][index].term = m.mentries[1].term
// \* This could make our commitIndex decrease (for
// \* example if we process an old, duplicated request),
// \* but that doesn't really affect anything.
// /\ commitIndex' = [commitIndex EXCEPT ![i] =
// m.mcommitIndex]
// /\ Reply([mtype |-> AppendEntriesResponse,
// mterm |-> currentTerm[i],
// msuccess |-> TRUE,
// mmatchIndex |-> m.mprevLogIndex +
// Len(m.mentries),
// msource |-> i,
// mdest |-> j],
// m)
// /\ UNCHANGED <<serverVars, log>>
// \/ \* conflict: remove 1 entry
// /\ m.mentries /= << >>
// /\ Len(log[i]) >= index
// /\ log[i][index].term /= m.mentries[1].term
// /\ LET new == [index2 \in 1..(Len(log[i]) - 1) |->
// log[i][index2]]
// IN log' = [log EXCEPT ![i] = new]
// /\ UNCHANGED <<serverVars, commitIndex, messages>>
// \/ \* no conflict: append entry
// /\ m.mentries /= << >>
// /\ Len(log[i]) = m.mprevLogIndex
// /\ log' = [log EXCEPT ![i] =
// Append(log[i], m.mentries[1])]
// /\ UNCHANGED <<serverVars, commitIndex, messages>>
// /\ UNCHANGED <<candidateVars, leaderVars>>
//
int32_t
syncNodeOnAppendEntriesCb
(
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
);
#ifdef __cplusplus
...
...
source/libs/sync/inc/syncAppendEntriesReply.h
浏览文件 @
573b0042
...
...
@@ -28,6 +28,19 @@ extern "C" {
#include "syncRaft.h"
#include "taosdef.h"
// TLA+ Spec
// HandleAppendEntriesResponse(i, j, m) ==
// /\ m.mterm = currentTerm[i]
// /\ \/ /\ m.msuccess \* successful
// /\ nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
// /\ matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
// \/ /\ \lnot m.msuccess \* not successful
// /\ nextIndex' = [nextIndex EXCEPT ![i][j] =
// Max({nextIndex[i][j] - 1, 1})]
// /\ UNCHANGED <<matchIndex>>
// /\ Discard(m)
// /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
//
int32_t
syncNodeOnAppendEntriesReplyCb
(
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
);
#ifdef __cplusplus
...
...
source/libs/sync/inc/syncElection.h
浏览文件 @
573b0042
...
...
@@ -26,10 +26,21 @@ extern "C" {
#include "syncInt.h"
#include "taosdef.h"
int32_t
syncNodeElect
(
SSyncNode
*
pSyncNode
);
// TLA+ Spec
// RequestVote(i, j) ==
// /\ state[i] = Candidate
// /\ j \notin votesResponded[i]
// /\ Send([mtype |-> RequestVoteRequest,
// mterm |-> currentTerm[i],
// mlastLogTerm |-> LastTerm(log[i]),
// mlastLogIndex |-> Len(log[i]),
// msource |-> i,
// mdest |-> j])
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
//
int32_t
syncNodeRequestVotePeers
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeElect
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeRequestVote
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
const
SyncRequestVote
*
pMsg
);
#ifdef __cplusplus
...
...
source/libs/sync/inc/syncIO.h
浏览文件 @
573b0042
...
...
@@ -31,11 +31,11 @@ extern "C" {
typedef
struct
SSyncIO
{
STaosQueue
*
pMsgQ
;
STaosQset
*
pQset
;
STaosQset
*
pQset
;
pthread_t
consumerTid
;
void
*
serverRpc
;
void
*
clientRpc
;
void
*
serverRpc
;
void
*
clientRpc
;
SEpSet
myAddr
;
void
*
ioTimerTickQ
;
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
573b0042
...
...
@@ -192,9 +192,9 @@ void syncNodeClose(SSyncNode* pSyncNode);
int32_t
syncNodeSendMsgById
(
const
SRaftId
*
destRaftId
,
SSyncNode
*
pSyncNode
,
SRpcMsg
*
pMsg
);
int32_t
syncNodeSendMsgByInfo
(
const
SNodeInfo
*
nodeInfo
,
SSyncNode
*
pSyncNode
,
SRpcMsg
*
pMsg
);
int32_t
syncNodePing
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
SyncPing
*
pMsg
);
void
syncNodePingAll
(
SSyncNode
*
pSyncNode
);
void
syncNodePingPeers
(
SSyncNode
*
pSyncNode
);
void
syncNodePingSelf
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodePingAll
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodePingPeers
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodePingSelf
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeStartPingTimer
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeStopPingTimer
(
SSyncNode
*
pSyncNode
);
...
...
source/libs/sync/inc/syncReplication.h
浏览文件 @
573b0042
...
...
@@ -26,6 +26,31 @@ extern "C" {
#include "syncInt.h"
#include "taosdef.h"
// TLA+ Spec
// AppendEntries(i, j) ==
// /\ i /= j
// /\ state[i] = Leader
// /\ LET prevLogIndex == nextIndex[i][j] - 1
// prevLogTerm == IF prevLogIndex > 0 THEN
// log[i][prevLogIndex].term
// ELSE
// 0
// \* Send up to 1 entry, constrained by the end of the log.
// lastEntry == Min({Len(log[i]), nextIndex[i][j]})
// entries == SubSeq(log[i], nextIndex[i][j], lastEntry)
// IN Send([mtype |-> AppendEntriesRequest,
// mterm |-> currentTerm[i],
// mprevLogIndex |-> prevLogIndex,
// mprevLogTerm |-> prevLogTerm,
// mentries |-> entries,
// \* mlog is used as a history variable for the proof.
// \* It would not exist in a real implementation.
// mlog |-> log[i],
// mcommitIndex |-> Min({commitIndex[i], lastEntry}),
// msource |-> i,
// mdest |-> j])
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
//
int32_t
syncNodeAppendEntriesPeers
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeAppendEntries
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
const
SyncAppendEntries
*
pMsg
);
...
...
source/libs/sync/inc/syncRequestVote.h
浏览文件 @
573b0042
...
...
@@ -28,6 +28,28 @@ extern "C" {
#include "syncRaft.h"
#include "taosdef.h"
// TLA+ Spec
// HandleRequestVoteRequest(i, j, m) ==
// LET logOk == \/ m.mlastLogTerm > LastTerm(log[i])
// \/ /\ m.mlastLogTerm = LastTerm(log[i])
// /\ m.mlastLogIndex >= Len(log[i])
// grant == /\ m.mterm = currentTerm[i]
// /\ logOk
// /\ votedFor[i] \in {Nil, j}
// IN /\ m.mterm <= currentTerm[i]
// /\ \/ grant /\ votedFor' = [votedFor EXCEPT ![i] = j]
// \/ ~grant /\ UNCHANGED votedFor
// /\ Reply([mtype |-> RequestVoteResponse,
// mterm |-> currentTerm[i],
// mvoteGranted |-> grant,
// \* mlog is used just for the `elections' history variable for
// \* the proof. It would not exist in a real implementation.
// mlog |-> log[i],
// msource |-> i,
// mdest |-> j],
// m)
// /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>>
//
int32_t
syncNodeOnRequestVoteCb
(
SSyncNode
*
ths
,
SyncRequestVote
*
pMsg
);
#ifdef __cplusplus
...
...
source/libs/sync/inc/syncRequestVoteReply.h
浏览文件 @
573b0042
...
...
@@ -28,6 +28,23 @@ extern "C" {
#include "syncRaft.h"
#include "taosdef.h"
// TLA+ Spec
// HandleRequestVoteResponse(i, j, m) ==
// \* This tallies votes even when the current state is not Candidate, but
// \* they won't be looked at, so it doesn't matter.
// /\ m.mterm = currentTerm[i]
// /\ votesResponded' = [votesResponded EXCEPT ![i] =
// votesResponded[i] \cup {j}]
// /\ \/ /\ m.mvoteGranted
// /\ votesGranted' = [votesGranted EXCEPT ![i] =
// votesGranted[i] \cup {j}]
// /\ voterLog' = [voterLog EXCEPT ![i] =
// voterLog[i] @@ (j :> m.mlog)]
// \/ /\ ~m.mvoteGranted
// /\ UNCHANGED <<votesGranted, voterLog>>
// /\ Discard(m)
// /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>>
//
int32_t
syncNodeOnRequestVoteReplyCb
(
SSyncNode
*
ths
,
SyncRequestVoteReply
*
pMsg
);
#ifdef __cplusplus
...
...
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
573b0042
...
...
@@ -27,4 +27,5 @@
// /\ UNCHANGED <<matchIndex>>
// /\ Discard(m)
// /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
//
int32_t
syncNodeOnAppendEntriesReplyCb
(
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
)
{}
source/libs/sync/src/syncElection.c
浏览文件 @
573b0042
...
...
@@ -16,11 +16,6 @@
#include "syncElection.h"
#include "syncMessage.h"
int32_t
syncNodeElect
(
SSyncNode
*
pSyncNode
)
{
// start election
syncNodeRequestVotePeers
(
pSyncNode
);
}
// TLA+ Spec
// RequestVote(i, j) ==
// /\ state[i] = Candidate
...
...
@@ -32,8 +27,14 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
// msource |-> i,
// mdest |-> j])
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
//
int32_t
syncNodeRequestVotePeers
(
SSyncNode
*
pSyncNode
)
{}
int32_t
syncNodeElect
(
SSyncNode
*
pSyncNode
)
{
// start election
syncNodeRequestVotePeers
(
pSyncNode
);
}
int32_t
syncNodeRequestVote
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
const
SyncRequestVote
*
pMsg
)
{
sTrace
(
"syncNodeRequestVote pSyncNode:%p "
,
pSyncNode
);
int32_t
ret
=
0
;
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
573b0042
...
...
@@ -177,7 +177,7 @@ int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing*
return
ret
;
}
void
syncNodePingAll
(
SSyncNode
*
pSyncNode
)
{
int32_t
syncNodePingAll
(
SSyncNode
*
pSyncNode
)
{
sTrace
(
"syncNodePingAll pSyncNode:%p "
,
pSyncNode
);
int32_t
ret
=
0
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
syncCfg
.
replicaNum
;
++
i
)
{
...
...
@@ -190,7 +190,7 @@ void syncNodePingAll(SSyncNode* pSyncNode) {
}
}
void
syncNodePingPeers
(
SSyncNode
*
pSyncNode
)
{
int32_t
syncNodePingPeers
(
SSyncNode
*
pSyncNode
)
{
int32_t
ret
=
0
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
SRaftId
destId
;
...
...
@@ -202,7 +202,7 @@ void syncNodePingPeers(SSyncNode* pSyncNode) {
}
}
void
syncNodePingSelf
(
SSyncNode
*
pSyncNode
)
{
int32_t
syncNodePingSelf
(
SSyncNode
*
pSyncNode
)
{
int32_t
ret
;
SyncPing
*
pMsg
=
syncPingBuild3
(
&
pSyncNode
->
raftId
,
&
pSyncNode
->
raftId
);
ret
=
syncNodePing
(
pSyncNode
,
&
pMsg
->
destId
,
pMsg
);
...
...
source/libs/sync/src/syncReplication.c
浏览文件 @
573b0042
...
...
@@ -40,6 +40,7 @@
// msource |-> i,
// mdest |-> j])
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
//
int32_t
syncNodeAppendEntriesPeers
(
SSyncNode
*
pSyncNode
)
{}
int32_t
syncNodeAppendEntries
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
const
SyncAppendEntries
*
pMsg
)
{
...
...
source/libs/sync/src/syncRequestVote.c
浏览文件 @
573b0042
...
...
@@ -36,4 +36,5 @@
// mdest |-> j],
// m)
// /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>>
//
int32_t
syncNodeOnRequestVoteCb
(
SSyncNode
*
ths
,
SyncRequestVote
*
pMsg
)
{}
source/libs/sync/src/syncRequestVoteReply.c
浏览文件 @
573b0042
...
...
@@ -31,4 +31,5 @@
// /\ UNCHANGED <<votesGranted, voterLog>>
// /\ Discard(m)
// /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>>
//
int32_t
syncNodeOnRequestVoteReplyCb
(
SSyncNode
*
ths
,
SyncRequestVoteReply
*
pMsg
)
{}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录