Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9988e85f
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9988e85f
编写于
5月 31, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add test sender, receiver
上级
79229978
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
185 addition
and
22 deletion
+185
-22
source/libs/sync/inc/syncSnapshot.h
source/libs/sync/inc/syncSnapshot.h
+8
-8
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+2
-2
source/libs/sync/test/CMakeLists.txt
source/libs/sync/test/CMakeLists.txt
+28
-0
source/libs/sync/test/syncSnapshotReceiverTest.cpp
source/libs/sync/test/syncSnapshotReceiverTest.cpp
+65
-0
source/libs/sync/test/syncSnapshotRspTest.cpp
source/libs/sync/test/syncSnapshotRspTest.cpp
+6
-6
source/libs/sync/test/syncSnapshotSendTest.cpp
source/libs/sync/test/syncSnapshotSendTest.cpp
+6
-6
source/libs/sync/test/syncSnapshotSenderTest.cpp
source/libs/sync/test/syncSnapshotSenderTest.cpp
+70
-0
未找到文件。
source/libs/sync/inc/syncSnapshot.h
浏览文件 @
9988e85f
...
...
@@ -37,8 +37,8 @@ typedef struct SSyncSnapshotSender {
bool
start
;
int32_t
seq
;
int32_t
ack
;
void
*
pReader
;
void
*
pCurrentBlock
;
void
*
pReader
;
void
*
pCurrentBlock
;
int32_t
blockLen
;
SSnapshot
snapshot
;
int64_t
sendingMS
;
...
...
@@ -52,15 +52,15 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
void
snapshotSenderStart
(
SSyncSnapshotSender
*
pSender
);
void
snapshotSenderStop
(
SSyncSnapshotSender
*
pSender
);
int32_t
snapshotSend
(
SSyncSnapshotSender
*
pSender
);
cJSON
*
snapshotSender2Json
(
SSyncSnapshotSender
*
pSender
);
char
*
snapshotSender2Str
(
SSyncSnapshotSender
*
pSender
);
cJSON
*
snapshotSender2Json
(
SSyncSnapshotSender
*
pSender
);
char
*
snapshotSender2Str
(
SSyncSnapshotSender
*
pSender
);
typedef
struct
SSyncSnapshotReceiver
{
bool
start
;
int32_t
ack
;
void
*
pWriter
;
void
*
pCurrentBlock
;
void
*
pWriter
;
void
*
pCurrentBlock
;
int32_t
blockLen
;
SyncTerm
term
;
...
...
@@ -72,8 +72,8 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl
void
snapshotReceiverDestroy
(
SSyncSnapshotReceiver
*
pReceiver
);
void
snapshotReceiverStart
(
SSyncSnapshotReceiver
*
pReceiver
);
void
snapshotReceiverStop
(
SSyncSnapshotReceiver
*
pReceiver
);
cJSON
*
snapshotReceiver2Json
(
SSyncSnapshotReceiver
*
pReceiver
);
char
*
snapshotReceiver2Str
(
SSyncSnapshotReceiver
*
pReceiver
);
cJSON
*
snapshotReceiver2Json
(
SSyncSnapshotReceiver
*
pReceiver
);
char
*
snapshotReceiver2Str
(
SSyncSnapshotReceiver
*
pReceiver
);
int32_t
syncNodeOnSnapshotSendCb
(
SSyncNode
*
ths
,
SyncSnapshotSend
*
pMsg
);
int32_t
syncNodeOnSnapshotRspCb
(
SSyncNode
*
ths
,
SyncSnapshotRsp
*
pMsg
);
...
...
source/libs/sync/src/syncSnapshot.c
浏览文件 @
9988e85f
...
...
@@ -175,9 +175,9 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
cJSON
*
pSnapshot
=
cJSON_CreateObject
();
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pSender
->
snapshot
.
lastApplyIndex
);
cJSON_AddStringToObject
(
p
Ro
ot
,
"lastApplyIndex"
,
u64buf
);
cJSON_AddStringToObject
(
p
Snapsh
ot
,
"lastApplyIndex"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pSender
->
snapshot
.
lastApplyTerm
);
cJSON_AddStringToObject
(
p
Ro
ot
,
"lastApplyTerm"
,
u64buf
);
cJSON_AddStringToObject
(
p
Snapsh
ot
,
"lastApplyTerm"
,
u64buf
);
cJSON_AddItemToObject
(
pRoot
,
"snapshot"
,
pSnapshot
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pSender
->
sendingMS
);
...
...
source/libs/sync/test/CMakeLists.txt
浏览文件 @
9988e85f
...
...
@@ -40,6 +40,8 @@ add_executable(syncApplyMsgTest "")
add_executable
(
syncConfigChangeTest
""
)
add_executable
(
syncSnapshotSendTest
""
)
add_executable
(
syncSnapshotRspTest
""
)
add_executable
(
syncSnapshotSenderTest
""
)
add_executable
(
syncSnapshotReceiverTest
""
)
target_sources
(
syncTest
...
...
@@ -210,6 +212,14 @@ target_sources(syncSnapshotRspTest
PRIVATE
"syncSnapshotRspTest.cpp"
)
target_sources
(
syncSnapshotSenderTest
PRIVATE
"syncSnapshotSenderTest.cpp"
)
target_sources
(
syncSnapshotReceiverTest
PRIVATE
"syncSnapshotReceiverTest.cpp"
)
target_include_directories
(
syncTest
...
...
@@ -422,6 +432,16 @@ target_include_directories(syncSnapshotRspTest
"
${
TD_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
syncSnapshotSenderTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
syncSnapshotReceiverTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_link_libraries
(
syncTest
...
...
@@ -592,6 +612,14 @@ target_link_libraries(syncSnapshotRspTest
sync
gtest_main
)
target_link_libraries
(
syncSnapshotSenderTest
sync
gtest_main
)
target_link_libraries
(
syncSnapshotReceiverTest
sync
gtest_main
)
enable_testing
()
...
...
source/libs/sync/test/syncSnapshotReceiverTest.cpp
0 → 100644
浏览文件 @
9988e85f
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaftStore.h"
#include "syncSnapshot.h"
#include "syncUtil.h"
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
sDebug
(
"--- sync log test: debug"
);
sInfo
(
"--- sync log test: info"
);
sWarn
(
"--- sync log test: warn"
);
sError
(
"--- sync log test: error"
);
sFatal
(
"--- sync log test: fatal"
);
}
void
CommitCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{}
void
PreCommitCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{}
void
RollBackCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{}
void
RestoreFinishCb
(
struct
SSyncFSM
*
pFsm
)
{}
void
ReConfigCb
(
struct
SSyncFSM
*
pFsm
,
SSyncCfg
newCfg
,
SReConfigCbMeta
cbMeta
)
{}
int32_t
GetSnapshot
(
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
)
{
return
0
;
}
int32_t
SnapshotStartRead
(
struct
SSyncFSM
*
pFsm
,
void
**
ppReader
)
{
return
0
;
}
int32_t
SnapshotStopRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pReader
)
{
return
0
;
}
int32_t
SnapshotDoRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pReader
,
void
**
ppBuf
,
int32_t
*
len
)
{
return
0
;
}
int32_t
SnapshotStartWrite
(
struct
SSyncFSM
*
pFsm
,
void
**
ppWriter
)
{
return
0
;
}
int32_t
SnapshotStopWrite
(
struct
SSyncFSM
*
pFsm
,
void
*
pWriter
,
bool
isApply
)
{
return
0
;
}
int32_t
SnapshotDoWrite
(
struct
SSyncFSM
*
pFsm
,
void
*
pWriter
,
void
*
pBuf
,
int32_t
len
)
{
return
0
;
}
SSyncSnapshotReceiver
*
createReceiver
()
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosMemoryMalloc
(
sizeof
(
*
pSyncNode
));
pSyncNode
->
pRaftStore
=
(
SRaftStore
*
)
taosMemoryMalloc
(
sizeof
(
*
(
pSyncNode
->
pRaftStore
)));
pSyncNode
->
pFsm
=
(
SSyncFSM
*
)
taosMemoryMalloc
(
sizeof
(
*
(
pSyncNode
->
pFsm
)));
pSyncNode
->
pFsm
->
FpSnapshotStartWrite
=
SnapshotStartWrite
;
pSyncNode
->
pFsm
->
FpSnapshotStopWrite
=
SnapshotStopWrite
;
pSyncNode
->
pFsm
->
FpSnapshotDoWrite
=
SnapshotDoWrite
;
SSyncSnapshotReceiver
*
pReceiver
=
snapshotReceiverCreate
(
pSyncNode
,
2
);
pReceiver
->
start
=
true
;
pReceiver
->
ack
=
20
;
pReceiver
->
pWriter
=
(
void
*
)
0x11
;
pReceiver
->
blockLen
=
20
;
pReceiver
->
pCurrentBlock
=
taosMemoryMalloc
(
pReceiver
->
blockLen
);
snprintf
((
char
*
)(
pReceiver
->
pCurrentBlock
),
pReceiver
->
blockLen
,
"%s"
,
"hello"
);
pReceiver
->
term
=
66
;
return
pReceiver
;
}
int
main
()
{
tsAsyncLog
=
0
;
sDebugFlag
=
DEBUG_TRACE
+
DEBUG_SCREEN
+
DEBUG_FILE
;
logTest
();
SSyncSnapshotReceiver
*
pReceiver
=
createReceiver
();
sTrace
(
"%s"
,
snapshotReceiver2Str
(
pReceiver
));
return
0
;
}
source/libs/sync/test/syncSnapshotRspTest.cpp
浏览文件 @
9988e85f
...
...
@@ -35,8 +35,8 @@ void test1() {
void
test2
()
{
SyncSnapshotRsp
*
pMsg
=
createMsg
();
uint32_t
len
=
pMsg
->
bytes
;
char
*
serialized
=
(
char
*
)
taosMemoryMalloc
(
len
);
uint32_t
len
=
pMsg
->
bytes
;
char
*
serialized
=
(
char
*
)
taosMemoryMalloc
(
len
);
syncSnapshotRspSerialize
(
pMsg
,
serialized
,
len
);
SyncSnapshotRsp
*
pMsg2
=
syncSnapshotRspBuild
(
1000
);
syncSnapshotRspDeserialize
(
serialized
,
len
,
pMsg2
);
...
...
@@ -49,8 +49,8 @@ void test2() {
void
test3
()
{
SyncSnapshotRsp
*
pMsg
=
createMsg
();
uint32_t
len
;
char
*
serialized
=
syncSnapshotRspSerialize2
(
pMsg
,
&
len
);
uint32_t
len
;
char
*
serialized
=
syncSnapshotRspSerialize2
(
pMsg
,
&
len
);
SyncSnapshotRsp
*
pMsg2
=
syncSnapshotRspDeserialize2
(
serialized
,
len
);
syncSnapshotRspLog2
((
char
*
)
"test3: syncSnapshotRspSerialize2 -> syncSnapshotRspDeserialize2 "
,
pMsg2
);
...
...
@@ -61,7 +61,7 @@ void test3() {
void
test4
()
{
SyncSnapshotRsp
*
pMsg
=
createMsg
();
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
;
syncSnapshotRsp2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncSnapshotRsp
*
pMsg2
=
(
SyncSnapshotRsp
*
)
taosMemoryMalloc
(
rpcMsg
.
contLen
);
syncSnapshotRspFromRpcMsg
(
&
rpcMsg
,
pMsg2
);
...
...
@@ -74,7 +74,7 @@ void test4() {
void
test5
()
{
SyncSnapshotRsp
*
pMsg
=
createMsg
();
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
;
syncSnapshotRsp2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncSnapshotRsp
*
pMsg2
=
syncSnapshotRspFromRpcMsg2
(
&
rpcMsg
);
syncSnapshotRspLog2
((
char
*
)
"test5: syncSnapshotRsp2RpcMsg -> syncSnapshotRspFromRpcMsg2 "
,
pMsg2
);
...
...
source/libs/sync/test/syncSnapshotSendTest.cpp
浏览文件 @
9988e85f
...
...
@@ -36,8 +36,8 @@ void test1() {
void
test2
()
{
SyncSnapshotSend
*
pMsg
=
createMsg
();
uint32_t
len
=
pMsg
->
bytes
;
char
*
serialized
=
(
char
*
)
taosMemoryMalloc
(
len
);
uint32_t
len
=
pMsg
->
bytes
;
char
*
serialized
=
(
char
*
)
taosMemoryMalloc
(
len
);
syncSnapshotSendSerialize
(
pMsg
,
serialized
,
len
);
SyncSnapshotSend
*
pMsg2
=
syncSnapshotSendBuild
(
pMsg
->
dataLen
,
1000
);
syncSnapshotSendDeserialize
(
serialized
,
len
,
pMsg2
);
...
...
@@ -50,8 +50,8 @@ void test2() {
void
test3
()
{
SyncSnapshotSend
*
pMsg
=
createMsg
();
uint32_t
len
;
char
*
serialized
=
syncSnapshotSendSerialize2
(
pMsg
,
&
len
);
uint32_t
len
;
char
*
serialized
=
syncSnapshotSendSerialize2
(
pMsg
,
&
len
);
SyncSnapshotSend
*
pMsg2
=
syncSnapshotSendDeserialize2
(
serialized
,
len
);
syncSnapshotSendLog2
((
char
*
)
"test3: syncSnapshotSendSerialize2 -> syncSnapshotSendDeserialize2 "
,
pMsg2
);
...
...
@@ -62,7 +62,7 @@ void test3() {
void
test4
()
{
SyncSnapshotSend
*
pMsg
=
createMsg
();
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
;
syncSnapshotSend2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncSnapshotSend
*
pMsg2
=
(
SyncSnapshotSend
*
)
taosMemoryMalloc
(
rpcMsg
.
contLen
);
syncSnapshotSendFromRpcMsg
(
&
rpcMsg
,
pMsg2
);
...
...
@@ -75,7 +75,7 @@ void test4() {
void
test5
()
{
SyncSnapshotSend
*
pMsg
=
createMsg
();
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
;
syncSnapshotSend2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncSnapshotSend
*
pMsg2
=
syncSnapshotSendFromRpcMsg2
(
&
rpcMsg
);
syncSnapshotSendLog2
((
char
*
)
"test5: syncSnapshotSend2RpcMsg -> syncSnapshotSendFromRpcMsg2 "
,
pMsg2
);
...
...
source/libs/sync/test/syncSnapshotSenderTest.cpp
0 → 100644
浏览文件 @
9988e85f
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaftStore.h"
#include "syncSnapshot.h"
#include "syncUtil.h"
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
sDebug
(
"--- sync log test: debug"
);
sInfo
(
"--- sync log test: info"
);
sWarn
(
"--- sync log test: warn"
);
sError
(
"--- sync log test: error"
);
sFatal
(
"--- sync log test: fatal"
);
}
void
CommitCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{}
void
PreCommitCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{}
void
RollBackCb
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{}
void
RestoreFinishCb
(
struct
SSyncFSM
*
pFsm
)
{}
void
ReConfigCb
(
struct
SSyncFSM
*
pFsm
,
SSyncCfg
newCfg
,
SReConfigCbMeta
cbMeta
)
{}
int32_t
GetSnapshot
(
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
)
{
return
0
;
}
int32_t
SnapshotStartRead
(
struct
SSyncFSM
*
pFsm
,
void
**
ppReader
)
{
return
0
;
}
int32_t
SnapshotStopRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pReader
)
{
return
0
;
}
int32_t
SnapshotDoRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pReader
,
void
**
ppBuf
,
int32_t
*
len
)
{
return
0
;
}
int32_t
SnapshotStartWrite
(
struct
SSyncFSM
*
pFsm
,
void
**
ppWriter
)
{
return
0
;
}
int32_t
SnapshotStopWrite
(
struct
SSyncFSM
*
pFsm
,
void
*
pWriter
,
bool
isApply
)
{
return
0
;
}
int32_t
SnapshotDoWrite
(
struct
SSyncFSM
*
pFsm
,
void
*
pWriter
,
void
*
pBuf
,
int32_t
len
)
{
return
0
;
}
SSyncSnapshotSender
*
createSender
()
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosMemoryMalloc
(
sizeof
(
*
pSyncNode
));
pSyncNode
->
pRaftStore
=
(
SRaftStore
*
)
taosMemoryMalloc
(
sizeof
(
*
(
pSyncNode
->
pRaftStore
)));
pSyncNode
->
pFsm
=
(
SSyncFSM
*
)
taosMemoryMalloc
(
sizeof
(
*
(
pSyncNode
->
pFsm
)));
pSyncNode
->
pFsm
->
FpSnapshotStartRead
=
SnapshotStartRead
;
pSyncNode
->
pFsm
->
FpSnapshotStopRead
=
SnapshotStopRead
;
pSyncNode
->
pFsm
->
FpSnapshotDoRead
=
SnapshotDoRead
;
SSyncSnapshotSender
*
pSender
=
snapshotSenderCreate
(
pSyncNode
,
2
);
pSender
->
start
=
true
;
pSender
->
seq
=
10
;
pSender
->
ack
=
20
;
pSender
->
pReader
=
(
void
*
)
0x11
;
pSender
->
blockLen
=
20
;
pSender
->
pCurrentBlock
=
taosMemoryMalloc
(
pSender
->
blockLen
);
snprintf
((
char
*
)(
pSender
->
pCurrentBlock
),
pSender
->
blockLen
,
"%s"
,
"hello"
);
pSender
->
snapshot
.
lastApplyIndex
=
99
;
pSender
->
snapshot
.
lastApplyTerm
=
88
;
pSender
->
sendingMS
=
77
;
pSender
->
term
=
66
;
return
pSender
;
}
int
main
()
{
tsAsyncLog
=
0
;
sDebugFlag
=
DEBUG_TRACE
+
DEBUG_SCREEN
+
DEBUG_FILE
;
logTest
();
SSyncSnapshotSender
*
pSender
=
createSender
();
sTrace
(
"%s"
,
snapshotSender2Str
(
pSender
));
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录