Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b05c65cd
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b05c65cd
编写于
12月 29, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refact: rename variables
上级
4b7ab0a8
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
41 addition
and
43 deletion
+41
-43
source/libs/sync/inc/syncVoteMgr.h
source/libs/sync/inc/syncVoteMgr.h
+6
-6
source/libs/sync/src/syncRespMgr.c
source/libs/sync/src/syncRespMgr.c
+7
-9
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+1
-1
source/libs/sync/src/syncVoteMgr.c
source/libs/sync/src/syncVoteMgr.c
+27
-27
未找到文件。
source/libs/sync/inc/syncVoteMgr.h
浏览文件 @
b05c65cd
...
...
@@ -30,12 +30,12 @@ typedef struct SVotesGranted {
SyncTerm
term
;
int32_t
quorum
;
bool
toLeader
;
SSyncNode
*
p
Sync
Node
;
SSyncNode
*
pNode
;
}
SVotesGranted
;
SVotesGranted
*
voteGrantedCreate
(
SSyncNode
*
p
Sync
Node
);
SVotesGranted
*
voteGrantedCreate
(
SSyncNode
*
pNode
);
void
voteGrantedDestroy
(
SVotesGranted
*
pVotesGranted
);
void
voteGrantedUpdate
(
SVotesGranted
*
pVotesGranted
,
SSyncNode
*
p
Sync
Node
);
void
voteGrantedUpdate
(
SVotesGranted
*
pVotesGranted
,
SSyncNode
*
pNode
);
bool
voteGrantedMajority
(
SVotesGranted
*
pVotesGranted
);
void
voteGrantedVote
(
SVotesGranted
*
pVotesGranted
,
SyncRequestVoteReply
*
pMsg
);
void
voteGrantedReset
(
SVotesGranted
*
pVotesGranted
,
SyncTerm
term
);
...
...
@@ -45,12 +45,12 @@ typedef struct SVotesRespond {
bool
isRespond
[
TSDB_MAX_REPLICA
];
int32_t
replicaNum
;
SyncTerm
term
;
SSyncNode
*
p
Sync
Node
;
SSyncNode
*
pNode
;
}
SVotesRespond
;
SVotesRespond
*
votesRespondCreate
(
SSyncNode
*
p
Sync
Node
);
SVotesRespond
*
votesRespondCreate
(
SSyncNode
*
pNode
);
void
votesRespondDestory
(
SVotesRespond
*
pVotesRespond
);
void
votesRespondUpdate
(
SVotesRespond
*
pVotesRespond
,
SSyncNode
*
p
Sync
Node
);
void
votesRespondUpdate
(
SVotesRespond
*
pVotesRespond
,
SSyncNode
*
pNode
);
bool
votesResponded
(
SVotesRespond
*
pVotesRespond
,
const
SRaftId
*
pRaftId
);
void
votesRespondAdd
(
SVotesRespond
*
pVotesRespond
,
const
SyncRequestVoteReply
*
pMsg
);
void
votesRespondReset
(
SVotesRespond
*
pVotesRespond
,
SyncTerm
term
);
...
...
source/libs/sync/src/syncRespMgr.c
浏览文件 @
b05c65cd
...
...
@@ -118,12 +118,12 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
SRespStub
*
pStub
=
(
SRespStub
*
)
taosHashIterate
(
pObj
->
pRespHash
,
NULL
);
int
cnt
=
0
;
int
sum
=
0
;
SSyncNode
*
p
Sync
Node
=
pObj
->
data
;
SSyncNode
*
pNode
=
pObj
->
data
;
SArray
*
delIndexArray
=
taosArrayInit
(
4
,
sizeof
(
uint64_t
));
if
(
delIndexArray
==
NULL
)
return
;
sDebug
(
"vgId:%d, resp manager begin clean by ttl"
,
p
Sync
Node
->
vgId
);
sDebug
(
"vgId:%d, resp manager begin clean by ttl"
,
pNode
->
vgId
);
while
(
pStub
)
{
size_t
len
;
void
*
key
=
taosHashGetKey
(
pStub
,
&
len
);
...
...
@@ -140,20 +140,18 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
.
lastConfigIndex
=
SYNC_INDEX_INVALID
,
.
isWeak
=
false
,
.
code
=
TSDB_CODE_SYN_TIMEOUT
,
.
state
=
p
Sync
Node
->
state
,
.
state
=
pNode
->
state
,
.
seqNum
=
*
pSeqNum
,
.
term
=
SYNC_TERM_INVALID
,
.
currentTerm
=
p
Sync
Node
->
pRaftStore
->
currentTerm
,
.
currentTerm
=
pNode
->
pRaftStore
->
currentTerm
,
.
flag
=
0
,
};
pStub
->
rpcMsg
.
pCont
=
NULL
;
pStub
->
rpcMsg
.
contLen
=
0
;
// TODO: and make rpcMsg body, call commit cb
// pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &pStub->rpcMsg, cbMeta);
SRpcMsg
rpcMsg
=
{.
info
=
pStub
->
rpcMsg
.
info
,
.
code
=
TSDB_CODE_SYN_TIMEOUT
};
sInfo
(
"vgId:%d, message handle:%p expired, type:%s ahandle:%p"
,
p
Sync
Node
->
vgId
,
rpcMsg
.
info
.
handle
,
sInfo
(
"vgId:%d, message handle:%p expired, type:%s ahandle:%p"
,
pNode
->
vgId
,
rpcMsg
.
info
.
handle
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
rpcMsg
.
info
.
ahandle
);
rpcSendResponse
(
&
rpcMsg
);
}
...
...
@@ -162,12 +160,12 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
}
int32_t
arraySize
=
taosArrayGetSize
(
delIndexArray
);
sDebug
(
"vgId:%d, resp manager end clean by ttl, sum:%d, cnt:%d, array-size:%d"
,
p
Sync
Node
->
vgId
,
sum
,
cnt
,
arraySize
);
sDebug
(
"vgId:%d, resp manager end clean by ttl, sum:%d, cnt:%d, array-size:%d"
,
pNode
->
vgId
,
sum
,
cnt
,
arraySize
);
for
(
int32_t
i
=
0
;
i
<
arraySize
;
++
i
)
{
uint64_t
*
pSeqNum
=
taosArrayGet
(
delIndexArray
,
i
);
taosHashRemove
(
pObj
->
pRespHash
,
pSeqNum
,
sizeof
(
uint64_t
));
sDebug
(
"vgId:%d, resp manager clean by ttl, seq:%"
PRId64
,
p
Sync
Node
->
vgId
,
*
pSeqNum
);
sDebug
(
"vgId:%d, resp manager clean by ttl, seq:%"
PRId64
,
pNode
->
vgId
,
*
pSeqNum
);
}
taosArrayDestroy
(
delIndexArray
);
}
...
...
source/libs/sync/src/syncSnapshot.c
浏览文件 @
b05c65cd
...
...
@@ -406,7 +406,7 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p
}
// just set start = false
// FpSnapshotStopWrite should not be called
, assert writer == NULL
// FpSnapshotStopWrite should not be called
void
snapshotReceiverStop
(
SSyncSnapshotReceiver
*
pReceiver
)
{
sRInfo
(
pReceiver
,
"snapshot receiver stop, not apply, writer:%p"
,
pReceiver
->
pWriter
);
...
...
source/libs/sync/src/syncVoteMgr.c
浏览文件 @
b05c65cd
...
...
@@ -23,21 +23,21 @@ static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) {
pVotesGranted
->
votes
=
0
;
}
SVotesGranted
*
voteGrantedCreate
(
SSyncNode
*
p
Sync
Node
)
{
SVotesGranted
*
voteGrantedCreate
(
SSyncNode
*
pNode
)
{
SVotesGranted
*
pVotesGranted
=
taosMemoryCalloc
(
1
,
sizeof
(
SVotesGranted
));
if
(
pVotesGranted
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pVotesGranted
->
replicas
=
&
(
pSyncNode
->
replicasId
)
;
pVotesGranted
->
replicaNum
=
p
Sync
Node
->
replicaNum
;
pVotesGranted
->
replicas
=
&
pNode
->
replicasId
;
pVotesGranted
->
replicaNum
=
pNode
->
replicaNum
;
voteGrantedClearVotes
(
pVotesGranted
);
pVotesGranted
->
term
=
0
;
pVotesGranted
->
quorum
=
p
Sync
Node
->
quorum
;
pVotesGranted
->
quorum
=
pNode
->
quorum
;
pVotesGranted
->
toLeader
=
false
;
pVotesGranted
->
p
SyncNode
=
pSync
Node
;
pVotesGranted
->
p
Node
=
p
Node
;
return
pVotesGranted
;
}
...
...
@@ -48,33 +48,33 @@ void voteGrantedDestroy(SVotesGranted *pVotesGranted) {
}
}
void
voteGrantedUpdate
(
SVotesGranted
*
pVotesGranted
,
SSyncNode
*
p
Sync
Node
)
{
pVotesGranted
->
replicas
=
&
(
pSyncNode
->
replicasId
)
;
pVotesGranted
->
replicaNum
=
p
Sync
Node
->
replicaNum
;
void
voteGrantedUpdate
(
SVotesGranted
*
pVotesGranted
,
SSyncNode
*
pNode
)
{
pVotesGranted
->
replicas
=
&
pNode
->
replicasId
;
pVotesGranted
->
replicaNum
=
pNode
->
replicaNum
;
voteGrantedClearVotes
(
pVotesGranted
);
pVotesGranted
->
term
=
0
;
pVotesGranted
->
quorum
=
p
Sync
Node
->
quorum
;
pVotesGranted
->
quorum
=
pNode
->
quorum
;
pVotesGranted
->
toLeader
=
false
;
pVotesGranted
->
p
SyncNode
=
pSync
Node
;
pVotesGranted
->
p
Node
=
p
Node
;
}
bool
voteGrantedMajority
(
SVotesGranted
*
pVotesGranted
)
{
return
pVotesGranted
->
votes
>=
pVotesGranted
->
quorum
;
}
void
voteGrantedVote
(
SVotesGranted
*
pVotesGranted
,
SyncRequestVoteReply
*
pMsg
)
{
if
(
!
pMsg
->
voteGranted
)
{
sNFatal
(
pVotesGranted
->
p
Sync
Node
,
"vote granted should be true"
);
sNFatal
(
pVotesGranted
->
pNode
,
"vote granted should be true"
);
return
;
}
if
(
pMsg
->
term
!=
pVotesGranted
->
term
)
{
sNTrace
(
pVotesGranted
->
p
SyncNode
,
"vote grant term:%"
PRId64
" not matched with msg term:%"
PRId64
,
p
VotesGranted
->
term
,
p
Msg
->
term
);
sNTrace
(
pVotesGranted
->
p
Node
,
"vote grant term:%"
PRId64
" not matched with msg term:%"
PRId64
,
pVotesGranted
->
term
,
pMsg
->
term
);
return
;
}
if
(
!
syncUtilSameId
(
&
pVotesGranted
->
p
Sync
Node
->
myRaftId
,
&
pMsg
->
destId
))
{
sNFatal
(
pVotesGranted
->
p
Sync
Node
,
"vote granted raftId not matched with msg"
);
if
(
!
syncUtilSameId
(
&
pVotesGranted
->
pNode
->
myRaftId
,
&
pMsg
->
destId
))
{
sNFatal
(
pVotesGranted
->
pNode
,
"vote granted raftId not matched with msg"
);
return
;
}
...
...
@@ -86,7 +86,7 @@ void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
}
}
if
((
j
==
-
1
)
||
!
(
j
>=
0
&&
j
<
pVotesGranted
->
replicaNum
))
{
sNFatal
(
pVotesGranted
->
p
Sync
Node
,
"invalid msg srcId, index:%d"
,
j
);
sNFatal
(
pVotesGranted
->
pNode
,
"invalid msg srcId, index:%d"
,
j
);
return
;
}
...
...
@@ -96,7 +96,7 @@ void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
}
if
(
pVotesGranted
->
votes
>
pVotesGranted
->
replicaNum
)
{
sNFatal
(
pVotesGranted
->
p
Sync
Node
,
"votes:%d not matched with replicaNum:%d"
,
pVotesGranted
->
votes
,
sNFatal
(
pVotesGranted
->
pNode
,
"votes:%d not matched with replicaNum:%d"
,
pVotesGranted
->
votes
,
pVotesGranted
->
replicaNum
);
return
;
}
...
...
@@ -108,17 +108,17 @@ void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) {
pVotesGranted
->
toLeader
=
false
;
}
SVotesRespond
*
votesRespondCreate
(
SSyncNode
*
p
Sync
Node
)
{
SVotesRespond
*
votesRespondCreate
(
SSyncNode
*
pNode
)
{
SVotesRespond
*
pVotesRespond
=
taosMemoryCalloc
(
1
,
sizeof
(
SVotesRespond
));
if
(
pVotesRespond
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
pVotesRespond
->
replicas
=
&
(
pSyncNode
->
replicasId
)
;
pVotesRespond
->
replicaNum
=
p
Sync
Node
->
replicaNum
;
pVotesRespond
->
replicas
=
&
pNode
->
replicasId
;
pVotesRespond
->
replicaNum
=
pNode
->
replicaNum
;
pVotesRespond
->
term
=
0
;
pVotesRespond
->
p
SyncNode
=
pSync
Node
;
pVotesRespond
->
p
Node
=
p
Node
;
return
pVotesRespond
;
}
...
...
@@ -129,11 +129,11 @@ void votesRespondDestory(SVotesRespond *pVotesRespond) {
}
}
void
votesRespondUpdate
(
SVotesRespond
*
pVotesRespond
,
SSyncNode
*
p
Sync
Node
)
{
pVotesRespond
->
replicas
=
&
(
pSyncNode
->
replicasId
)
;
pVotesRespond
->
replicaNum
=
p
Sync
Node
->
replicaNum
;
void
votesRespondUpdate
(
SVotesRespond
*
pVotesRespond
,
SSyncNode
*
pNode
)
{
pVotesRespond
->
replicas
=
&
pNode
->
replicasId
;
pVotesRespond
->
replicaNum
=
pNode
->
replicaNum
;
pVotesRespond
->
term
=
0
;
pVotesRespond
->
p
SyncNode
=
pSync
Node
;
pVotesRespond
->
p
Node
=
p
Node
;
}
bool
votesResponded
(
SVotesRespond
*
pVotesRespond
,
const
SRaftId
*
pRaftId
)
{
...
...
@@ -149,7 +149,7 @@ bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) {
void
votesRespondAdd
(
SVotesRespond
*
pVotesRespond
,
const
SyncRequestVoteReply
*
pMsg
)
{
if
(
pVotesRespond
->
term
!=
pMsg
->
term
)
{
sNTrace
(
pVotesRespond
->
p
Sync
Node
,
"vote respond add error"
);
sNTrace
(
pVotesRespond
->
pNode
,
"vote respond add error"
);
return
;
}
...
...
@@ -160,7 +160,7 @@ void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *p
}
}
sNFatal
(
pVotesRespond
->
p
Sync
Node
,
"votes respond not found"
);
sNFatal
(
pVotesRespond
->
pNode
,
"votes respond not found"
);
}
void
votesRespondReset
(
SVotesRespond
*
pVotesRespond
,
SyncTerm
term
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录