Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0d608172
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
0d608172
编写于
11月 02, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
11月 02, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #17837 from taosdata/feature/3.0_mhli
refactor(sync): add pre snapshot
上级
fb933494
70aff845
变更
29
显示空白变更内容
内联
并排
Showing
29 changed file
with
1098 addition
and
210 deletion
+1098
-210
include/common/tmsgdef.h
include/common/tmsgdef.h
+2
-0
include/libs/sync/sync.h
include/libs/sync/sync.h
+1
-0
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+2
-0
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+3
-0
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+12
-1
source/libs/sync/inc/syncSnapshot.h
source/libs/sync/inc/syncSnapshot.h
+14
-11
source/libs/sync/inc/syncTools.h
source/libs/sync/inc/syncTools.h
+69
-2
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+0
-1
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+2
-0
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+51
-0
source/libs/sync/src/syncMessage.c
source/libs/sync/src/syncMessage.c
+304
-4
source/libs/sync/src/syncReplication.c
source/libs/sync/src/syncReplication.c
+2
-2
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+373
-173
source/libs/sync/test/CMakeLists.txt
source/libs/sync/test/CMakeLists.txt
+28
-0
source/libs/sync/test/syncConfigChangeSnapshotTest.cpp
source/libs/sync/test/syncConfigChangeSnapshotTest.cpp
+2
-0
source/libs/sync/test/syncConfigChangeTest.cpp
source/libs/sync/test/syncConfigChangeTest.cpp
+6
-4
source/libs/sync/test/syncEnvTest.cpp
source/libs/sync/test/syncEnvTest.cpp
+2
-2
source/libs/sync/test/syncPreSnapshotReplyTest.cpp
source/libs/sync/test/syncPreSnapshotReplyTest.cpp
+99
-0
source/libs/sync/test/syncPreSnapshotTest.cpp
source/libs/sync/test/syncPreSnapshotTest.cpp
+97
-0
source/libs/sync/test/syncRaftLogTest2.cpp
source/libs/sync/test/syncRaftLogTest2.cpp
+1
-1
source/libs/sync/test/syncRaftLogTest3.cpp
source/libs/sync/test/syncRaftLogTest3.cpp
+1
-1
source/libs/sync/test/syncReplicateTest.cpp
source/libs/sync/test/syncReplicateTest.cpp
+4
-0
source/libs/sync/test/syncSnapshotReceiverTest.cpp
source/libs/sync/test/syncSnapshotReceiverTest.cpp
+3
-1
source/libs/sync/test/syncSnapshotRspTest.cpp
source/libs/sync/test/syncSnapshotRspTest.cpp
+1
-1
source/libs/sync/test/syncSnapshotSendTest.cpp
source/libs/sync/test/syncSnapshotSendTest.cpp
+0
-1
source/libs/sync/test/syncSnapshotSenderTest.cpp
source/libs/sync/test/syncSnapshotSenderTest.cpp
+5
-1
source/libs/sync/test/syncSnapshotTest.cpp
source/libs/sync/test/syncSnapshotTest.cpp
+8
-4
source/libs/sync/test/syncTestTool.cpp
source/libs/sync/test/syncTestTool.cpp
+2
-0
source/libs/sync/test/syncWriteTest.cpp
source/libs/sync/test/syncWriteTest.cpp
+4
-0
未找到文件。
include/common/tmsgdef.h
浏览文件 @
0d608172
...
...
@@ -272,6 +272,8 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_SYNC_HEARTBEAT
,
"sync-heartbeat"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_HEARTBEAT_REPLY
,
"sync-heartbeat-reply"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_LOCAL_CMD
,
"sync-local-cmd"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_PRE_SNAPSHOT
,
"sync-pre-snapshot"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_PRE_SNAPSHOT_REPLY
,
"sync-pre-snapshot-reply"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_MAX_MSG
,
"sync-max"
,
NULL
,
NULL
)
TD_NEW_MSG_SEG
(
TDMT_VND_STREAM_MSG
)
...
...
include/libs/sync/sync.h
浏览文件 @
0d608172
...
...
@@ -37,6 +37,7 @@ extern "C" {
#define SYNC_ADD_QUORUM_COUNT 3
#define SYNC_MNODE_LOG_RETENTION 10000
#define SYNC_VNODE_LOG_RETENTION 100
#define SNAPSHOT_MAX_CLOCK_SKEW_MS 1000 * 10
#define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000
...
...
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
浏览文件 @
0d608172
...
...
@@ -195,6 +195,8 @@ SArray *mmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_APPEND_ENTRIES_REPLY
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_SNAPSHOT_SEND
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_SNAPSHOT_RSP
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_PRE_SNAPSHOT
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_PRE_SNAPSHOT_REPLY
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_HEARTBEAT
,
mmPutMsgToSyncCtrlQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_HEARTBEAT_REPLY
,
mmPutMsgToSyncCtrlQueue
,
1
)
==
NULL
)
goto
_OVER
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
0d608172
...
...
@@ -435,6 +435,9 @@ SArray *vmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_APPEND_ENTRIES_REPLY
,
vmPutMsgToSyncQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_SNAPSHOT_SEND
,
vmPutMsgToSyncQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_SNAPSHOT_RSP
,
vmPutMsgToSyncQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_PRE_SNAPSHOT
,
vmPutMsgToSyncQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_PRE_SNAPSHOT_REPLY
,
vmPutMsgToSyncQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_HEARTBEAT
,
vmPutMsgToSyncCtrlQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_HEARTBEAT_REPLY
,
vmPutMsgToSyncCtrlQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
0d608172
...
...
@@ -314,6 +314,7 @@ int32_t syncNodePeerStateInit(SSyncNode* pSyncNode);
// trace log
void
syncLogRecvTimer
(
SSyncNode
*
pSyncNode
,
const
SyncTimeout
*
pMsg
,
const
char
*
s
);
void
syncLogRecvLocalCmd
(
SSyncNode
*
pSyncNode
,
const
SyncLocalCmd
*
pMsg
,
const
char
*
s
);
void
syncLogSendRequestVote
(
SSyncNode
*
pSyncNode
,
const
SyncRequestVote
*
pMsg
,
const
char
*
s
);
void
syncLogRecvRequestVote
(
SSyncNode
*
pSyncNode
,
const
SyncRequestVote
*
pMsg
,
const
char
*
s
);
...
...
@@ -336,7 +337,17 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const
void
syncLogSendHeartbeatReply
(
SSyncNode
*
pSyncNode
,
const
SyncHeartbeatReply
*
pMsg
,
const
char
*
s
);
void
syncLogRecvHeartbeatReply
(
SSyncNode
*
pSyncNode
,
const
SyncHeartbeatReply
*
pMsg
,
const
char
*
s
);
void
syncLogRecvLocalCmd
(
SSyncNode
*
pSyncNode
,
const
SyncLocalCmd
*
pMsg
,
const
char
*
s
);
void
syncLogSendSyncPreSnapshot
(
SSyncNode
*
pSyncNode
,
const
SyncPreSnapshot
*
pMsg
,
const
char
*
s
);
void
syncLogRecvSyncPreSnapshot
(
SSyncNode
*
pSyncNode
,
const
SyncPreSnapshot
*
pMsg
,
const
char
*
s
);
void
syncLogSendSyncPreSnapshotReply
(
SSyncNode
*
pSyncNode
,
const
SyncPreSnapshotReply
*
pMsg
,
const
char
*
s
);
void
syncLogRecvSyncPreSnapshotReply
(
SSyncNode
*
pSyncNode
,
const
SyncPreSnapshotReply
*
pMsg
,
const
char
*
s
);
void
syncLogSendSyncSnapshotSend
(
SSyncNode
*
pSyncNode
,
const
SyncSnapshotSend
*
pMsg
,
const
char
*
s
);
void
syncLogRecvSyncSnapshotSend
(
SSyncNode
*
pSyncNode
,
const
SyncSnapshotSend
*
pMsg
,
const
char
*
s
);
void
syncLogSendSyncSnapshotRsp
(
SSyncNode
*
pSyncNode
,
const
SyncSnapshotRsp
*
pMsg
,
const
char
*
s
);
void
syncLogRecvSyncSnapshotRsp
(
SSyncNode
*
pSyncNode
,
const
SyncSnapshotRsp
*
pMsg
,
const
char
*
s
);
// for debug --------------
void
syncNodePrint
(
SSyncNode
*
pObj
);
...
...
source/libs/sync/inc/syncSnapshot.h
浏览文件 @
0d608172
...
...
@@ -28,8 +28,9 @@ extern "C" {
#include "syncMessage.h"
#include "taosdef.h"
#define SYNC_SNAPSHOT_SEQ_INVALID -1
#define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -2
#define SYNC_SNAPSHOT_SEQ_INVALID -2
#define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -3
#define SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT -1
#define SYNC_SNAPSHOT_SEQ_BEGIN 0
#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF
...
...
@@ -47,19 +48,19 @@ typedef struct SSyncSnapshotSender {
SSnapshot
snapshot
;
SSyncCfg
lastConfig
;
int64_t
sendingMS
;
SSyncNode
*
pSyncNode
;
int32_t
replicaIndex
;
SyncTerm
term
;
SyncTerm
privateTerm
;
int64_t
startTime
;
bool
finish
;
// init when create
SSyncNode
*
pSyncNode
;
int32_t
replicaIndex
;
}
SSyncSnapshotSender
;
SSyncSnapshotSender
*
snapshotSenderCreate
(
SSyncNode
*
pSyncNode
,
int32_t
replicaIndex
);
void
snapshotSenderDestroy
(
SSyncSnapshotSender
*
pSender
);
bool
snapshotSenderIsStart
(
SSyncSnapshotSender
*
pSender
);
int32_t
snapshotSenderStart
(
SSyncSnapshotSender
*
pSender
,
SSnapshotParam
snapshotParam
,
SSnapshot
snapshot
,
void
*
pReader
);
int32_t
snapshotSenderStart
(
SSyncSnapshotSender
*
pSender
);
int32_t
snapshotSenderStop
(
SSyncSnapshotSender
*
pSender
,
bool
finish
);
int32_t
snapshotSend
(
SSyncSnapshotSender
*
pSender
);
int32_t
snapshotReSend
(
SSyncSnapshotSender
*
pSender
);
...
...
@@ -76,10 +77,12 @@ typedef struct SSyncSnapshotReceiver {
int32_t
ack
;
void
*
pWriter
;
SyncTerm
term
;
SyncTerm
privateTerm
;
SSnapshotParam
snapshotParam
;
SSnapshot
snapshot
;
SRaftId
fromId
;
int64_t
startTime
;
// init when create
SSyncNode
*
pSyncNode
;
}
SSyncSnapshotReceiver
;
...
...
source/libs/sync/inc/syncTools.h
浏览文件 @
0d608172
...
...
@@ -496,6 +496,69 @@ void syncHeartbeatReplyPrint2(char* s, const SyncHeartbeatReply* pMsg);
void
syncHeartbeatReplyLog
(
const
SyncHeartbeatReply
*
pMsg
);
void
syncHeartbeatReplyLog2
(
char
*
s
,
const
SyncHeartbeatReply
*
pMsg
);
// ---------------------------------------------
typedef
struct
SyncPreSnapshot
{
uint32_t
bytes
;
int32_t
vgId
;
uint32_t
msgType
;
SRaftId
srcId
;
SRaftId
destId
;
// private data
SyncTerm
term
;
}
SyncPreSnapshot
;
SyncPreSnapshot
*
syncPreSnapshotBuild
(
int32_t
vgId
);
void
syncPreSnapshotDestroy
(
SyncPreSnapshot
*
pMsg
);
void
syncPreSnapshotSerialize
(
const
SyncPreSnapshot
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncPreSnapshotDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPreSnapshot
*
pMsg
);
char
*
syncPreSnapshotSerialize2
(
const
SyncPreSnapshot
*
pMsg
,
uint32_t
*
len
);
SyncPreSnapshot
*
syncPreSnapshotDeserialize2
(
const
char
*
buf
,
uint32_t
len
);
void
syncPreSnapshot2RpcMsg
(
const
SyncPreSnapshot
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncPreSnapshotFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPreSnapshot
*
pMsg
);
SyncPreSnapshot
*
syncPreSnapshotFromRpcMsg2
(
const
SRpcMsg
*
pRpcMsg
);
cJSON
*
syncPreSnapshot2Json
(
const
SyncPreSnapshot
*
pMsg
);
char
*
syncPreSnapshot2Str
(
const
SyncPreSnapshot
*
pMsg
);
// for debug ----------------------
void
syncPreSnapshotPrint
(
const
SyncPreSnapshot
*
pMsg
);
void
syncPreSnapshotPrint2
(
char
*
s
,
const
SyncPreSnapshot
*
pMsg
);
void
syncPreSnapshotLog
(
const
SyncPreSnapshot
*
pMsg
);
void
syncPreSnapshotLog2
(
char
*
s
,
const
SyncPreSnapshot
*
pMsg
);
// ---------------------------------------------
typedef
struct
SyncPreSnapshotReply
{
uint32_t
bytes
;
int32_t
vgId
;
uint32_t
msgType
;
SRaftId
srcId
;
SRaftId
destId
;
// private data
SyncTerm
term
;
SyncIndex
snapStart
;
}
SyncPreSnapshotReply
;
SyncPreSnapshotReply
*
syncPreSnapshotReplyBuild
(
int32_t
vgId
);
void
syncPreSnapshotReplyDestroy
(
SyncPreSnapshotReply
*
pMsg
);
void
syncPreSnapshotReplySerialize
(
const
SyncPreSnapshotReply
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncPreSnapshotReplyDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPreSnapshotReply
*
pMsg
);
char
*
syncPreSnapshotReplySerialize2
(
const
SyncPreSnapshotReply
*
pMsg
,
uint32_t
*
len
);
SyncPreSnapshotReply
*
syncPreSnapshotReplyDeserialize2
(
const
char
*
buf
,
uint32_t
len
);
void
syncPreSnapshotReply2RpcMsg
(
const
SyncPreSnapshotReply
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncPreSnapshotReplyFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPreSnapshotReply
*
pMsg
);
SyncPreSnapshotReply
*
syncPreSnapshotReplyFromRpcMsg2
(
const
SRpcMsg
*
pRpcMsg
);
cJSON
*
syncPreSnapshotReply2Json
(
const
SyncPreSnapshotReply
*
pMsg
);
char
*
syncPreSnapshotReply2Str
(
const
SyncPreSnapshotReply
*
pMsg
);
// for debug ----------------------
void
syncPreSnapshotReplyPrint
(
const
SyncPreSnapshotReply
*
pMsg
);
void
syncPreSnapshotReplyPrint2
(
char
*
s
,
const
SyncPreSnapshotReply
*
pMsg
);
void
syncPreSnapshotReplyLog
(
const
SyncPreSnapshotReply
*
pMsg
);
void
syncPreSnapshotReplyLog2
(
char
*
s
,
const
SyncPreSnapshotReply
*
pMsg
);
// ---------------------------------------------
typedef
struct
SyncApplyMsg
{
uint32_t
bytes
;
...
...
@@ -541,7 +604,7 @@ typedef struct SyncSnapshotSend {
SyncTerm
lastTerm
;
// snapshot.lastTerm
SyncIndex
lastConfigIndex
;
// snapshot.lastConfigIndex
SSyncCfg
lastConfig
;
SyncTerm
privateTerm
;
int64_t
startTime
;
int32_t
seq
;
uint32_t
dataLen
;
char
data
[];
...
...
@@ -576,9 +639,10 @@ typedef struct SyncSnapshotRsp {
SyncTerm
term
;
SyncIndex
lastIndex
;
SyncTerm
lastTerm
;
SyncTerm
privateTerm
;
int64_t
startTime
;
int32_t
ack
;
int32_t
code
;
SyncIndex
snapBeginIndex
;
// when ack = SYNC_SNAPSHOT_SEQ_BEGIN, it's valid
}
SyncSnapshotRsp
;
SyncSnapshotRsp
*
syncSnapshotRspBuild
(
int32_t
vgId
);
...
...
@@ -709,6 +773,9 @@ int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg);
int32_t
syncNodeOnAppendEntries
(
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
);
int32_t
syncNodeOnAppendEntriesReply
(
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
);
int32_t
syncNodeOnPreSnapshot
(
SSyncNode
*
ths
,
SyncPreSnapshot
*
pMsg
);
int32_t
syncNodeOnPreSnapshotReply
(
SSyncNode
*
ths
,
SyncPreSnapshotReply
*
pMsg
);
int32_t
syncNodeOnSnapshot
(
SSyncNode
*
ths
,
SyncSnapshotSend
*
pMsg
);
int32_t
syncNodeOnSnapshotReply
(
SSyncNode
*
ths
,
SyncSnapshotRsp
*
pMsg
);
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
0d608172
...
...
@@ -142,7 +142,6 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) {
// pReply->matchIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
pReply
->
matchIndex
=
SYNC_INDEX_INVALID
;
pReply
->
lastSendIndex
=
pMsg
->
prevLogIndex
+
1
;
pReply
->
privateTerm
=
ths
->
pNewNodeReceiver
->
privateTerm
;
pReply
->
startTime
=
ths
->
startTime
;
if
(
pMsg
->
term
<
ths
->
pRaftStore
->
currentTerm
)
{
...
...
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
0d608172
...
...
@@ -73,6 +73,7 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync
int32_t
code
=
ths
->
pFsm
->
FpSnapshotStartRead
(
ths
->
pFsm
,
&
readerParam
,
&
pReader
);
ASSERT
(
code
==
0
);
#if 0
if (pMsg->privateTerm < pSender->privateTerm) {
ASSERT(pReader != NULL);
snapshotSenderStart(pSender, readerParam, snapshot, pReader);
...
...
@@ -82,6 +83,7 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync
ths->pFsm->FpSnapshotStopRead(ths->pFsm, pReader);
}
}
#endif
}
int32_t
syncNodeOnAppendEntriesReply
(
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
)
{
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
0d608172
...
...
@@ -2236,6 +2236,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
// init peer mgr
syncNodePeerStateInit
(
pSyncNode
);
#if 0
// update sender private term
SSyncSnapshotSender* pMySender = syncNodeGetSnapshotSender(pSyncNode, &(pSyncNode->myRaftId));
if (pMySender != NULL) {
...
...
@@ -2246,6 +2247,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
}
(pMySender->privateTerm) += 100;
}
#endif
// close receiver
if
(
snapshotReceiverIsStart
(
pSyncNode
->
pNewNodeReceiver
))
{
...
...
@@ -3561,3 +3563,52 @@ void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const c
syncLocalCmdGetStr
(
pMsg
->
cmd
),
pMsg
->
sdNewTerm
,
s
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
void
syncLogSendSyncPreSnapshot
(
SSyncNode
*
pSyncNode
,
const
SyncPreSnapshot
*
pMsg
,
const
char
*
s
)
{
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
destId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"send sync-pre-snapshot to %s:%d {term:%"
PRIu64
"}, %s"
,
host
,
port
,
pMsg
->
term
,
s
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
void
syncLogRecvSyncPreSnapshot
(
SSyncNode
*
pSyncNode
,
const
SyncPreSnapshot
*
pMsg
,
const
char
*
s
)
{
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
srcId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"recv sync-pre-snapshot from %s:%d {term:%"
PRIu64
"}, %s"
,
host
,
port
,
pMsg
->
term
,
s
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
void
syncLogSendSyncPreSnapshotReply
(
SSyncNode
*
pSyncNode
,
const
SyncPreSnapshotReply
*
pMsg
,
const
char
*
s
)
{
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
destId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"send sync-pre-snapshot-reply to %s:%d {term:%"
PRIu64
", snap-start:%"
PRId64
"}, %s"
,
host
,
port
,
pMsg
->
term
,
pMsg
->
snapStart
,
s
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
void
syncLogRecvSyncPreSnapshotReply
(
SSyncNode
*
pSyncNode
,
const
SyncPreSnapshotReply
*
pMsg
,
const
char
*
s
)
{
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pMsg
->
srcId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"recv sync-pre-snapshot-reply from %s:%d {term:%"
PRIu64
", snap-start:%"
PRId64
"}, %s"
,
host
,
port
,
pMsg
->
term
,
pMsg
->
snapStart
,
s
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
void
syncLogSendSyncSnapshotSend
(
SSyncNode
*
pSyncNode
,
const
SyncSnapshotSend
*
pMsg
,
const
char
*
s
)
{}
void
syncLogRecvSyncSnapshotSend
(
SSyncNode
*
pSyncNode
,
const
SyncSnapshotSend
*
pMsg
,
const
char
*
s
)
{}
void
syncLogSendSyncSnapshotRsp
(
SSyncNode
*
pSyncNode
,
const
SyncSnapshotRsp
*
pMsg
,
const
char
*
s
)
{}
void
syncLogRecvSyncSnapshotRsp
(
SSyncNode
*
pSyncNode
,
const
SyncSnapshotRsp
*
pMsg
,
const
char
*
s
)
{}
source/libs/sync/src/syncMessage.c
浏览文件 @
0d608172
...
...
@@ -2315,6 +2315,303 @@ void syncHeartbeatReplyLog2(char* s, const SyncHeartbeatReply* pMsg) {
}
}
// ---- message process SyncPreSnapshot----
SyncPreSnapshot
*
syncPreSnapshotBuild
(
int32_t
vgId
)
{
uint32_t
bytes
=
sizeof
(
SyncPreSnapshot
);
SyncPreSnapshot
*
pMsg
=
taosMemoryMalloc
(
bytes
);
memset
(
pMsg
,
0
,
bytes
);
pMsg
->
bytes
=
bytes
;
pMsg
->
vgId
=
vgId
;
pMsg
->
msgType
=
TDMT_SYNC_PRE_SNAPSHOT
;
return
pMsg
;
}
void
syncPreSnapshotDestroy
(
SyncPreSnapshot
*
pMsg
)
{
if
(
pMsg
!=
NULL
)
{
taosMemoryFree
(
pMsg
);
}
}
void
syncPreSnapshotSerialize
(
const
SyncPreSnapshot
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
)
{
ASSERT
(
pMsg
->
bytes
<=
bufLen
);
memcpy
(
buf
,
pMsg
,
pMsg
->
bytes
);
}
void
syncPreSnapshotDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPreSnapshot
*
pMsg
)
{
memcpy
(
pMsg
,
buf
,
len
);
ASSERT
(
len
==
pMsg
->
bytes
);
}
char
*
syncPreSnapshotSerialize2
(
const
SyncPreSnapshot
*
pMsg
,
uint32_t
*
len
)
{
char
*
buf
=
taosMemoryMalloc
(
pMsg
->
bytes
);
ASSERT
(
buf
!=
NULL
);
syncPreSnapshotSerialize
(
pMsg
,
buf
,
pMsg
->
bytes
);
if
(
len
!=
NULL
)
{
*
len
=
pMsg
->
bytes
;
}
return
buf
;
}
SyncPreSnapshot
*
syncPreSnapshotDeserialize2
(
const
char
*
buf
,
uint32_t
len
)
{
uint32_t
bytes
=
*
((
uint32_t
*
)
buf
);
SyncPreSnapshot
*
pMsg
=
taosMemoryMalloc
(
bytes
);
ASSERT
(
pMsg
!=
NULL
);
syncPreSnapshotDeserialize
(
buf
,
len
,
pMsg
);
ASSERT
(
len
==
pMsg
->
bytes
);
return
pMsg
;
}
void
syncPreSnapshot2RpcMsg
(
const
SyncPreSnapshot
*
pMsg
,
SRpcMsg
*
pRpcMsg
)
{
memset
(
pRpcMsg
,
0
,
sizeof
(
*
pRpcMsg
));
pRpcMsg
->
msgType
=
pMsg
->
msgType
;
pRpcMsg
->
contLen
=
pMsg
->
bytes
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
syncPreSnapshotSerialize
(
pMsg
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
}
void
syncPreSnapshotFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPreSnapshot
*
pMsg
)
{
syncPreSnapshotDeserialize
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
pMsg
);
}
SyncPreSnapshot
*
syncPreSnapshotFromRpcMsg2
(
const
SRpcMsg
*
pRpcMsg
)
{
SyncPreSnapshot
*
pMsg
=
syncPreSnapshotDeserialize2
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
ASSERT
(
pMsg
!=
NULL
);
return
pMsg
;
}
cJSON
*
syncPreSnapshot2Json
(
const
SyncPreSnapshot
*
pMsg
)
{
char
u64buf
[
128
]
=
{
0
};
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pMsg
!=
NULL
)
{
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"vgId"
,
pMsg
->
vgId
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON
*
pSrcId
=
cJSON_CreateObject
();
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRIu64
,
pMsg
->
srcId
.
addr
);
cJSON_AddStringToObject
(
pSrcId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pMsg
->
srcId
.
addr
;
cJSON
*
pTmp
=
pSrcId
;
char
host
[
128
]
=
{
0
};
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pSrcId
,
"vgId"
,
pMsg
->
srcId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
cJSON
*
pDestId
=
cJSON_CreateObject
();
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRIu64
,
pMsg
->
destId
.
addr
);
cJSON_AddStringToObject
(
pDestId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pMsg
->
destId
.
addr
;
cJSON
*
pTmp
=
pDestId
;
char
host
[
128
]
=
{
0
};
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pDestId
,
"vgId"
,
pMsg
->
destId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"destId"
,
pDestId
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRIu64
,
pMsg
->
term
);
cJSON_AddStringToObject
(
pRoot
,
"term"
,
u64buf
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncPreSnapshot"
,
pRoot
);
return
pJson
;
}
char
*
syncPreSnapshot2Str
(
const
SyncPreSnapshot
*
pMsg
)
{
cJSON
*
pJson
=
syncPreSnapshot2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
void
syncPreSnapshotPrint
(
const
SyncPreSnapshot
*
pMsg
)
{
char
*
serialized
=
syncPreSnapshot2Str
(
pMsg
);
printf
(
"syncPreSnapshotPrint | len:%d | %s
\n
"
,
(
int32_t
)
strlen
(
serialized
),
serialized
);
fflush
(
NULL
);
taosMemoryFree
(
serialized
);
}
void
syncPreSnapshotPrint2
(
char
*
s
,
const
SyncPreSnapshot
*
pMsg
)
{
char
*
serialized
=
syncPreSnapshot2Str
(
pMsg
);
printf
(
"syncPreSnapshotPrint2 | len:%d | %s | %s
\n
"
,
(
int32_t
)
strlen
(
serialized
),
s
,
serialized
);
fflush
(
NULL
);
taosMemoryFree
(
serialized
);
}
void
syncPreSnapshotLog
(
const
SyncPreSnapshot
*
pMsg
)
{
char
*
serialized
=
syncPreSnapshot2Str
(
pMsg
);
sTrace
(
"syncPreSnapshotLog | len:%d | %s"
,
(
int32_t
)
strlen
(
serialized
),
serialized
);
taosMemoryFree
(
serialized
);
}
void
syncPreSnapshotLog2
(
char
*
s
,
const
SyncPreSnapshot
*
pMsg
)
{
if
(
gRaftDetailLog
)
{
char
*
serialized
=
syncPreSnapshot2Str
(
pMsg
);
sTrace
(
"syncPreSnapshotLog2 | len:%d | %s | %s"
,
(
int32_t
)
strlen
(
serialized
),
s
,
serialized
);
taosMemoryFree
(
serialized
);
}
}
// ---- message process SyncPreSnapshotReply----
SyncPreSnapshotReply
*
syncPreSnapshotReplyBuild
(
int32_t
vgId
)
{
uint32_t
bytes
=
sizeof
(
SyncPreSnapshotReply
);
SyncPreSnapshotReply
*
pMsg
=
taosMemoryMalloc
(
bytes
);
memset
(
pMsg
,
0
,
bytes
);
pMsg
->
bytes
=
bytes
;
pMsg
->
vgId
=
vgId
;
pMsg
->
msgType
=
TDMT_SYNC_PRE_SNAPSHOT_REPLY
;
return
pMsg
;
}
void
syncPreSnapshotReplyDestroy
(
SyncPreSnapshotReply
*
pMsg
)
{
if
(
pMsg
!=
NULL
)
{
taosMemoryFree
(
pMsg
);
}
}
void
syncPreSnapshotReplySerialize
(
const
SyncPreSnapshotReply
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
)
{
ASSERT
(
pMsg
->
bytes
<=
bufLen
);
memcpy
(
buf
,
pMsg
,
pMsg
->
bytes
);
}
void
syncPreSnapshotReplyDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPreSnapshotReply
*
pMsg
)
{
memcpy
(
pMsg
,
buf
,
len
);
ASSERT
(
len
==
pMsg
->
bytes
);
}
char
*
syncPreSnapshotReplySerialize2
(
const
SyncPreSnapshotReply
*
pMsg
,
uint32_t
*
len
)
{
char
*
buf
=
taosMemoryMalloc
(
pMsg
->
bytes
);
ASSERT
(
buf
!=
NULL
);
syncPreSnapshotReplySerialize
(
pMsg
,
buf
,
pMsg
->
bytes
);
if
(
len
!=
NULL
)
{
*
len
=
pMsg
->
bytes
;
}
return
buf
;
}
SyncPreSnapshotReply
*
syncPreSnapshotReplyDeserialize2
(
const
char
*
buf
,
uint32_t
len
)
{
uint32_t
bytes
=
*
((
uint32_t
*
)
buf
);
SyncPreSnapshotReply
*
pMsg
=
taosMemoryMalloc
(
bytes
);
ASSERT
(
pMsg
!=
NULL
);
syncPreSnapshotReplyDeserialize
(
buf
,
len
,
pMsg
);
ASSERT
(
len
==
pMsg
->
bytes
);
return
pMsg
;
}
void
syncPreSnapshotReply2RpcMsg
(
const
SyncPreSnapshotReply
*
pMsg
,
SRpcMsg
*
pRpcMsg
)
{
memset
(
pRpcMsg
,
0
,
sizeof
(
*
pRpcMsg
));
pRpcMsg
->
msgType
=
pMsg
->
msgType
;
pRpcMsg
->
contLen
=
pMsg
->
bytes
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
syncPreSnapshotReplySerialize
(
pMsg
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
}
void
syncPreSnapshotReplyFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPreSnapshotReply
*
pMsg
)
{
syncPreSnapshotReplyDeserialize
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
pMsg
);
}
SyncPreSnapshotReply
*
syncPreSnapshotReplyFromRpcMsg2
(
const
SRpcMsg
*
pRpcMsg
)
{
SyncPreSnapshotReply
*
pMsg
=
syncPreSnapshotReplyDeserialize2
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
ASSERT
(
pMsg
!=
NULL
);
return
pMsg
;
}
cJSON
*
syncPreSnapshotReply2Json
(
const
SyncPreSnapshotReply
*
pMsg
)
{
char
u64buf
[
128
]
=
{
0
};
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pMsg
!=
NULL
)
{
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"vgId"
,
pMsg
->
vgId
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON
*
pSrcId
=
cJSON_CreateObject
();
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRIu64
,
pMsg
->
srcId
.
addr
);
cJSON_AddStringToObject
(
pSrcId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pMsg
->
srcId
.
addr
;
cJSON
*
pTmp
=
pSrcId
;
char
host
[
128
]
=
{
0
};
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pSrcId
,
"vgId"
,
pMsg
->
srcId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
cJSON
*
pDestId
=
cJSON_CreateObject
();
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRIu64
,
pMsg
->
destId
.
addr
);
cJSON_AddStringToObject
(
pDestId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pMsg
->
destId
.
addr
;
cJSON
*
pTmp
=
pDestId
;
char
host
[
128
]
=
{
0
};
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pDestId
,
"vgId"
,
pMsg
->
destId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"destId"
,
pDestId
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRIu64
,
pMsg
->
term
);
cJSON_AddStringToObject
(
pRoot
,
"term"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRId64
,
pMsg
->
snapStart
);
cJSON_AddStringToObject
(
pRoot
,
"snap-start"
,
u64buf
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncPreSnapshotReply"
,
pRoot
);
return
pJson
;
}
char
*
syncPreSnapshotReply2Str
(
const
SyncPreSnapshotReply
*
pMsg
)
{
cJSON
*
pJson
=
syncPreSnapshotReply2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
void
syncPreSnapshotReplyPrint
(
const
SyncPreSnapshotReply
*
pMsg
)
{
char
*
serialized
=
syncPreSnapshotReply2Str
(
pMsg
);
printf
(
"syncPreSnapshotReplyPrint | len:%d | %s
\n
"
,
(
int32_t
)
strlen
(
serialized
),
serialized
);
fflush
(
NULL
);
taosMemoryFree
(
serialized
);
}
void
syncPreSnapshotReplyPrint2
(
char
*
s
,
const
SyncPreSnapshotReply
*
pMsg
)
{
char
*
serialized
=
syncPreSnapshotReply2Str
(
pMsg
);
printf
(
"syncPreSnapshotReplyPrint2 | len:%d | %s | %s
\n
"
,
(
int32_t
)
strlen
(
serialized
),
s
,
serialized
);
fflush
(
NULL
);
taosMemoryFree
(
serialized
);
}
void
syncPreSnapshotReplyLog
(
const
SyncPreSnapshotReply
*
pMsg
)
{
char
*
serialized
=
syncPreSnapshotReply2Str
(
pMsg
);
sTrace
(
"syncPreSnapshotReplyLog | len:%d | %s"
,
(
int32_t
)
strlen
(
serialized
),
serialized
);
taosMemoryFree
(
serialized
);
}
void
syncPreSnapshotReplyLog2
(
char
*
s
,
const
SyncPreSnapshotReply
*
pMsg
)
{
if
(
gRaftDetailLog
)
{
char
*
serialized
=
syncPreSnapshotReply2Str
(
pMsg
);
sTrace
(
"syncPreSnapshotReplyLog2 | len:%d | %s | %s"
,
(
int32_t
)
strlen
(
serialized
),
s
,
serialized
);
taosMemoryFree
(
serialized
);
}
}
// ---- message process SyncApplyMsg----
SyncApplyMsg
*
syncApplyMsgBuild
(
uint32_t
dataLen
)
{
uint32_t
bytes
=
sizeof
(
SyncApplyMsg
)
+
dataLen
;
...
...
@@ -2576,8 +2873,8 @@ cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg) {
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRIu64
,
pMsg
->
term
);
cJSON_AddStringToObject
(
pRoot
,
"term"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRI
u64
,
pMsg
->
privateTerm
);
cJSON_AddStringToObject
(
pRoot
,
"
privateTerm
"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRI
d64
,
pMsg
->
startTime
);
cJSON_AddStringToObject
(
pRoot
,
"
startTime
"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRId64
,
pMsg
->
beginIndex
);
cJSON_AddStringToObject
(
pRoot
,
"beginIndex"
,
u64buf
);
...
...
@@ -2751,8 +3048,8 @@ cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg) {
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRIu64
,
pMsg
->
term
);
cJSON_AddStringToObject
(
pRoot
,
"term"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRI
u64
,
pMsg
->
privateTerm
);
cJSON_AddStringToObject
(
pRoot
,
"
privateTerm
"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRI
d64
,
pMsg
->
startTime
);
cJSON_AddStringToObject
(
pRoot
,
"
startTime
"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRId64
,
pMsg
->
lastIndex
);
cJSON_AddStringToObject
(
pRoot
,
"lastIndex"
,
u64buf
);
...
...
@@ -2762,6 +3059,9 @@ cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg) {
cJSON_AddNumberToObject
(
pRoot
,
"ack"
,
pMsg
->
ack
);
cJSON_AddNumberToObject
(
pRoot
,
"code"
,
pMsg
->
code
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRId64
,
pMsg
->
snapBeginIndex
);
cJSON_AddStringToObject
(
pRoot
,
"snap-begin"
,
u64buf
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
...
...
source/libs/sync/src/syncReplication.c
浏览文件 @
0d608172
...
...
@@ -62,8 +62,8 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) {
syncNodeEventLog
(
pSyncNode
,
logBuf
);
// start snapshot
int32_t
code
=
syncNodeStartSnapshot
(
pSyncNode
,
pDestId
);
ASSERT
(
code
==
0
);
//
int32_t code = syncNodeStartSnapshot(pSyncNode, pDestId);
return
0
;
}
...
...
source/libs/sync/src/syncSnapshot.c
浏览文件 @
0d608172
...
...
@@ -41,8 +41,6 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
}
memset
(
pSender
,
0
,
sizeof
(
*
pSender
));
int64_t
timeNow
=
taosGetTimestampMs
();
pSender
->
start
=
false
;
pSender
->
seq
=
SYNC_SNAPSHOT_SEQ_INVALID
;
pSender
->
ack
=
SYNC_SNAPSHOT_SEQ_INVALID
;
...
...
@@ -53,8 +51,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender
->
pSyncNode
=
pSyncNode
;
pSender
->
replicaIndex
=
replicaIndex
;
pSender
->
term
=
pSyncNode
->
pRaftStore
->
currentTerm
;
pSender
->
privateTerm
=
timeNow
+
100
;
pSender
->
startTime
=
timeNow
;
pSender
->
startTime
=
0
;
pSender
->
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
(
pSender
->
pSyncNode
->
pFsm
,
&
(
pSender
->
snapshot
));
pSender
->
finish
=
false
;
}
else
{
...
...
@@ -88,88 +85,30 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
bool
snapshotSenderIsStart
(
SSyncSnapshotSender
*
pSender
)
{
return
pSender
->
start
;
}
// begin send snapshot by param, snapshot, pReader
//
// action:
// 1. assert reader not start
// 2. update state
// 3. send first snapshot block
int32_t
snapshotSenderStart
(
SSyncSnapshotSender
*
pSender
,
SSnapshotParam
snapshotParam
,
SSnapshot
snapshot
,
void
*
pReader
)
{
int32_t
snapshotSenderStart
(
SSyncSnapshotSender
*
pSender
)
{
ASSERT
(
!
snapshotSenderIsStart
(
pSender
));
// init snapshot, parm, reader
ASSERT
(
pSender
->
pReader
==
NULL
);
pSender
->
pReader
=
pReader
;
pSender
->
snapshot
=
snapshot
;
pSender
->
snapshotParam
=
snapshotParam
;
// init current block
if
(
pSender
->
pCurrentBlock
!=
NULL
)
{
taosMemoryFree
(
pSender
->
pCurrentBlock
);
}
pSender
->
blockLen
=
0
;
// update term
pSender
->
term
=
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
;
++
(
pSender
->
privateTerm
);
// increase private term
// update state
pSender
->
finish
=
false
;
pSender
->
start
=
true
;
pSender
->
seq
=
SYNC_SNAPSHOT_SEQ_BEGIN
;
pSender
->
ack
=
SYNC_SNAPSHOT_SEQ_INVALID
;
pSender
->
pReader
=
NULL
;
pSender
->
pCurrentBlock
=
NULL
;
pSender
->
blockLen
=
0
;
// init last config
if
(
pSender
->
snapshot
.
lastConfigIndex
!=
SYNC_INDEX_INVALID
)
{
int32_t
code
=
0
;
SSyncRaftEntry
*
pEntry
=
NULL
;
bool
getLastConfig
=
false
;
code
=
pSender
->
pSyncNode
->
pLogStore
->
syncLogGetEntry
(
pSender
->
pSyncNode
->
pLogStore
,
pSender
->
snapshot
.
lastConfigIndex
,
&
pEntry
);
if
(
code
==
0
&&
pEntry
!=
NULL
)
{
SRpcMsg
rpcMsg
;
syncEntry2OriginalRpc
(
pEntry
,
&
rpcMsg
);
SSyncCfg
lastConfig
;
int32_t
ret
=
syncCfgFromStr
(
rpcMsg
.
pCont
,
&
lastConfig
);
ASSERT
(
ret
==
0
);
pSender
->
lastConfig
=
lastConfig
;
getLastConfig
=
true
;
rpcFreeCont
(
rpcMsg
.
pCont
);
syncEntryDestory
(
pEntry
);
}
else
{
if
(
pSender
->
snapshot
.
lastConfigIndex
==
pSender
->
pSyncNode
->
pRaftCfg
->
lastConfigIndex
)
{
sTrace
(
"vgId:%d, sync sender get cfg from local"
,
pSender
->
pSyncNode
->
vgId
);
pSender
->
lastConfig
=
pSender
->
pSyncNode
->
pRaftCfg
->
cfg
;
getLastConfig
=
true
;
}
}
pSender
->
snapshotParam
.
start
=
SYNC_INDEX_INVALID
;
pSender
->
snapshotParam
.
end
=
SYNC_INDEX_INVALID
;
// last config not found in wal, update to -1
if
(
!
getLastConfig
)
{
SyncIndex
oldLastConfigIndex
=
pSender
->
snapshot
.
lastConfigIndex
;
SyncIndex
newLastConfigIndex
=
SYNC_INDEX
_INVALID
;
pSender
->
snapshot
.
data
=
NULL
;
pSender
->
snapshotParam
.
end
=
SYNC_INDEX_INVALID
;
pSender
->
snapshot
.
lastApplyIndex
=
SYNC_INDEX_INVALID
;
pSender
->
snapshot
.
lastApplyTerm
=
SYNC_TERM
_INVALID
;
pSender
->
snapshot
.
lastConfigIndex
=
SYNC_INDEX_INVALID
;
memset
(
&
(
pSender
->
lastConfig
),
0
,
sizeof
(
SSyncCfg
));
// event log
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"snapshot sender update lcindex from %"
PRId64
" to %"
PRId64
,
oldLastConfigIndex
,
newLastConfigIndex
);
char
*
eventLog
=
snapshotSender2SimpleStr
(
pSender
,
logBuf
);
syncNodeEventLog
(
pSender
->
pSyncNode
,
eventLog
);
taosMemoryFree
(
eventLog
);
}
while
(
0
);
}
}
else
{
// no last config
memset
(
&
(
pSender
->
lastConfig
),
0
,
sizeof
(
SSyncCfg
));
}
memset
(
&
(
pSender
->
lastConfig
),
0
,
sizeof
(
pSender
->
lastConfig
));
pSender
->
sendingMS
=
0
;
pSender
->
term
=
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
;
pSender
->
startTime
=
taosGetTimestampMs
();
pSender
->
finish
=
false
;
// build begin msg
SyncSnapshotSend
*
pMsg
=
syncSnapshotSendBuild
(
0
,
pSender
->
pSyncNode
->
vgId
);
...
...
@@ -181,8 +120,8 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapsho
pMsg
->
lastTerm
=
pSender
->
snapshot
.
lastApplyTerm
;
pMsg
->
lastConfigIndex
=
pSender
->
snapshot
.
lastConfigIndex
;
pMsg
->
lastConfig
=
pSender
->
lastConfig
;
pMsg
->
s
eq
=
pSender
->
seq
;
// SYNC_SNAPSHOT_SEQ_BEGIN
pMsg
->
privateTerm
=
pSender
->
privateTerm
;
pMsg
->
s
tartTime
=
pSender
->
startTime
;
pMsg
->
seq
=
SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT
;
// send msg
SRpcMsg
rpcMsg
;
...
...
@@ -201,6 +140,10 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapsho
}
int32_t
snapshotSenderStop
(
SSyncSnapshotSender
*
pSender
,
bool
finish
)
{
// update flag
pSender
->
start
=
false
;
pSender
->
finish
=
finish
;
// close reader
if
(
pSender
->
pReader
!=
NULL
)
{
int32_t
ret
=
pSender
->
pSyncNode
->
pFsm
->
FpSnapshotStopRead
(
pSender
->
pSyncNode
->
pFsm
,
pSender
->
pReader
);
...
...
@@ -215,12 +158,6 @@ int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
pSender
->
blockLen
=
0
;
}
// update flag
pSender
->
start
=
false
;
pSender
->
finish
=
finish
;
// do not update term, maybe print
// event log
do
{
char
*
eventLog
=
snapshotSender2SimpleStr
(
pSender
,
"snapshot sender stop"
);
...
...
@@ -263,7 +200,9 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
pMsg
->
lastConfigIndex
=
pSender
->
snapshot
.
lastConfigIndex
;
pMsg
->
lastConfig
=
pSender
->
lastConfig
;
pMsg
->
seq
=
pSender
->
seq
;
pMsg
->
privateTerm
=
pSender
->
privateTerm
;
// pMsg->privateTerm = pSender->privateTerm;
memcpy
(
pMsg
->
data
,
pSender
->
pCurrentBlock
,
pSender
->
blockLen
);
// send msg
...
...
@@ -302,7 +241,9 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
pMsg
->
lastConfigIndex
=
pSender
->
snapshot
.
lastConfigIndex
;
pMsg
->
lastConfig
=
pSender
->
lastConfig
;
pMsg
->
seq
=
pSender
->
seq
;
pMsg
->
privateTerm
=
pSender
->
privateTerm
;
// pMsg->privateTerm = pSender->privateTerm;
memcpy
(
pMsg
->
data
,
pSender
->
pCurrentBlock
,
pSender
->
blockLen
);
// send msg
...
...
@@ -367,8 +308,10 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
cJSON_AddNumberToObject
(
pRoot
,
"replicaIndex"
,
pSender
->
replicaIndex
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRIu64
,
pSender
->
term
);
cJSON_AddStringToObject
(
pRoot
,
"term"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRIu64
,
pSender
->
privateTerm
);
cJSON_AddStringToObject
(
pRoot
,
"privateTerm"
,
u64buf
);
// snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pSender->privateTerm);
// cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
cJSON_AddNumberToObject
(
pRoot
,
"finish"
,
pSender
->
finish
);
}
...
...
@@ -395,30 +338,38 @@ char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
snprintf
(
s
,
len
,
"%s {%p s-param:%"
PRId64
" e-param:%"
PRId64
" laindex:%"
PRId64
" laterm:%"
PRIu64
" lcindex:%"
PRId64
" seq:%d ack:%d finish:%d pterm:%"
PRIu64
" "
"replica-index:%d %s:%d}"
,
" seq:%d ack:%d finish:%d replica-index:%d %s:%d}"
,
event
,
pSender
,
pSender
->
snapshotParam
.
start
,
pSender
->
snapshotParam
.
end
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
finish
,
pSender
->
privateTerm
,
pSender
->
replicaIndex
,
host
,
port
);
pSender
->
finish
,
pSender
->
replicaIndex
,
host
,
port
);
return
s
;
}
int32_t
syncNodeStartSnapshot
(
SSyncNode
*
pSyncNode
,
SRaftId
*
pDestId
)
{
// calculate <start, end> index
syncNodeEventLog
(
pSyncNode
,
"start snapshot ..."
);
syncNodeEventLog
(
pSyncNode
,
"starting snapshot ..."
);
SSyncSnapshotSender
*
pSender
=
syncNodeGetSnapshotSender
(
pSyncNode
,
pDestId
);
if
(
pSender
==
NULL
)
{
// create sender
}
else
{
// if <start, end> is same
// return 0;
syncNodeErrorLog
(
pSyncNode
,
"start snapshot error, sender is null"
);
return
-
1
;
}
// send begin msg
int32_t
code
=
0
;
if
(
snapshotSenderIsStart
(
pSender
))
{
code
=
snapshotSenderStop
(
pSender
,
false
);
if
(
code
!=
0
)
{
syncNodeErrorLog
(
pSyncNode
,
"snapshot sender stop error"
);
return
-
1
;
}
}
code
=
snapshotSenderStart
(
pSender
);
if
(
code
!=
0
)
{
syncNodeErrorLog
(
pSyncNode
,
"snapshot sender start error"
);
return
-
1
;
}
return
0
;
}
...
...
@@ -440,7 +391,6 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from
pReceiver
->
pSyncNode
=
pSyncNode
;
pReceiver
->
fromId
=
fromId
;
pReceiver
->
term
=
pSyncNode
->
pRaftStore
->
currentTerm
;
pReceiver
->
privateTerm
=
0
;
pReceiver
->
snapshot
.
data
=
NULL
;
pReceiver
->
snapshot
.
lastApplyIndex
=
SYNC_INDEX_INVALID
;
pReceiver
->
snapshot
.
lastApplyTerm
=
0
;
...
...
@@ -474,25 +424,27 @@ bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceive
// receive first snapshot data
// write first block data
static
void
snapshotReceiverDoStart
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncSnapshotSend
*
pBeginMsg
)
{
// update state
pReceiver
->
term
=
pReceiver
->
pSyncNode
->
pRaftStore
->
currentTerm
;
pReceiver
->
privateTerm
=
pBeginMsg
->
privateTerm
;
pReceiver
->
start
=
true
;
pReceiver
->
ack
=
SYNC_SNAPSHOT_SEQ_BEGIN
;
// start writer
ASSERT
(
pReceiver
->
pWriter
==
NULL
);
int32_t
ret
=
pReceiver
->
pSyncNode
->
pFsm
->
FpSnapshotStartWrite
(
pReceiver
->
pSyncNode
->
pFsm
,
&
(
pReceiver
->
snapshotParam
),
&
(
pReceiver
->
pWriter
));
ASSERT
(
ret
==
0
);
pReceiver
->
term
=
pReceiver
->
pSyncNode
->
pRaftStore
->
currentTerm
;
pReceiver
->
snapshotParam
.
start
=
pBeginMsg
->
beginIndex
;
pReceiver
->
snapshotParam
.
end
=
pBeginMsg
->
lastIndex
;
pReceiver
->
fromId
=
pBeginMsg
->
srcId
;
pReceiver
->
start
=
true
;
// update snapshot
pReceiver
->
snapshot
.
lastApplyIndex
=
pBeginMsg
->
lastIndex
;
pReceiver
->
snapshot
.
lastApplyTerm
=
pBeginMsg
->
lastTerm
;
pReceiver
->
snapshot
.
lastConfigIndex
=
pBeginMsg
->
lastConfigIndex
;
pReceiver
->
snapshotParam
.
start
=
pBeginMsg
->
beginIndex
;
pReceiver
->
snapshotParam
.
end
=
pBeginMsg
->
lastIndex
;
// start writer
ASSERT
(
pReceiver
->
pWriter
==
NULL
);
int32_t
ret
=
pReceiver
->
pSyncNode
->
pFsm
->
FpSnapshotStartWrite
(
pReceiver
->
pSyncNode
->
pFsm
,
&
(
pReceiver
->
snapshotParam
),
&
(
pReceiver
->
pWriter
));
ASSERT
(
ret
==
0
);
pReceiver
->
startTime
=
pBeginMsg
->
startTime
;
// event log
do
{
...
...
@@ -523,23 +475,10 @@ void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
}
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again
int32_t
snapshotReceiverStart
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncSnapshotSend
*
pBeginMsg
)
{
if
(
!
snapshotReceiverIsStart
(
pReceiver
))
{
// first start
ASSERT
(
!
snapshotReceiverIsStart
(
pReceiver
));
snapshotReceiverDoStart
(
pReceiver
,
pBeginMsg
);
}
else
{
// already start
sInfo
(
"vgId:%d, snapshot recv, receiver already start"
,
pReceiver
->
pSyncNode
->
vgId
);
// force close, abandon incomplete data
snapshotReceiverForceStop
(
pReceiver
);
// start again
snapshotReceiverDoStart
(
pReceiver
,
pBeginMsg
);
}
return
0
;
}
...
...
@@ -698,8 +637,8 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRIu64
,
pReceiver
->
term
);
cJSON_AddStringToObject
(
pRoot
,
"term"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRI
u64
,
pReceiver
->
privateTerm
);
cJSON_AddStringToObject
(
pRoot
,
"
privateTerm
"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRI
d64
,
pReceiver
->
startTime
);
cJSON_AddStringToObject
(
pRoot
,
"
startTime
"
,
u64buf
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
...
...
@@ -724,38 +663,204 @@ char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event)
syncUtilU642Addr
(
fromId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
snprintf
(
s
,
len
,
"%s {%p start:%d ack:%d term:%"
PRIu64
"
pterm:%"
PRIu64
" from:%s:%d s-param:%"
PRId64
" e
-param:%"
PRId64
" laindex:%"
PRId64
" laterm:%"
PRIu64
"%s {%p start:%d ack:%d term:%"
PRIu64
"
start-time:%"
PRId64
" from:%s:%d s
-param:%"
PRId64
"
e-param:%"
PRId64
"
laindex:%"
PRId64
" laterm:%"
PRIu64
" "
"lcindex:%"
PRId64
"}"
,
event
,
pReceiver
,
pReceiver
->
start
,
pReceiver
->
ack
,
pReceiver
->
term
,
pReceiver
->
privateTerm
,
host
,
port
,
event
,
pReceiver
,
pReceiver
->
start
,
pReceiver
->
ack
,
pReceiver
->
term
,
pReceiver
->
startTime
,
host
,
port
,
pReceiver
->
snapshotParam
.
start
,
pReceiver
->
snapshotParam
.
end
,
pReceiver
->
snapshot
.
lastApplyIndex
,
pReceiver
->
snapshot
.
lastApplyTerm
,
pReceiver
->
snapshot
.
lastConfigIndex
);
return
s
;
}
SyncIndex
syncNodeGetSnapBeginIndex
(
SSyncNode
*
ths
)
{
SyncIndex
snapStart
=
SYNC_INDEX_INVALID
;
if
(
syncNodeIsMnode
(
ths
))
{
snapStart
=
SYNC_INDEX_BEGIN
;
}
else
{
SSyncLogStoreData
*
pData
=
ths
->
pLogStore
->
data
;
SWal
*
pWal
=
pData
->
pWal
;
bool
isEmpty
=
ths
->
pLogStore
->
syncLogIsEmpty
(
ths
->
pLogStore
);
int64_t
walCommitVer
=
walGetCommittedVer
(
pWal
);
if
(
!
isEmpty
&&
ths
->
commitIndex
!=
walCommitVer
)
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"commit not same, wal-commit:%"
PRId64
", commit:%"
PRId64
", ignore"
,
walCommitVer
,
ths
->
commitIndex
);
syncNodeErrorLog
(
ths
,
logBuf
);
snapStart
=
walCommitVer
+
1
;
}
else
{
snapStart
=
ths
->
commitIndex
+
1
;
}
}
return
snapStart
;
}
static
int32_t
syncNodeOnSnapshotPre
(
SSyncNode
*
pSyncNode
,
SyncSnapshotSend
*
pMsg
)
{
SSyncSnapshotReceiver
*
pReceiver
=
pSyncNode
->
pNewNodeReceiver
;
if
(
snapshotReceiverIsStart
(
pReceiver
))
{
// already start
if
(
pMsg
->
startTime
>
pReceiver
->
startTime
)
{
goto
_START_RECEIVER
;
}
else
if
(
pMsg
->
startTime
==
pReceiver
->
startTime
)
{
goto
_SEND_REPLY
;
}
else
{
// ignore
return
0
;
}
}
else
{
// start new
goto
_START_RECEIVER
;
}
_START_RECEIVER:
if
(
taosGetTimestampMs
()
-
pMsg
->
startTime
>
SNAPSHOT_MAX_CLOCK_SKEW_MS
)
{
syncNodeErrorLog
(
pSyncNode
,
"snapshot receiver time skew too much"
);
return
-
1
;
}
else
{
// waiting for clock match
while
(
taosGetTimestampMs
()
>
pMsg
->
startTime
)
{
taosMsleep
(
10
);
}
snapshotReceiverStart
(
pReceiver
,
pMsg
);
// set start-time same with sender
}
_SEND_REPLY:
// build msg
;
// make complier happy
SyncSnapshotRsp
*
pRspMsg
=
syncSnapshotRspBuild
(
pSyncNode
->
vgId
);
pRspMsg
->
srcId
=
pSyncNode
->
myRaftId
;
pRspMsg
->
destId
=
pMsg
->
srcId
;
pRspMsg
->
term
=
pSyncNode
->
pRaftStore
->
currentTerm
;
pRspMsg
->
lastIndex
=
pMsg
->
lastIndex
;
pRspMsg
->
lastTerm
=
pMsg
->
lastTerm
;
pRspMsg
->
startTime
=
pReceiver
->
startTime
;
pRspMsg
->
ack
=
pMsg
->
seq
;
// receiver maybe already closed
pRspMsg
->
code
=
0
;
pRspMsg
->
snapBeginIndex
=
syncNodeGetSnapBeginIndex
(
pSyncNode
);
// send msg
SRpcMsg
rpcMsg
;
syncSnapshotRsp2RpcMsg
(
pRspMsg
,
&
rpcMsg
);
syncNodeSendMsgById
(
&
(
pRspMsg
->
destId
),
pSyncNode
,
&
rpcMsg
);
syncSnapshotRspDestroy
(
pRspMsg
);
return
0
;
}
static
int32_t
syncNodeOnSnapshotBegin
(
SSyncNode
*
pSyncNode
,
SyncSnapshotSend
*
pMsg
)
{
// condition 1
SSyncSnapshotReceiver
*
pReceiver
=
pSyncNode
->
pNewNodeReceiver
;
if
(
snapshotReceiverIsStart
(
pReceiver
))
{
if
(
pMsg
->
startTime
>
pReceiver
->
startTime
)
{
snapshotReceiverStop
(
pReceiver
);
}
else
if
(
pMsg
->
startTime
==
pReceiver
->
startTime
)
{
return
0
;
}
else
{
// ignore
syncNodeEventLog
(
pSyncNode
,
"msg ignore"
);
return
0
;
}
}
_START_RECEIVER:
if
(
taosGetTimestampMs
()
-
pMsg
->
startTime
>
SNAPSHOT_MAX_CLOCK_SKEW_MS
)
{
syncNodeErrorLog
(
pSyncNode
,
"snapshot receiver time skew too much"
);
return
-
1
;
}
else
{
// waiting for clock match
while
(
taosGetTimestampMs
()
>
pMsg
->
startTime
)
{
taosMsleep
(
10
);
}
snapshotReceiverStart
(
pReceiver
,
pMsg
);
// build msg
SyncSnapshotRsp
*
pRspMsg
=
syncSnapshotRspBuild
(
pSyncNode
->
vgId
);
pRspMsg
->
srcId
=
pSyncNode
->
myRaftId
;
pRspMsg
->
destId
=
pMsg
->
srcId
;
pRspMsg
->
term
=
pSyncNode
->
pRaftStore
->
currentTerm
;
pRspMsg
->
lastIndex
=
pMsg
->
lastIndex
;
pRspMsg
->
lastTerm
=
pMsg
->
lastTerm
;
pRspMsg
->
ack
=
pReceiver
->
ack
;
// receiver maybe already closed
pRspMsg
->
code
=
0
;
// send msg
SRpcMsg
rpcMsg
;
syncSnapshotRsp2RpcMsg
(
pRspMsg
,
&
rpcMsg
);
syncNodeSendMsgById
(
&
(
pRspMsg
->
destId
),
pSyncNode
,
&
rpcMsg
);
syncSnapshotRspDestroy
(
pRspMsg
);
}
return
0
;
}
static
int32_t
syncNodeOnSnapshotTransfer
(
SSyncNode
*
pSyncNode
,
SyncSnapshotSend
*
pMsg
)
{
return
0
;
}
static
int32_t
syncNodeOnSnapshotEnd
(
SSyncNode
*
pSyncNode
,
SyncSnapshotSend
*
pMsg
)
{
return
0
;
}
// receiver on message
//
// condition 1, recv SYNC_SNAPSHOT_SEQ_BEGIN, start receiver, update privateTerm
// condition 2, recv SYNC_SNAPSHOT_SEQ_END, finish receiver(apply snapshot data, update commit index, maybe reconfig)
// condition 3, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close
// condition 4, got data, update ack
// condition 1, recv SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT
// if receiver already start
// if sender.start-time > receiver.start-time, restart receiver(reply snapshot start)
// if sender.start-time = receiver.start-time, maybe duplicate msg
// if sender.start-time < receiver.start-time, ignore
// else
// waiting for clock match
// start receiver(reply snapshot start)
//
// condition 2, recv SYNC_SNAPSHOT_SEQ_BEGIN
// a. create writer with <begin, end>
//
// condition 3, recv SYNC_SNAPSHOT_SEQ_END, finish receiver(apply snapshot data, update commit index, maybe reconfig)
//
// condition 4, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close
//
// condition 5, got data, update ack
//
int32_t
syncNodeOnSnapshot
(
SSyncNode
*
pSyncNode
,
SyncSnapshotSend
*
pMsg
)
{
// get receiver
SSyncSnapshotReceiver
*
pReceiver
=
pSyncNode
->
pNewNodeReceiver
;
bool
needRsp
=
false
;
// if already drop replica, do not process
if
(
!
syncNodeInRaftGroup
(
pSyncNode
,
&
(
pMsg
->
srcId
)))
{
syncLogRecvSyncSnapshotSend
(
pSyncNode
,
pMsg
,
"not in my config"
);
return
0
;
}
if
(
pMsg
->
term
<
pSyncNode
->
pRaftStore
->
currentTerm
)
{
syncLogRecvSyncSnapshotSend
(
pSyncNode
,
pMsg
,
"reject, small term"
);
return
0
;
}
if
(
pMsg
->
term
>
pSyncNode
->
pRaftStore
->
currentTerm
)
{
syncNodeStepDown
(
pSyncNode
,
pMsg
->
term
);
}
syncNodeResetElectTimer
(
pSyncNode
);
int32_t
code
=
0
;
SSyncSnapshotReceiver
*
pReceiver
=
pSyncNode
->
pNewNodeReceiver
;
// state, term, seq/ack
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_FOLLOWER
)
{
if
(
pMsg
->
term
==
pSyncNode
->
pRaftStore
->
currentTerm
)
{
if
(
pMsg
->
seq
==
SYNC_SNAPSHOT_SEQ_
BEGIN
)
{
// condition 1
// begin, no data
snapshotReceiverStart
(
pReceiver
,
pMsg
);
needRsp
=
true
;
if
(
pMsg
->
seq
==
SYNC_SNAPSHOT_SEQ_
PRE_SNAPSHOT
)
{
syncNodeOnSnapshotPre
(
pSyncNode
,
pMsg
);
}
else
if
(
pMsg
->
seq
==
SYNC_SNAPSHOT_SEQ_BEGIN
)
{
syncNodeOnSnapshotBegin
(
pSyncNode
,
pMsg
)
;
}
else
if
(
pMsg
->
seq
==
SYNC_SNAPSHOT_SEQ_END
)
{
// condition 2
...
...
@@ -764,7 +869,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if
(
code
==
0
)
{
snapshotReceiverStop
(
pReceiver
);
}
needRsp
=
true
;
bool
needRsp
=
true
;
// maybe update lastconfig
if
(
pMsg
->
lastConfigIndex
>=
SYNC_INDEX_BEGIN
)
{
...
...
@@ -782,7 +887,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// condition 3
// force close
snapshotReceiverForceStop
(
pReceiver
);
needRsp
=
false
;
bool
needRsp
=
false
;
}
else
if
(
pMsg
->
seq
>
SYNC_SNAPSHOT_SEQ_BEGIN
&&
pMsg
->
seq
<
SYNC_SNAPSHOT_SEQ_END
)
{
// condition 4
...
...
@@ -790,7 +895,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if
(
pMsg
->
seq
==
pReceiver
->
ack
+
1
)
{
snapshotReceiverGotData
(
pReceiver
,
pMsg
);
}
needRsp
=
true
;
bool
needRsp
=
true
;
}
else
{
// error log
...
...
@@ -805,26 +910,6 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
return
-
1
;
}
// send ack
if
(
needRsp
)
{
// build msg
SyncSnapshotRsp
*
pRspMsg
=
syncSnapshotRspBuild
(
pSyncNode
->
vgId
);
pRspMsg
->
srcId
=
pSyncNode
->
myRaftId
;
pRspMsg
->
destId
=
pMsg
->
srcId
;
pRspMsg
->
term
=
pSyncNode
->
pRaftStore
->
currentTerm
;
pRspMsg
->
lastIndex
=
pMsg
->
lastIndex
;
pRspMsg
->
lastTerm
=
pMsg
->
lastTerm
;
pRspMsg
->
ack
=
pReceiver
->
ack
;
// receiver maybe already closed
pRspMsg
->
code
=
0
;
pRspMsg
->
privateTerm
=
pReceiver
->
privateTerm
;
// receiver maybe already closed
// send msg
SRpcMsg
rpcMsg
;
syncSnapshotRsp2RpcMsg
(
pRspMsg
,
&
rpcMsg
);
syncNodeSendMsgById
(
&
(
pRspMsg
->
destId
),
pSyncNode
,
&
rpcMsg
);
syncSnapshotRspDestroy
(
pRspMsg
);
}
}
else
{
// error log
do
{
...
...
@@ -849,6 +934,52 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
return
0
;
}
int32_t
syncNodeOnSnapshotReplyPre
(
SSyncNode
*
pSyncNode
,
SyncSnapshotRsp
*
pMsg
)
{
// get sender
SSyncSnapshotSender
*
pSender
=
syncNodeGetSnapshotSender
(
pSyncNode
,
&
(
pMsg
->
srcId
));
ASSERT
(
pSender
!=
NULL
);
SSnapshot
snapshot
;
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
(
pSyncNode
->
pFsm
,
&
snapshot
);
// prepare <begin, end>
pSender
->
snapshotParam
.
start
=
pMsg
->
snapBeginIndex
;
pSender
->
snapshotParam
.
end
=
snapshot
.
lastApplyIndex
;
if
(
pMsg
->
snapBeginIndex
>
snapshot
.
lastApplyIndex
)
{
syncNodeErrorLog
(
pSyncNode
,
"snapshot last index too small"
);
return
-
1
;
}
// start reader
int32_t
code
=
pSyncNode
->
pFsm
->
FpSnapshotStartRead
(
pSyncNode
->
pFsm
,
&
(
pSender
->
snapshotParam
),
&
(
pSender
->
pReader
));
if
(
code
!=
0
)
{
syncNodeErrorLog
(
pSyncNode
,
"create snapshot reader error"
);
return
-
1
;
}
// build begin msg
SyncSnapshotSend
*
pSendMsg
=
syncSnapshotSendBuild
(
0
,
pSender
->
pSyncNode
->
vgId
);
pSendMsg
->
srcId
=
pSender
->
pSyncNode
->
myRaftId
;
pSendMsg
->
destId
=
(
pSender
->
pSyncNode
->
replicasId
)[
pSender
->
replicaIndex
];
pSendMsg
->
term
=
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
;
pSendMsg
->
beginIndex
=
pSender
->
snapshotParam
.
start
;
pSendMsg
->
lastIndex
=
pSender
->
snapshot
.
lastApplyIndex
;
pSendMsg
->
lastTerm
=
pSender
->
snapshot
.
lastApplyTerm
;
pSendMsg
->
lastConfigIndex
=
pSender
->
snapshot
.
lastConfigIndex
;
pSendMsg
->
lastConfig
=
pSender
->
lastConfig
;
pSendMsg
->
startTime
=
pSender
->
startTime
;
pSendMsg
->
seq
=
SYNC_SNAPSHOT_SEQ_BEGIN
;
// send msg
SRpcMsg
rpcMsg
;
syncSnapshotSend2RpcMsg
(
pSendMsg
,
&
rpcMsg
);
syncNodeSendMsgById
(
&
(
pSendMsg
->
destId
),
pSender
->
pSyncNode
,
&
rpcMsg
);
syncSnapshotSendDestroy
(
pSendMsg
);
return
0
;
}
// sender on message
//
// condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender
...
...
@@ -857,8 +988,8 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
//
int32_t
syncNodeOnSnapshotReply
(
SSyncNode
*
pSyncNode
,
SyncSnapshotRsp
*
pMsg
)
{
// if already drop replica, do not process
if
(
!
syncNodeInRaftGroup
(
pSyncNode
,
&
(
pMsg
->
srcId
))
&&
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
s
Error
(
"vgId:%d, recv sync-snapshot-rsp, maybe replica already dropped"
,
pSyncNode
->
vgId
);
if
(
!
syncNodeInRaftGroup
(
pSyncNode
,
&
(
pMsg
->
srcId
)))
{
s
yncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"maybe replica already dropped"
);
return
-
1
;
}
...
...
@@ -866,17 +997,26 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
SSyncSnapshotSender
*
pSender
=
syncNodeGetSnapshotSender
(
pSyncNode
,
&
(
pMsg
->
srcId
));
ASSERT
(
pSender
!=
NULL
);
if
(
pMsg
->
startTime
!=
pSender
->
startTime
)
{
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"sender/receiver start time not match"
);
return
-
1
;
}
// state, term, seq/ack
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
if
(
pMsg
->
term
==
pSyncNode
->
pRaftStore
->
currentTerm
)
{
// condition 1
// prepare <begin, end>, send begin msg
if
(
pMsg
->
ack
==
SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT
)
{
syncNodeOnSnapshotReplyPre
(
pSyncNode
,
pMsg
);
return
0
;
}
// receive ack is finish, close sender
if
(
pMsg
->
ack
==
SYNC_SNAPSHOT_SEQ_END
)
{
snapshotSenderStop
(
pSender
,
true
);
return
0
;
}
// condition 2
// send next msg
if
(
pMsg
->
ack
==
pSender
->
seq
)
{
// update sender ack
...
...
@@ -922,3 +1062,63 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
return
0
;
}
int32_t
syncNodeOnPreSnapshot
(
SSyncNode
*
ths
,
SyncPreSnapshot
*
pMsg
)
{
syncLogRecvSyncPreSnapshot
(
ths
,
pMsg
,
""
);
SyncPreSnapshotReply
*
pMsgReply
=
syncPreSnapshotReplyBuild
(
ths
->
vgId
);
pMsgReply
->
srcId
=
ths
->
myRaftId
;
pMsgReply
->
destId
=
pMsg
->
srcId
;
pMsgReply
->
term
=
ths
->
pRaftStore
->
currentTerm
;
SSyncLogStoreData
*
pData
=
ths
->
pLogStore
->
data
;
SWal
*
pWal
=
pData
->
pWal
;
if
(
syncNodeIsMnode
(
ths
))
{
pMsgReply
->
snapStart
=
SYNC_INDEX_BEGIN
;
}
else
{
bool
isEmpty
=
ths
->
pLogStore
->
syncLogIsEmpty
(
ths
->
pLogStore
);
int64_t
walCommitVer
=
walGetCommittedVer
(
pWal
);
if
(
!
isEmpty
&&
ths
->
commitIndex
!=
walCommitVer
)
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"commit not same, wal-commit:%"
PRId64
", commit:%"
PRId64
", ignore"
,
walCommitVer
,
ths
->
commitIndex
);
syncNodeErrorLog
(
ths
,
logBuf
);
goto
_IGNORE
;
}
pMsgReply
->
snapStart
=
ths
->
commitIndex
+
1
;
// make local log clean
int32_t
code
=
ths
->
pLogStore
->
syncLogTruncate
(
ths
->
pLogStore
,
pMsgReply
->
snapStart
);
if
(
code
!=
0
)
{
syncNodeErrorLog
(
ths
,
"truncate wal error"
);
goto
_IGNORE
;
}
}
// can not write behind _RESPONSE
SRpcMsg
rpcMsg
;
_RESPONSE:
syncPreSnapshotReply2RpcMsg
(
pMsgReply
,
&
rpcMsg
);
syncNodeSendMsgById
(
&
pMsgReply
->
destId
,
ths
,
&
rpcMsg
);
syncPreSnapshotReplyDestroy
(
pMsgReply
);
return
0
;
_IGNORE:
syncPreSnapshotReplyDestroy
(
pMsgReply
);
return
0
;
}
int32_t
syncNodeOnPreSnapshotReply
(
SSyncNode
*
ths
,
SyncPreSnapshotReply
*
pMsg
)
{
syncLogRecvSyncPreSnapshotReply
(
ths
,
pMsg
,
""
);
// start snapshot
return
0
;
}
\ No newline at end of file
source/libs/sync/test/CMakeLists.txt
浏览文件 @
0d608172
...
...
@@ -60,6 +60,8 @@ add_executable(syncRaftCfgIndexTest "")
add_executable
(
syncHeartbeatTest
""
)
add_executable
(
syncHeartbeatReplyTest
""
)
add_executable
(
syncLocalCmdTest
""
)
add_executable
(
syncPreSnapshotTest
""
)
add_executable
(
syncPreSnapshotReplyTest
""
)
target_sources
(
syncTest
...
...
@@ -310,6 +312,14 @@ target_sources(syncLocalCmdTest
PRIVATE
"syncLocalCmdTest.cpp"
)
target_sources
(
syncPreSnapshotTest
PRIVATE
"syncPreSnapshotTest.cpp"
)
target_sources
(
syncPreSnapshotReplyTest
PRIVATE
"syncPreSnapshotReplyTest.cpp"
)
target_include_directories
(
syncTest
...
...
@@ -622,6 +632,16 @@ target_include_directories(syncLocalCmdTest
"
${
TD_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
syncPreSnapshotTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
syncPreSnapshotReplyTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_link_libraries
(
syncTest
...
...
@@ -872,6 +892,14 @@ target_link_libraries(syncLocalCmdTest
sync
gtest_main
)
target_link_libraries
(
syncPreSnapshotTest
sync
gtest_main
)
target_link_libraries
(
syncPreSnapshotReplyTest
sync
gtest_main
)
enable_testing
()
...
...
source/libs/sync/test/syncConfigChangeSnapshotTest.cpp
浏览文件 @
0d608172
...
...
@@ -157,6 +157,7 @@ SSyncFSM* createFsm() {
SSyncFSM
*
pFsm
=
(
SSyncFSM
*
)
taosMemoryMalloc
(
sizeof
(
SSyncFSM
));
memset
(
pFsm
,
0
,
sizeof
(
*
pFsm
));
#if 0
pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb;
...
...
@@ -171,6 +172,7 @@ SSyncFSM* createFsm() {
pFsm->FpSnapshotDoWrite = SnapshotDoWrite;
pFsm->FpReConfigCb = ReConfigCb;
#endif
return
pFsm
;
}
...
...
source/libs/sync/test/syncConfigChangeTest.cpp
浏览文件 @
0d608172
...
...
@@ -33,7 +33,7 @@ void init() {
void
cleanup
()
{
walCleanUp
();
}
void
CommitCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
void
CommitCb
(
const
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
const
SFsmCbMeta
cbMeta
)
{
SyncIndex
beginIndex
=
SYNC_INDEX_INVALID
;
if
(
pFsm
->
FpGetSnapshotInfo
!=
NULL
)
{
SSnapshot
snapshot
;
...
...
@@ -52,7 +52,7 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
}
}
void
PreCommitCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
void
PreCommitCb
(
const
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
char
logBuf
[
256
]
=
{
0
};
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==PreCommitCb== pFsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s flag:%"
PRIu64
...
...
@@ -61,7 +61,7 @@ void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta)
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
void
RollBackCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
void
RollBackCb
(
const
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==RollBackCb== pFsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s flag:%"
PRIu64
"
\n
"
,
...
...
@@ -69,7 +69,7 @@ void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
int32_t
GetSnapshotCb
(
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
)
{
int32_t
GetSnapshotCb
(
const
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
)
{
pSnapshot
->
data
=
NULL
;
pSnapshot
->
lastApplyIndex
=
gSnapshotLastApplyIndex
;
pSnapshot
->
lastApplyTerm
=
100
;
...
...
@@ -87,6 +87,7 @@ SSyncFSM* createFsm() {
SSyncFSM
*
pFsm
=
(
SSyncFSM
*
)
taosMemoryMalloc
(
sizeof
(
SSyncFSM
));
memset
(
pFsm
,
0
,
sizeof
(
*
pFsm
));
#if 0
pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb;
...
...
@@ -95,6 +96,7 @@ SSyncFSM* createFsm() {
pFsm->FpRestoreFinishCb = RestoreFinishCb;
pFsm->FpReConfigCb = ReConfigCb;
#endif
return
pFsm
;
}
...
...
source/libs/sync/test/syncEnvTest.cpp
浏览文件 @
0d608172
...
...
@@ -26,12 +26,12 @@ int main() {
assert
(
ret
==
0
);
for
(
int
i
=
0
;
i
<
5
;
++
i
)
{
ret
=
syncEnvStartTimer
();
//
ret = syncEnvStartTimer();
assert
(
ret
==
0
);
taosMsleep
(
5000
);
ret
=
syncEnvStopTimer
();
//
ret = syncEnvStopTimer();
assert
(
ret
==
0
);
taosMsleep
(
5000
);
...
...
source/libs/sync/test/syncPreSnapshotReplyTest.cpp
0 → 100644
浏览文件 @
0d608172
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
sDebug
(
"--- sync log test: debug"
);
sInfo
(
"--- sync log test: info"
);
sWarn
(
"--- sync log test: warn"
);
sError
(
"--- sync log test: error"
);
sFatal
(
"--- sync log test: fatal"
);
}
SyncPreSnapshotReply
*
createMsg
()
{
SyncPreSnapshotReply
*
pMsg
=
syncPreSnapshotReplyBuild
(
789
);
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1234
);
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
5678
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
term
=
9527
;
pMsg
->
snapStart
=
12306
;
return
pMsg
;
}
void
test1
()
{
SyncPreSnapshotReply
*
pMsg
=
createMsg
();
syncPreSnapshotReplyLog2
((
char
*
)
"test1:"
,
pMsg
);
syncPreSnapshotReplyDestroy
(
pMsg
);
}
void
test2
()
{
SyncPreSnapshotReply
*
pMsg
=
createMsg
();
uint32_t
len
=
pMsg
->
bytes
;
char
*
serialized
=
(
char
*
)
taosMemoryMalloc
(
len
);
syncPreSnapshotReplySerialize
(
pMsg
,
serialized
,
len
);
SyncPreSnapshotReply
*
pMsg2
=
syncPreSnapshotReplyBuild
(
789
);
syncPreSnapshotReplyDeserialize
(
serialized
,
len
,
pMsg2
);
syncPreSnapshotReplyLog2
((
char
*
)
"test2: syncPreSnapshotReplySerialize -> syncPreSnapshotReplyDeserialize "
,
pMsg2
);
taosMemoryFree
(
serialized
);
syncPreSnapshotReplyDestroy
(
pMsg
);
syncPreSnapshotReplyDestroy
(
pMsg2
);
}
void
test3
()
{
SyncPreSnapshotReply
*
pMsg
=
createMsg
();
uint32_t
len
;
char
*
serialized
=
syncPreSnapshotReplySerialize2
(
pMsg
,
&
len
);
SyncPreSnapshotReply
*
pMsg2
=
syncPreSnapshotReplyDeserialize2
(
serialized
,
len
);
syncPreSnapshotReplyLog2
((
char
*
)
"test3: syncPreSnapshotReplySerialize2 -> syncPreSnapshotReplyDeserialize2 "
,
pMsg2
);
taosMemoryFree
(
serialized
);
syncPreSnapshotReplyDestroy
(
pMsg
);
syncPreSnapshotReplyDestroy
(
pMsg2
);
}
void
test4
()
{
SyncPreSnapshotReply
*
pMsg
=
createMsg
();
SRpcMsg
rpcMsg
;
syncPreSnapshotReply2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncPreSnapshotReply
*
pMsg2
=
(
SyncPreSnapshotReply
*
)
taosMemoryMalloc
(
rpcMsg
.
contLen
);
syncPreSnapshotReplyFromRpcMsg
(
&
rpcMsg
,
pMsg2
);
syncPreSnapshotReplyLog2
((
char
*
)
"test4: syncPreSnapshotReply2RpcMsg -> syncPreSnapshotReplyFromRpcMsg "
,
pMsg2
);
rpcFreeCont
(
rpcMsg
.
pCont
);
syncPreSnapshotReplyDestroy
(
pMsg
);
syncPreSnapshotReplyDestroy
(
pMsg2
);
}
void
test5
()
{
SyncPreSnapshotReply
*
pMsg
=
createMsg
();
SRpcMsg
rpcMsg
;
syncPreSnapshotReply2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncPreSnapshotReply
*
pMsg2
=
syncPreSnapshotReplyFromRpcMsg2
(
&
rpcMsg
);
syncPreSnapshotReplyLog2
((
char
*
)
"test5: syncPreSnapshotReply2RpcMsg -> syncPreSnapshotReplyFromRpcMsg2 "
,
pMsg2
);
rpcFreeCont
(
rpcMsg
.
pCont
);
syncPreSnapshotReplyDestroy
(
pMsg
);
syncPreSnapshotReplyDestroy
(
pMsg2
);
}
int
main
()
{
tsAsyncLog
=
0
;
sDebugFlag
=
DEBUG_TRACE
+
DEBUG_SCREEN
+
DEBUG_FILE
;
gRaftDetailLog
=
true
;
logTest
();
test1
();
test2
();
test3
();
test4
();
test5
();
return
0
;
}
source/libs/sync/test/syncPreSnapshotTest.cpp
0 → 100644
浏览文件 @
0d608172
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
sDebug
(
"--- sync log test: debug"
);
sInfo
(
"--- sync log test: info"
);
sWarn
(
"--- sync log test: warn"
);
sError
(
"--- sync log test: error"
);
sFatal
(
"--- sync log test: fatal"
);
}
SyncPreSnapshot
*
createMsg
()
{
SyncPreSnapshot
*
pMsg
=
syncPreSnapshotBuild
(
789
);
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1234
);
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
5678
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
term
=
9527
;
return
pMsg
;
}
void
test1
()
{
SyncPreSnapshot
*
pMsg
=
createMsg
();
syncPreSnapshotLog2
((
char
*
)
"test1:"
,
pMsg
);
syncPreSnapshotDestroy
(
pMsg
);
}
void
test2
()
{
SyncPreSnapshot
*
pMsg
=
createMsg
();
uint32_t
len
=
pMsg
->
bytes
;
char
*
serialized
=
(
char
*
)
taosMemoryMalloc
(
len
);
syncPreSnapshotSerialize
(
pMsg
,
serialized
,
len
);
SyncPreSnapshot
*
pMsg2
=
syncPreSnapshotBuild
(
789
);
syncPreSnapshotDeserialize
(
serialized
,
len
,
pMsg2
);
syncPreSnapshotLog2
((
char
*
)
"test2: syncPreSnapshotSerialize -> syncPreSnapshotDeserialize "
,
pMsg2
);
taosMemoryFree
(
serialized
);
syncPreSnapshotDestroy
(
pMsg
);
syncPreSnapshotDestroy
(
pMsg2
);
}
void
test3
()
{
SyncPreSnapshot
*
pMsg
=
createMsg
();
uint32_t
len
;
char
*
serialized
=
syncPreSnapshotSerialize2
(
pMsg
,
&
len
);
SyncPreSnapshot
*
pMsg2
=
syncPreSnapshotDeserialize2
(
serialized
,
len
);
syncPreSnapshotLog2
((
char
*
)
"test3: syncPreSnapshotSerialize2 -> syncPreSnapshotDeserialize2 "
,
pMsg2
);
taosMemoryFree
(
serialized
);
syncPreSnapshotDestroy
(
pMsg
);
syncPreSnapshotDestroy
(
pMsg2
);
}
void
test4
()
{
SyncPreSnapshot
*
pMsg
=
createMsg
();
SRpcMsg
rpcMsg
;
syncPreSnapshot2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncPreSnapshot
*
pMsg2
=
(
SyncPreSnapshot
*
)
taosMemoryMalloc
(
rpcMsg
.
contLen
);
syncPreSnapshotFromRpcMsg
(
&
rpcMsg
,
pMsg2
);
syncPreSnapshotLog2
((
char
*
)
"test4: syncPreSnapshot2RpcMsg -> syncPreSnapshotFromRpcMsg "
,
pMsg2
);
rpcFreeCont
(
rpcMsg
.
pCont
);
syncPreSnapshotDestroy
(
pMsg
);
syncPreSnapshotDestroy
(
pMsg2
);
}
void
test5
()
{
SyncPreSnapshot
*
pMsg
=
createMsg
();
SRpcMsg
rpcMsg
;
syncPreSnapshot2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncPreSnapshot
*
pMsg2
=
syncPreSnapshotFromRpcMsg2
(
&
rpcMsg
);
syncPreSnapshotLog2
((
char
*
)
"test5: syncPreSnapshot2RpcMsg -> syncPreSnapshotFromRpcMsg2 "
,
pMsg2
);
rpcFreeCont
(
rpcMsg
.
pCont
);
syncPreSnapshotDestroy
(
pMsg
);
syncPreSnapshotDestroy
(
pMsg2
);
}
int
main
()
{
tsAsyncLog
=
0
;
sDebugFlag
=
DEBUG_TRACE
+
DEBUG_SCREEN
+
DEBUG_FILE
;
gRaftDetailLog
=
true
;
logTest
();
test1
();
test2
();
test3
();
test4
();
test5
();
return
0
;
}
source/libs/sync/test/syncRaftLogTest2.cpp
浏览文件 @
0d608172
...
...
@@ -25,7 +25,7 @@ const char* pWalPath = "./syncLogStoreTest_wal";
SyncIndex
gSnapshotLastApplyIndex
;
SyncIndex
gSnapshotLastApplyTerm
;
int32_t
GetSnapshotCb
(
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
)
{
int32_t
GetSnapshotCb
(
const
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
)
{
pSnapshot
->
data
=
NULL
;
pSnapshot
->
lastApplyIndex
=
gSnapshotLastApplyIndex
;
pSnapshot
->
lastApplyTerm
=
gSnapshotLastApplyTerm
;
...
...
source/libs/sync/test/syncRaftLogTest3.cpp
浏览文件 @
0d608172
...
...
@@ -27,7 +27,7 @@ const char* pWalPath = "./syncLogStoreTest_wal";
SyncIndex
gSnapshotLastApplyIndex
;
SyncIndex
gSnapshotLastApplyTerm
;
int32_t
GetSnapshotCb
(
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
)
{
int32_t
GetSnapshotCb
(
const
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
)
{
pSnapshot
->
data
=
NULL
;
pSnapshot
->
lastApplyIndex
=
gSnapshotLastApplyIndex
;
pSnapshot
->
lastApplyTerm
=
gSnapshotLastApplyTerm
;
...
...
source/libs/sync/test/syncReplicateTest.cpp
浏览文件 @
0d608172
...
...
@@ -74,10 +74,14 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
SSyncFSM
*
createFsm
()
{
SSyncFSM
*
pFsm
=
(
SSyncFSM
*
)
taosMemoryMalloc
(
sizeof
(
SSyncFSM
));
#if 0
pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb;
pFsm->FpGetSnapshotInfo = GetSnapshotCb;
#endif
return
pFsm
;
}
...
...
source/libs/sync/test/syncSnapshotReceiverTest.cpp
浏览文件 @
0d608172
...
...
@@ -37,9 +37,12 @@ SSyncSnapshotReceiver* createReceiver() {
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosMemoryMalloc
(
sizeof
(
*
pSyncNode
));
pSyncNode
->
pRaftStore
=
(
SRaftStore
*
)
taosMemoryMalloc
(
sizeof
(
*
(
pSyncNode
->
pRaftStore
)));
pSyncNode
->
pFsm
=
(
SSyncFSM
*
)
taosMemoryMalloc
(
sizeof
(
*
(
pSyncNode
->
pFsm
)));
#if 0
pSyncNode->pFsm->FpSnapshotStartWrite = SnapshotStartWrite;
pSyncNode->pFsm->FpSnapshotStopWrite = SnapshotStopWrite;
pSyncNode->pFsm->FpSnapshotDoWrite = SnapshotDoWrite;
#endif
SRaftId
id
;
id
.
addr
=
syncUtilAddr2U64
(
"1.2.3.4"
,
99
);
...
...
@@ -50,7 +53,6 @@ SSyncSnapshotReceiver* createReceiver() {
pReceiver
->
ack
=
20
;
pReceiver
->
pWriter
=
(
void
*
)
0x11
;
pReceiver
->
term
=
66
;
pReceiver
->
privateTerm
=
99
;
return
pReceiver
;
}
...
...
source/libs/sync/test/syncSnapshotRspTest.cpp
浏览文件 @
0d608172
...
...
@@ -21,7 +21,7 @@ SyncSnapshotRsp *createMsg() {
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
5678
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
term
=
11
;
pMsg
->
privateTerm
=
99
;
pMsg
->
startTime
=
99
;
pMsg
->
lastIndex
=
22
;
pMsg
->
lastTerm
=
33
;
pMsg
->
ack
=
44
;
...
...
source/libs/sync/test/syncSnapshotSendTest.cpp
浏览文件 @
0d608172
...
...
@@ -21,7 +21,6 @@ SyncSnapshotSend *createMsg() {
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
5678
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
term
=
11
;
pMsg
->
privateTerm
=
99
;
pMsg
->
lastIndex
=
22
;
pMsg
->
lastTerm
=
33
;
...
...
source/libs/sync/test/syncSnapshotSenderTest.cpp
浏览文件 @
0d608172
...
...
@@ -37,10 +37,13 @@ SSyncSnapshotSender* createSender() {
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosMemoryMalloc
(
sizeof
(
*
pSyncNode
));
pSyncNode
->
pRaftStore
=
(
SRaftStore
*
)
taosMemoryMalloc
(
sizeof
(
*
(
pSyncNode
->
pRaftStore
)));
pSyncNode
->
pFsm
=
(
SSyncFSM
*
)
taosMemoryMalloc
(
sizeof
(
*
(
pSyncNode
->
pFsm
)));
#if 0
pSyncNode->pFsm->FpSnapshotStartRead = SnapshotStartRead;
pSyncNode->pFsm->FpSnapshotStopRead = SnapshotStopRead;
pSyncNode->pFsm->FpSnapshotDoRead = SnapshotDoRead;
pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshot;
#endif
SSyncSnapshotSender
*
pSender
=
snapshotSenderCreate
(
pSyncNode
,
2
);
pSender
->
start
=
true
;
...
...
@@ -55,7 +58,8 @@ SSyncSnapshotSender* createSender() {
pSender
->
snapshot
.
lastApplyTerm
=
88
;
pSender
->
sendingMS
=
77
;
pSender
->
term
=
66
;
pSender
->
privateTerm
=
99
;
//pSender->privateTerm = 99;
return
pSender
;
}
...
...
source/libs/sync/test/syncSnapshotTest.cpp
浏览文件 @
0d608172
...
...
@@ -33,7 +33,7 @@ SyncIndex snapshotLastApplyIndex = SYNC_INDEX_INVALID;
const
char
*
pDir
=
"./syncSnapshotTest"
;
const
char
*
pWalDir
=
"./syncSnapshotTest_wal"
;
void
CommitCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
void
CommitCb
(
const
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
SyncIndex
beginIndex
=
SYNC_INDEX_INVALID
;
if
(
pFsm
->
FpGetSnapshotInfo
!=
NULL
)
{
SSnapshot
snapshot
;
...
...
@@ -52,7 +52,7 @@ void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
}
}
void
PreCommitCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
void
PreCommitCb
(
const
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==PreCommitCb== pFsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s
\n
"
,
pFsm
,
...
...
@@ -60,7 +60,7 @@ void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta)
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
void
RollBackCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
void
RollBackCb
(
const
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==RollBackCb== pFsm:%p, index:%"
PRId64
", isWeak:%d, code:%d, state:%d %s
\n
"
,
pFsm
,
...
...
@@ -68,7 +68,7 @@ void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
int32_t
GetSnapshotCb
(
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
)
{
int32_t
GetSnapshotCb
(
const
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
)
{
pSnapshot
->
data
=
NULL
;
pSnapshot
->
lastApplyIndex
=
snapshotLastApplyIndex
;
pSnapshot
->
lastApplyTerm
=
100
;
...
...
@@ -78,10 +78,14 @@ int32_t GetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
void
initFsm
()
{
pFsm
=
(
SSyncFSM
*
)
taosMemoryMalloc
(
sizeof
(
SSyncFSM
));
memset
(
pFsm
,
0
,
sizeof
(
*
pFsm
));
#if 0
pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb;
pFsm->FpGetSnapshotInfo = GetSnapshotCb;
#endif
}
SSyncNode
*
syncNodeInit
()
{
...
...
source/libs/sync/test/syncTestTool.cpp
浏览文件 @
0d608172
...
...
@@ -177,6 +177,7 @@ SSyncFSM* createFsm() {
SSyncFSM
*
pFsm
=
(
SSyncFSM
*
)
taosMemoryMalloc
(
sizeof
(
SSyncFSM
));
memset
(
pFsm
,
0
,
sizeof
(
*
pFsm
));
#if 0
pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb;
...
...
@@ -193,6 +194,7 @@ SSyncFSM* createFsm() {
pFsm->FpSnapshotDoWrite = SnapshotDoWrite;
pFsm->FpLeaderTransferCb = LeaderTransferCb;
#endif
return
pFsm
;
}
...
...
source/libs/sync/test/syncWriteTest.cpp
浏览文件 @
0d608172
...
...
@@ -57,9 +57,13 @@ void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
void
initFsm
()
{
pFsm
=
(
SSyncFSM
*
)
taosMemoryMalloc
(
sizeof
(
SSyncFSM
));
#if 0
pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb;
#endif
}
SSyncNode
*
syncNodeInit
()
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录