Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f263a623
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
f263a623
编写于
3月 03, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
sync encode/decode
上级
76c4fce8
变更
3
显示空白变更内容
内联
并排
Showing
3 changed file
with
361 addition
and
161 deletion
+361
-161
source/libs/sync/inc/syncMessage.h
source/libs/sync/inc/syncMessage.h
+87
-75
source/libs/sync/src/syncMessage.c
source/libs/sync/src/syncMessage.c
+168
-63
source/libs/sync/test/syncEncodeTest.cpp
source/libs/sync/test/syncEncodeTest.cpp
+106
-23
未找到文件。
source/libs/sync/inc/syncMessage.h
浏览文件 @
f263a623
...
@@ -28,30 +28,25 @@ extern "C" {
...
@@ -28,30 +28,25 @@ extern "C" {
#include "syncRaftEntry.h"
#include "syncRaftEntry.h"
#include "taosdef.h"
#include "taosdef.h"
// encode as uint
64
// encode as uint
32
typedef
enum
ESyncMessageType
{
typedef
enum
ESyncMessageType
{
SYNC_PING
=
101
,
SYNC_PING
=
101
,
SYNC_PING_REPLY
=
103
,
SYNC_PING_REPLY
=
103
,
SYNC_CLIENT_REQUEST
,
SYNC_CLIENT_REQUEST
=
105
,
SYNC_CLIENT_REQUEST_REPLY
,
SYNC_CLIENT_REQUEST_REPLY
=
107
,
SYNC_REQUEST_VOTE
,
SYNC_REQUEST_VOTE
=
109
,
SYNC_REQUEST_VOTE_REPLY
,
SYNC_REQUEST_VOTE_REPLY
=
111
,
SYNC_APPEND_ENTRIES
,
SYNC_APPEND_ENTRIES
=
113
,
SYNC_APPEND_ENTRIES_REPLY
,
SYNC_APPEND_ENTRIES_REPLY
=
115
,
}
ESyncMessageType
;
}
ESyncMessageType
;
/*
// ---------------------------------------------
typedef struct SRaftId {
SyncNodeId addr; // typedef uint64_t SyncNodeId;
SyncGroupId vgId; // typedef int32_t SyncGroupId;
} SRaftId;
*/
typedef
struct
SyncPing
{
typedef
struct
SyncPing
{
uint32_t
bytes
;
uint32_t
bytes
;
uint32_t
msgType
;
uint32_t
msgType
;
SRaftId
srcId
;
SRaftId
srcId
;
SRaftId
destId
;
SRaftId
destId
;
// private data
uint32_t
dataLen
;
uint32_t
dataLen
;
char
data
[];
char
data
[];
}
SyncPing
;
}
SyncPing
;
...
@@ -59,28 +54,22 @@ typedef struct SyncPing {
...
@@ -59,28 +54,22 @@ typedef struct SyncPing {
#define SYNC_PING_FIX_LEN (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t))
#define SYNC_PING_FIX_LEN (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t))
SyncPing
*
syncPingBuild
(
uint32_t
dataLen
);
SyncPing
*
syncPingBuild
(
uint32_t
dataLen
);
void
syncPingDestroy
(
SyncPing
*
pMsg
);
void
syncPingDestroy
(
SyncPing
*
pMsg
);
void
syncPingSerialize
(
const
SyncPing
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncPingSerialize
(
const
SyncPing
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncPingDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPing
*
pMsg
);
void
syncPingDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPing
*
pMsg
);
void
syncPing2RpcMsg
(
const
SyncPing
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncPing2RpcMsg
(
const
SyncPing
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncPingFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPing
*
pMsg
);
void
syncPingFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPing
*
pMsg
);
cJSON
*
syncPing2Json
(
const
SyncPing
*
pMsg
);
cJSON
*
syncPing2Json
(
const
SyncPing
*
pMsg
);
SyncPing
*
syncPingBuild2
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
,
const
char
*
str
);
SyncPing
*
syncPingBuild2
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
,
const
char
*
str
);
SyncPing
*
syncPingBuild3
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
);
SyncPing
*
syncPingBuild3
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
);
// ---------------------------------------------
typedef
struct
SyncPingReply
{
typedef
struct
SyncPingReply
{
uint32_t
bytes
;
uint32_t
bytes
;
uint32_t
msgType
;
uint32_t
msgType
;
SRaftId
srcId
;
SRaftId
srcId
;
SRaftId
destId
;
SRaftId
destId
;
// private data
uint32_t
dataLen
;
uint32_t
dataLen
;
char
data
[];
char
data
[];
}
SyncPingReply
;
}
SyncPingReply
;
...
@@ -89,70 +78,93 @@ typedef struct SyncPingReply {
...
@@ -89,70 +78,93 @@ typedef struct SyncPingReply {
(sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t))
(sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t))
SyncPingReply
*
syncPingReplyBuild
(
uint32_t
dataLen
);
SyncPingReply
*
syncPingReplyBuild
(
uint32_t
dataLen
);
void
syncPingReplyDestroy
(
SyncPingReply
*
pMsg
);
void
syncPingReplyDestroy
(
SyncPingReply
*
pMsg
);
void
syncPingReplySerialize
(
const
SyncPingReply
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncPingReplySerialize
(
const
SyncPingReply
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncPingReplyDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPingReply
*
pMsg
);
void
syncPingReplyDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPingReply
*
pMsg
);
void
syncPingReply2RpcMsg
(
const
SyncPingReply
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncPingReply2RpcMsg
(
const
SyncPingReply
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncPingReplyFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPingReply
*
pMsg
);
void
syncPingReplyFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPingReply
*
pMsg
);
cJSON
*
syncPingReply2Json
(
const
SyncPingReply
*
pMsg
);
cJSON
*
syncPingReply2Json
(
const
SyncPingReply
*
pMsg
);
SyncPingReply
*
syncPingReplyBuild2
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
,
const
char
*
str
);
SyncPingReply
*
syncPingReplyBuild2
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
,
const
char
*
str
);
SyncPingReply
*
syncPingReplyBuild3
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
);
SyncPingReply
*
syncPingReplyBuild3
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
);
// ---------------------------------------------
typedef
struct
SyncClientRequest
{
typedef
struct
SyncClientRequest
{
ESyncMessageType
msgType
;
uint32_t
bytes
;
char
*
data
;
uint32_t
msgType
;
uint32_t
dataLen
;
int64_t
seqNum
;
int64_t
seqNum
;
bool
isWeak
;
bool
isWeak
;
uint32_t
dataLen
;
char
data
[];
}
SyncClientRequest
;
}
SyncClientRequest
;
// ---------------------------------------------
typedef
struct
SyncClientRequestReply
{
typedef
struct
SyncClientRequestReply
{
ESyncMessageType
msgType
;
uint32_t
bytes
;
uint32_t
msgType
;
int32_t
errCode
;
int32_t
errCode
;
SSyncBuffer
*
pErrMsg
;
SRaftId
leaderHint
;
SSyncBuffer
*
pLeaderHint
;
}
SyncClientRequestReply
;
}
SyncClientRequestReply
;
// ---------------------------------------------
typedef
struct
SyncRequestVote
{
typedef
struct
SyncRequestVote
{
ESyncMessageType
msgType
;
uint32_t
bytes
;
uint32_t
msgType
;
SRaftId
srcId
;
SRaftId
destId
;
// private data
SyncTerm
currentTerm
;
SyncTerm
currentTerm
;
SyncNodeId
nodeId
;
SyncGroupId
vgId
;
SyncIndex
lastLogIndex
;
SyncIndex
lastLogIndex
;
SyncTerm
lastLogTerm
;
SyncTerm
lastLogTerm
;
}
SyncRequestVote
;
}
SyncRequestVote
;
SyncRequestVote
*
syncRequestVoteBuild
();
void
syncRequestVoteDestroy
(
SyncRequestVote
*
pMsg
);
void
syncRequestVoteSerialize
(
const
SyncRequestVote
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncRequestVoteDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncRequestVote
*
pMsg
);
void
syncRequestVote2RpcMsg
(
const
SyncRequestVote
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncRequestVoteFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncRequestVote
*
pMsg
);
cJSON
*
syncRequestVote2Json
(
const
SyncRequestVote
*
pMsg
);
// ---------------------------------------------
typedef
struct
SyncRequestVoteReply
{
typedef
struct
SyncRequestVoteReply
{
ESyncMessageType
msgType
;
uint32_t
bytes
;
SyncTerm
currentTerm
;
uint32_t
msgType
;
SyncNodeId
nodeId
;
SRaftId
srcId
;
SyncGroupId
vgId
;
SRaftId
destId
;
// private data
SyncTerm
term
;
bool
voteGranted
;
bool
voteGranted
;
}
SyncRequestVoteReply
;
}
SyncRequestVoteReply
;
SyncRequestVoteReply
*
SyncRequestVoteReplyBuild
();
void
syncRequestVoteReplyDestroy
(
SyncRequestVoteReply
*
pMsg
);
void
syncRequestVoteReplySerialize
(
const
SyncRequestVoteReply
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncRequestVoteReplyDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncRequestVoteReply
*
pMsg
);
void
syncRequestVoteReply2RpcMsg
(
const
SyncRequestVoteReply
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncRequestVoteReplyFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncRequestVoteReply
*
pMsg
);
cJSON
*
syncRequestVoteReply2Json
(
const
SyncRequestVoteReply
*
pMsg
);
// ---------------------------------------------
typedef
struct
SyncAppendEntries
{
typedef
struct
SyncAppendEntries
{
ESyncMessageType
msgType
;
uint32_t
bytes
;
SyncTerm
currentTerm
;
uint32_t
msgType
;
SyncNodeId
nodeId
;
SRaftId
srcId
;
SRaftId
destId
;
// private data
SyncIndex
prevLogIndex
;
SyncIndex
prevLogIndex
;
SyncTerm
prevLogTerm
;
SyncTerm
prevLogTerm
;
int32_t
entryCount
;
SSyncRaftEntry
*
logEntries
;
SyncIndex
commitIndex
;
SyncIndex
commitIndex
;
uint32_t
dataLen
;
char
data
[];
}
SyncAppendEntries
;
}
SyncAppendEntries
;
// ---------------------------------------------
typedef
struct
SyncAppendEntriesReply
{
typedef
struct
SyncAppendEntriesReply
{
ESyncMessageType
msgType
;
uint32_t
bytes
;
SyncTerm
currentTerm
;
uint32_t
msgType
;
SyncNodeId
nodeId
;
SRaftId
srcId
;
SRaftId
destId
;
// private data
bool
success
;
bool
success
;
SyncIndex
matchIndex
;
SyncIndex
matchIndex
;
}
SyncAppendEntriesReply
;
}
SyncAppendEntriesReply
;
...
...
source/libs/sync/src/syncMessage.c
浏览文件 @
f263a623
...
@@ -60,12 +60,15 @@ void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) {
...
@@ -60,12 +60,15 @@ void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) {
}
}
cJSON
*
syncPing2Json
(
const
SyncPing
*
pMsg
)
{
cJSON
*
syncPing2Json
(
const
SyncPing
*
pMsg
)
{
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON
*
pSrcId
=
cJSON_CreateObject
();
cJSON
*
pSrcId
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pSrcId
,
"addr"
,
pMsg
->
srcId
.
addr
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
srcId
.
addr
);
cJSON_AddStringToObject
(
pSrcId
,
"addr"
,
u64buf
);
{
{
uint64_t
u64
=
pMsg
->
srcId
.
addr
;
uint64_t
u64
=
pMsg
->
srcId
.
addr
;
cJSON
*
pTmp
=
pSrcId
;
cJSON
*
pTmp
=
pSrcId
;
...
@@ -79,7 +82,8 @@ cJSON* syncPing2Json(const SyncPing* pMsg) {
...
@@ -79,7 +82,8 @@ cJSON* syncPing2Json(const SyncPing* pMsg) {
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
cJSON
*
pDestId
=
cJSON_CreateObject
();
cJSON
*
pDestId
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pDestId
,
"addr"
,
pMsg
->
destId
.
addr
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
destId
.
addr
);
cJSON_AddStringToObject
(
pDestId
,
"addr"
,
u64buf
);
{
{
uint64_t
u64
=
pMsg
->
destId
.
addr
;
uint64_t
u64
=
pMsg
->
destId
.
addr
;
cJSON
*
pTmp
=
pDestId
;
cJSON
*
pTmp
=
pDestId
;
...
@@ -154,12 +158,15 @@ void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg) {
...
@@ -154,12 +158,15 @@ void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg) {
}
}
cJSON
*
syncPingReply2Json
(
const
SyncPingReply
*
pMsg
)
{
cJSON
*
syncPingReply2Json
(
const
SyncPingReply
*
pMsg
)
{
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON
*
pSrcId
=
cJSON_CreateObject
();
cJSON
*
pSrcId
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pSrcId
,
"addr"
,
pMsg
->
srcId
.
addr
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
srcId
.
addr
);
cJSON_AddStringToObject
(
pSrcId
,
"addr"
,
u64buf
);
{
{
uint64_t
u64
=
pMsg
->
srcId
.
addr
;
uint64_t
u64
=
pMsg
->
srcId
.
addr
;
cJSON
*
pTmp
=
pSrcId
;
cJSON
*
pTmp
=
pSrcId
;
...
@@ -173,7 +180,8 @@ cJSON* syncPingReply2Json(const SyncPingReply* pMsg) {
...
@@ -173,7 +180,8 @@ cJSON* syncPingReply2Json(const SyncPingReply* pMsg) {
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
cJSON
*
pDestId
=
cJSON_CreateObject
();
cJSON
*
pDestId
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pDestId
,
"addr"
,
pMsg
->
destId
.
addr
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
destId
.
addr
);
cJSON_AddStringToObject
(
pDestId
,
"addr"
,
u64buf
);
{
{
uint64_t
u64
=
pMsg
->
destId
.
addr
;
uint64_t
u64
=
pMsg
->
destId
.
addr
;
cJSON
*
pTmp
=
pDestId
;
cJSON
*
pTmp
=
pDestId
;
...
@@ -208,72 +216,169 @@ SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId)
...
@@ -208,72 +216,169 @@ SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId)
return
pMsg
;
return
pMsg
;
}
}
#if 0
// ---- message process SyncRequestVote----
void syncPingSerialize(const SyncPing* pMsg, char** ppBuf, uint32_t* bufLen) {
SyncRequestVote
*
syncRequestVoteBuild
()
{
*bufLen = sizeof(SyncPing) + pMsg->dataLen;
uint32_t
bytes
=
sizeof
(
SyncRequestVote
);
*ppBuf = (char*)malloc(*bufLen);
SyncRequestVote
*
pMsg
=
malloc
(
bytes
);
void* pStart = *ppBuf;
memset
(
pMsg
,
0
,
bytes
);
uint32_t allBytes = *bufLen;
pMsg
->
bytes
=
bytes
;
pMsg
->
msgType
=
SYNC_REQUEST_VOTE
;
}
void
syncRequestVoteDestroy
(
SyncRequestVote
*
pMsg
)
{
if
(
pMsg
!=
NULL
)
{
free
(
pMsg
);
}
}
void
syncRequestVoteSerialize
(
const
SyncRequestVote
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
)
{
assert
(
pMsg
->
bytes
<=
bufLen
);
memcpy
(
buf
,
pMsg
,
pMsg
->
bytes
);
}
void
syncRequestVoteDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncRequestVote
*
pMsg
)
{
memcpy
(
pMsg
,
buf
,
len
);
assert
(
len
==
pMsg
->
bytes
);
}
void
syncRequestVote2RpcMsg
(
const
SyncRequestVote
*
pMsg
,
SRpcMsg
*
pRpcMsg
)
{
memset
(
pRpcMsg
,
0
,
sizeof
(
*
pRpcMsg
));
pRpcMsg
->
msgType
=
pMsg
->
msgType
;
pRpcMsg
->
contLen
=
pMsg
->
bytes
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
syncRequestVoteSerialize
(
pMsg
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
}
int len = 0;
void
syncRequestVoteFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncRequestVote
*
pMsg
)
{
len = taosEncodeFixedU32(&pStart, pMsg->msgType);
syncRequestVoteDeserialize
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
pMsg
);
allBytes -= len;
}
assert(len > 0);
pStart += len;
len = taosEncodeFixedU64(&pStart, pMsg->srcId.addr);
cJSON
*
syncRequestVote2Json
(
const
SyncRequestVote
*
pMsg
)
{
allBytes -= len;
char
u64buf
[
128
];
assert(len > 0);
pStart += len;
len = taosEncodeFixedI32(&pStart, pMsg->srcId.vgId);
cJSON
*
pRoot
=
cJSON_CreateObject
();
allBytes -= len;
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
assert(len > 0);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
pStart += len;
len = taosEncodeFixedU64(&pStart, pMsg->destId.addr);
cJSON
*
pSrcId
=
cJSON_CreateObject
();
allBytes -= len;
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
srcId
.
addr
);
assert(len > 0);
cJSON_AddStringToObject
(
pSrcId
,
"addr"
,
u64buf
);
pStart += len;
{
uint64_t
u64
=
pMsg
->
srcId
.
addr
;
cJSON
*
pTmp
=
pSrcId
;
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pSrcId
,
"vgId"
,
pMsg
->
srcId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
len = taosEncodeFixedI32(&pStart, pMsg->destId.vgId);
cJSON
*
pDestId
=
cJSON_CreateObject
();
allBytes -= len;
cJSON_AddNumberToObject
(
pDestId
,
"addr"
,
pMsg
->
destId
.
addr
);
assert(len > 0);
{
pStart += len;
uint64_t
u64
=
pMsg
->
destId
.
addr
;
cJSON
*
pTmp
=
pDestId
;
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pDestId
,
"vgId"
,
pMsg
->
destId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"destId"
,
pDestId
);
len = taosEncodeFixedU32(&pStart, pMsg->dataLen);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
currentTerm
);
allBytes -= len;
cJSON_AddStringToObject
(
pRoot
,
"currentTerm"
,
u64buf
);
assert(len > 0);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
lastLogIndex
);
pStart += len;
cJSON_AddStringToObject
(
pRoot
,
"lastLogIndex"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
lastLogTerm
);
cJSON_AddStringToObject
(
pRoot
,
"lastLogTerm"
,
u64buf
);
memcpy(pStart, pMsg->data, pMsg->dataLen
);
cJSON
*
pJson
=
cJSON_CreateObject
(
);
allBytes -= pMsg->dataLen
;
cJSON_AddItemToObject
(
pJson
,
"SyncRequestVote"
,
pRoot
)
;
assert(allBytes == 0)
;
return
pJson
;
}
}
// ---- message process SyncRequestVoteReply----
SyncRequestVoteReply
*
SyncRequestVoteReplyBuild
()
{
uint32_t
bytes
=
sizeof
(
SyncRequestVoteReply
);
SyncRequestVoteReply
*
pMsg
=
malloc
(
bytes
);
memset
(
pMsg
,
0
,
bytes
);
pMsg
->
bytes
=
bytes
;
pMsg
->
msgType
=
SYNC_REQUEST_VOTE_REPLY
;
}
void sync
PingDeserialize(const char* buf, uint32_t len, SyncPing
* pMsg) {
void
sync
RequestVoteReplyDestroy
(
SyncRequestVoteReply
*
pMsg
)
{
void* pStart = (void*)buf;
if
(
pMsg
!=
NULL
)
{
uint64_t u64
;
free
(
pMsg
)
;
int32_t i32;
}
uint32_t u32;
}
pStart = taosDecodeFixedU64(pStart, &u64);
void
syncRequestVoteReplySerialize
(
const
SyncRequestVoteReply
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
)
{
pMsg->msgType = u64;
assert
(
pMsg
->
bytes
<=
bufLen
);
memcpy
(
buf
,
pMsg
,
pMsg
->
bytes
);
}
void
syncRequestVoteReplyDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncRequestVoteReply
*
pMsg
)
{
memcpy
(
pMsg
,
buf
,
len
);
assert
(
len
==
pMsg
->
bytes
);
}
pStart = taosDecodeFixedU64(pStart, &u64);
void
syncRequestVoteReply2RpcMsg
(
const
SyncRequestVoteReply
*
pMsg
,
SRpcMsg
*
pRpcMsg
)
{
pMsg->srcId.addr = u64;
memset
(
pRpcMsg
,
0
,
sizeof
(
*
pRpcMsg
));
pRpcMsg
->
msgType
=
pMsg
->
msgType
;
pRpcMsg
->
contLen
=
pMsg
->
bytes
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
syncRequestVoteReplySerialize
(
pMsg
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
}
pStart = taosDecodeFixedI32(pStart, &i32);
void
syncRequestVoteReplyFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncRequestVoteReply
*
pMsg
)
{
pMsg->srcId.vgId = i32;
syncRequestVoteReplyDeserialize
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
pMsg
);
}
pStart = taosDecodeFixedU64(pStart, &u64);
cJSON
*
syncRequestVoteReply2Json
(
const
SyncRequestVoteReply
*
pMsg
)
{
pMsg->destId.addr = u64
;
char
u64buf
[
128
]
;
pStart = taosDecodeFixedI32(pStart, &i32);
cJSON
*
pRoot
=
cJSON_CreateObject
();
pMsg->destId.vgId = i32;
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
pStart = taosDecodeFixedU32(pStart, &u32);
cJSON
*
pSrcId
=
cJSON_CreateObject
();
pMsg->dataLen = u32;
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
srcId
.
addr
);
cJSON_AddStringToObject
(
pSrcId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pMsg
->
srcId
.
addr
;
cJSON
*
pTmp
=
pSrcId
;
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pSrcId
,
"vgId"
,
pMsg
->
srcId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
cJSON
*
pDestId
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pDestId
,
"addr"
,
pMsg
->
destId
.
addr
);
{
uint64_t
u64
=
pMsg
->
destId
.
addr
;
cJSON
*
pTmp
=
pDestId
;
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pDestId
,
"vgId"
,
pMsg
->
destId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"destId"
,
pDestId
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
term
);
cJSON_AddStringToObject
(
pRoot
,
"term"
,
u64buf
);
cJSON_AddNumberToObject
(
pRoot
,
"vote_granted"
,
pMsg
->
voteGranted
);
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncRequestVoteReply"
,
pRoot
);
return
pJson
;
}
}
\ No newline at end of file
#endif
\ No newline at end of file
source/libs/sync/test/syncEncodeTest.cpp
浏览文件 @
f263a623
...
@@ -3,6 +3,7 @@
...
@@ -3,6 +3,7 @@
#include "syncIO.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncMessage.h"
#include "syncUtil.h"
void
logTest
()
{
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
sTrace
(
"--- sync log test: trace"
);
...
@@ -21,16 +22,16 @@ void test1() {
...
@@ -21,16 +22,16 @@ void test1() {
char
msg
[
PING_MSG_LEN
];
char
msg
[
PING_MSG_LEN
];
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"test ping"
);
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"test ping"
);
SyncPing
*
pMsg
=
syncPingBuild
(
PING_MSG_LEN
);
SyncPing
*
pMsg
=
syncPingBuild
(
PING_MSG_LEN
);
pMsg
->
srcId
.
addr
=
1
;
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1111
)
;
pMsg
->
srcId
.
vgId
=
2
;
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
3
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
2222
)
;
pMsg
->
destId
.
vgId
=
4
;
pMsg
->
destId
.
vgId
=
100
;
memcpy
(
pMsg
->
data
,
msg
,
PING_MSG_LEN
);
memcpy
(
pMsg
->
data
,
msg
,
PING_MSG_LEN
);
{
{
cJSON
*
pJson
=
syncPing2Json
(
pMsg
);
cJSON
*
pJson
=
syncPing2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
SyncPing:
\n
%s
\n\n
"
,
serialized
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
cJSON_Delete
(
pJson
);
}
}
...
@@ -45,7 +46,7 @@ void test1() {
...
@@ -45,7 +46,7 @@ void test1() {
{
{
cJSON
*
pJson
=
syncPing2Json
(
pMsg2
);
cJSON
*
pJson
=
syncPing2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
SyncPing2:
\n
%s
\n\n
"
,
serialized
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
cJSON_Delete
(
pJson
);
}
}
...
@@ -61,16 +62,16 @@ void test2() {
...
@@ -61,16 +62,16 @@ void test2() {
char
msg
[
PING_MSG_LEN
];
char
msg
[
PING_MSG_LEN
];
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"hello raft"
);
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"hello raft"
);
SyncPing
*
pMsg
=
syncPingBuild
(
PING_MSG_LEN
);
SyncPing
*
pMsg
=
syncPingBuild
(
PING_MSG_LEN
);
pMsg
->
srcId
.
addr
=
100
;
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
3333
)
;
pMsg
->
srcId
.
vgId
=
200
;
pMsg
->
srcId
.
vgId
=
200
;
pMsg
->
destId
.
addr
=
300
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
4444
)
;
pMsg
->
destId
.
vgId
=
4
00
;
pMsg
->
destId
.
vgId
=
2
00
;
memcpy
(
pMsg
->
data
,
msg
,
PING_MSG_LEN
);
memcpy
(
pMsg
->
data
,
msg
,
PING_MSG_LEN
);
{
{
cJSON
*
pJson
=
syncPing2Json
(
pMsg
);
cJSON
*
pJson
=
syncPing2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
SyncPing:
\n
%s
\n\n
"
,
serialized
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
cJSON_Delete
(
pJson
);
}
}
...
@@ -84,7 +85,7 @@ void test2() {
...
@@ -84,7 +85,7 @@ void test2() {
{
{
cJSON
*
pJson
=
syncPing2Json
(
pMsg2
);
cJSON
*
pJson
=
syncPing2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
SyncPing2:
\n
%s
\n\n
"
,
serialized
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
cJSON_Delete
(
pJson
);
}
}
...
@@ -99,16 +100,16 @@ void test3() {
...
@@ -99,16 +100,16 @@ void test3() {
char
msg
[
PING_MSG_LEN
];
char
msg
[
PING_MSG_LEN
];
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"test ping"
);
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"test ping"
);
SyncPingReply
*
pMsg
=
syncPingReplyBuild
(
PING_MSG_LEN
);
SyncPingReply
*
pMsg
=
syncPingReplyBuild
(
PING_MSG_LEN
);
pMsg
->
srcId
.
addr
=
19
;
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
5555
)
;
pMsg
->
srcId
.
vgId
=
29
;
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
39
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
6666
)
;
pMsg
->
destId
.
vgId
=
49
;
pMsg
->
destId
.
vgId
=
100
;
memcpy
(
pMsg
->
data
,
msg
,
PING_MSG_LEN
);
memcpy
(
pMsg
->
data
,
msg
,
PING_MSG_LEN
);
{
{
cJSON
*
pJson
=
syncPingReply2Json
(
pMsg
);
cJSON
*
pJson
=
syncPingReply2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
SyncPingReply:
\n
%s
\n\n
"
,
serialized
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
cJSON_Delete
(
pJson
);
}
}
...
@@ -123,7 +124,7 @@ void test3() {
...
@@ -123,7 +124,7 @@ void test3() {
{
{
cJSON
*
pJson
=
syncPingReply2Json
(
pMsg2
);
cJSON
*
pJson
=
syncPingReply2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
SyncPingReply2:
\n
%s
\n\n
"
,
serialized
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
cJSON_Delete
(
pJson
);
}
}
...
@@ -139,16 +140,16 @@ void test4() {
...
@@ -139,16 +140,16 @@ void test4() {
char
msg
[
PING_MSG_LEN
];
char
msg
[
PING_MSG_LEN
];
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"hello raft"
);
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"hello raft"
);
SyncPingReply
*
pMsg
=
syncPingReplyBuild
(
PING_MSG_LEN
);
SyncPingReply
*
pMsg
=
syncPingReplyBuild
(
PING_MSG_LEN
);
pMsg
->
srcId
.
addr
=
66
;
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
7777
)
;
pMsg
->
srcId
.
vgId
=
77
;
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
88
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
8888
)
;
pMsg
->
destId
.
vgId
=
99
;
pMsg
->
destId
.
vgId
=
100
;
memcpy
(
pMsg
->
data
,
msg
,
PING_MSG_LEN
);
memcpy
(
pMsg
->
data
,
msg
,
PING_MSG_LEN
);
{
{
cJSON
*
pJson
=
syncPingReply2Json
(
pMsg
);
cJSON
*
pJson
=
syncPingReply2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
SyncPingReply:
\n
%s
\n\n
"
,
serialized
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
cJSON_Delete
(
pJson
);
}
}
...
@@ -162,7 +163,7 @@ void test4() {
...
@@ -162,7 +163,7 @@ void test4() {
{
{
cJSON
*
pJson
=
syncPingReply2Json
(
pMsg2
);
cJSON
*
pJson
=
syncPingReply2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
SyncPingReply2:
\n
%s
\n\n
"
,
serialized
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
cJSON_Delete
(
pJson
);
}
}
...
@@ -170,6 +171,86 @@ void test4() {
...
@@ -170,6 +171,86 @@ void test4() {
syncPingReplyDestroy
(
pMsg
);
syncPingReplyDestroy
(
pMsg
);
syncPingReplyDestroy
(
pMsg2
);
syncPingReplyDestroy
(
pMsg2
);
}
}
void
test5
()
{
sTrace
(
"test5: ---- syncRequestVoteSerialize, syncRequestVoteDeserialize"
);
SyncRequestVote
*
pMsg
=
syncRequestVoteBuild
();
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1234
);
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"8.8.8.8"
,
5678
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
currentTerm
=
20
;
pMsg
->
lastLogIndex
=
21
;
pMsg
->
lastLogTerm
=
22
;
{
cJSON
*
pJson
=
syncRequestVote2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
uint32_t
bufLen
=
pMsg
->
bytes
;
char
*
buf
=
(
char
*
)
malloc
(
bufLen
);
syncRequestVoteSerialize
(
pMsg
,
buf
,
bufLen
);
SyncRequestVote
*
pMsg2
=
(
SyncRequestVote
*
)
malloc
(
pMsg
->
bytes
);
syncRequestVoteDeserialize
(
buf
,
bufLen
,
pMsg2
);
{
cJSON
*
pJson
=
syncRequestVote2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncRequestVoteDestroy
(
pMsg
);
syncRequestVoteDestroy
(
pMsg2
);
free
(
buf
);
}
void
test6
()
{
sTrace
(
"test6: ---- syncRequestVoteReplySerialize, syncRequestVoteReplyDeserialize"
);
SyncRequestVoteReply
*
pMsg
=
SyncRequestVoteReplyBuild
();
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1234
);
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"8.8.8.8"
,
5678
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
term
=
20
;
pMsg
->
voteGranted
=
1
;
{
cJSON
*
pJson
=
syncRequestVoteReply2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
uint32_t
bufLen
=
pMsg
->
bytes
;
char
*
buf
=
(
char
*
)
malloc
(
bufLen
);
syncRequestVoteReplySerialize
(
pMsg
,
buf
,
bufLen
);
SyncRequestVoteReply
*
pMsg2
=
(
SyncRequestVoteReply
*
)
malloc
(
pMsg
->
bytes
);
syncRequestVoteReplyDeserialize
(
buf
,
bufLen
,
pMsg2
);
{
cJSON
*
pJson
=
syncRequestVoteReply2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncRequestVoteReplyDestroy
(
pMsg
);
syncRequestVoteReplyDestroy
(
pMsg2
);
free
(
buf
);
}
int
main
()
{
int
main
()
{
// taosInitLog((char*)"syncPingTest.log", 100000, 10);
// taosInitLog((char*)"syncPingTest.log", 100000, 10);
tsAsyncLog
=
0
;
tsAsyncLog
=
0
;
...
@@ -179,6 +260,8 @@ int main() {
...
@@ -179,6 +260,8 @@ int main() {
test2
();
test2
();
test3
();
test3
();
test4
();
test4
();
test5
();
test6
();
return
0
;
return
0
;
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录