Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
12c202aa
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
12c202aa
编写于
3月 01, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
sync encode test
上级
12050c9a
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
251 addition
and
98 deletion
+251
-98
source/libs/sync/inc/syncMessage.h
source/libs/sync/inc/syncMessage.h
+31
-15
source/libs/sync/src/syncMessage.c
source/libs/sync/src/syncMessage.c
+112
-54
source/libs/sync/test/syncEncodeTest.cpp
source/libs/sync/test/syncEncodeTest.cpp
+108
-29
未找到文件。
source/libs/sync/inc/syncMessage.h
浏览文件 @
12c202aa
...
@@ -58,6 +58,20 @@ typedef struct SyncPing {
...
@@ -58,6 +58,20 @@ typedef struct SyncPing {
#define SYNC_PING_FIX_LEN (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t))
#define SYNC_PING_FIX_LEN (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t))
SyncPing
*
syncPingBuild
(
uint32_t
dataLen
);
void
syncPingDestroy
(
SyncPing
*
pMsg
);
void
syncPingSerialize
(
const
SyncPing
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncPingDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPing
*
pMsg
);
void
syncPing2RpcMsg
(
const
SyncPing
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncPingFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPing
*
pMsg
);
cJSON
*
syncPing2Json
(
const
SyncPing
*
pMsg
);
typedef
struct
SyncPingReply
{
typedef
struct
SyncPingReply
{
uint32_t
bytes
;
uint32_t
bytes
;
uint32_t
msgType
;
uint32_t
msgType
;
...
@@ -67,6 +81,23 @@ typedef struct SyncPingReply {
...
@@ -67,6 +81,23 @@ typedef struct SyncPingReply {
char
data
[];
char
data
[];
}
SyncPingReply
;
}
SyncPingReply
;
#define SYNC_PING_REPLY_FIX_LEN \
(sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t))
SyncPingReply
*
syncPingReplyBuild
(
uint32_t
dataLen
);
void
syncPingReplyDestroy
(
SyncPingReply
*
pMsg
);
void
syncPingReplySerialize
(
const
SyncPingReply
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncPingReplyDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPingReply
*
pMsg
);
void
syncPingReply2RpcMsg
(
const
SyncPingReply
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncPingReplyFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPingReply
*
pMsg
);
cJSON
*
syncPingReply2Json
(
const
SyncPingReply
*
pMsg
);
typedef
struct
SyncClientRequest
{
typedef
struct
SyncClientRequest
{
ESyncMessageType
msgType
;
ESyncMessageType
msgType
;
char
*
data
;
char
*
data
;
...
@@ -118,21 +149,6 @@ typedef struct SyncAppendEntriesReply {
...
@@ -118,21 +149,6 @@ typedef struct SyncAppendEntriesReply {
SyncIndex
matchIndex
;
SyncIndex
matchIndex
;
}
SyncAppendEntriesReply
;
}
SyncAppendEntriesReply
;
// ---- message build ----
SyncPing
*
syncPingBuild
(
uint32_t
dataLen
);
void
syncPingDestroy
(
SyncPing
*
pSyncPing
);
void
syncPingSerialize
(
const
SyncPing
*
pSyncPing
,
char
*
buf
,
uint32_t
bufLen
);
void
syncPingDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPing
*
pSyncPing
);
void
syncPing2RpcMsg
(
const
SyncPing
*
pSyncPing
,
SRpcMsg
*
pRpcMsg
);
void
syncPingFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPing
*
pSyncPing
);
cJSON
*
syncPing2Json
(
const
SyncPing
*
pSyncPing
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
...
...
source/libs/sync/src/syncMessage.c
浏览文件 @
12c202aa
...
@@ -19,136 +19,194 @@
...
@@ -19,136 +19,194 @@
void
onMessage
(
SRaft
*
pRaft
,
void
*
pMsg
)
{}
void
onMessage
(
SRaft
*
pRaft
,
void
*
pMsg
)
{}
// ---- message
build
----
// ---- message
process SyncPing
----
SyncPing
*
syncPingBuild
(
uint32_t
dataLen
)
{
SyncPing
*
syncPingBuild
(
uint32_t
dataLen
)
{
uint32_t
bytes
=
SYNC_PING_FIX_LEN
+
dataLen
;
uint32_t
bytes
=
SYNC_PING_FIX_LEN
+
dataLen
;
SyncPing
*
p
SyncPin
g
=
malloc
(
bytes
);
SyncPing
*
p
Ms
g
=
malloc
(
bytes
);
memset
(
p
SyncPin
g
,
0
,
bytes
);
memset
(
p
Ms
g
,
0
,
bytes
);
p
SyncPin
g
->
bytes
=
bytes
;
p
Ms
g
->
bytes
=
bytes
;
p
SyncPin
g
->
msgType
=
SYNC_PING
;
p
Ms
g
->
msgType
=
SYNC_PING
;
p
SyncPin
g
->
dataLen
=
dataLen
;
p
Ms
g
->
dataLen
=
dataLen
;
}
}
void
syncPingDestroy
(
SyncPing
*
p
SyncPin
g
)
{
void
syncPingDestroy
(
SyncPing
*
p
Ms
g
)
{
if
(
p
SyncPin
g
!=
NULL
)
{
if
(
p
Ms
g
!=
NULL
)
{
free
(
p
SyncPin
g
);
free
(
p
Ms
g
);
}
}
}
}
void
syncPingSerialize
(
const
SyncPing
*
p
SyncPin
g
,
char
*
buf
,
uint32_t
bufLen
)
{
void
syncPingSerialize
(
const
SyncPing
*
p
Ms
g
,
char
*
buf
,
uint32_t
bufLen
)
{
assert
(
p
SyncPin
g
->
bytes
<=
bufLen
);
assert
(
p
Ms
g
->
bytes
<=
bufLen
);
memcpy
(
buf
,
p
SyncPing
,
pSyncPin
g
->
bytes
);
memcpy
(
buf
,
p
Msg
,
pMs
g
->
bytes
);
}
}
void
syncPingDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPing
*
pSyncPing
)
{
void
syncPingDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPing
*
pMsg
)
{
/*
memcpy
(
pMsg
,
buf
,
len
);
uint32_t* pU32 = (uint32_t*)buf;
assert
(
len
==
pMsg
->
bytes
);
uint32_t bytes = *pU32;
assert
(
pMsg
->
bytes
==
SYNC_PING_FIX_LEN
+
pMsg
->
dataLen
);
pSyncPing = (SyncPing*)malloc(bytes);
*/
memcpy
(
pSyncPing
,
buf
,
len
);
assert
(
len
==
pSyncPing
->
bytes
);
assert
(
pSyncPing
->
bytes
==
SYNC_PING_FIX_LEN
+
pSyncPing
->
dataLen
);
}
}
void
syncPing2RpcMsg
(
const
SyncPing
*
p
SyncPin
g
,
SRpcMsg
*
pRpcMsg
)
{
void
syncPing2RpcMsg
(
const
SyncPing
*
p
Ms
g
,
SRpcMsg
*
pRpcMsg
)
{
pRpcMsg
->
msgType
=
p
SyncPin
g
->
msgType
;
pRpcMsg
->
msgType
=
p
Ms
g
->
msgType
;
pRpcMsg
->
contLen
=
p
SyncPin
g
->
bytes
;
pRpcMsg
->
contLen
=
p
Ms
g
->
bytes
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
syncPingSerialize
(
p
SyncPin
g
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
syncPingSerialize
(
p
Ms
g
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
}
}
void
syncPingFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPing
*
p
SyncPin
g
)
{
void
syncPingFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPing
*
p
Ms
g
)
{
syncPingDeserialize
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
p
SyncPin
g
);
syncPingDeserialize
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
p
Ms
g
);
}
}
cJSON
*
syncPing2Json
(
const
SyncPing
*
p
SyncPin
g
)
{
cJSON
*
syncPing2Json
(
const
SyncPing
*
p
Ms
g
)
{
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
p
SyncPin
g
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
p
Ms
g
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
p
SyncPin
g
->
msgType
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
p
Ms
g
->
msgType
);
cJSON
*
pSrcId
=
cJSON_CreateObject
();
cJSON
*
pSrcId
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pSrcId
,
"addr"
,
p
SyncPin
g
->
srcId
.
addr
);
cJSON_AddNumberToObject
(
pSrcId
,
"addr"
,
p
Ms
g
->
srcId
.
addr
);
cJSON_AddNumberToObject
(
pSrcId
,
"vgId"
,
p
SyncPin
g
->
srcId
.
vgId
);
cJSON_AddNumberToObject
(
pSrcId
,
"vgId"
,
p
Ms
g
->
srcId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
cJSON
*
pDestId
=
cJSON_CreateObject
();
cJSON
*
pDestId
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pDestId
,
"addr"
,
p
SyncPin
g
->
destId
.
addr
);
cJSON_AddNumberToObject
(
pDestId
,
"addr"
,
p
Ms
g
->
destId
.
addr
);
cJSON_AddNumberToObject
(
pDestId
,
"vgId"
,
p
SyncPin
g
->
destId
.
vgId
);
cJSON_AddNumberToObject
(
pDestId
,
"vgId"
,
p
Ms
g
->
destId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pDestId
);
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pDestId
);
cJSON_AddNumberToObject
(
pRoot
,
"dataLen"
,
p
SyncPin
g
->
dataLen
);
cJSON_AddNumberToObject
(
pRoot
,
"dataLen"
,
p
Ms
g
->
dataLen
);
cJSON_AddStringToObject
(
pRoot
,
"data"
,
p
SyncPin
g
->
data
);
cJSON_AddStringToObject
(
pRoot
,
"data"
,
p
Ms
g
->
data
);
return
pRoot
;
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncPing"
,
pRoot
);
return
pJson
;
}
// ---- message process SyncPingReply----
SyncPingReply
*
syncPingReplyBuild
(
uint32_t
dataLen
)
{
uint32_t
bytes
=
SYNC_PING_REPLY_FIX_LEN
+
dataLen
;
SyncPingReply
*
pMsg
=
malloc
(
bytes
);
memset
(
pMsg
,
0
,
bytes
);
pMsg
->
bytes
=
bytes
;
pMsg
->
msgType
=
SYNC_PING
;
pMsg
->
dataLen
=
dataLen
;
}
void
syncPingReplyDestroy
(
SyncPingReply
*
pMsg
)
{
if
(
pMsg
!=
NULL
)
{
free
(
pMsg
);
}
}
void
syncPingReplySerialize
(
const
SyncPingReply
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
)
{
assert
(
pMsg
->
bytes
<=
bufLen
);
memcpy
(
buf
,
pMsg
,
pMsg
->
bytes
);
}
void
syncPingReplyDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPingReply
*
pMsg
)
{
memcpy
(
pMsg
,
buf
,
len
);
assert
(
len
==
pMsg
->
bytes
);
assert
(
pMsg
->
bytes
==
SYNC_PING_FIX_LEN
+
pMsg
->
dataLen
);
}
void
syncPingReply2RpcMsg
(
const
SyncPingReply
*
pMsg
,
SRpcMsg
*
pRpcMsg
)
{
pRpcMsg
->
msgType
=
pMsg
->
msgType
;
pRpcMsg
->
contLen
=
pMsg
->
bytes
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
syncPingReplySerialize
(
pMsg
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
}
void
syncPingReplyFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPingReply
*
pMsg
)
{
syncPingReplyDeserialize
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
pMsg
);
}
cJSON
*
syncPingReply2Json
(
const
SyncPingReply
*
pMsg
)
{
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON
*
pSrcId
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pSrcId
,
"addr"
,
pMsg
->
srcId
.
addr
);
cJSON_AddNumberToObject
(
pSrcId
,
"vgId"
,
pMsg
->
srcId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
cJSON
*
pDestId
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pDestId
,
"addr"
,
pMsg
->
destId
.
addr
);
cJSON_AddNumberToObject
(
pDestId
,
"vgId"
,
pMsg
->
destId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pDestId
);
cJSON_AddNumberToObject
(
pRoot
,
"dataLen"
,
pMsg
->
dataLen
);
cJSON_AddStringToObject
(
pRoot
,
"data"
,
pMsg
->
data
);
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncPingReply"
,
pRoot
);
return
pJson
;
}
}
#if 0
#if 0
void syncPingSerialize(const SyncPing* p
SyncPin
g, char** ppBuf, uint32_t* bufLen) {
void syncPingSerialize(const SyncPing* p
Ms
g, char** ppBuf, uint32_t* bufLen) {
*bufLen = sizeof(SyncPing) + p
SyncPin
g->dataLen;
*bufLen = sizeof(SyncPing) + p
Ms
g->dataLen;
*ppBuf = (char*)malloc(*bufLen);
*ppBuf = (char*)malloc(*bufLen);
void* pStart = *ppBuf;
void* pStart = *ppBuf;
uint32_t allBytes = *bufLen;
uint32_t allBytes = *bufLen;
int len = 0;
int len = 0;
len = taosEncodeFixedU32(&pStart, p
SyncPin
g->msgType);
len = taosEncodeFixedU32(&pStart, p
Ms
g->msgType);
allBytes -= len;
allBytes -= len;
assert(len > 0);
assert(len > 0);
pStart += len;
pStart += len;
len = taosEncodeFixedU64(&pStart, p
SyncPin
g->srcId.addr);
len = taosEncodeFixedU64(&pStart, p
Ms
g->srcId.addr);
allBytes -= len;
allBytes -= len;
assert(len > 0);
assert(len > 0);
pStart += len;
pStart += len;
len = taosEncodeFixedI32(&pStart, p
SyncPin
g->srcId.vgId);
len = taosEncodeFixedI32(&pStart, p
Ms
g->srcId.vgId);
allBytes -= len;
allBytes -= len;
assert(len > 0);
assert(len > 0);
pStart += len;
pStart += len;
len = taosEncodeFixedU64(&pStart, p
SyncPin
g->destId.addr);
len = taosEncodeFixedU64(&pStart, p
Ms
g->destId.addr);
allBytes -= len;
allBytes -= len;
assert(len > 0);
assert(len > 0);
pStart += len;
pStart += len;
len = taosEncodeFixedI32(&pStart, p
SyncPin
g->destId.vgId);
len = taosEncodeFixedI32(&pStart, p
Ms
g->destId.vgId);
allBytes -= len;
allBytes -= len;
assert(len > 0);
assert(len > 0);
pStart += len;
pStart += len;
len = taosEncodeFixedU32(&pStart, p
SyncPin
g->dataLen);
len = taosEncodeFixedU32(&pStart, p
Ms
g->dataLen);
allBytes -= len;
allBytes -= len;
assert(len > 0);
assert(len > 0);
pStart += len;
pStart += len;
memcpy(pStart, p
SyncPing->data, pSyncPin
g->dataLen);
memcpy(pStart, p
Msg->data, pMs
g->dataLen);
allBytes -= p
SyncPin
g->dataLen;
allBytes -= p
Ms
g->dataLen;
assert(allBytes == 0);
assert(allBytes == 0);
}
}
void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* p
SyncPin
g) {
void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* p
Ms
g) {
void* pStart = (void*)buf;
void* pStart = (void*)buf;
uint64_t u64;
uint64_t u64;
int32_t i32;
int32_t i32;
uint32_t u32;
uint32_t u32;
pStart = taosDecodeFixedU64(pStart, &u64);
pStart = taosDecodeFixedU64(pStart, &u64);
p
SyncPin
g->msgType = u64;
p
Ms
g->msgType = u64;
pStart = taosDecodeFixedU64(pStart, &u64);
pStart = taosDecodeFixedU64(pStart, &u64);
p
SyncPin
g->srcId.addr = u64;
p
Ms
g->srcId.addr = u64;
pStart = taosDecodeFixedI32(pStart, &i32);
pStart = taosDecodeFixedI32(pStart, &i32);
p
SyncPin
g->srcId.vgId = i32;
p
Ms
g->srcId.vgId = i32;
pStart = taosDecodeFixedU64(pStart, &u64);
pStart = taosDecodeFixedU64(pStart, &u64);
p
SyncPin
g->destId.addr = u64;
p
Ms
g->destId.addr = u64;
pStart = taosDecodeFixedI32(pStart, &i32);
pStart = taosDecodeFixedI32(pStart, &i32);
p
SyncPin
g->destId.vgId = i32;
p
Ms
g->destId.vgId = i32;
pStart = taosDecodeFixedU32(pStart, &u32);
pStart = taosDecodeFixedU32(pStart, &u32);
p
SyncPin
g->dataLen = u32;
p
Ms
g->dataLen = u32;
}
}
#endif
#endif
\ No newline at end of file
source/libs/sync/test/syncEncodeTest.cpp
浏览文件 @
12c202aa
...
@@ -16,59 +16,59 @@ void logTest() {
...
@@ -16,59 +16,59 @@ void logTest() {
#define PING_MSG_LEN 20
#define PING_MSG_LEN 20
void
test1
()
{
void
test1
()
{
sTrace
(
"test1: ----"
);
sTrace
(
"test1: ----
syncPingSerialize, syncPingDeserialize
"
);
char
msg
[
PING_MSG_LEN
];
char
msg
[
PING_MSG_LEN
];
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"test ping"
);
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"test ping"
);
SyncPing
*
p
SyncPin
g
=
syncPingBuild
(
PING_MSG_LEN
);
SyncPing
*
p
Ms
g
=
syncPingBuild
(
PING_MSG_LEN
);
p
SyncPin
g
->
srcId
.
addr
=
1
;
p
Ms
g
->
srcId
.
addr
=
1
;
p
SyncPin
g
->
srcId
.
vgId
=
2
;
p
Ms
g
->
srcId
.
vgId
=
2
;
p
SyncPin
g
->
destId
.
addr
=
3
;
p
Ms
g
->
destId
.
addr
=
3
;
p
SyncPin
g
->
destId
.
vgId
=
4
;
p
Ms
g
->
destId
.
vgId
=
4
;
memcpy
(
p
SyncPin
g
->
data
,
msg
,
PING_MSG_LEN
);
memcpy
(
p
Ms
g
->
data
,
msg
,
PING_MSG_LEN
);
{
{
cJSON
*
pJson
=
syncPing2Json
(
p
SyncPin
g
);
cJSON
*
pJson
=
syncPing2Json
(
p
Ms
g
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"SyncPing:
\n
%s
\n\n
"
,
serialized
);
printf
(
"SyncPing:
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
cJSON_Delete
(
pJson
);
}
}
uint32_t
bufLen
=
p
SyncPin
g
->
bytes
;
uint32_t
bufLen
=
p
Ms
g
->
bytes
;
char
*
buf
=
(
char
*
)
malloc
(
bufLen
);
char
*
buf
=
(
char
*
)
malloc
(
bufLen
);
syncPingSerialize
(
p
SyncPin
g
,
buf
,
bufLen
);
syncPingSerialize
(
p
Ms
g
,
buf
,
bufLen
);
SyncPing
*
p
SyncPing2
=
(
SyncPing
*
)
malloc
(
pSyncPin
g
->
bytes
);
SyncPing
*
p
Msg2
=
(
SyncPing
*
)
malloc
(
pMs
g
->
bytes
);
syncPingDeserialize
(
buf
,
bufLen
,
p
SyncPin
g2
);
syncPingDeserialize
(
buf
,
bufLen
,
p
Ms
g2
);
{
{
cJSON
*
pJson
=
syncPing2Json
(
p
SyncPin
g2
);
cJSON
*
pJson
=
syncPing2Json
(
p
Ms
g2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"SyncPing2:
\n
%s
\n\n
"
,
serialized
);
printf
(
"SyncPing2:
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
cJSON_Delete
(
pJson
);
}
}
syncPingDestroy
(
p
SyncPin
g
);
syncPingDestroy
(
p
Ms
g
);
syncPingDestroy
(
p
SyncPin
g2
);
syncPingDestroy
(
p
Ms
g2
);
free
(
buf
);
free
(
buf
);
}
}
void
test2
()
{
void
test2
()
{
sTrace
(
"test2: ----"
);
sTrace
(
"test2: ----
syncPing2RpcMsg, syncPingFromRpcMsg
"
);
char
msg
[
PING_MSG_LEN
];
char
msg
[
PING_MSG_LEN
];
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"hello raft"
);
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"hello raft"
);
SyncPing
*
p
SyncPin
g
=
syncPingBuild
(
PING_MSG_LEN
);
SyncPing
*
p
Ms
g
=
syncPingBuild
(
PING_MSG_LEN
);
p
SyncPin
g
->
srcId
.
addr
=
100
;
p
Ms
g
->
srcId
.
addr
=
100
;
p
SyncPin
g
->
srcId
.
vgId
=
200
;
p
Ms
g
->
srcId
.
vgId
=
200
;
p
SyncPin
g
->
destId
.
addr
=
300
;
p
Ms
g
->
destId
.
addr
=
300
;
p
SyncPin
g
->
destId
.
vgId
=
400
;
p
Ms
g
->
destId
.
vgId
=
400
;
memcpy
(
p
SyncPin
g
->
data
,
msg
,
PING_MSG_LEN
);
memcpy
(
p
Ms
g
->
data
,
msg
,
PING_MSG_LEN
);
{
{
cJSON
*
pJson
=
syncPing2Json
(
p
SyncPin
g
);
cJSON
*
pJson
=
syncPing2Json
(
p
Ms
g
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"SyncPing:
\n
%s
\n\n
"
,
serialized
);
printf
(
"SyncPing:
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
free
(
serialized
);
...
@@ -76,23 +76,100 @@ void test2() {
...
@@ -76,23 +76,100 @@ void test2() {
}
}
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
;
syncPing2RpcMsg
(
p
SyncPin
g
,
&
rpcMsg
);
syncPing2RpcMsg
(
p
Ms
g
,
&
rpcMsg
);
SyncPing
*
p
SyncPing2
=
(
SyncPing
*
)
malloc
(
pSyncPin
g
->
bytes
);
SyncPing
*
p
Msg2
=
(
SyncPing
*
)
malloc
(
pMs
g
->
bytes
);
syncPingFromRpcMsg
(
&
rpcMsg
,
p
SyncPin
g2
);
syncPingFromRpcMsg
(
&
rpcMsg
,
p
Ms
g2
);
rpcFreeCont
(
rpcMsg
.
pCont
);
rpcFreeCont
(
rpcMsg
.
pCont
);
{
{
cJSON
*
pJson
=
syncPing2Json
(
p
SyncPin
g2
);
cJSON
*
pJson
=
syncPing2Json
(
p
Ms
g2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"SyncPing2:
\n
%s
\n\n
"
,
serialized
);
printf
(
"SyncPing2:
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
cJSON_Delete
(
pJson
);
}
}
syncPingDestroy
(
p
SyncPin
g
);
syncPingDestroy
(
p
Ms
g
);
syncPingDestroy
(
p
SyncPin
g2
);
syncPingDestroy
(
p
Ms
g2
);
}
}
void
test3
()
{
sTrace
(
"test3: ---- syncPingReplySerialize, syncPingReplyDeserialize"
);
char
msg
[
PING_MSG_LEN
];
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"test ping"
);
SyncPingReply
*
pMsg
=
syncPingReplyBuild
(
PING_MSG_LEN
);
pMsg
->
srcId
.
addr
=
19
;
pMsg
->
srcId
.
vgId
=
29
;
pMsg
->
destId
.
addr
=
39
;
pMsg
->
destId
.
vgId
=
49
;
memcpy
(
pMsg
->
data
,
msg
,
PING_MSG_LEN
);
{
cJSON
*
pJson
=
syncPingReply2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"SyncPingReply:
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
uint32_t
bufLen
=
pMsg
->
bytes
;
char
*
buf
=
(
char
*
)
malloc
(
bufLen
);
syncPingReplySerialize
(
pMsg
,
buf
,
bufLen
);
SyncPingReply
*
pMsg2
=
(
SyncPingReply
*
)
malloc
(
pMsg
->
bytes
);
syncPingReplyDeserialize
(
buf
,
bufLen
,
pMsg2
);
{
cJSON
*
pJson
=
syncPingReply2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"SyncPingReply2:
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncPingReplyDestroy
(
pMsg
);
syncPingReplyDestroy
(
pMsg2
);
free
(
buf
);
}
void
test4
()
{
sTrace
(
"test4: ---- syncPingReply2RpcMsg, syncPingReplyFromRpcMsg"
);
char
msg
[
PING_MSG_LEN
];
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"hello raft"
);
SyncPingReply
*
pMsg
=
syncPingReplyBuild
(
PING_MSG_LEN
);
pMsg
->
srcId
.
addr
=
66
;
pMsg
->
srcId
.
vgId
=
77
;
pMsg
->
destId
.
addr
=
88
;
pMsg
->
destId
.
vgId
=
99
;
memcpy
(
pMsg
->
data
,
msg
,
PING_MSG_LEN
);
{
cJSON
*
pJson
=
syncPingReply2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"SyncPingReply:
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
SRpcMsg
rpcMsg
;
syncPingReply2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncPingReply
*
pMsg2
=
(
SyncPingReply
*
)
malloc
(
pMsg
->
bytes
);
syncPingReplyFromRpcMsg
(
&
rpcMsg
,
pMsg2
);
rpcFreeCont
(
rpcMsg
.
pCont
);
{
cJSON
*
pJson
=
syncPingReply2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"SyncPingReply2:
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncPingReplyDestroy
(
pMsg
);
syncPingReplyDestroy
(
pMsg2
);
}
int
main
()
{
int
main
()
{
// taosInitLog((char*)"syncPingTest.log", 100000, 10);
// taosInitLog((char*)"syncPingTest.log", 100000, 10);
tsAsyncLog
=
0
;
tsAsyncLog
=
0
;
...
@@ -100,6 +177,8 @@ int main() {
...
@@ -100,6 +177,8 @@ int main() {
test1
();
test1
();
test2
();
test2
();
test3
();
test4
();
return
0
;
return
0
;
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录