Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
797cde53
T
TDengine
项目概览
taosdata
/
TDengine
11 个月 前同步成功
通知
1179
Star
22014
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
797cde53
编写于
6月 10, 2022
作者:
L
Li Minghao
提交者:
GitHub
6月 10, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13692 from taosdata/feature/3.0_mhli
refactor(sync): add rpcMsg to reconfig callback
上级
b352f0d1
972fee7f
变更
19
隐藏空白更改
内联
并排
Showing
19 changed file
with
379 addition
and
192 deletion
+379
-192
include/libs/sync/sync.h
include/libs/sync/sync.h
+6
-3
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+9
-1
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+10
-1
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+2
-2
source/libs/sync/inc/syncRaftStore.h
source/libs/sync/inc/syncRaftStore.h
+2
-2
source/libs/sync/inc/syncSnapshot.h
source/libs/sync/inc/syncSnapshot.h
+11
-10
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+27
-11
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+29
-8
source/libs/sync/src/syncCommit.c
source/libs/sync/src/syncCommit.c
+33
-11
source/libs/sync/src/syncEnv.c
source/libs/sync/src/syncEnv.c
+1
-1
source/libs/sync/src/syncIndexMgr.c
source/libs/sync/src/syncIndexMgr.c
+1
-1
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+92
-72
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+4
-1
source/libs/sync/src/syncReplication.c
source/libs/sync/src/syncReplication.c
+1
-2
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+142
-60
source/libs/sync/test/syncConfigChangeSnapshotTest.cpp
source/libs/sync/test/syncConfigChangeSnapshotTest.cpp
+1
-1
source/libs/sync/test/syncConfigChangeTest.cpp
source/libs/sync/test/syncConfigChangeTest.cpp
+1
-1
source/libs/sync/test/syncSnapshotReceiverTest.cpp
source/libs/sync/test/syncSnapshotReceiverTest.cpp
+5
-1
source/libs/sync/test/syncTestTool.cpp
source/libs/sync/test/syncTestTool.cpp
+2
-3
未找到文件。
include/libs/sync/sync.h
浏览文件 @
797cde53
...
...
@@ -83,8 +83,10 @@ typedef struct SReConfigCbMeta {
SyncTerm
term
;
SyncTerm
currentTerm
;
SSyncCfg
oldCfg
;
SSyncCfg
newCfg
;
bool
isDrop
;
uint64_t
flag
;
uint64_t
seqNum
;
}
SReConfigCbMeta
;
typedef
struct
SSnapshot
{
...
...
@@ -106,7 +108,7 @@ typedef struct SSyncFSM {
void
(
*
FpRollBackCb
)(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
);
void
(
*
FpRestoreFinishCb
)(
struct
SSyncFSM
*
pFsm
);
void
(
*
FpReConfigCb
)(
struct
SSyncFSM
*
pFsm
,
SSyncCfg
newCf
g
,
SReConfigCbMeta
cbMeta
);
void
(
*
FpReConfigCb
)(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMs
g
,
SReConfigCbMeta
cbMeta
);
int32_t
(
*
FpGetSnapshot
)(
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
);
...
...
@@ -184,7 +186,6 @@ int64_t syncOpen(const SSyncInfo* pSyncInfo);
void
syncStart
(
int64_t
rid
);
void
syncStop
(
int64_t
rid
);
int32_t
syncSetStandby
(
int64_t
rid
);
int32_t
syncReconfig
(
int64_t
rid
,
const
SSyncCfg
*
pSyncCfg
);
ESyncState
syncGetMyRole
(
int64_t
rid
);
const
char
*
syncGetMyRoleStr
(
int64_t
rid
);
SyncTerm
syncGetMyTerm
(
int64_t
rid
);
...
...
@@ -194,8 +195,10 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
bool
syncEnvIsStart
();
const
char
*
syncStr
(
ESyncState
state
);
bool
syncIsRestoreFinish
(
int64_t
rid
);
int32_t
syncGetSnapshotMeta
(
int64_t
rid
,
struct
SSnapshotMeta
*
sMeta
);
int32_t
syncGetSnapshotMeta
(
int64_t
rid
,
struct
SSnapshotMeta
*
sMeta
);
int32_t
syncReconfig
(
int64_t
rid
,
const
SSyncCfg
*
pNewCfg
);
int32_t
syncReconfigRaw
(
int64_t
rid
,
const
SSyncCfg
*
pNewCfg
,
SRpcMsg
*
pRpcMsg
);
// to be moved to static
void
syncStartNormal
(
int64_t
rid
);
...
...
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
797cde53
...
...
@@ -96,10 +96,18 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
}
}
void
mndReConfig
(
struct
SSyncFSM
*
pFsm
,
SSyncCfg
newCf
g
,
SReConfigCbMeta
cbMeta
)
{
void
mndReConfig
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMs
g
,
SReConfigCbMeta
cbMeta
)
{
SMnode
*
pMnode
=
pFsm
->
data
;
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
#if 0
// send response
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pMnode->syncMgmt.sync, cbMeta.seqNum, &rpcMsg.info);
#endif
pMgmt
->
errCode
=
cbMeta
.
code
;
mInfo
(
"trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%"
PRId64
" term:%"
PRId64
,
pMgmt
->
transId
,
cbMeta
.
code
,
cbMeta
.
index
,
cbMeta
.
term
);
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
797cde53
...
...
@@ -180,10 +180,18 @@ static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
return
0
;
}
static
void
vnodeSyncReconfig
(
struct
SSyncFSM
*
pFsm
,
SSyncCfg
newCf
g
,
SReConfigCbMeta
cbMeta
)
{
static
void
vnodeSyncReconfig
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMs
g
,
SReConfigCbMeta
cbMeta
)
{
SVnode
*
pVnode
=
pFsm
->
data
;
vInfo
(
"vgId:%d, sync reconfig is confirmed"
,
TD_VID
(
pVnode
));
#if 0
// send response
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen, .conn.applyIndex = cbMeta.index};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
#endif
// todo rpc response here
// build rpc msg
// put into apply queue
...
...
@@ -213,6 +221,7 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
memcpy
(
rpcMsg
.
pCont
,
pMsg
->
pCont
,
pMsg
->
contLen
);
syncGetAndDelRespRpc
(
pVnode
->
sync
,
cbMeta
.
seqNum
,
&
rpcMsg
.
info
);
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
APPLY_QUEUE
,
&
rpcMsg
);
}
else
{
char
logBuf
[
256
]
=
{
0
};
snprintf
(
logBuf
,
sizeof
(
logBuf
),
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
797cde53
...
...
@@ -201,8 +201,8 @@ void syncNodeRelease(SSyncNode* pNode);
// raft state change --------------
void
syncNodeUpdateTerm
(
SSyncNode
*
pSyncNode
,
SyncTerm
term
);
void
syncNodeBecomeFollower
(
SSyncNode
*
pSyncNode
);
void
syncNodeBecomeLeader
(
SSyncNode
*
pSyncNode
);
void
syncNodeBecomeFollower
(
SSyncNode
*
pSyncNode
,
const
char
*
debugStr
);
void
syncNodeBecomeLeader
(
SSyncNode
*
pSyncNode
,
const
char
*
debugStr
);
void
syncNodeCandidate2Leader
(
SSyncNode
*
pSyncNode
);
void
syncNodeFollower2Candidate
(
SSyncNode
*
pSyncNode
);
...
...
source/libs/sync/inc/syncRaftStore.h
浏览文件 @
797cde53
...
...
@@ -49,8 +49,8 @@ void raftStoreClearVote(SRaftStore *pRaftStore);
void
raftStoreNextTerm
(
SRaftStore
*
pRaftStore
);
void
raftStoreSetTerm
(
SRaftStore
*
pRaftStore
,
SyncTerm
term
);
int32_t
raftStoreFromJson
(
SRaftStore
*
pRaftStore
,
cJSON
*
pJson
);
cJSON
*
raftStore2Json
(
SRaftStore
*
pRaftStore
);
char
*
raftStore2Str
(
SRaftStore
*
pRaftStore
);
cJSON
*
raftStore2Json
(
SRaftStore
*
pRaftStore
);
char
*
raftStore2Str
(
SRaftStore
*
pRaftStore
);
// for debug -------------------
void
raftStorePrint
(
SRaftStore
*
pObj
);
...
...
source/libs/sync/inc/syncSnapshot.h
浏览文件 @
797cde53
...
...
@@ -39,8 +39,8 @@ typedef struct SSyncSnapshotSender {
bool
start
;
int32_t
seq
;
int32_t
ack
;
void
*
pReader
;
void
*
pCurrentBlock
;
void
*
pReader
;
void
*
pCurrentBlock
;
int32_t
blockLen
;
SSnapshot
snapshot
;
int64_t
sendingMS
;
...
...
@@ -58,28 +58,29 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender);
void
snapshotSenderStop
(
SSyncSnapshotSender
*
pSender
);
int32_t
snapshotSend
(
SSyncSnapshotSender
*
pSender
);
int32_t
snapshotReSend
(
SSyncSnapshotSender
*
pSender
);
cJSON
*
snapshotSender2Json
(
SSyncSnapshotSender
*
pSender
);
char
*
snapshotSender2Str
(
SSyncSnapshotSender
*
pSender
);
cJSON
*
snapshotSender2Json
(
SSyncSnapshotSender
*
pSender
);
char
*
snapshotSender2Str
(
SSyncSnapshotSender
*
pSender
);
typedef
struct
SSyncSnapshotReceiver
{
bool
start
;
int32_t
ack
;
void
*
pWriter
;
void
*
pWriter
;
SyncTerm
term
;
SyncTerm
privateTerm
;
SSyncNode
*
pSyncNode
;
int32_t
replicaIndex
;
SRaftId
fromId
;
}
SSyncSnapshotReceiver
;
SSyncSnapshotReceiver
*
snapshotReceiverCreate
(
SSyncNode
*
pSyncNode
,
int32_t
replicaIndex
);
SSyncSnapshotReceiver
*
snapshotReceiverCreate
(
SSyncNode
*
pSyncNode
,
SRaftId
fromId
);
void
snapshotReceiverDestroy
(
SSyncSnapshotReceiver
*
pReceiver
);
void
snapshotReceiverStart
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncTerm
privateTerm
);
void
snapshotReceiverStart
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncTerm
privateTerm
,
SRaftId
fromId
);
bool
snapshotReceiverIsStart
(
SSyncSnapshotReceiver
*
pReceiver
);
void
snapshotReceiverStop
(
SSyncSnapshotReceiver
*
pReceiver
,
bool
apply
);
cJSON
*
snapshotReceiver2Json
(
SSyncSnapshotReceiver
*
pReceiver
);
char
*
snapshotReceiver2Str
(
SSyncSnapshotReceiver
*
pReceiver
);
cJSON
*
snapshotReceiver2Json
(
SSyncSnapshotReceiver
*
pReceiver
);
char
*
snapshotReceiver2Str
(
SSyncSnapshotReceiver
*
pReceiver
);
int32_t
syncNodeOnSnapshotSendCb
(
SSyncNode
*
ths
,
SyncSnapshotSend
*
pMsg
);
int32_t
syncNodeOnSnapshotRspCb
(
SSyncNode
*
ths
,
SyncSnapshotRsp
*
pMsg
);
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
797cde53
...
...
@@ -150,7 +150,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
"ths->state:%d, logOK:%d"
,
pMsg
->
term
,
ths
->
pRaftStore
->
currentTerm
,
ths
->
state
,
logOK
);
syncNodeBecomeFollower
(
ths
);
syncNodeBecomeFollower
(
ths
,
"from candidate by append entries"
);
// ret or reply?
return
ret
;
...
...
@@ -380,17 +380,19 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// change isStandBy to normal
if
(
!
isDrop
)
{
if
(
ths
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
syncNodeBecomeLeader
(
ths
);
syncNodeBecomeLeader
(
ths
,
"config change"
);
}
else
{
syncNodeBecomeFollower
(
ths
);
syncNodeBecomeFollower
(
ths
,
"config change"
);
}
}
char
*
sOld
=
syncCfg2Str
(
&
oldSyncCfg
);
char
*
sNew
=
syncCfg2Str
(
&
newSyncCfg
);
sInfo
(
"==config change== 0x11 old:%s new:%s isDrop:%d
\n
"
,
sOld
,
sNew
,
isDrop
);
taosMemoryFree
(
sOld
);
taosMemoryFree
(
sNew
);
if
(
gRaftDetailLog
)
{
char
*
sOld
=
syncCfg2Str
(
&
oldSyncCfg
);
char
*
sNew
=
syncCfg2Str
(
&
newSyncCfg
);
sInfo
(
"==config change== 0x11 old:%s new:%s isDrop:%d
\n
"
,
sOld
,
sNew
,
isDrop
);
taosMemoryFree
(
sOld
);
taosMemoryFree
(
sNew
);
}
}
// always call FpReConfigCb
...
...
@@ -399,10 +401,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
cbMeta
.
currentTerm
=
ths
->
pRaftStore
->
currentTerm
;
cbMeta
.
index
=
pEntry
->
index
;
cbMeta
.
term
=
pEntry
->
term
;
cbMeta
.
newCfg
=
newSyncCfg
;
cbMeta
.
oldCfg
=
oldSyncCfg
;
cbMeta
.
seqNum
=
pEntry
->
seqNum
;
cbMeta
.
flag
=
0x11
;
cbMeta
.
isDrop
=
isDrop
;
ths
->
pFsm
->
FpReConfigCb
(
ths
->
pFsm
,
newSyncCf
g
,
cbMeta
);
ths
->
pFsm
->
FpReConfigCb
(
ths
->
pFsm
,
&
rpcMs
g
,
cbMeta
);
}
}
...
...
@@ -469,7 +473,7 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
// delete confict entries
code
=
ths
->
pLogStore
->
syncLogTruncate
(
ths
->
pLogStore
,
delBegin
);
ASSERT
(
code
==
0
);
sInfo
(
"sync event
log truncate, from %ld to %ld"
,
delBegin
,
delEnd
);
sInfo
(
"sync event
vgId:%d log truncate, from %ld to %ld"
,
ths
->
vgId
,
delBegin
,
delEnd
);
logStoreSimpleLog2
(
"after syncNodeMakeLogSame"
,
ths
->
pLogStore
);
return
code
;
...
...
@@ -571,7 +575,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
if
(
condition
)
{
sTrace
(
"recv SyncAppendEntries, candidate to follower"
);
syncNodeBecomeFollower
(
ths
);
syncNodeBecomeFollower
(
ths
,
"from candidate by append entries"
);
// do not reply?
return
ret
;
}
...
...
@@ -742,6 +746,18 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
if
(
pMsg
->
commitIndex
>
ths
->
commitIndex
)
{
// has commit entry in local
if
(
pMsg
->
commitIndex
<=
ths
->
pLogStore
->
syncLogLastIndex
(
ths
->
pLogStore
))
{
// advance commit index to sanpshot first
SSnapshot
snapshot
;
ths
->
pFsm
->
FpGetSnapshot
(
ths
->
pFsm
,
&
snapshot
);
if
(
snapshot
.
lastApplyIndex
>=
0
&&
snapshot
.
lastApplyIndex
>
ths
->
commitIndex
)
{
SyncIndex
commitBegin
=
ths
->
commitIndex
;
SyncIndex
commitEnd
=
snapshot
.
lastApplyIndex
;
ths
->
commitIndex
=
snapshot
.
lastApplyIndex
;
sInfo
(
"sync event vgId:%d commit by snapshot from index:%ld to index:%ld, %s"
,
ths
->
vgId
,
commitBegin
,
commitEnd
,
syncUtilState2String
(
ths
->
state
));
}
SyncIndex
beginIndex
=
ths
->
commitIndex
+
1
;
SyncIndex
endIndex
=
pMsg
->
commitIndex
;
...
...
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
797cde53
...
...
@@ -121,7 +121,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
syncIndexMgrLog2
(
"recv SyncAppendEntriesReply, before pNextIndex:"
,
ths
->
pNextIndex
);
syncIndexMgrLog2
(
"recv SyncAppendEntriesReply, before pMatchIndex:"
,
ths
->
pMatchIndex
);
{
if
(
gRaftDetailLog
)
{
SSnapshot
snapshot
;
ths
->
pFsm
->
FpGetSnapshot
(
ths
->
pFsm
,
&
snapshot
);
sTrace
(
"recv SyncAppendEntriesReply, before snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu"
,
...
...
@@ -147,7 +147,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
if
(
pMsg
->
success
)
{
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
syncIndexMgrSetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
),
pMsg
->
matchIndex
+
1
);
sTrace
(
"update next match, index:%ld, success:%d"
,
pMsg
->
matchIndex
+
1
,
pMsg
->
success
);
if
(
gRaftDetailLog
)
{
sTrace
(
"update next match, index:%ld, success:%d"
,
pMsg
->
matchIndex
+
1
,
pMsg
->
success
);
}
// matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
syncIndexMgrSetIndex
(
ths
->
pMatchIndex
,
&
(
pMsg
->
srcId
),
pMsg
->
matchIndex
);
...
...
@@ -159,7 +162,9 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
}
else
{
SyncIndex
nextIndex
=
syncIndexMgrGetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
));
sTrace
(
"update next not match, begin, index:%ld, success:%d"
,
nextIndex
,
pMsg
->
success
);
if
(
gRaftDetailLog
)
{
sTrace
(
"update next index not match, begin, index:%ld, success:%d"
,
nextIndex
,
pMsg
->
success
);
}
// notice! int64, uint64
if
(
nextIndex
>
SYNC_INDEX_BEGIN
)
{
...
...
@@ -178,9 +183,23 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
pMsg
->
privateTerm
<
pSender
->
privateTerm
)
{
snapshotSenderStart
(
pSender
);
char
*
s
=
snapshotSender2Str
(
pSender
);
sInfo
(
"sync event snapshot send start sender first time, sender:%s"
,
s
);
taosMemoryFree
(
s
);
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
pSender
->
pSyncNode
->
replicasId
[
pSender
->
replicaIndex
].
addr
,
host
,
sizeof
(
host
),
&
port
);
if
(
gRaftDetailLog
)
{
char
*
s
=
snapshotSender2Str
(
pSender
);
sInfo
(
"sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu "
"sender:%s"
,
ths
->
vgId
,
host
,
port
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
s
);
taosMemoryFree
(
s
);
}
else
{
sInfo
(
"sync event vgId:%d snapshot send to %s:%d start sender first time, lastApplyIndex:%ld "
"lastApplyTerm:%lu"
,
ths
->
vgId
,
host
,
port
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
);
}
}
SyncIndex
sentryIndex
=
pSender
->
snapshot
.
lastApplyIndex
+
1
;
...
...
@@ -195,12 +214,14 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
}
syncIndexMgrSetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
),
nextIndex
);
sTrace
(
"update next not match, end, index:%ld, success:%d"
,
nextIndex
,
pMsg
->
success
);
if
(
gRaftDetailLog
)
{
sTrace
(
"update next index not match, end, index:%ld, success:%d"
,
nextIndex
,
pMsg
->
success
);
}
}
syncIndexMgrLog2
(
"recv SyncAppendEntriesReply, after pNextIndex:"
,
ths
->
pNextIndex
);
syncIndexMgrLog2
(
"recv SyncAppendEntriesReply, after pMatchIndex:"
,
ths
->
pMatchIndex
);
{
if
(
gRaftDetailLog
)
{
SSnapshot
snapshot
;
ths
->
pFsm
->
FpGetSnapshot
(
ths
->
pFsm
,
&
snapshot
);
sTrace
(
"recv SyncAppendEntriesReply, after snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu"
,
...
...
source/libs/sync/src/syncCommit.c
浏览文件 @
797cde53
...
...
@@ -48,13 +48,28 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
syncIndexMgrLog2
(
"==syncNodeMaybeAdvanceCommitIndex== pNextIndex"
,
pSyncNode
->
pNextIndex
);
syncIndexMgrLog2
(
"==syncNodeMaybeAdvanceCommitIndex== pMatchIndex"
,
pSyncNode
->
pMatchIndex
);
// advance commit index to sanpshot first
SSnapshot
snapshot
;
pSyncNode
->
pFsm
->
FpGetSnapshot
(
pSyncNode
->
pFsm
,
&
snapshot
);
if
(
snapshot
.
lastApplyIndex
>
0
&&
snapshot
.
lastApplyIndex
>
pSyncNode
->
commitIndex
)
{
SyncIndex
commitBegin
=
pSyncNode
->
commitIndex
;
SyncIndex
commitEnd
=
snapshot
.
lastApplyIndex
;
pSyncNode
->
commitIndex
=
snapshot
.
lastApplyIndex
;
sInfo
(
"sync event vgId:%d commit by snapshot from index:%ld to index:%ld, %s"
,
pSyncNode
->
vgId
,
pSyncNode
->
commitIndex
,
snapshot
.
lastApplyIndex
,
syncUtilState2String
(
pSyncNode
->
state
));
}
// update commit index
SyncIndex
newCommitIndex
=
pSyncNode
->
commitIndex
;
for
(
SyncIndex
index
=
pSyncNode
->
pLogStore
->
getLastIndex
(
pSyncNode
->
pLogStore
);
index
>
pSyncNode
->
commitIndex
;
--
index
)
{
for
(
SyncIndex
index
=
syncNodeGetLastIndex
(
pSyncNode
);
index
>
pSyncNode
->
commitIndex
;
--
index
)
{
bool
agree
=
syncAgree
(
pSyncNode
,
index
);
sTrace
(
"syncMaybeAdvanceCommitIndex syncAgree:%d, index:%ld, pSyncNode->commitIndex:%ld"
,
agree
,
index
,
pSyncNode
->
commitIndex
);
if
(
gRaftDetailLog
)
{
sTrace
(
"syncMaybeAdvanceCommitIndex syncAgree:%d, index:%ld, pSyncNode->commitIndex:%ld"
,
agree
,
index
,
pSyncNode
->
commitIndex
);
}
if
(
agree
)
{
// term
SSyncRaftEntry
*
pEntry
=
pSyncNode
->
pLogStore
->
getEntry
(
pSyncNode
->
pLogStore
,
index
);
...
...
@@ -64,16 +79,21 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
if
(
pEntry
->
term
==
pSyncNode
->
pRaftStore
->
currentTerm
)
{
// update commit index
newCommitIndex
=
index
;
sTrace
(
"syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%ld commit, pSyncNode->commitIndex:%ld"
,
newCommitIndex
,
pSyncNode
->
commitIndex
);
if
(
gRaftDetailLog
)
{
sTrace
(
"syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%ld commit, pSyncNode->commitIndex:%ld"
,
newCommitIndex
,
pSyncNode
->
commitIndex
);
}
syncEntryDestory
(
pEntry
);
break
;
}
else
{
sTrace
(
"syncMaybeAdvanceCommitIndex can not commit due to term not equal, pEntry->term:%lu, "
"pSyncNode->pRaftStore->currentTerm:%lu"
,
pEntry
->
term
,
pSyncNode
->
pRaftStore
->
currentTerm
);
if
(
gRaftDetailLog
)
{
sTrace
(
"syncMaybeAdvanceCommitIndex can not commit due to term not equal, pEntry->term:%lu, "
"pSyncNode->pRaftStore->currentTerm:%lu"
,
pEntry
->
term
,
pSyncNode
->
pRaftStore
->
currentTerm
);
}
}
syncEntryDestory
(
pEntry
);
...
...
@@ -84,7 +104,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SyncIndex
beginIndex
=
pSyncNode
->
commitIndex
+
1
;
SyncIndex
endIndex
=
newCommitIndex
;
sTrace
(
"syncMaybeAdvanceCommitIndex sync commit %ld"
,
newCommitIndex
);
if
(
gRaftDetailLog
)
{
sTrace
(
"syncMaybeAdvanceCommitIndex sync commit %ld"
,
newCommitIndex
);
}
// update commit index
pSyncNode
->
commitIndex
=
newCommitIndex
;
...
...
source/libs/sync/src/syncEnv.c
浏览文件 @
797cde53
...
...
@@ -40,7 +40,7 @@ int32_t syncEnvStart() {
// gSyncEnv = doSyncEnvStart(gSyncEnv);
gSyncEnv
=
doSyncEnvStart
();
assert
(
gSyncEnv
!=
NULL
);
sTrace
(
"sync
EnvStart ok!
"
);
sTrace
(
"sync
env start ok
"
);
return
ret
;
}
...
...
source/libs/sync/src/syncIndexMgr.c
浏览文件 @
797cde53
...
...
@@ -119,7 +119,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
char
*
syncIndexMgr2Str
(
SSyncIndexMgr
*
pSyncIndexMgr
)
{
cJSON
*
pJson
=
syncIndexMgr2Json
(
pSyncIndexMgr
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
797cde53
...
...
@@ -87,7 +87,9 @@ int64_t syncOpen(const SSyncInfo* pSyncInfo) {
SSyncNode
*
pSyncNode
=
syncNodeOpen
(
pSyncInfo
);
assert
(
pSyncNode
!=
NULL
);
syncNodeLog2
(
"syncNodeOpen open success"
,
pSyncNode
);
if
(
gRaftDetailLog
)
{
syncNodeLog2
(
"syncNodeOpen open success"
,
pSyncNode
);
}
pSyncNode
->
rid
=
taosAddRef
(
tsNodeRefId
,
pSyncNode
);
if
(
pSyncNode
->
rid
<
0
)
{
...
...
@@ -173,20 +175,37 @@ int32_t syncSetStandby(int64_t rid) {
int32_t
syncReconfig
(
int64_t
rid
,
const
SSyncCfg
*
pSyncCfg
)
{
int32_t
ret
=
0
;
char
*
configChange
=
syncCfg2Str
((
SSyncCfg
*
)
pSyncCfg
);
sInfo
(
"==syncReconfig== newconfig:%s"
,
configChange
);
char
*
newconfig
=
syncCfg2Str
((
SSyncCfg
*
)
pSyncCfg
);
if
(
gRaftDetailLog
)
{
sInfo
(
"==syncReconfig== newconfig:%s"
,
newconfig
);
}
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
msgType
=
TDMT_SYNC_CONFIG_CHANGE
;
rpcMsg
.
info
.
noResp
=
1
;
rpcMsg
.
contLen
=
strlen
(
configChange
)
+
1
;
rpcMsg
.
contLen
=
strlen
(
newconfig
)
+
1
;
rpcMsg
.
pCont
=
rpcMallocCont
(
rpcMsg
.
contLen
);
snprintf
(
rpcMsg
.
pCont
,
rpcMsg
.
contLen
,
"%s"
,
configChange
);
taosMemoryFree
(
configChange
);
snprintf
(
rpcMsg
.
pCont
,
rpcMsg
.
contLen
,
"%s"
,
newconfig
);
taosMemoryFree
(
newconfig
);
ret
=
syncPropose
(
rid
,
&
rpcMsg
,
false
);
return
ret
;
}
int32_t
syncReconfigRaw
(
int64_t
rid
,
const
SSyncCfg
*
pNewCfg
,
SRpcMsg
*
pRpcMsg
)
{
int32_t
ret
=
0
;
char
*
newconfig
=
syncCfg2Str
((
SSyncCfg
*
)
pNewCfg
);
pRpcMsg
->
msgType
=
TDMT_SYNC_CONFIG_CHANGE
;
pRpcMsg
->
info
.
noResp
=
1
;
pRpcMsg
->
contLen
=
strlen
(
newconfig
)
+
1
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
snprintf
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
"%s"
,
newconfig
);
taosMemoryFree
(
newconfig
);
return
ret
;
}
int32_t
syncForwardToPeer
(
int64_t
rid
,
const
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
int32_t
ret
=
syncPropose
(
rid
,
pMsg
,
isWeak
);
return
ret
;
...
...
@@ -374,13 +393,14 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
}
int32_t
syncPropose
(
int64_t
rid
,
const
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
sTrace
(
"syncPropose msgType:%d "
,
pMsg
->
msgType
)
;
int32_t
ret
=
TAOS_SYNC_PROPOSE_SUCCESS
;
int32_t
ret
=
TAOS_SYNC_PROPOSE_SUCCESS
;
SSyncNode
*
pSyncNode
=
taosAcquireRef
(
tsNodeRefId
,
rid
);
if
(
pSyncNode
==
NULL
)
return
TAOS_SYNC_PROPOSE_OTHER_ERROR
;
if
(
pSyncNode
==
NULL
)
{
return
TAOS_SYNC_PROPOSE_OTHER_ERROR
;
}
assert
(
rid
==
pSyncNode
->
rid
);
sTrace
(
"sync event vgId:%d propose msgType:%s"
,
pSyncNode
->
vgId
,
TMSG_INFO
(
pMsg
->
msgType
));
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
SRespStub
stub
;
...
...
@@ -411,6 +431,8 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
SSyncNode
*
syncNodeOpen
(
const
SSyncInfo
*
pOldSyncInfo
)
{
SSyncInfo
*
pSyncInfo
=
(
SSyncInfo
*
)
pOldSyncInfo
;
sInfo
(
"sync event vgId:%d sync open"
,
pSyncInfo
->
vgId
);
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosMemoryMalloc
(
sizeof
(
SSyncNode
));
assert
(
pSyncNode
!=
NULL
);
memset
(
pSyncNode
,
0
,
sizeof
(
SSyncNode
));
...
...
@@ -439,9 +461,11 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
assert
(
pSyncNode
->
pRaftCfg
!=
NULL
);
pSyncInfo
->
syncCfg
=
pSyncNode
->
pRaftCfg
->
cfg
;
char
*
seralized
=
raftCfg2Str
(
pSyncNode
->
pRaftCfg
);
sInfo
(
"syncNodeOpen update config :%s"
,
seralized
);
taosMemoryFree
(
seralized
);
if
(
gRaftDetailLog
)
{
char
*
seralized
=
raftCfg2Str
(
pSyncNode
->
pRaftCfg
);
sInfo
(
"syncNodeOpen update config :%s"
,
seralized
);
taosMemoryFree
(
seralized
);
}
raftCfgClose
(
pSyncNode
->
pRaftCfg
);
}
...
...
@@ -612,7 +636,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
}
// snapshot receivers
pSyncNode
->
pNewNodeReceiver
=
snapshotReceiverCreate
(
pSyncNode
,
100
);
pSyncNode
->
pNewNodeReceiver
=
snapshotReceiverCreate
(
pSyncNode
,
EMPTY_RAFT_ID
);
// start in syncNodeStart
// start raft
...
...
@@ -628,9 +652,7 @@ void syncNodeStart(SSyncNode* pSyncNode) {
// start raft
if
(
pSyncNode
->
replicaNum
==
1
)
{
raftStoreNextTerm
(
pSyncNode
->
pRaftStore
);
syncNodeBecomeLeader
(
pSyncNode
);
syncNodeLog2
(
"==state change become leader immediately=="
,
pSyncNode
);
syncNodeBecomeLeader
(
pSyncNode
,
"one replica start"
);
// Raft 3.6.2 Committing entries from previous terms
...
...
@@ -638,41 +660,22 @@ void syncNodeStart(SSyncNode* pSyncNode) {
syncNodeAppendNoop
(
pSyncNode
);
syncMaybeAdvanceCommitIndex
(
pSyncNode
);
// maybe only one replica
/*
sInfo("==syncNodeStart== RestoreFinish begin 1 replica tsem_wait %p", pSyncNode);
tsem_wait(&pSyncNode->restoreSem);
sInfo("==syncNodeStart== RestoreFinish end 1 replica tsem_wait %p", pSyncNode);
*/
/*
while (pSyncNode->restoreFinish != true) {
taosMsleep(10);
if
(
gRaftDetailLog
)
{
syncNodeLog2
(
"==state change become leader immediately=="
,
pSyncNode
);
}
*/
sInfo
(
"==syncNodeStart== restoreFinish ok 1 replica %p vgId:%d"
,
pSyncNode
,
pSyncNode
->
vgId
);
return
;
}
syncNodeBecomeFollower
(
pSyncNode
);
syncNodeBecomeFollower
(
pSyncNode
,
"first start"
);
// for test
int32_t
ret
=
0
;
// int32_t ret = 0;
// ret = syncNodeStartPingTimer(pSyncNode);
assert
(
ret
==
0
);
/*
sInfo("==syncNodeStart== RestoreFinish begin multi replica tsem_wait %p", pSyncNode);
tsem_wait(&pSyncNode->restoreSem);
sInfo("==syncNodeStart== RestoreFinish end multi replica tsem_wait %p", pSyncNode);
*/
// assert(ret == 0);
/*
while (pSyncNode->restoreFinish != true) {
taosMsleep(10);
if
(
gRaftDetailLog
)
{
syncNodeLog2
(
"==state change become leader immediately=="
,
pSyncNode
);
}
*/
sInfo
(
"==syncNodeStart== restoreFinish ok multi replica %p vgId:%d"
,
pSyncNode
,
pSyncNode
->
vgId
);
}
void
syncNodeStartStandBy
(
SSyncNode
*
pSyncNode
)
{
...
...
@@ -687,6 +690,8 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
}
void
syncNodeClose
(
SSyncNode
*
pSyncNode
)
{
sInfo
(
"sync event vgId:%d sync close"
,
pSyncNode
->
vgId
);
int32_t
ret
;
assert
(
pSyncNode
!=
NULL
);
...
...
@@ -1131,7 +1136,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, bool* isDro
}
raftCfgPersist
(
pSyncNode
->
pRaftCfg
);
syncNodeLog2
(
"==syncNodeUpdateConfig=="
,
pSyncNode
);
if
(
gRaftDetailLog
)
{
syncNodeLog2
(
"==syncNodeUpdateConfig=="
,
pSyncNode
);
}
}
SSyncNode
*
syncNodeAcquire
(
int64_t
rid
)
{
...
...
@@ -1149,12 +1157,14 @@ void syncNodeRelease(SSyncNode* pNode) { taosReleaseRef(tsNodeRefId, pNode->rid)
void
syncNodeUpdateTerm
(
SSyncNode
*
pSyncNode
,
SyncTerm
term
)
{
if
(
term
>
pSyncNode
->
pRaftStore
->
currentTerm
)
{
raftStoreSetTerm
(
pSyncNode
->
pRaftStore
,
term
);
syncNodeBecomeFollower
(
pSyncNode
);
syncNodeBecomeFollower
(
pSyncNode
,
"update term"
);
raftStoreClearVote
(
pSyncNode
->
pRaftStore
);
}
}
void
syncNodeBecomeFollower
(
SSyncNode
*
pSyncNode
)
{
void
syncNodeBecomeFollower
(
SSyncNode
*
pSyncNode
,
const
char
*
debugStr
)
{
sInfo
(
"sync event vgId:%d become follower, %s"
,
pSyncNode
->
vgId
,
debugStr
);
// maybe clear leader cache
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
pSyncNode
->
leaderCache
=
EMPTY_RAFT_ID
;
...
...
@@ -1186,7 +1196,9 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode) {
// evoterLog |-> voterLog[i]]}
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
void
syncNodeBecomeLeader
(
SSyncNode
*
pSyncNode
)
{
void
syncNodeBecomeLeader
(
SSyncNode
*
pSyncNode
,
const
char
*
debugStr
)
{
sInfo
(
"sync event vgId:%d become leader, %s"
,
pSyncNode
->
vgId
,
debugStr
);
// state change
pSyncNode
->
state
=
TAOS_SYNC_STATE_LEADER
;
...
...
@@ -1237,7 +1249,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode) {
void
syncNodeCandidate2Leader
(
SSyncNode
*
pSyncNode
)
{
assert
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
);
assert
(
voteGrantedMajority
(
pSyncNode
->
pVotesGranted
));
syncNodeBecomeLeader
(
pSyncNode
);
syncNodeBecomeLeader
(
pSyncNode
,
"candidate to leader"
);
syncNodeLog2
(
"==state change syncNodeCandidate2Leader=="
,
pSyncNode
);
...
...
@@ -1260,14 +1272,14 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
void
syncNodeLeader2Follower
(
SSyncNode
*
pSyncNode
)
{
assert
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
);
syncNodeBecomeFollower
(
pSyncNode
);
syncNodeBecomeFollower
(
pSyncNode
,
"leader to follower"
);
syncNodeLog2
(
"==state change syncNodeLeader2Follower=="
,
pSyncNode
);
}
void
syncNodeCandidate2Follower
(
SSyncNode
*
pSyncNode
)
{
assert
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
);
syncNodeBecomeFollower
(
pSyncNode
);
syncNodeBecomeFollower
(
pSyncNode
,
"candidate to follower"
);
syncNodeLog2
(
"==state change syncNodeCandidate2Follower=="
,
pSyncNode
);
}
...
...
@@ -1467,9 +1479,11 @@ void syncNodeLog(SSyncNode* pObj) {
}
void
syncNodeLog2
(
char
*
s
,
SSyncNode
*
pObj
)
{
char
*
serialized
=
syncNode2Str
(
pObj
);
sTraceLong
(
"syncNodeLog2 | len:%lu | %s | %s"
,
strlen
(
serialized
),
s
,
serialized
);
taosMemoryFree
(
serialized
);
if
(
gRaftDetailLog
)
{
char
*
serialized
=
syncNode2Str
(
pObj
);
sTraceLong
(
"syncNodeLog2 | len:%lu | %s | %s"
,
strlen
(
serialized
),
s
,
serialized
);
taosMemoryFree
(
serialized
);
}
}
// ------ local funciton ---------
...
...
@@ -1724,17 +1738,19 @@ const char* syncStr(ESyncState state) {
int32_t
syncNodeCommit
(
SSyncNode
*
ths
,
SyncIndex
beginIndex
,
SyncIndex
endIndex
,
uint64_t
flag
)
{
int32_t
code
=
0
;
ESyncState
state
=
flag
;
sInfo
(
"sync event
commit from index:%"
PRId64
" to index:%"
PRId64
", %s"
,
beginIndex
,
end
Index
,
syncUtilState2String
(
state
));
sInfo
(
"sync event
vgId:%d commit by wal from index:%"
PRId64
" to index:%"
PRId64
", %s"
,
ths
->
vgId
,
begin
Index
,
endIndex
,
syncUtilState2String
(
state
));
// maybe execute by leader, skip snapshot
SSnapshot
snapshot
=
{.
data
=
NULL
,
.
lastApplyIndex
=
-
1
,
.
lastApplyTerm
=
0
};
if
(
ths
->
pFsm
->
FpGetSnapshot
!=
NULL
)
{
ths
->
pFsm
->
FpGetSnapshot
(
ths
->
pFsm
,
&
snapshot
);
}
if
(
beginIndex
<=
snapshot
.
lastApplyIndex
)
{
beginIndex
=
snapshot
.
lastApplyIndex
+
1
;
}
/*
// maybe execute by leader, skip snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (ths->pFsm->FpGetSnapshot != NULL) {
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
}
if (beginIndex <= snapshot.lastApplyIndex) {
beginIndex = snapshot.lastApplyIndex + 1;
}
*/
// execute fsm
if
(
ths
->
pFsm
!=
NULL
)
{
...
...
@@ -1791,17 +1807,19 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
// change isStandBy to normal
if
(
!
isDrop
)
{
if
(
ths
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
syncNodeBecomeLeader
(
ths
);
syncNodeBecomeLeader
(
ths
,
"config change"
);
}
else
{
syncNodeBecomeFollower
(
ths
);
syncNodeBecomeFollower
(
ths
,
"config change"
);
}
}
char
*
sOld
=
syncCfg2Str
(
&
oldSyncCfg
);
char
*
sNew
=
syncCfg2Str
(
&
newSyncCfg
);
sInfo
(
"==config change== 0x11 old:%s new:%s isDrop:%d
\n
"
,
sOld
,
sNew
,
isDrop
);
taosMemoryFree
(
sOld
);
taosMemoryFree
(
sNew
);
if
(
gRaftDetailLog
)
{
char
*
sOld
=
syncCfg2Str
(
&
oldSyncCfg
);
char
*
sNew
=
syncCfg2Str
(
&
newSyncCfg
);
sInfo
(
"==config change== 0x11 old:%s new:%s isDrop:%d
\n
"
,
sOld
,
sNew
,
isDrop
);
taosMemoryFree
(
sOld
);
taosMemoryFree
(
sNew
);
}
}
// always call FpReConfigCb
...
...
@@ -1810,10 +1828,12 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
cbMeta
.
currentTerm
=
ths
->
pRaftStore
->
currentTerm
;
cbMeta
.
index
=
pEntry
->
index
;
cbMeta
.
term
=
pEntry
->
term
;
cbMeta
.
newCfg
=
newSyncCfg
;
cbMeta
.
oldCfg
=
oldSyncCfg
;
cbMeta
.
seqNum
=
pEntry
->
seqNum
;
cbMeta
.
flag
=
0x11
;
cbMeta
.
isDrop
=
isDrop
;
ths
->
pFsm
->
FpReConfigCb
(
ths
->
pFsm
,
newSyncCf
g
,
cbMeta
);
ths
->
pFsm
->
FpReConfigCb
(
ths
->
pFsm
,
&
rpcMs
g
,
cbMeta
);
}
}
...
...
@@ -1824,7 +1844,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ths
->
pFsm
->
FpRestoreFinishCb
(
ths
->
pFsm
);
}
ths
->
restoreFinish
=
true
;
sInfo
(
"
restore finish %p vgId:%d"
,
ths
,
ths
->
vgId
);
sInfo
(
"
sync event vgId:%d restore finish"
,
ths
->
vgId
);
}
}
...
...
source/libs/sync/src/syncRaftLog.c
浏览文件 @
797cde53
...
...
@@ -14,6 +14,7 @@
*/
#include "syncRaftLog.h"
#include "syncRaftCfg.h"
#include "wal.h"
// refactor, log[0 .. n] ==> log[m .. n]
...
...
@@ -161,7 +162,9 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
walFsync
(
pWal
,
true
);
sTrace
(
"sync event write index:%"
PRId64
,
pEntry
->
index
);
sTrace
(
"sync event vgId:%d write index:%ld, %s, isStandBy:%d, msgType:%s, originalRpcType:%s"
,
pData
->
pSyncNode
->
vgId
,
pEntry
->
index
,
syncUtilState2String
(
pData
->
pSyncNode
->
state
),
pData
->
pSyncNode
->
pRaftCfg
->
isStandBy
,
TMSG_INFO
(
pEntry
->
msgType
),
TMSG_INFO
(
pEntry
->
originalRpcType
));
return
code
;
}
...
...
source/libs/sync/src/syncReplication.c
浏览文件 @
797cde53
...
...
@@ -122,7 +122,7 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
syncIndexMgrLog2
(
"begin append entries peers pNextIndex:"
,
pSyncNode
->
pNextIndex
);
syncIndexMgrLog2
(
"begin append entries peers pMatchIndex:"
,
pSyncNode
->
pMatchIndex
);
logStoreSimpleLog2
(
"begin append entries peers LogStore:"
,
pSyncNode
->
pLogStore
);
{
if
(
gRaftDetailLog
)
{
SSnapshot
snapshot
;
pSyncNode
->
pFsm
->
FpGetSnapshot
(
pSyncNode
->
pFsm
,
&
snapshot
);
sTrace
(
"begin append entries peers, snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu"
,
...
...
@@ -201,7 +201,6 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
}
int32_t
syncNodeAppendEntries
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
const
SyncAppendEntries
*
pMsg
)
{
sTrace
(
"syncNodeAppendEntries pSyncNode:%p "
,
pSyncNode
);
int32_t
ret
=
0
;
SRpcMsg
rpcMsg
;
...
...
source/libs/sync/src/syncSnapshot.c
浏览文件 @
797cde53
...
...
@@ -20,7 +20,7 @@
#include "syncUtil.h"
#include "wal.h"
static
void
snapshotReceiverDoStart
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncTerm
privateTerm
);
static
void
snapshotReceiverDoStart
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncTerm
privateTerm
,
SRaftId
fromId
);
//----------------------------------
SSyncSnapshotSender
*
snapshotSenderCreate
(
SSyncNode
*
pSyncNode
,
int32_t
replicaIndex
)
{
...
...
@@ -105,13 +105,23 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
syncSnapshotSend2RpcMsg
(
pMsg
,
&
rpcMsg
);
syncNodeSendMsgById
(
&
(
pMsg
->
destId
),
pSender
->
pSyncNode
,
&
rpcMsg
);
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
pSender
->
pSyncNode
->
replicasId
[
pSender
->
replicaIndex
].
addr
,
host
,
sizeof
(
host
),
&
port
);
sTrace
(
"sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send msg:%s"
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sTrace
(
"sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send "
"msg:%s"
,
pSender
->
pSyncNode
->
vgId
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
sTrace
(
"sync event vgId:%d snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu"
,
pSender
->
pSyncNode
->
vgId
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
);
}
syncSnapshotSendDestroy
(
pMsg
);
}
...
...
@@ -183,9 +193,11 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender) {
pSender
->
start
=
false
;
char
*
s
=
snapshotSender2Str
(
pSender
);
sInfo
(
"snapshotSenderStop %s"
,
s
);
taosMemoryFree
(
s
);
if
(
gRaftDetailLog
)
{
char
*
s
=
snapshotSender2Str
(
pSender
);
sInfo
(
"snapshotSenderStop %s"
,
s
);
taosMemoryFree
(
s
);
}
}
// when sender receiver ack, call this function to send msg from seq
...
...
@@ -225,20 +237,29 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
syncSnapshotSend2RpcMsg
(
pMsg
,
&
rpcMsg
);
syncNodeSendMsgById
(
&
(
pMsg
->
destId
),
pSender
->
pSyncNode
,
&
rpcMsg
);
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
pSender
->
pSyncNode
->
replicasId
[
pSender
->
replicaIndex
].
addr
,
host
,
sizeof
(
host
),
&
port
);
if
(
pSender
->
seq
==
SYNC_SNAPSHOT_SEQ_END
)
{
sTrace
(
"sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send msg:%s"
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
msgStr
);
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sTrace
(
"sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send "
"msg:%s"
,
pSender
->
pSyncNode
->
vgId
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
sTrace
(
"sync event vgId:%d snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu"
,
pSender
->
pSyncNode
->
vgId
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
);
}
}
else
{
sTrace
(
"sync event
snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send msg:%s
"
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
msgStr
);
sTrace
(
"sync event
vgId:%d snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu
"
,
pSender
->
pSyncNode
->
vgId
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
);
}
taosMemoryFree
(
msgStr
);
syncSnapshotSendDestroy
(
pMsg
);
return
0
;
...
...
@@ -260,13 +281,19 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
syncSnapshotSend2RpcMsg
(
pMsg
,
&
rpcMsg
);
syncNodeSendMsgById
(
&
(
pMsg
->
destId
),
pSender
->
pSyncNode
,
&
rpcMsg
);
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
pSender
->
pSyncNode
->
replicasId
[
pSender
->
replicaIndex
].
addr
,
host
,
sizeof
(
host
),
&
port
);
sTrace
(
"sync event snapshot send to %s:%d resend seq:%d ack:%d send msg:%s"
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
msgStr
);
taosMemoryFree
(
msgStr
);
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sTrace
(
"sync event vgId:%d snapshot send to %s:%d resend seq:%d ack:%d send msg:%s"
,
pSender
->
pSyncNode
->
vgId
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
sTrace
(
"sync event vgId:%d snapshot send to %s:%d resend seq:%d ack:%d"
,
pSender
->
pSyncNode
->
vgId
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
);
}
syncSnapshotSendDestroy
(
pMsg
);
}
...
...
@@ -331,7 +358,7 @@ char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
}
// -------------------------------------
SSyncSnapshotReceiver
*
snapshotReceiverCreate
(
SSyncNode
*
pSyncNode
,
int32_t
replicaIndex
)
{
SSyncSnapshotReceiver
*
snapshotReceiverCreate
(
SSyncNode
*
pSyncNode
,
SRaftId
fromId
)
{
bool
condition
=
(
pSyncNode
->
pFsm
->
FpSnapshotStartWrite
!=
NULL
)
&&
(
pSyncNode
->
pFsm
->
FpSnapshotStopWrite
!=
NULL
)
&&
(
pSyncNode
->
pFsm
->
FpSnapshotDoWrite
!=
NULL
);
...
...
@@ -345,7 +372,7 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl
pReceiver
->
ack
=
SYNC_SNAPSHOT_SEQ_BEGIN
;
pReceiver
->
pWriter
=
NULL
;
pReceiver
->
pSyncNode
=
pSyncNode
;
pReceiver
->
replicaIndex
=
replicaIndex
;
pReceiver
->
fromId
=
fromId
;
pReceiver
->
term
=
pSyncNode
->
pRaftStore
->
currentTerm
;
pReceiver
->
privateTerm
=
0
;
...
...
@@ -365,10 +392,11 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
bool
snapshotReceiverIsStart
(
SSyncSnapshotReceiver
*
pReceiver
)
{
return
pReceiver
->
start
;
}
// begin receive snapshot msg (current term, seq begin)
static
void
snapshotReceiverDoStart
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncTerm
privateTerm
)
{
static
void
snapshotReceiverDoStart
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncTerm
privateTerm
,
SRaftId
fromId
)
{
pReceiver
->
term
=
pReceiver
->
pSyncNode
->
pRaftStore
->
currentTerm
;
pReceiver
->
privateTerm
=
privateTerm
;
pReceiver
->
ack
=
SYNC_SNAPSHOT_SEQ_BEGIN
;
pReceiver
->
fromId
=
fromId
;
ASSERT
(
pReceiver
->
pWriter
==
NULL
);
int32_t
ret
=
pReceiver
->
pSyncNode
->
pFsm
->
FpSnapshotStartWrite
(
pReceiver
->
pSyncNode
->
pFsm
,
&
(
pReceiver
->
pWriter
));
...
...
@@ -377,14 +405,15 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again
void
snapshotReceiverStart
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncTerm
privateTerm
)
{
void
snapshotReceiverStart
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncTerm
privateTerm
,
SRaftId
fromId
)
{
if
(
!
snapshotReceiverIsStart
(
pReceiver
))
{
// start
snapshotReceiverDoStart
(
pReceiver
,
privateTerm
);
snapshotReceiverDoStart
(
pReceiver
,
privateTerm
,
fromId
);
pReceiver
->
start
=
true
;
}
else
{
// already start
sInfo
(
"snapshot recv, receiver already start"
);
// force close, abandon incomplete data
int32_t
ret
=
...
...
@@ -393,15 +422,15 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateTer
pReceiver
->
pWriter
=
NULL
;
// start again
snapshotReceiverDoStart
(
pReceiver
,
privateTerm
);
snapshotReceiverDoStart
(
pReceiver
,
privateTerm
,
fromId
);
pReceiver
->
start
=
true
;
ASSERT
(
0
);
}
char
*
s
=
snapshotReceiver2Str
(
pReceiver
);
sInfo
(
"snapshotReceiverStart %s"
,
s
);
taosMemoryFree
(
s
);
if
(
gRaftDetailLog
)
{
char
*
s
=
snapshotReceiver2Str
(
pReceiver
);
sInfo
(
"snapshotReceiverStart %s"
,
s
);
taosMemoryFree
(
s
);
}
}
void
snapshotReceiverStop
(
SSyncSnapshotReceiver
*
pReceiver
,
bool
apply
)
{
...
...
@@ -418,9 +447,11 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
++
(
pReceiver
->
privateTerm
);
}
char
*
s
=
snapshotReceiver2Str
(
pReceiver
);
sInfo
(
"snapshotReceiverStop %s"
,
s
);
taosMemoryFree
(
s
);
if
(
gRaftDetailLog
)
{
char
*
s
=
snapshotReceiver2Str
(
pReceiver
);
sInfo
(
"snapshotReceiverStop %s"
,
s
);
taosMemoryFree
(
s
);
}
}
cJSON
*
snapshotReceiver2Json
(
SSyncSnapshotReceiver
*
pReceiver
)
{
...
...
@@ -436,7 +467,22 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pReceiver
->
pSyncNode
);
cJSON_AddStringToObject
(
pRoot
,
"pSyncNode"
,
u64buf
);
cJSON_AddNumberToObject
(
pRoot
,
"replicaIndex"
,
pReceiver
->
replicaIndex
);
cJSON
*
pFromId
=
cJSON_CreateObject
();
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pReceiver
->
fromId
.
addr
);
cJSON_AddStringToObject
(
pFromId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pReceiver
->
fromId
.
addr
;
cJSON
*
pTmp
=
pFromId
;
char
host
[
128
]
=
{
0
};
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pFromId
,
"vgId"
,
pReceiver
->
fromId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"fromId"
,
pFromId
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pReceiver
->
term
);
cJSON_AddStringToObject
(
pRoot
,
"term"
,
u64buf
);
...
...
@@ -468,17 +514,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if
(
pMsg
->
term
==
pSyncNode
->
pRaftStore
->
currentTerm
)
{
if
(
pMsg
->
seq
==
SYNC_SNAPSHOT_SEQ_BEGIN
)
{
// begin
snapshotReceiverStart
(
pReceiver
,
pMsg
->
privateTerm
);
snapshotReceiverStart
(
pReceiver
,
pMsg
->
privateTerm
,
pMsg
->
srcId
);
pReceiver
->
ack
=
pMsg
->
seq
;
needRsp
=
true
;
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
srcId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
sTrace
(
"sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s"
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sTrace
(
"sync event vgId:%d snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s"
,
pSyncNode
->
vgId
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
sTrace
(
"sync event vgId:%d snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu"
,
pSyncNode
->
vgId
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
);
}
}
else
if
(
pMsg
->
seq
==
SYNC_SNAPSHOT_SEQ_END
)
{
// end, finish FSM
...
...
@@ -486,29 +538,46 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
ASSERT
(
writeCode
==
0
);
pSyncNode
->
pFsm
->
FpSnapshotStopWrite
(
pSyncNode
->
pFsm
,
pReceiver
->
pWriter
,
true
);
pSyncNode
->
pLogStore
->
syncLogSetBeginIndex
(
pSyncNode
->
pLogStore
,
pMsg
->
lastIndex
+
1
);
char
*
logSimpleStr
=
logStoreSimple2Str
(
pSyncNode
->
pLogStore
);
SSnapshot
snapshot
;
pSyncNode
->
pFsm
->
FpGetSnapshot
(
pSyncNode
->
pFsm
,
&
snapshot
);
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
srcId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
sInfo
(
"sync event snapshot recv from %s:%d finish, update log begin index:%ld, snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, raft log:%s"
,
host
,
port
,
pMsg
->
lastIndex
+
1
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
logSimpleStr
);
taosMemoryFree
(
logSimpleStr
);
if
(
gRaftDetailLog
)
{
char
*
logSimpleStr
=
logStoreSimple2Str
(
pSyncNode
->
pLogStore
);
sInfo
(
"sync event vgId:%d snapshot recv from %s:%d finish, update log begin index:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, raft log:%s"
,
pSyncNode
->
vgId
,
host
,
port
,
pMsg
->
lastIndex
+
1
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
logSimpleStr
);
taosMemoryFree
(
logSimpleStr
);
}
else
{
sInfo
(
"sync event vgId:%d snapshot recv from %s:%d finish, update log begin index:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu"
,
pSyncNode
->
vgId
,
host
,
port
,
pMsg
->
lastIndex
+
1
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
);
}
pReceiver
->
pWriter
=
NULL
;
snapshotReceiverStop
(
pReceiver
,
true
);
pReceiver
->
ack
=
pMsg
->
seq
;
needRsp
=
true
;
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sTrace
(
"sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s"
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sTrace
(
"sync event vgId:%d snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s"
,
pReceiver
->
pSyncNode
->
vgId
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
sTrace
(
"sync event vgId:%d snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu"
,
pReceiver
->
pSyncNode
->
vgId
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
);
}
}
else
if
(
pMsg
->
seq
==
SYNC_SNAPSHOT_SEQ_FORCE_CLOSE
)
{
pSyncNode
->
pFsm
->
FpSnapshotStopWrite
(
pSyncNode
->
pFsm
,
pReceiver
->
pWriter
,
false
);
...
...
@@ -519,11 +588,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
srcId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sTrace
(
"sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s"
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sTrace
(
"sync event vgId:%d snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv "
"msg:%s"
,
pReceiver
->
pSyncNode
->
vgId
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
sTrace
(
"sync event vgId:%d snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu"
,
pReceiver
->
pSyncNode
->
vgId
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
);
}
}
else
if
(
pMsg
->
seq
>
SYNC_SNAPSHOT_SEQ_BEGIN
&&
pMsg
->
seq
<
SYNC_SNAPSHOT_SEQ_END
)
{
// transfering
...
...
@@ -535,13 +610,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
}
needRsp
=
true
;
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
srcId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
sTrace
(
"sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s"
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sTrace
(
"sync event vgId:%d snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s"
,
pSyncNode
->
vgId
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
sTrace
(
"sync event vgId:%d snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu"
,
pSyncNode
->
vgId
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
);
}
}
else
{
ASSERT
(
0
);
...
...
source/libs/sync/test/syncConfigChangeSnapshotTest.cpp
浏览文件 @
797cde53
...
...
@@ -146,7 +146,7 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_
void
RestoreFinishCb
(
struct
SSyncFSM
*
pFsm
)
{
sTrace
(
"==callback== ==RestoreFinishCb=="
);
}
void
ReConfigCb
(
struct
SSyncFSM
*
pFsm
,
SSyncCfg
newCf
g
,
SReConfigCbMeta
cbMeta
)
{
void
ReConfigCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMs
g
,
SReConfigCbMeta
cbMeta
)
{
sTrace
(
"==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu"
,
cbMeta
.
flag
,
cbMeta
.
isDrop
,
cbMeta
.
index
,
cbMeta
.
code
,
cbMeta
.
currentTerm
,
cbMeta
.
term
);
}
...
...
source/libs/sync/test/syncConfigChangeTest.cpp
浏览文件 @
797cde53
...
...
@@ -77,7 +77,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
void
RestoreFinishCb
(
struct
SSyncFSM
*
pFsm
)
{
sTrace
(
"==callback== ==RestoreFinishCb=="
);
}
void
ReConfigCb
(
struct
SSyncFSM
*
pFsm
,
SSyncCfg
newCf
g
,
SReConfigCbMeta
cbMeta
)
{
void
ReConfigCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMs
g
,
SReConfigCbMeta
cbMeta
)
{
sTrace
(
"==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu"
,
cbMeta
.
flag
,
cbMeta
.
isDrop
,
cbMeta
.
index
,
cbMeta
.
code
,
cbMeta
.
currentTerm
,
cbMeta
.
term
);
}
...
...
source/libs/sync/test/syncSnapshotReceiverTest.cpp
浏览文件 @
797cde53
...
...
@@ -41,7 +41,11 @@ SSyncSnapshotReceiver* createReceiver() {
pSyncNode
->
pFsm
->
FpSnapshotStopWrite
=
SnapshotStopWrite
;
pSyncNode
->
pFsm
->
FpSnapshotDoWrite
=
SnapshotDoWrite
;
SSyncSnapshotReceiver
*
pReceiver
=
snapshotReceiverCreate
(
pSyncNode
,
2
);
SRaftId
id
;
id
.
addr
=
syncUtilAddr2U64
(
"1.2.3.4"
,
99
);
id
.
vgId
=
100
;
SSyncSnapshotReceiver
*
pReceiver
=
snapshotReceiverCreate
(
pSyncNode
,
id
);
pReceiver
->
start
=
true
;
pReceiver
->
ack
=
20
;
pReceiver
->
pWriter
=
(
void
*
)
0x11
;
...
...
source/libs/sync/test/syncTestTool.cpp
浏览文件 @
797cde53
...
...
@@ -146,8 +146,8 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_
void
RestoreFinishCb
(
struct
SSyncFSM
*
pFsm
)
{
sTrace
(
"==callback== ==RestoreFinishCb== pFsm:%p"
,
pFsm
);
}
void
ReConfigCb
(
struct
SSyncFSM
*
pFsm
,
SSyncCfg
newCf
g
,
SReConfigCbMeta
cbMeta
)
{
char
*
s
=
syncCfg2Str
(
&
newCfg
);
void
ReConfigCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMs
g
,
SReConfigCbMeta
cbMeta
)
{
char
*
s
=
syncCfg2Str
(
&
(
cbMeta
.
newCfg
)
);
sTrace
(
"==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu, newCfg:%s"
,
cbMeta
.
flag
,
cbMeta
.
isDrop
,
cbMeta
.
index
,
cbMeta
.
code
,
cbMeta
.
currentTerm
,
cbMeta
.
term
,
s
);
taosMemoryFree
(
s
);
...
...
@@ -235,7 +235,6 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
}
}
int64_t
rid
=
syncOpen
(
&
syncInfo
);
assert
(
rid
>
0
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录