Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
eec03f2f
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
eec03f2f
编写于
3月 03, 2022
作者:
L
Li Minghao
提交者:
GitHub
3月 03, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #10525 from taosdata/feature/3.0_mhli
Feature/3.0 mhli
上级
ff8c9dd9
aeb94af5
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
815 addition
and
186 deletion
+815
-186
source/libs/sync/inc/syncMessage.h
source/libs/sync/inc/syncMessage.h
+107
-75
source/libs/sync/src/syncIO.c
source/libs/sync/src/syncIO.c
+4
-10
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+0
-10
source/libs/sync/src/syncMessage.c
source/libs/sync/src/syncMessage.c
+337
-56
source/libs/sync/test/syncEncodeTest.cpp
source/libs/sync/test/syncEncodeTest.cpp
+348
-23
source/libs/sync/test/syncPingTest.cpp
source/libs/sync/test/syncPingTest.cpp
+19
-12
未找到文件。
source/libs/sync/inc/syncMessage.h
浏览文件 @
eec03f2f
...
...
@@ -28,30 +28,25 @@ extern "C" {
#include "syncRaftEntry.h"
#include "taosdef.h"
// encode as uint
64
// encode as uint
32
typedef
enum
ESyncMessageType
{
SYNC_PING
=
101
,
SYNC_PING_REPLY
=
103
,
SYNC_CLIENT_REQUEST
,
SYNC_CLIENT_REQUEST_REPLY
,
SYNC_REQUEST_VOTE
,
SYNC_REQUEST_VOTE_REPLY
,
SYNC_APPEND_ENTRIES
,
SYNC_APPEND_ENTRIES_REPLY
,
SYNC_CLIENT_REQUEST
=
105
,
SYNC_CLIENT_REQUEST_REPLY
=
107
,
SYNC_REQUEST_VOTE
=
109
,
SYNC_REQUEST_VOTE_REPLY
=
111
,
SYNC_APPEND_ENTRIES
=
113
,
SYNC_APPEND_ENTRIES_REPLY
=
115
,
}
ESyncMessageType
;
/*
typedef struct SRaftId {
SyncNodeId addr; // typedef uint64_t SyncNodeId;
SyncGroupId vgId; // typedef int32_t SyncGroupId;
} SRaftId;
*/
// ---------------------------------------------
typedef
struct
SyncPing
{
uint32_t
bytes
;
uint32_t
msgType
;
SRaftId
srcId
;
SRaftId
destId
;
// private data
uint32_t
dataLen
;
char
data
[];
}
SyncPing
;
...
...
@@ -59,28 +54,22 @@ typedef struct SyncPing {
#define SYNC_PING_FIX_LEN (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t))
SyncPing
*
syncPingBuild
(
uint32_t
dataLen
);
void
syncPingDestroy
(
SyncPing
*
pMsg
);
void
syncPingSerialize
(
const
SyncPing
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncPingDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPing
*
pMsg
);
void
syncPing2RpcMsg
(
const
SyncPing
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncPingFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPing
*
pMsg
);
cJSON
*
syncPing2Json
(
const
SyncPing
*
pMsg
);
void
syncPingDestroy
(
SyncPing
*
pMsg
);
void
syncPingSerialize
(
const
SyncPing
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncPingDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPing
*
pMsg
);
void
syncPing2RpcMsg
(
const
SyncPing
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncPingFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPing
*
pMsg
);
cJSON
*
syncPing2Json
(
const
SyncPing
*
pMsg
);
SyncPing
*
syncPingBuild2
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
,
const
char
*
str
);
SyncPing
*
syncPingBuild3
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
);
// ---------------------------------------------
typedef
struct
SyncPingReply
{
uint32_t
bytes
;
uint32_t
msgType
;
SRaftId
srcId
;
SRaftId
destId
;
// private data
uint32_t
dataLen
;
char
data
[];
}
SyncPingReply
;
...
...
@@ -89,74 +78,117 @@ typedef struct SyncPingReply {
(sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t))
SyncPingReply
*
syncPingReplyBuild
(
uint32_t
dataLen
);
void
syncPingReplyDestroy
(
SyncPingReply
*
pMsg
);
void
syncPingReplySerialize
(
const
SyncPingReply
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncPingReplyDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPingReply
*
pMsg
);
void
syncPingReply2RpcMsg
(
const
SyncPingReply
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncPingReplyFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPingReply
*
pMsg
);
cJSON
*
syncPingReply2Json
(
const
SyncPingReply
*
pMsg
);
void
syncPingReplyDestroy
(
SyncPingReply
*
pMsg
);
void
syncPingReplySerialize
(
const
SyncPingReply
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncPingReplyDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPingReply
*
pMsg
);
void
syncPingReply2RpcMsg
(
const
SyncPingReply
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncPingReplyFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncPingReply
*
pMsg
);
cJSON
*
syncPingReply2Json
(
const
SyncPingReply
*
pMsg
);
SyncPingReply
*
syncPingReplyBuild2
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
,
const
char
*
str
);
SyncPingReply
*
syncPingReplyBuild3
(
const
SRaftId
*
srcId
,
const
SRaftId
*
destId
);
// ---------------------------------------------
typedef
struct
SyncClientRequest
{
ESyncMessageType
msgType
;
char
*
data
;
uint32_t
dataLen
;
int64_t
seqNum
;
bool
isWeak
;
uint32_t
bytes
;
uint32_t
msgType
;
int64_t
seqNum
;
bool
isWeak
;
uint32_t
dataLen
;
char
data
[];
}
SyncClientRequest
;
// ---------------------------------------------
typedef
struct
SyncClientRequestReply
{
ESyncMessageType
msgType
;
int32_t
errCod
e
;
SSyncBuffer
*
pErrMsg
;
S
SyncBuffer
*
pL
eaderHint
;
uint32_t
bytes
;
uint32_t
msgTyp
e
;
int32_t
errCode
;
S
RaftId
l
eaderHint
;
}
SyncClientRequestReply
;
// ---------------------------------------------
typedef
struct
SyncRequestVote
{
ESyncMessageType
msgType
;
SyncTerm
currentTerm
;
SyncNodeId
nodeId
;
SyncGroupId
vgId
;
SyncIndex
lastLogIndex
;
SyncTerm
lastLogTerm
;
uint32_t
bytes
;
uint32_t
msgType
;
SRaftId
srcId
;
SRaftId
destId
;
// private data
SyncTerm
currentTerm
;
SyncIndex
lastLogIndex
;
SyncTerm
lastLogTerm
;
}
SyncRequestVote
;
SyncRequestVote
*
syncRequestVoteBuild
();
void
syncRequestVoteDestroy
(
SyncRequestVote
*
pMsg
);
void
syncRequestVoteSerialize
(
const
SyncRequestVote
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncRequestVoteDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncRequestVote
*
pMsg
);
void
syncRequestVote2RpcMsg
(
const
SyncRequestVote
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncRequestVoteFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncRequestVote
*
pMsg
);
cJSON
*
syncRequestVote2Json
(
const
SyncRequestVote
*
pMsg
);
// ---------------------------------------------
typedef
struct
SyncRequestVoteReply
{
ESyncMessageType
msgType
;
SyncTerm
currentTerm
;
SyncNodeId
nodeId
;
SyncGroupId
vgId
;
bool
voteGranted
;
uint32_t
bytes
;
uint32_t
msgType
;
SRaftId
srcId
;
SRaftId
destId
;
// private data
SyncTerm
term
;
bool
voteGranted
;
}
SyncRequestVoteReply
;
SyncRequestVoteReply
*
SyncRequestVoteReplyBuild
();
void
syncRequestVoteReplyDestroy
(
SyncRequestVoteReply
*
pMsg
);
void
syncRequestVoteReplySerialize
(
const
SyncRequestVoteReply
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncRequestVoteReplyDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncRequestVoteReply
*
pMsg
);
void
syncRequestVoteReply2RpcMsg
(
const
SyncRequestVoteReply
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncRequestVoteReplyFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncRequestVoteReply
*
pMsg
);
cJSON
*
syncRequestVoteReply2Json
(
const
SyncRequestVoteReply
*
pMsg
);
// ---------------------------------------------
typedef
struct
SyncAppendEntries
{
ESyncMessageType
msgType
;
SyncTerm
currentTerm
;
SyncNodeId
nodeId
;
SyncIndex
prevLogIndex
;
SyncTerm
prevLogTerm
;
int32_t
entryCount
;
SSyncRaftEntry
*
logEntries
;
SyncIndex
commitIndex
;
uint32_t
bytes
;
uint32_t
msgType
;
SRaftId
srcId
;
SRaftId
destId
;
// private data
SyncIndex
prevLogIndex
;
SyncTerm
prevLogTerm
;
SyncIndex
commitIndex
;
uint32_t
dataLen
;
char
data
[];
}
SyncAppendEntries
;
#define SYNC_APPEND_ENTRIES_FIX_LEN \
(sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(SyncIndex) + sizeof(SyncTerm) + \
sizeof(SyncIndex) + sizeof(uint32_t))
SyncAppendEntries
*
syncAppendEntriesBuild
(
uint32_t
dataLen
);
void
syncAppendEntriesDestroy
(
SyncAppendEntries
*
pMsg
);
void
syncAppendEntriesSerialize
(
const
SyncAppendEntries
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncAppendEntriesDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncAppendEntries
*
pMsg
);
void
syncAppendEntries2RpcMsg
(
const
SyncAppendEntries
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncAppendEntriesFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncAppendEntries
*
pMsg
);
cJSON
*
syncAppendEntries2Json
(
const
SyncAppendEntries
*
pMsg
);
// ---------------------------------------------
typedef
struct
SyncAppendEntriesReply
{
ESyncMessageType
msgType
;
SyncTerm
currentTerm
;
SyncNodeId
nodeId
;
bool
success
;
SyncIndex
matchIndex
;
uint32_t
bytes
;
uint32_t
msgType
;
SRaftId
srcId
;
SRaftId
destId
;
// private data
bool
success
;
SyncIndex
matchIndex
;
}
SyncAppendEntriesReply
;
SyncAppendEntriesReply
*
syncAppendEntriesReplyBuild
();
void
syncAppendEntriesReplyDestroy
(
SyncAppendEntriesReply
*
pMsg
);
void
syncAppendEntriesReplySerialize
(
const
SyncAppendEntriesReply
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncAppendEntriesReplyDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncAppendEntriesReply
*
pMsg
);
void
syncAppendEntriesReply2RpcMsg
(
const
SyncAppendEntriesReply
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncAppendEntriesReplyFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncAppendEntriesReply
*
pMsg
);
cJSON
*
syncAppendEntriesReply2Json
(
const
SyncAppendEntriesReply
*
pMsg
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/sync/src/syncIO.c
浏览文件 @
eec03f2f
...
...
@@ -211,23 +211,17 @@ static void *syncIOConsumerFunc(void *param) {
if
(
pRpcMsg
->
msgType
==
SYNC_PING
)
{
if
(
io
->
FpOnSyncPing
!=
NULL
)
{
SyncPing
*
pSyncMsg
;
SRpcMsg
tmpRpcMsg
;
memcpy
(
&
tmpRpcMsg
,
pRpcMsg
,
sizeof
(
SRpcMsg
));
pSyncMsg
=
syncPingBuild
(
tmpRpcMsg
.
contLen
);
pSyncMsg
=
syncPingBuild
(
pRpcMsg
->
contLen
);
syncPingFromRpcMsg
(
pRpcMsg
,
pSyncMsg
);
// memcpy(pSyncMsg, tmpRpcMsg.pCont, tmpRpcMsg.contLen);
io
->
FpOnSyncPing
(
io
->
pSyncNode
,
pSyncMsg
);
}
}
else
if
(
pRpcMsg
->
msgType
==
SYNC_PING_REPLY
)
{
SyncPingReply
*
pSyncMsg
=
syncPingReplyBuild
(
pRpcMsg
->
contLen
);
syncPingReplyFromRpcMsg
(
pRpcMsg
,
pSyncMsg
);
if
(
io
->
FpOnSyncPingReply
!=
NULL
)
{
SyncPingReply
*
pSyncMsg
;
pSyncMsg
=
syncPingReplyBuild
(
pRpcMsg
->
contLen
);
syncPingReplyFromRpcMsg
(
pRpcMsg
,
pSyncMsg
);
io
->
FpOnSyncPingReply
(
io
->
pSyncNode
,
pSyncMsg
);
}
}
else
{
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
eec03f2f
...
...
@@ -171,16 +171,6 @@ static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, Syn
SRpcMsg
rpcMsg
;
syncPing2RpcMsg
(
pMsg
,
&
rpcMsg
);
/*
SRpcMsg rpcMsg;
rpcMsg.contLen = 64;
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
snprintf((char*)rpcMsg.pCont, rpcMsg.contLen, "%s", "xxxxxxxxxxxxxx");
rpcMsg.handle = NULL;
rpcMsg.msgType = 1;
*/
syncNodeSendMsgById
(
destRaftId
,
pSyncNode
,
&
rpcMsg
);
{
...
...
source/libs/sync/src/syncMessage.c
浏览文件 @
eec03f2f
...
...
@@ -60,12 +60,15 @@ void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) {
}
cJSON
*
syncPing2Json
(
const
SyncPing
*
pMsg
)
{
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON
*
pSrcId
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pSrcId
,
"addr"
,
pMsg
->
srcId
.
addr
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
srcId
.
addr
);
cJSON_AddStringToObject
(
pSrcId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pMsg
->
srcId
.
addr
;
cJSON
*
pTmp
=
pSrcId
;
...
...
@@ -79,7 +82,8 @@ cJSON* syncPing2Json(const SyncPing* pMsg) {
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
cJSON
*
pDestId
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pDestId
,
"addr"
,
pMsg
->
destId
.
addr
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
destId
.
addr
);
cJSON_AddStringToObject
(
pDestId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pMsg
->
destId
.
addr
;
cJSON
*
pTmp
=
pDestId
;
...
...
@@ -154,12 +158,15 @@ void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg) {
}
cJSON
*
syncPingReply2Json
(
const
SyncPingReply
*
pMsg
)
{
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON
*
pSrcId
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pSrcId
,
"addr"
,
pMsg
->
srcId
.
addr
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
srcId
.
addr
);
cJSON_AddStringToObject
(
pSrcId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pMsg
->
srcId
.
addr
;
cJSON
*
pTmp
=
pSrcId
;
...
...
@@ -173,7 +180,8 @@ cJSON* syncPingReply2Json(const SyncPingReply* pMsg) {
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
cJSON
*
pDestId
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pDestId
,
"addr"
,
pMsg
->
destId
.
addr
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
destId
.
addr
);
cJSON_AddStringToObject
(
pDestId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pMsg
->
destId
.
addr
;
cJSON
*
pTmp
=
pDestId
;
...
...
@@ -208,72 +216,345 @@ SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId)
return
pMsg
;
}
#if 0
void syncPingSerialize(const SyncPing* pMsg, char** ppBuf, uint32_t* bufLen) {
*bufLen = sizeof(SyncPing) + pMsg->dataLen;
*ppBuf = (char*)malloc(*bufLen);
void* pStart = *ppBuf;
uint32_t allBytes = *bufLen;
// ---- message process SyncRequestVote----
SyncRequestVote
*
syncRequestVoteBuild
()
{
uint32_t
bytes
=
sizeof
(
SyncRequestVote
);
SyncRequestVote
*
pMsg
=
malloc
(
bytes
);
memset
(
pMsg
,
0
,
bytes
);
pMsg
->
bytes
=
bytes
;
pMsg
->
msgType
=
SYNC_REQUEST_VOTE
;
}
void
syncRequestVoteDestroy
(
SyncRequestVote
*
pMsg
)
{
if
(
pMsg
!=
NULL
)
{
free
(
pMsg
);
}
}
void
syncRequestVoteSerialize
(
const
SyncRequestVote
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
)
{
assert
(
pMsg
->
bytes
<=
bufLen
);
memcpy
(
buf
,
pMsg
,
pMsg
->
bytes
);
}
void
syncRequestVoteDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncRequestVote
*
pMsg
)
{
memcpy
(
pMsg
,
buf
,
len
);
assert
(
len
==
pMsg
->
bytes
);
}
int len = 0;
len = taosEncodeFixedU32(&pStart, pMsg->msgType);
allBytes -= len;
assert(len > 0);
pStart += len;
void
syncRequestVote2RpcMsg
(
const
SyncRequestVote
*
pMsg
,
SRpcMsg
*
pRpcMsg
)
{
memset
(
pRpcMsg
,
0
,
sizeof
(
*
pRpcMsg
));
pRpcMsg
->
msgType
=
pMsg
->
msgType
;
pRpcMsg
->
contLen
=
pMsg
->
bytes
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
syncRequestVoteSerialize
(
pMsg
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
}
void
syncRequestVoteFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncRequestVote
*
pMsg
)
{
syncRequestVoteDeserialize
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
pMsg
);
}
len = taosEncodeFixedU64(&pStart, pMsg->srcId.addr);
allBytes -= len;
assert(len > 0);
pStart += len;
cJSON
*
syncRequestVote2Json
(
const
SyncRequestVote
*
pMsg
)
{
char
u64buf
[
128
];
len = taosEncodeFixedI32(&pStart, pMsg->srcId.vgId);
allBytes -= len;
assert(len > 0);
pStart += len;
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
len = taosEncodeFixedU64(&pStart, pMsg->destId.addr);
allBytes -= len;
assert(len > 0);
pStart += len;
cJSON
*
pSrcId
=
cJSON_CreateObject
();
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
srcId
.
addr
);
cJSON_AddStringToObject
(
pSrcId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pMsg
->
srcId
.
addr
;
cJSON
*
pTmp
=
pSrcId
;
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pSrcId
,
"vgId"
,
pMsg
->
srcId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
len = taosEncodeFixedI32(&pStart, pMsg->destId.vgId);
allBytes -= len;
assert(len > 0);
pStart += len;
cJSON
*
pDestId
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pDestId
,
"addr"
,
pMsg
->
destId
.
addr
);
{
uint64_t
u64
=
pMsg
->
destId
.
addr
;
cJSON
*
pTmp
=
pDestId
;
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pDestId
,
"vgId"
,
pMsg
->
destId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"destId"
,
pDestId
);
len = taosEncodeFixedU32(&pStart, pMsg->dataLen);
allBytes -= len;
assert(len > 0);
pStart += len;
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
currentTerm
);
cJSON_AddStringToObject
(
pRoot
,
"currentTerm"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
lastLogIndex
);
cJSON_AddStringToObject
(
pRoot
,
"lastLogIndex"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
lastLogTerm
);
cJSON_AddStringToObject
(
pRoot
,
"lastLogTerm"
,
u64buf
);
memcpy(pStart, pMsg->data, pMsg->dataLen
);
allBytes -= pMsg->dataLen
;
assert(allBytes == 0)
;
cJSON
*
pJson
=
cJSON_CreateObject
(
);
cJSON_AddItemToObject
(
pJson
,
"SyncRequestVote"
,
pRoot
)
;
return
pJson
;
}
// ---- message process SyncRequestVoteReply----
SyncRequestVoteReply
*
SyncRequestVoteReplyBuild
()
{
uint32_t
bytes
=
sizeof
(
SyncRequestVoteReply
);
SyncRequestVoteReply
*
pMsg
=
malloc
(
bytes
);
memset
(
pMsg
,
0
,
bytes
);
pMsg
->
bytes
=
bytes
;
pMsg
->
msgType
=
SYNC_REQUEST_VOTE_REPLY
;
}
void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) {
void* pStart = (void*)buf;
uint64_t u64;
int32_t i32;
uint32_t u32;
void
syncRequestVoteReplyDestroy
(
SyncRequestVoteReply
*
pMsg
)
{
if
(
pMsg
!=
NULL
)
{
free
(
pMsg
);
}
}
void
syncRequestVoteReplySerialize
(
const
SyncRequestVoteReply
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
)
{
assert
(
pMsg
->
bytes
<=
bufLen
);
memcpy
(
buf
,
pMsg
,
pMsg
->
bytes
);
}
void
syncRequestVoteReplyDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncRequestVoteReply
*
pMsg
)
{
memcpy
(
pMsg
,
buf
,
len
);
assert
(
len
==
pMsg
->
bytes
);
}
void
syncRequestVoteReply2RpcMsg
(
const
SyncRequestVoteReply
*
pMsg
,
SRpcMsg
*
pRpcMsg
)
{
memset
(
pRpcMsg
,
0
,
sizeof
(
*
pRpcMsg
));
pRpcMsg
->
msgType
=
pMsg
->
msgType
;
pRpcMsg
->
contLen
=
pMsg
->
bytes
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
syncRequestVoteReplySerialize
(
pMsg
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
}
void
syncRequestVoteReplyFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncRequestVoteReply
*
pMsg
)
{
syncRequestVoteReplyDeserialize
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
pMsg
);
}
cJSON
*
syncRequestVoteReply2Json
(
const
SyncRequestVoteReply
*
pMsg
)
{
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON
*
pSrcId
=
cJSON_CreateObject
();
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
srcId
.
addr
);
cJSON_AddStringToObject
(
pSrcId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pMsg
->
srcId
.
addr
;
cJSON
*
pTmp
=
pSrcId
;
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pSrcId
,
"vgId"
,
pMsg
->
srcId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
cJSON
*
pDestId
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pDestId
,
"addr"
,
pMsg
->
destId
.
addr
);
{
uint64_t
u64
=
pMsg
->
destId
.
addr
;
cJSON
*
pTmp
=
pDestId
;
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pDestId
,
"vgId"
,
pMsg
->
destId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"destId"
,
pDestId
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
term
);
cJSON_AddStringToObject
(
pRoot
,
"term"
,
u64buf
);
cJSON_AddNumberToObject
(
pRoot
,
"vote_granted"
,
pMsg
->
voteGranted
);
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncRequestVoteReply"
,
pRoot
);
return
pJson
;
}
// ---- message process SyncAppendEntries----
SyncAppendEntries
*
syncAppendEntriesBuild
(
uint32_t
dataLen
)
{
uint32_t
bytes
=
SYNC_APPEND_ENTRIES_FIX_LEN
+
dataLen
;
SyncAppendEntries
*
pMsg
=
malloc
(
bytes
);
memset
(
pMsg
,
0
,
bytes
);
pMsg
->
bytes
=
bytes
;
pMsg
->
msgType
=
SYNC_APPEND_ENTRIES
;
pMsg
->
dataLen
=
dataLen
;
}
void
syncAppendEntriesDestroy
(
SyncAppendEntries
*
pMsg
)
{
if
(
pMsg
!=
NULL
)
{
free
(
pMsg
);
}
}
void
syncAppendEntriesSerialize
(
const
SyncAppendEntries
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
)
{
assert
(
pMsg
->
bytes
<=
bufLen
);
memcpy
(
buf
,
pMsg
,
pMsg
->
bytes
);
}
void
syncAppendEntriesDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncAppendEntries
*
pMsg
)
{
memcpy
(
pMsg
,
buf
,
len
);
assert
(
len
==
pMsg
->
bytes
);
assert
(
pMsg
->
bytes
==
SYNC_APPEND_ENTRIES_FIX_LEN
+
pMsg
->
dataLen
);
}
void
syncAppendEntries2RpcMsg
(
const
SyncAppendEntries
*
pMsg
,
SRpcMsg
*
pRpcMsg
)
{
memset
(
pRpcMsg
,
0
,
sizeof
(
*
pRpcMsg
));
pRpcMsg
->
msgType
=
pMsg
->
msgType
;
pRpcMsg
->
contLen
=
pMsg
->
bytes
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
syncAppendEntriesSerialize
(
pMsg
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
}
void
syncAppendEntriesFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncAppendEntries
*
pMsg
)
{
syncAppendEntriesDeserialize
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
pMsg
);
}
cJSON
*
syncAppendEntries2Json
(
const
SyncAppendEntries
*
pMsg
)
{
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
pStart = taosDecodeFixedU64(pStart, &u64);
pMsg->msgType = u64;
cJSON
*
pSrcId
=
cJSON_CreateObject
();
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
srcId
.
addr
);
cJSON_AddStringToObject
(
pSrcId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pMsg
->
srcId
.
addr
;
cJSON
*
pTmp
=
pSrcId
;
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pSrcId
,
"vgId"
,
pMsg
->
srcId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
cJSON
*
pDestId
=
cJSON_CreateObject
();
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
destId
.
addr
);
cJSON_AddStringToObject
(
pDestId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pMsg
->
destId
.
addr
;
cJSON
*
pTmp
=
pDestId
;
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pDestId
,
"vgId"
,
pMsg
->
destId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"destId"
,
pDestId
);
pStart = taosDecodeFixedU64(pStart, &u64
);
pMsg->srcId.addr = u64
;
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
prevLogIndex
);
cJSON_AddStringToObject
(
pRoot
,
"pre_log_index"
,
u64buf
)
;
pStart = taosDecodeFixedI32(pStart, &i32
);
pMsg->srcId.vgId = i32
;
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
prevLogTerm
);
cJSON_AddStringToObject
(
pRoot
,
"pre_log_term"
,
u64buf
)
;
pStart = taosDecodeFixedU64(pStart, &u64
);
pMsg->destId.addr = u64
;
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
commitIndex
);
cJSON_AddStringToObject
(
pRoot
,
"commit_index"
,
u64buf
)
;
pStart = taosDecodeFixedI32(pStart, &i32
);
pMsg->destId.vgId = i32
;
cJSON_AddNumberToObject
(
pRoot
,
"dataLen"
,
pMsg
->
dataLen
);
cJSON_AddStringToObject
(
pRoot
,
"data"
,
pMsg
->
data
)
;
pStart = taosDecodeFixedU32(pStart, &u32);
pMsg->dataLen = u32;
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncAppendEntries"
,
pRoot
);
return
pJson
;
}
#endif
\ No newline at end of file
// ---- message process SyncAppendEntriesReply----
SyncAppendEntriesReply
*
syncAppendEntriesReplyBuild
()
{
uint32_t
bytes
=
sizeof
(
SyncAppendEntriesReply
);
SyncAppendEntriesReply
*
pMsg
=
malloc
(
bytes
);
memset
(
pMsg
,
0
,
bytes
);
pMsg
->
bytes
=
bytes
;
pMsg
->
msgType
=
SYNC_APPEND_ENTRIES_REPLY
;
}
void
syncAppendEntriesReplyDestroy
(
SyncAppendEntriesReply
*
pMsg
)
{
if
(
pMsg
!=
NULL
)
{
free
(
pMsg
);
}
}
void
syncAppendEntriesReplySerialize
(
const
SyncAppendEntriesReply
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
)
{
assert
(
pMsg
->
bytes
<=
bufLen
);
memcpy
(
buf
,
pMsg
,
pMsg
->
bytes
);
}
void
syncAppendEntriesReplyDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncAppendEntriesReply
*
pMsg
)
{
memcpy
(
pMsg
,
buf
,
len
);
assert
(
len
==
pMsg
->
bytes
);
}
void
syncAppendEntriesReply2RpcMsg
(
const
SyncAppendEntriesReply
*
pMsg
,
SRpcMsg
*
pRpcMsg
)
{
memset
(
pRpcMsg
,
0
,
sizeof
(
*
pRpcMsg
));
pRpcMsg
->
msgType
=
pMsg
->
msgType
;
pRpcMsg
->
contLen
=
pMsg
->
bytes
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
syncAppendEntriesReplySerialize
(
pMsg
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
}
void
syncAppendEntriesReplyFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncAppendEntriesReply
*
pMsg
)
{
syncAppendEntriesReplyDeserialize
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
pMsg
);
}
cJSON
*
syncAppendEntriesReply2Json
(
const
SyncAppendEntriesReply
*
pMsg
)
{
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON
*
pSrcId
=
cJSON_CreateObject
();
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
srcId
.
addr
);
cJSON_AddStringToObject
(
pSrcId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pMsg
->
srcId
.
addr
;
cJSON
*
pTmp
=
pSrcId
;
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pSrcId
,
"vgId"
,
pMsg
->
srcId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
cJSON
*
pDestId
=
cJSON_CreateObject
();
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
destId
.
addr
);
cJSON_AddStringToObject
(
pDestId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pMsg
->
destId
.
addr
;
cJSON
*
pTmp
=
pDestId
;
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pDestId
,
"vgId"
,
pMsg
->
destId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"destId"
,
pDestId
);
cJSON_AddNumberToObject
(
pRoot
,
"success"
,
pMsg
->
success
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
matchIndex
);
cJSON_AddStringToObject
(
pRoot
,
"match_index"
,
u64buf
);
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncAppendEntriesReply"
,
pRoot
);
return
pJson
;
}
\ No newline at end of file
source/libs/sync/test/syncEncodeTest.cpp
浏览文件 @
eec03f2f
...
...
@@ -3,6 +3,7 @@
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
...
...
@@ -14,6 +15,7 @@ void logTest() {
}
#define PING_MSG_LEN 20
#define APPEND_ENTRIES_VALUE_LEN 32
void
test1
()
{
sTrace
(
"test1: ---- syncPingSerialize, syncPingDeserialize"
);
...
...
@@ -21,16 +23,16 @@ void test1() {
char
msg
[
PING_MSG_LEN
];
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"test ping"
);
SyncPing
*
pMsg
=
syncPingBuild
(
PING_MSG_LEN
);
pMsg
->
srcId
.
addr
=
1
;
pMsg
->
srcId
.
vgId
=
2
;
pMsg
->
destId
.
addr
=
3
;
pMsg
->
destId
.
vgId
=
4
;
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1111
)
;
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
2222
)
;
pMsg
->
destId
.
vgId
=
100
;
memcpy
(
pMsg
->
data
,
msg
,
PING_MSG_LEN
);
{
cJSON
*
pJson
=
syncPing2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
SyncPing:
\n
%s
\n\n
"
,
serialized
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
...
...
@@ -45,7 +47,7 @@ void test1() {
{
cJSON
*
pJson
=
syncPing2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
SyncPing2:
\n
%s
\n\n
"
,
serialized
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
...
...
@@ -61,16 +63,16 @@ void test2() {
char
msg
[
PING_MSG_LEN
];
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"hello raft"
);
SyncPing
*
pMsg
=
syncPingBuild
(
PING_MSG_LEN
);
pMsg
->
srcId
.
addr
=
100
;
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
3333
)
;
pMsg
->
srcId
.
vgId
=
200
;
pMsg
->
destId
.
addr
=
300
;
pMsg
->
destId
.
vgId
=
4
00
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
4444
)
;
pMsg
->
destId
.
vgId
=
2
00
;
memcpy
(
pMsg
->
data
,
msg
,
PING_MSG_LEN
);
{
cJSON
*
pJson
=
syncPing2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
SyncPing:
\n
%s
\n\n
"
,
serialized
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
...
...
@@ -84,7 +86,7 @@ void test2() {
{
cJSON
*
pJson
=
syncPing2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
SyncPing2:
\n
%s
\n\n
"
,
serialized
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
...
...
@@ -99,16 +101,16 @@ void test3() {
char
msg
[
PING_MSG_LEN
];
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"test ping"
);
SyncPingReply
*
pMsg
=
syncPingReplyBuild
(
PING_MSG_LEN
);
pMsg
->
srcId
.
addr
=
19
;
pMsg
->
srcId
.
vgId
=
29
;
pMsg
->
destId
.
addr
=
39
;
pMsg
->
destId
.
vgId
=
49
;
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
5555
)
;
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
6666
)
;
pMsg
->
destId
.
vgId
=
100
;
memcpy
(
pMsg
->
data
,
msg
,
PING_MSG_LEN
);
{
cJSON
*
pJson
=
syncPingReply2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
SyncPingReply:
\n
%s
\n\n
"
,
serialized
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
...
...
@@ -123,7 +125,7 @@ void test3() {
{
cJSON
*
pJson
=
syncPingReply2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
SyncPingReply2:
\n
%s
\n\n
"
,
serialized
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
...
...
@@ -139,16 +141,16 @@ void test4() {
char
msg
[
PING_MSG_LEN
];
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"hello raft"
);
SyncPingReply
*
pMsg
=
syncPingReplyBuild
(
PING_MSG_LEN
);
pMsg
->
srcId
.
addr
=
66
;
pMsg
->
srcId
.
vgId
=
77
;
pMsg
->
destId
.
addr
=
88
;
pMsg
->
destId
.
vgId
=
99
;
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
7777
)
;
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
8888
)
;
pMsg
->
destId
.
vgId
=
100
;
memcpy
(
pMsg
->
data
,
msg
,
PING_MSG_LEN
);
{
cJSON
*
pJson
=
syncPingReply2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
SyncPingReply:
\n
%s
\n\n
"
,
serialized
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
...
...
@@ -162,7 +164,7 @@ void test4() {
{
cJSON
*
pJson
=
syncPingReply2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
SyncPingReply2:
\n
%s
\n\n
"
,
serialized
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
...
...
@@ -170,6 +172,321 @@ void test4() {
syncPingReplyDestroy
(
pMsg
);
syncPingReplyDestroy
(
pMsg2
);
}
void
test5
()
{
sTrace
(
"test5: ---- syncRequestVoteSerialize, syncRequestVoteDeserialize"
);
SyncRequestVote
*
pMsg
=
syncRequestVoteBuild
();
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1234
);
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"8.8.8.8"
,
5678
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
currentTerm
=
20
;
pMsg
->
lastLogIndex
=
21
;
pMsg
->
lastLogTerm
=
22
;
{
cJSON
*
pJson
=
syncRequestVote2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
uint32_t
bufLen
=
pMsg
->
bytes
;
char
*
buf
=
(
char
*
)
malloc
(
bufLen
);
syncRequestVoteSerialize
(
pMsg
,
buf
,
bufLen
);
SyncRequestVote
*
pMsg2
=
(
SyncRequestVote
*
)
malloc
(
pMsg
->
bytes
);
syncRequestVoteDeserialize
(
buf
,
bufLen
,
pMsg2
);
{
cJSON
*
pJson
=
syncRequestVote2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncRequestVoteDestroy
(
pMsg
);
syncRequestVoteDestroy
(
pMsg2
);
free
(
buf
);
}
void
test6
()
{
sTrace
(
"test6: ---- syncRequestVote2RpcMsg, syncRequestVoteFromRpcMsg"
);
SyncRequestVote
*
pMsg
=
syncRequestVoteBuild
();
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1234
);
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"8.8.8.8"
,
5678
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
currentTerm
=
20
;
pMsg
->
lastLogIndex
=
21
;
pMsg
->
lastLogTerm
=
22
;
{
cJSON
*
pJson
=
syncRequestVote2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
SRpcMsg
rpcMsg
;
syncRequestVote2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncRequestVote
*
pMsg2
=
(
SyncRequestVote
*
)
malloc
(
pMsg
->
bytes
);
syncRequestVoteFromRpcMsg
(
&
rpcMsg
,
pMsg2
);
rpcFreeCont
(
rpcMsg
.
pCont
);
{
cJSON
*
pJson
=
syncRequestVote2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncRequestVoteDestroy
(
pMsg
);
syncRequestVoteDestroy
(
pMsg2
);
}
void
test7
()
{
sTrace
(
"test7: ---- syncRequestVoteReplySerialize, syncRequestVoteReplyDeserialize"
);
SyncRequestVoteReply
*
pMsg
=
SyncRequestVoteReplyBuild
();
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1234
);
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"8.8.8.8"
,
5678
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
term
=
20
;
pMsg
->
voteGranted
=
1
;
{
cJSON
*
pJson
=
syncRequestVoteReply2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
uint32_t
bufLen
=
pMsg
->
bytes
;
char
*
buf
=
(
char
*
)
malloc
(
bufLen
);
syncRequestVoteReplySerialize
(
pMsg
,
buf
,
bufLen
);
SyncRequestVoteReply
*
pMsg2
=
(
SyncRequestVoteReply
*
)
malloc
(
pMsg
->
bytes
);
syncRequestVoteReplyDeserialize
(
buf
,
bufLen
,
pMsg2
);
{
cJSON
*
pJson
=
syncRequestVoteReply2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncRequestVoteReplyDestroy
(
pMsg
);
syncRequestVoteReplyDestroy
(
pMsg2
);
free
(
buf
);
}
void
test8
()
{
sTrace
(
"test8: ---- syncRequestVoteReply2RpcMsg, syncRequestVoteReplyFromRpcMsg"
);
SyncRequestVoteReply
*
pMsg
=
SyncRequestVoteReplyBuild
();
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1234
);
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"8.8.8.8"
,
5678
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
term
=
20
;
pMsg
->
voteGranted
=
1
;
{
cJSON
*
pJson
=
syncRequestVoteReply2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
SRpcMsg
rpcMsg
;
syncRequestVoteReply2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncRequestVoteReply
*
pMsg2
=
(
SyncRequestVoteReply
*
)
malloc
(
pMsg
->
bytes
);
syncRequestVoteReplyFromRpcMsg
(
&
rpcMsg
,
pMsg2
);
rpcFreeCont
(
rpcMsg
.
pCont
);
{
cJSON
*
pJson
=
syncRequestVoteReply2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncRequestVoteReplyDestroy
(
pMsg
);
syncRequestVoteReplyDestroy
(
pMsg2
);
}
void
test9
()
{
sTrace
(
"test9: ---- syncAppendEntriesSerialize, syncAppendEntriesDeserialize"
);
char
msg
[
APPEND_ENTRIES_VALUE_LEN
];
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"test value"
);
SyncAppendEntries
*
pMsg
=
syncAppendEntriesBuild
(
APPEND_ENTRIES_VALUE_LEN
);
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1111
);
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
2222
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
prevLogIndex
=
55
;
pMsg
->
prevLogTerm
=
66
;
pMsg
->
commitIndex
=
77
;
memcpy
(
pMsg
->
data
,
msg
,
APPEND_ENTRIES_VALUE_LEN
);
{
cJSON
*
pJson
=
syncAppendEntries2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
uint32_t
bufLen
=
pMsg
->
bytes
;
char
*
buf
=
(
char
*
)
malloc
(
bufLen
);
syncAppendEntriesSerialize
(
pMsg
,
buf
,
bufLen
);
SyncAppendEntries
*
pMsg2
=
(
SyncAppendEntries
*
)
malloc
(
pMsg
->
bytes
);
syncAppendEntriesDeserialize
(
buf
,
bufLen
,
pMsg2
);
{
cJSON
*
pJson
=
syncAppendEntries2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncAppendEntriesDestroy
(
pMsg
);
syncAppendEntriesDestroy
(
pMsg2
);
free
(
buf
);
}
void
test10
()
{
sTrace
(
"test10: ---- syncAppendEntries2RpcMsg, syncAppendEntriesFromRpcMsg"
);
char
msg
[
APPEND_ENTRIES_VALUE_LEN
];
snprintf
(
msg
,
sizeof
(
msg
),
"%s"
,
"test value"
);
SyncAppendEntries
*
pMsg
=
syncAppendEntriesBuild
(
APPEND_ENTRIES_VALUE_LEN
);
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1111
);
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
2222
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
prevLogIndex
=
55
;
pMsg
->
prevLogTerm
=
66
;
pMsg
->
commitIndex
=
77
;
memcpy
(
pMsg
->
data
,
msg
,
APPEND_ENTRIES_VALUE_LEN
);
{
cJSON
*
pJson
=
syncAppendEntries2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
SRpcMsg
rpcMsg
;
syncAppendEntries2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncAppendEntries
*
pMsg2
=
(
SyncAppendEntries
*
)
malloc
(
pMsg
->
bytes
);
syncAppendEntriesFromRpcMsg
(
&
rpcMsg
,
pMsg2
);
rpcFreeCont
(
rpcMsg
.
pCont
);
{
cJSON
*
pJson
=
syncAppendEntries2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncAppendEntriesDestroy
(
pMsg
);
syncAppendEntriesDestroy
(
pMsg2
);
}
void
test11
()
{
sTrace
(
"test11: ---- syncAppendEntriesReplySerialize, syncAppendEntriesReplyDeserialize"
);
SyncAppendEntriesReply
*
pMsg
=
syncAppendEntriesReplyBuild
();
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1111
);
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
2222
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
success
=
1
;
pMsg
->
matchIndex
=
23
;
{
cJSON
*
pJson
=
syncAppendEntriesReply2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
uint32_t
bufLen
=
pMsg
->
bytes
;
char
*
buf
=
(
char
*
)
malloc
(
bufLen
);
syncAppendEntriesReplySerialize
(
pMsg
,
buf
,
bufLen
);
SyncAppendEntriesReply
*
pMsg2
=
(
SyncAppendEntriesReply
*
)
malloc
(
pMsg
->
bytes
);
syncAppendEntriesReplyDeserialize
(
buf
,
bufLen
,
pMsg2
);
{
cJSON
*
pJson
=
syncAppendEntriesReply2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncAppendEntriesReplyDestroy
(
pMsg
);
syncAppendEntriesReplyDestroy
(
pMsg2
);
free
(
buf
);
}
void
test12
()
{
sTrace
(
"test12: ---- syncAppendEntriesReply2RpcMsg, syncAppendEntriesReplyFromRpcMsg"
);
SyncAppendEntriesReply
*
pMsg
=
syncAppendEntriesReplyBuild
();
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1111
);
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
2222
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
success
=
1
;
pMsg
->
matchIndex
=
23
;
{
cJSON
*
pJson
=
syncAppendEntriesReply2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
SRpcMsg
rpcMsg
;
syncAppendEntriesReply2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncAppendEntriesReply
*
pMsg2
=
(
SyncAppendEntriesReply
*
)
malloc
(
pMsg
->
bytes
);
syncAppendEntriesReplyFromRpcMsg
(
&
rpcMsg
,
pMsg2
);
rpcFreeCont
(
rpcMsg
.
pCont
);
{
cJSON
*
pJson
=
syncAppendEntriesReply2Json
(
pMsg2
);
char
*
serialized
=
cJSON_Print
(
pJson
);
printf
(
"
\n
%s
\n\n
"
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncAppendEntriesReplyDestroy
(
pMsg
);
syncAppendEntriesReplyDestroy
(
pMsg2
);
}
int
main
()
{
// taosInitLog((char*)"syncPingTest.log", 100000, 10);
tsAsyncLog
=
0
;
...
...
@@ -179,6 +496,14 @@ int main() {
test2
();
test3
();
test4
();
test5
();
test6
();
test7
();
test8
();
test9
();
test10
();
test11
();
test12
();
return
0
;
}
source/libs/sync/test/syncPingTest.cpp
浏览文件 @
eec03f2f
...
...
@@ -13,7 +13,9 @@ void logTest() {
sFatal
(
"--- sync log test: fatal"
);
}
SSyncNode
*
doSync
()
{
uint16_t
ports
[
3
]
=
{
7010
,
7110
,
7210
};
SSyncNode
*
doSync
(
int
myIndex
)
{
SSyncFSM
*
pFsm
;
SSyncInfo
syncInfo
;
...
...
@@ -24,18 +26,18 @@ SSyncNode* doSync() {
snprintf
(
syncInfo
.
path
,
sizeof
(
syncInfo
.
path
),
"%s"
,
"./test_sync_ping"
);
SSyncCfg
*
pCfg
=
&
syncInfo
.
syncCfg
;
pCfg
->
myIndex
=
0
;
pCfg
->
replicaNum
=
2
;
pCfg
->
myIndex
=
myIndex
;
pCfg
->
replicaNum
=
3
;
pCfg
->
nodeInfo
[
0
].
nodePort
=
7010
;
pCfg
->
nodeInfo
[
0
].
nodePort
=
ports
[
0
]
;
snprintf
(
pCfg
->
nodeInfo
[
0
].
nodeFqdn
,
sizeof
(
pCfg
->
nodeInfo
[
0
].
nodeFqdn
),
"%s"
,
"127.0.0.1"
);
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
pCfg
->
nodeInfo
[
1
].
nodePort
=
7110
;
pCfg
->
nodeInfo
[
1
].
nodePort
=
ports
[
1
]
;
snprintf
(
pCfg
->
nodeInfo
[
1
].
nodeFqdn
,
sizeof
(
pCfg
->
nodeInfo
[
1
].
nodeFqdn
),
"%s"
,
"127.0.0.1"
);
// taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn);
pCfg
->
nodeInfo
[
2
].
nodePort
=
7210
;
pCfg
->
nodeInfo
[
2
].
nodePort
=
ports
[
2
]
;
snprintf
(
pCfg
->
nodeInfo
[
2
].
nodeFqdn
,
sizeof
(
pCfg
->
nodeInfo
[
2
].
nodeFqdn
),
"%s"
,
"127.0.0.1"
);
// taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn);
...
...
@@ -53,20 +55,25 @@ void timerPingAll(void* param, void* tmrId) {
syncNodePingAll
(
pSyncNode
);
}
int
main
()
{
int
main
(
int
argc
,
char
**
argv
)
{
// taosInitLog((char*)"syncPingTest.log", 100000, 10);
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
logTest
();
int32_t
ret
=
syncIOStart
((
char
*
)
"127.0.0.1"
,
7010
);
int
myIndex
=
0
;
if
(
argc
>=
2
)
{
myIndex
=
atoi
(
argv
[
1
]);
}
int32_t
ret
=
syncIOStart
((
char
*
)
"127.0.0.1"
,
ports
[
myIndex
]);
assert
(
ret
==
0
);
ret
=
syncEnvStart
();
assert
(
ret
==
0
);
SSyncNode
*
pSyncNode
=
doSync
();
SSyncNode
*
pSyncNode
=
doSync
(
myIndex
);
gSyncIO
->
FpOnSyncPing
=
pSyncNode
->
FpOnPing
;
gSyncIO
->
FpOnSyncPingReply
=
pSyncNode
->
FpOnPingReply
;
...
...
@@ -74,9 +81,9 @@ int main() {
assert
(
ret
==
0
);
/*
taosMsleep(10000);
ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0);
taosMsleep(10000);
ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0);
*/
while
(
1
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录