Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
aeb94af5
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
aeb94af5
编写于
3月 03, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
sync encode/decode
上级
f263a623
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
439 addition
and
1 deletion
+439
-1
source/libs/sync/inc/syncMessage.h
source/libs/sync/inc/syncMessage.h
+20
-0
source/libs/sync/src/syncMessage.c
source/libs/sync/src/syncMessage.c
+176
-0
source/libs/sync/test/syncEncodeTest.cpp
source/libs/sync/test/syncEncodeTest.cpp
+243
-1
未找到文件。
source/libs/sync/inc/syncMessage.h
浏览文件 @
aeb94af5
...
...
@@ -158,6 +158,18 @@ typedef struct SyncAppendEntries {
char
data
[];
}
SyncAppendEntries
;
#define SYNC_APPEND_ENTRIES_FIX_LEN \
(sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(SyncIndex) + sizeof(SyncTerm) + \
sizeof(SyncIndex) + sizeof(uint32_t))
SyncAppendEntries
*
syncAppendEntriesBuild
(
uint32_t
dataLen
);
void
syncAppendEntriesDestroy
(
SyncAppendEntries
*
pMsg
);
void
syncAppendEntriesSerialize
(
const
SyncAppendEntries
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncAppendEntriesDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncAppendEntries
*
pMsg
);
void
syncAppendEntries2RpcMsg
(
const
SyncAppendEntries
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncAppendEntriesFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncAppendEntries
*
pMsg
);
cJSON
*
syncAppendEntries2Json
(
const
SyncAppendEntries
*
pMsg
);
// ---------------------------------------------
typedef
struct
SyncAppendEntriesReply
{
uint32_t
bytes
;
...
...
@@ -169,6 +181,14 @@ typedef struct SyncAppendEntriesReply {
SyncIndex
matchIndex
;
}
SyncAppendEntriesReply
;
SyncAppendEntriesReply
*
syncAppendEntriesReplyBuild
();
void
syncAppendEntriesReplyDestroy
(
SyncAppendEntriesReply
*
pMsg
);
void
syncAppendEntriesReplySerialize
(
const
SyncAppendEntriesReply
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncAppendEntriesReplyDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncAppendEntriesReply
*
pMsg
);
void
syncAppendEntriesReply2RpcMsg
(
const
SyncAppendEntriesReply
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncAppendEntriesReplyFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncAppendEntriesReply
*
pMsg
);
cJSON
*
syncAppendEntriesReply2Json
(
const
SyncAppendEntriesReply
*
pMsg
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/sync/src/syncMessage.c
浏览文件 @
aeb94af5
...
...
@@ -381,4 +381,180 @@ cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) {
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncRequestVoteReply"
,
pRoot
);
return
pJson
;
}
// ---- message process SyncAppendEntries----
SyncAppendEntries
*
syncAppendEntriesBuild
(
uint32_t
dataLen
)
{
uint32_t
bytes
=
SYNC_APPEND_ENTRIES_FIX_LEN
+
dataLen
;
SyncAppendEntries
*
pMsg
=
malloc
(
bytes
);
memset
(
pMsg
,
0
,
bytes
);
pMsg
->
bytes
=
bytes
;
pMsg
->
msgType
=
SYNC_APPEND_ENTRIES
;
pMsg
->
dataLen
=
dataLen
;
}
void
syncAppendEntriesDestroy
(
SyncAppendEntries
*
pMsg
)
{
if
(
pMsg
!=
NULL
)
{
free
(
pMsg
);
}
}
void
syncAppendEntriesSerialize
(
const
SyncAppendEntries
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
)
{
assert
(
pMsg
->
bytes
<=
bufLen
);
memcpy
(
buf
,
pMsg
,
pMsg
->
bytes
);
}
void
syncAppendEntriesDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncAppendEntries
*
pMsg
)
{
memcpy
(
pMsg
,
buf
,
len
);
assert
(
len
==
pMsg
->
bytes
);
assert
(
pMsg
->
bytes
==
SYNC_APPEND_ENTRIES_FIX_LEN
+
pMsg
->
dataLen
);
}
void
syncAppendEntries2RpcMsg
(
const
SyncAppendEntries
*
pMsg
,
SRpcMsg
*
pRpcMsg
)
{
memset
(
pRpcMsg
,
0
,
sizeof
(
*
pRpcMsg
));
pRpcMsg
->
msgType
=
pMsg
->
msgType
;
pRpcMsg
->
contLen
=
pMsg
->
bytes
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
syncAppendEntriesSerialize
(
pMsg
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
}
void
syncAppendEntriesFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncAppendEntries
*
pMsg
)
{
syncAppendEntriesDeserialize
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
pMsg
);
}
cJSON
*
syncAppendEntries2Json
(
const
SyncAppendEntries
*
pMsg
)
{
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON
*
pSrcId
=
cJSON_CreateObject
();
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
srcId
.
addr
);
cJSON_AddStringToObject
(
pSrcId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pMsg
->
srcId
.
addr
;
cJSON
*
pTmp
=
pSrcId
;
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pSrcId
,
"vgId"
,
pMsg
->
srcId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
cJSON
*
pDestId
=
cJSON_CreateObject
();
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
destId
.
addr
);
cJSON_AddStringToObject
(
pDestId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pMsg
->
destId
.
addr
;
cJSON
*
pTmp
=
pDestId
;
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pDestId
,
"vgId"
,
pMsg
->
destId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"destId"
,
pDestId
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
prevLogIndex
);
cJSON_AddStringToObject
(
pRoot
,
"pre_log_index"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
prevLogTerm
);
cJSON_AddStringToObject
(
pRoot
,
"pre_log_term"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
commitIndex
);
cJSON_AddStringToObject
(
pRoot
,
"commit_index"
,
u64buf
);
cJSON_AddNumberToObject
(
pRoot
,
"dataLen"
,
pMsg
->
dataLen
);
cJSON_AddStringToObject
(
pRoot
,
"data"
,
pMsg
->
data
);
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncAppendEntries"
,
pRoot
);
return
pJson
;
}
// ---- message process SyncAppendEntriesReply----
SyncAppendEntriesReply
*
syncAppendEntriesReplyBuild
()
{
uint32_t
bytes
=
sizeof
(
SyncAppendEntriesReply
);
SyncAppendEntriesReply
*
pMsg
=
malloc
(
bytes
);
memset
(
pMsg
,
0
,
bytes
);
pMsg
->
bytes
=
bytes
;
pMsg
->
msgType
=
SYNC_APPEND_ENTRIES_REPLY
;
}
void
syncAppendEntriesReplyDestroy
(
SyncAppendEntriesReply
*
pMsg
)
{
if
(
pMsg
!=
NULL
)
{
free
(
pMsg
);
}
}
void
syncAppendEntriesReplySerialize
(
const
SyncAppendEntriesReply
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
)
{
assert
(
pMsg
->
bytes
<=
bufLen
);
memcpy
(
buf
,
pMsg
,
pMsg
->
bytes
);
}
void
syncAppendEntriesReplyDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncAppendEntriesReply
*
pMsg
)
{
memcpy
(
pMsg
,
buf
,
len
);
assert
(
len
==
pMsg
->
bytes
);
}
void
syncAppendEntriesReply2RpcMsg
(
const
SyncAppendEntriesReply
*
pMsg
,
SRpcMsg
*
pRpcMsg
)
{
memset
(
pRpcMsg
,
0
,
sizeof
(
*
pRpcMsg
));
pRpcMsg
->
msgType
=
pMsg
->
msgType
;
pRpcMsg
->
contLen
=
pMsg
->
bytes
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
syncAppendEntriesReplySerialize
(
pMsg
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
}
void
syncAppendEntriesReplyFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncAppendEntriesReply
*
pMsg
)
{
syncAppendEntriesReplyDeserialize
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
pMsg
);
}
cJSON
*
syncAppendEntriesReply2Json
(
const
SyncAppendEntriesReply
*
pMsg
)
{
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON
*
pSrcId
=
cJSON_CreateObject
();
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
srcId
.
addr
);
cJSON_AddStringToObject
(
pSrcId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pMsg
->
srcId
.
addr
;
cJSON
*
pTmp
=
pSrcId
;
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pSrcId
,
"vgId"
,
pMsg
->
srcId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
cJSON
*
pDestId
=
cJSON_CreateObject
();
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
destId
.
addr
);
cJSON_AddStringToObject
(
pDestId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pMsg
->
destId
.
addr
;
cJSON
*
pTmp
=
pDestId
;
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pDestId
,
"vgId"
,
pMsg
->
destId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"destId"
,
pDestId
);
cJSON_AddNumberToObject
(
pRoot
,
"success"
,
pMsg
->
success
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
matchIndex
);
cJSON_AddStringToObject
(
pRoot
,
"match_index"
,
u64buf
);
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncAppendEntriesReply"
,
pRoot
);
return
pJson
;
}
\ No newline at end of file
source/libs/sync/test/syncEncodeTest.cpp
浏览文件 @
aeb94af5
...
...
@@ -15,6 +15,7 @@ void logTest() {
}
#define PING_MSG_LEN 20
#define APPEND_ENTRIES_VALUE_LEN 32
void
test1
()
{
sTrace
(
"test1: ---- syncPingSerialize, syncPingDeserialize"
);
...
...
@@ -213,7 +214,45 @@ void test5() {
}
void
test6
()
{
sTrace
(
"test6: ---- syncRequestVoteReplySerialize, syncRequestVoteReplyDeserialize"
);
sTrace
(
"test6: ---- syncRequestVote2RpcMsg, syncRequestVoteFromRpcMsg"
);
SyncRequestVote
*
pMsg
=
syncRequestVoteBuild
();
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1234
);
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"8.8.8.8"
,
5678
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
currentTerm
=
20
;
pMsg
->
lastLogIndex
=
21
;
pMsg
->
lastLogTerm
=
22
;
{
cJSON
*
pJson
=
syncRequestVote2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
SRpcMsg
rpcMsg
;
syncRequestVote2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncRequestVote
*
pMsg2
=
(
SyncRequestVote
*
)
malloc
(
pMsg
->
bytes
);
syncRequestVoteFromRpcMsg
(
&
rpcMsg
,
pMsg2
);
rpcFreeCont
(
rpcMsg
.
pCont
);
{
cJSON
*
pJson
=
syncRequestVote2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncRequestVoteDestroy
(
pMsg
);
syncRequestVoteDestroy
(
pMsg2
);
}
void
test7
()
{
sTrace
(
"test7: ---- syncRequestVoteReplySerialize, syncRequestVoteReplyDeserialize"
);
SyncRequestVoteReply
*
pMsg
=
SyncRequestVoteReplyBuild
();
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1234
);
...
...
@@ -251,6 +290,203 @@ void test6() {
free
(
buf
);
}
void
test8
()
{
sTrace
(
"test8: ---- syncRequestVoteReply2RpcMsg, syncRequestVoteReplyFromRpcMsg"
);
SyncRequestVoteReply
*
pMsg
=
SyncRequestVoteReplyBuild
();
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1234
);
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"8.8.8.8"
,
5678
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
term
=
20
;
pMsg
->
voteGranted
=
1
;
{
cJSON
*
pJson
=
syncRequestVoteReply2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
SRpcMsg
rpcMsg
;
syncRequestVoteReply2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncRequestVoteReply
*
pMsg2
=
(
SyncRequestVoteReply
*
)
malloc
(
pMsg
->
bytes
);
syncRequestVoteReplyFromRpcMsg
(
&
rpcMsg
,
pMsg2
);
rpcFreeCont
(
rpcMsg
.
pCont
);
{
cJSON
*
pJson
=
syncRequestVoteReply2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncRequestVoteReplyDestroy
(
pMsg
);
syncRequestVoteReplyDestroy
(
pMsg2
);
}
void
test9
()
{
sTrace
(
"test9: ---- syncAppendEntriesSerialize, syncAppendEntriesDeserialize"
);
char
msg
[
APPEND_ENTRIES_VALUE_LEN
];
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"test value"
);
SyncAppendEntries
*
pMsg
=
syncAppendEntriesBuild
(
APPEND_ENTRIES_VALUE_LEN
);
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1111
);
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
2222
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
prevLogIndex
=
55
;
pMsg
->
prevLogTerm
=
66
;
pMsg
->
commitIndex
=
77
;
memcpy
(
pMsg
->
data
,
msg
,
APPEND_ENTRIES_VALUE_LEN
);
{
cJSON
*
pJson
=
syncAppendEntries2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
uint32_t
bufLen
=
pMsg
->
bytes
;
char
*
buf
=
(
char
*
)
malloc
(
bufLen
);
syncAppendEntriesSerialize
(
pMsg
,
buf
,
bufLen
);
SyncAppendEntries
*
pMsg2
=
(
SyncAppendEntries
*
)
malloc
(
pMsg
->
bytes
);
syncAppendEntriesDeserialize
(
buf
,
bufLen
,
pMsg2
);
{
cJSON
*
pJson
=
syncAppendEntries2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncAppendEntriesDestroy
(
pMsg
);
syncAppendEntriesDestroy
(
pMsg2
);
free
(
buf
);
}
void
test10
()
{
sTrace
(
"test10: ---- syncAppendEntries2RpcMsg, syncAppendEntriesFromRpcMsg"
);
char
msg
[
APPEND_ENTRIES_VALUE_LEN
];
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"test value"
);
SyncAppendEntries
*
pMsg
=
syncAppendEntriesBuild
(
APPEND_ENTRIES_VALUE_LEN
);
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1111
);
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
2222
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
prevLogIndex
=
55
;
pMsg
->
prevLogTerm
=
66
;
pMsg
->
commitIndex
=
77
;
memcpy
(
pMsg
->
data
,
msg
,
APPEND_ENTRIES_VALUE_LEN
);
{
cJSON
*
pJson
=
syncAppendEntries2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
SRpcMsg
rpcMsg
;
syncAppendEntries2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncAppendEntries
*
pMsg2
=
(
SyncAppendEntries
*
)
malloc
(
pMsg
->
bytes
);
syncAppendEntriesFromRpcMsg
(
&
rpcMsg
,
pMsg2
);
rpcFreeCont
(
rpcMsg
.
pCont
);
{
cJSON
*
pJson
=
syncAppendEntries2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncAppendEntriesDestroy
(
pMsg
);
syncAppendEntriesDestroy
(
pMsg2
);
}
void
test11
()
{
sTrace
(
"test11: ---- syncAppendEntriesReplySerialize, syncAppendEntriesReplyDeserialize"
);
SyncAppendEntriesReply
*
pMsg
=
syncAppendEntriesReplyBuild
();
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1111
);
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
2222
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
success
=
1
;
pMsg
->
matchIndex
=
23
;
{
cJSON
*
pJson
=
syncAppendEntriesReply2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
uint32_t
bufLen
=
pMsg
->
bytes
;
char
*
buf
=
(
char
*
)
malloc
(
bufLen
);
syncAppendEntriesReplySerialize
(
pMsg
,
buf
,
bufLen
);
SyncAppendEntriesReply
*
pMsg2
=
(
SyncAppendEntriesReply
*
)
malloc
(
pMsg
->
bytes
);
syncAppendEntriesReplyDeserialize
(
buf
,
bufLen
,
pMsg2
);
{
cJSON
*
pJson
=
syncAppendEntriesReply2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncAppendEntriesReplyDestroy
(
pMsg
);
syncAppendEntriesReplyDestroy
(
pMsg2
);
free
(
buf
);
}
void
test12
()
{
sTrace
(
"test12: ---- syncAppendEntriesReply2RpcMsg, syncAppendEntriesReplyFromRpcMsg"
);
SyncAppendEntriesReply
*
pMsg
=
syncAppendEntriesReplyBuild
();
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1111
);
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
2222
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
success
=
1
;
pMsg
->
matchIndex
=
23
;
{
cJSON
*
pJson
=
syncAppendEntriesReply2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
SRpcMsg
rpcMsg
;
syncAppendEntriesReply2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncAppendEntriesReply
*
pMsg2
=
(
SyncAppendEntriesReply
*
)
malloc
(
pMsg
->
bytes
);
syncAppendEntriesReplyFromRpcMsg
(
&
rpcMsg
,
pMsg2
);
rpcFreeCont
(
rpcMsg
.
pCont
);
{
cJSON
*
pJson
=
syncAppendEntriesReply2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncAppendEntriesReplyDestroy
(
pMsg
);
syncAppendEntriesReplyDestroy
(
pMsg2
);
}
int
main
()
{
// taosInitLog((char*)"syncPingTest.log", 100000, 10);
tsAsyncLog
=
0
;
...
...
@@ -262,6 +498,12 @@ int main() {
test4
();
test5
();
test6
();
test7
();
test8
();
test9
();
test10
();
test11
();
test12
();
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录