Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b5b1417a
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b5b1417a
编写于
3月 16, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
sync append entries (sync-io)
上级
a69330b6
变更
9
显示空白变更内容
内联
并排
Showing
9 changed file
with
578 addition
and
515 deletion
+578
-515
source/libs/sync/inc/syncRaftStore.h
source/libs/sync/inc/syncRaftStore.h
+7
-5
source/libs/sync/src/syncIndexMgr.c
source/libs/sync/src/syncIndexMgr.c
+18
-16
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+120
-119
source/libs/sync/src/syncMessage.c
source/libs/sync/src/syncMessage.c
+295
-279
source/libs/sync/src/syncRaftEntry.c
source/libs/sync/src/syncRaftEntry.c
+24
-22
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+20
-18
source/libs/sync/src/syncRaftStore.c
source/libs/sync/src/syncRaftStore.c
+43
-10
source/libs/sync/src/syncVoteMgr.c
source/libs/sync/src/syncVoteMgr.c
+50
-46
source/libs/sync/test/syncInitTest.cpp
source/libs/sync/test/syncInitTest.cpp
+1
-0
未找到文件。
source/libs/sync/inc/syncRaftStore.h
浏览文件 @
b5b1417a
...
@@ -48,6 +48,8 @@ void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId);
...
@@ -48,6 +48,8 @@ void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId);
void
raftStoreClearVote
(
SRaftStore
*
pRaftStore
);
void
raftStoreClearVote
(
SRaftStore
*
pRaftStore
);
void
raftStoreNextTerm
(
SRaftStore
*
pRaftStore
);
void
raftStoreNextTerm
(
SRaftStore
*
pRaftStore
);
void
raftStoreSetTerm
(
SRaftStore
*
pRaftStore
,
SyncTerm
term
);
void
raftStoreSetTerm
(
SRaftStore
*
pRaftStore
,
SyncTerm
term
);
cJSON
*
raftStore2Json
(
SRaftStore
*
pRaftStore
);
char
*
raftStore2Str
(
SRaftStore
*
pRaftStore
);
// for debug -------------------
// for debug -------------------
void
raftStorePrint
(
SRaftStore
*
pObj
);
void
raftStorePrint
(
SRaftStore
*
pObj
);
...
...
source/libs/sync/src/syncIndexMgr.c
浏览文件 @
b5b1417a
...
@@ -70,6 +70,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
...
@@ -70,6 +70,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
char
u64buf
[
128
];
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pSyncIndexMgr
!=
NULL
)
{
cJSON_AddNumberToObject
(
pRoot
,
"replicaNum"
,
pSyncIndexMgr
->
replicaNum
);
cJSON_AddNumberToObject
(
pRoot
,
"replicaNum"
,
pSyncIndexMgr
->
replicaNum
);
cJSON
*
pReplicas
=
cJSON_CreateArray
();
cJSON
*
pReplicas
=
cJSON_CreateArray
();
cJSON_AddItemToObject
(
pRoot
,
"replicas"
,
pReplicas
);
cJSON_AddItemToObject
(
pRoot
,
"replicas"
,
pReplicas
);
...
@@ -86,6 +87,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
...
@@ -86,6 +87,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
cJSON_AddItemToObject
(
pRoot
,
"index"
,
pIndex
);
cJSON_AddItemToObject
(
pRoot
,
"index"
,
pIndex
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pSyncIndexMgr
->
pSyncNode
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pSyncIndexMgr
->
pSyncNode
);
cJSON_AddStringToObject
(
pRoot
,
"pSyncNode"
,
u64buf
);
cJSON_AddStringToObject
(
pRoot
,
"pSyncNode"
,
u64buf
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"pSyncIndexMgr"
,
pRoot
);
cJSON_AddItemToObject
(
pJson
,
"pSyncIndexMgr"
,
pRoot
);
...
@@ -94,7 +96,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
...
@@ -94,7 +96,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
char
*
syncIndexMgr2Str
(
SSyncIndexMgr
*
pSyncIndexMgr
)
{
char
*
syncIndexMgr2Str
(
SSyncIndexMgr
*
pSyncIndexMgr
)
{
cJSON
*
pJson
=
syncIndexMgr2Json
(
pSyncIndexMgr
);
cJSON
*
pJson
=
syncIndexMgr2Json
(
pSyncIndexMgr
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
return
serialized
;
}
}
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
b5b1417a
...
@@ -355,6 +355,7 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
...
@@ -355,6 +355,7 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
char
u64buf
[
128
];
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pSyncNode
!=
NULL
)
{
// init by SSyncInfo
// init by SSyncInfo
cJSON_AddNumberToObject
(
pRoot
,
"vgId"
,
pSyncNode
->
vgId
);
cJSON_AddNumberToObject
(
pRoot
,
"vgId"
,
pSyncNode
->
vgId
);
cJSON_AddStringToObject
(
pRoot
,
"path"
,
pSyncNode
->
path
);
cJSON_AddStringToObject
(
pRoot
,
"path"
,
pSyncNode
->
path
);
...
@@ -406,9 +407,8 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
...
@@ -406,9 +407,8 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
// tla+ server vars
// tla+ server vars
cJSON_AddNumberToObject
(
pRoot
,
"state"
,
pSyncNode
->
state
);
cJSON_AddNumberToObject
(
pRoot
,
"state"
,
pSyncNode
->
state
);
cJSON_AddStringToObject
(
pRoot
,
"state_str"
,
syncUtilState2String
(
pSyncNode
->
state
));
cJSON_AddStringToObject
(
pRoot
,
"state_str"
,
syncUtilState2String
(
pSyncNode
->
state
));
char
tmpBuf
[
RAFT_STORE_BLOCK_SIZE
];
cJSON
*
pRaftStore
=
raftStore2Json
(
pSyncNode
->
pRaftStore
);
raftStoreSerialize
(
pSyncNode
->
pRaftStore
,
tmpBuf
,
sizeof
(
tmpBuf
));
cJSON_AddItemToObject
(
pRoot
,
"pRaftStore"
,
pRaftStore
);
cJSON_AddStringToObject
(
pRoot
,
"pRaftStore"
,
tmpBuf
);
// tla+ candidate vars
// tla+ candidate vars
cJSON_AddItemToObject
(
pRoot
,
"pVotesGranted"
,
voteGranted2Json
(
pSyncNode
->
pVotesGranted
));
cJSON_AddItemToObject
(
pRoot
,
"pVotesGranted"
,
voteGranted2Json
(
pSyncNode
->
pVotesGranted
));
...
@@ -477,6 +477,7 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
...
@@ -477,6 +477,7 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
cJSON_AddStringToObject
(
pRoot
,
"FpOnAppendEntriesReply"
,
u64buf
);
cJSON_AddStringToObject
(
pRoot
,
"FpOnAppendEntriesReply"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pSyncNode
->
FpOnTimeout
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pSyncNode
->
FpOnTimeout
);
cJSON_AddStringToObject
(
pRoot
,
"FpOnTimeout"
,
u64buf
);
cJSON_AddStringToObject
(
pRoot
,
"FpOnTimeout"
,
u64buf
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SSyncNode"
,
pRoot
);
cJSON_AddItemToObject
(
pJson
,
"SSyncNode"
,
pRoot
);
...
...
source/libs/sync/src/syncMessage.c
浏览文件 @
b5b1417a
...
@@ -213,8 +213,9 @@ SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg) {
...
@@ -213,8 +213,9 @@ SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg) {
cJSON
*
syncTimeout2Json
(
const
SyncTimeout
*
pMsg
)
{
cJSON
*
syncTimeout2Json
(
const
SyncTimeout
*
pMsg
)
{
char
u64buf
[
128
];
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pMsg
!=
NULL
)
{
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON_AddNumberToObject
(
pRoot
,
"timeoutType"
,
pMsg
->
timeoutType
);
cJSON_AddNumberToObject
(
pRoot
,
"timeoutType"
,
pMsg
->
timeoutType
);
...
@@ -223,6 +224,7 @@ cJSON* syncTimeout2Json(const SyncTimeout* pMsg) {
...
@@ -223,6 +224,7 @@ cJSON* syncTimeout2Json(const SyncTimeout* pMsg) {
cJSON_AddNumberToObject
(
pRoot
,
"timerMS"
,
pMsg
->
timerMS
);
cJSON_AddNumberToObject
(
pRoot
,
"timerMS"
,
pMsg
->
timerMS
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pMsg
->
data
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pMsg
->
data
);
cJSON_AddStringToObject
(
pRoot
,
"data"
,
u64buf
);
cJSON_AddStringToObject
(
pRoot
,
"data"
,
u64buf
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncTimeout"
,
pRoot
);
cJSON_AddItemToObject
(
pJson
,
"SyncTimeout"
,
pRoot
);
...
@@ -343,8 +345,9 @@ SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) {
...
@@ -343,8 +345,9 @@ SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) {
cJSON
*
syncPing2Json
(
const
SyncPing
*
pMsg
)
{
cJSON
*
syncPing2Json
(
const
SyncPing
*
pMsg
)
{
char
u64buf
[
128
];
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pMsg
!=
NULL
)
{
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
...
@@ -386,6 +389,7 @@ cJSON* syncPing2Json(const SyncPing* pMsg) {
...
@@ -386,6 +389,7 @@ cJSON* syncPing2Json(const SyncPing* pMsg) {
s
=
syncUtilprintBin2
((
char
*
)(
pMsg
->
data
),
pMsg
->
dataLen
);
s
=
syncUtilprintBin2
((
char
*
)(
pMsg
->
data
),
pMsg
->
dataLen
);
cJSON_AddStringToObject
(
pRoot
,
"data2"
,
s
);
cJSON_AddStringToObject
(
pRoot
,
"data2"
,
s
);
free
(
s
);
free
(
s
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncPing"
,
pRoot
);
cJSON_AddItemToObject
(
pJson
,
"SyncPing"
,
pRoot
);
...
@@ -506,8 +510,9 @@ SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
...
@@ -506,8 +510,9 @@ SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
cJSON
*
syncPingReply2Json
(
const
SyncPingReply
*
pMsg
)
{
cJSON
*
syncPingReply2Json
(
const
SyncPingReply
*
pMsg
)
{
char
u64buf
[
128
];
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pMsg
!=
NULL
)
{
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
...
@@ -549,6 +554,7 @@ cJSON* syncPingReply2Json(const SyncPingReply* pMsg) {
...
@@ -549,6 +554,7 @@ cJSON* syncPingReply2Json(const SyncPingReply* pMsg) {
s
=
syncUtilprintBin2
((
char
*
)(
pMsg
->
data
),
pMsg
->
dataLen
);
s
=
syncUtilprintBin2
((
char
*
)(
pMsg
->
data
),
pMsg
->
dataLen
);
cJSON_AddStringToObject
(
pRoot
,
"data2"
,
s
);
cJSON_AddStringToObject
(
pRoot
,
"data2"
,
s
);
free
(
s
);
free
(
s
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncPingReply"
,
pRoot
);
cJSON_AddItemToObject
(
pJson
,
"SyncPingReply"
,
pRoot
);
...
@@ -665,8 +671,9 @@ SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg) {
...
@@ -665,8 +671,9 @@ SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg) {
cJSON
*
syncClientRequest2Json
(
const
SyncClientRequest
*
pMsg
)
{
cJSON
*
syncClientRequest2Json
(
const
SyncClientRequest
*
pMsg
)
{
char
u64buf
[
128
];
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pMsg
!=
NULL
)
{
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON_AddNumberToObject
(
pRoot
,
"originalRpcType"
,
pMsg
->
originalRpcType
);
cJSON_AddNumberToObject
(
pRoot
,
"originalRpcType"
,
pMsg
->
originalRpcType
);
...
@@ -682,6 +689,7 @@ cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg) {
...
@@ -682,6 +689,7 @@ cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg) {
s
=
syncUtilprintBin2
((
char
*
)(
pMsg
->
data
),
pMsg
->
dataLen
);
s
=
syncUtilprintBin2
((
char
*
)(
pMsg
->
data
),
pMsg
->
dataLen
);
cJSON_AddStringToObject
(
pRoot
,
"data2"
,
s
);
cJSON_AddStringToObject
(
pRoot
,
"data2"
,
s
);
free
(
s
);
free
(
s
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncClientRequest"
,
pRoot
);
cJSON_AddItemToObject
(
pJson
,
"SyncClientRequest"
,
pRoot
);
...
@@ -786,8 +794,9 @@ SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg) {
...
@@ -786,8 +794,9 @@ SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg) {
cJSON
*
syncRequestVote2Json
(
const
SyncRequestVote
*
pMsg
)
{
cJSON
*
syncRequestVote2Json
(
const
SyncRequestVote
*
pMsg
)
{
char
u64buf
[
128
];
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pMsg
!=
NULL
)
{
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
...
@@ -826,6 +835,7 @@ cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) {
...
@@ -826,6 +835,7 @@ cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) {
cJSON_AddStringToObject
(
pRoot
,
"lastLogIndex"
,
u64buf
);
cJSON_AddStringToObject
(
pRoot
,
"lastLogIndex"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
lastLogTerm
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
lastLogTerm
);
cJSON_AddStringToObject
(
pRoot
,
"lastLogTerm"
,
u64buf
);
cJSON_AddStringToObject
(
pRoot
,
"lastLogTerm"
,
u64buf
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncRequestVote"
,
pRoot
);
cJSON_AddItemToObject
(
pJson
,
"SyncRequestVote"
,
pRoot
);
...
@@ -930,8 +940,9 @@ SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
...
@@ -930,8 +940,9 @@ SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) {
cJSON
*
syncRequestVoteReply2Json
(
const
SyncRequestVoteReply
*
pMsg
)
{
cJSON
*
syncRequestVoteReply2Json
(
const
SyncRequestVoteReply
*
pMsg
)
{
char
u64buf
[
128
];
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pMsg
!=
NULL
)
{
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
...
@@ -967,6 +978,7 @@ cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) {
...
@@ -967,6 +978,7 @@ cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg) {
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
term
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
term
);
cJSON_AddStringToObject
(
pRoot
,
"term"
,
u64buf
);
cJSON_AddStringToObject
(
pRoot
,
"term"
,
u64buf
);
cJSON_AddNumberToObject
(
pRoot
,
"vote_granted"
,
pMsg
->
voteGranted
);
cJSON_AddNumberToObject
(
pRoot
,
"vote_granted"
,
pMsg
->
voteGranted
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncRequestVoteReply"
,
pRoot
);
cJSON_AddItemToObject
(
pJson
,
"SyncRequestVoteReply"
,
pRoot
);
...
@@ -1073,8 +1085,9 @@ SyncAppendEntries* syncAppendEntriesFromRpcMsg2(const SRpcMsg* pRpcMsg) {
...
@@ -1073,8 +1085,9 @@ SyncAppendEntries* syncAppendEntriesFromRpcMsg2(const SRpcMsg* pRpcMsg) {
cJSON
*
syncAppendEntries2Json
(
const
SyncAppendEntries
*
pMsg
)
{
cJSON
*
syncAppendEntries2Json
(
const
SyncAppendEntries
*
pMsg
)
{
char
u64buf
[
128
];
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pMsg
!=
NULL
)
{
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
...
@@ -1128,6 +1141,7 @@ cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) {
...
@@ -1128,6 +1141,7 @@ cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) {
s
=
syncUtilprintBin2
((
char
*
)(
pMsg
->
data
),
pMsg
->
dataLen
);
s
=
syncUtilprintBin2
((
char
*
)(
pMsg
->
data
),
pMsg
->
dataLen
);
cJSON_AddStringToObject
(
pRoot
,
"data2"
,
s
);
cJSON_AddStringToObject
(
pRoot
,
"data2"
,
s
);
free
(
s
);
free
(
s
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncAppendEntries"
,
pRoot
);
cJSON_AddItemToObject
(
pJson
,
"SyncAppendEntries"
,
pRoot
);
...
@@ -1232,8 +1246,9 @@ SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg
...
@@ -1232,8 +1246,9 @@ SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg
cJSON
*
syncAppendEntriesReply2Json
(
const
SyncAppendEntriesReply
*
pMsg
)
{
cJSON
*
syncAppendEntriesReply2Json
(
const
SyncAppendEntriesReply
*
pMsg
)
{
char
u64buf
[
128
];
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pMsg
!=
NULL
)
{
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
...
@@ -1272,6 +1287,7 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) {
...
@@ -1272,6 +1287,7 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) {
cJSON_AddNumberToObject
(
pRoot
,
"success"
,
pMsg
->
success
);
cJSON_AddNumberToObject
(
pRoot
,
"success"
,
pMsg
->
success
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
matchIndex
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
matchIndex
);
cJSON_AddStringToObject
(
pRoot
,
"matchIndex"
,
u64buf
);
cJSON_AddStringToObject
(
pRoot
,
"matchIndex"
,
u64buf
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncAppendEntriesReply"
,
pRoot
);
cJSON_AddItemToObject
(
pJson
,
"SyncAppendEntriesReply"
,
pRoot
);
...
...
source/libs/sync/src/syncRaftEntry.c
浏览文件 @
b5b1417a
...
@@ -69,8 +69,9 @@ SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len) {
...
@@ -69,8 +69,9 @@ SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len) {
cJSON
*
syncEntry2Json
(
const
SSyncRaftEntry
*
pEntry
)
{
cJSON
*
syncEntry2Json
(
const
SSyncRaftEntry
*
pEntry
)
{
char
u64buf
[
128
];
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pEntry
!=
NULL
)
{
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pEntry
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pEntry
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pEntry
->
msgType
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pEntry
->
msgType
);
cJSON_AddNumberToObject
(
pRoot
,
"originalRpcType"
,
pEntry
->
originalRpcType
);
cJSON_AddNumberToObject
(
pRoot
,
"originalRpcType"
,
pEntry
->
originalRpcType
);
...
@@ -91,6 +92,7 @@ cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry) {
...
@@ -91,6 +92,7 @@ cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry) {
s
=
syncUtilprintBin2
((
char
*
)(
pEntry
->
data
),
pEntry
->
dataLen
);
s
=
syncUtilprintBin2
((
char
*
)(
pEntry
->
data
),
pEntry
->
dataLen
);
cJSON_AddStringToObject
(
pRoot
,
"data2"
,
s
);
cJSON_AddStringToObject
(
pRoot
,
"data2"
,
s
);
free
(
s
);
free
(
s
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SSyncRaftEntry"
,
pRoot
);
cJSON_AddItemToObject
(
pJson
,
"SSyncRaftEntry"
,
pRoot
);
...
...
source/libs/sync/src/syncRaftLog.c
浏览文件 @
b5b1417a
...
@@ -123,9 +123,10 @@ SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) {
...
@@ -123,9 +123,10 @@ SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) {
cJSON
*
logStore2Json
(
SSyncLogStore
*
pLogStore
)
{
cJSON
*
logStore2Json
(
SSyncLogStore
*
pLogStore
)
{
char
u64buf
[
128
];
char
u64buf
[
128
];
SSyncLogStoreData
*
pData
=
(
SSyncLogStoreData
*
)
pLogStore
->
data
;
SSyncLogStoreData
*
pData
=
(
SSyncLogStoreData
*
)
pLogStore
->
data
;
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pData
!=
NULL
&&
pData
->
pWal
!=
NULL
)
{
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pData
->
pSyncNode
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pData
->
pSyncNode
);
cJSON_AddStringToObject
(
pRoot
,
"pSyncNode"
,
u64buf
);
cJSON_AddStringToObject
(
pRoot
,
"pSyncNode"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pData
->
pWal
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pData
->
pWal
);
...
@@ -143,6 +144,7 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) {
...
@@ -143,6 +144,7 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) {
cJSON_AddItemToArray
(
pEntries
,
syncEntry2Json
(
pEntry
));
cJSON_AddItemToArray
(
pEntries
,
syncEntry2Json
(
pEntry
));
syncEntryDestory
(
pEntry
);
syncEntryDestory
(
pEntry
);
}
}
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SSyncLogStore"
,
pRoot
);
cJSON_AddItemToObject
(
pJson
,
"SSyncLogStore"
,
pRoot
);
...
...
source/libs/sync/src/syncRaftStore.c
浏览文件 @
b5b1417a
...
@@ -164,30 +164,63 @@ void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term) {
...
@@ -164,30 +164,63 @@ void raftStoreSetTerm(SRaftStore *pRaftStore, SyncTerm term) {
raftStorePersist
(
pRaftStore
);
raftStorePersist
(
pRaftStore
);
}
}
cJSON
*
raftStore2Json
(
SRaftStore
*
pRaftStore
)
{
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pRaftStore
!=
NULL
)
{
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pRaftStore
->
currentTerm
);
cJSON_AddStringToObject
(
pRoot
,
"currentTerm"
,
u64buf
);
cJSON
*
pVoteFor
=
cJSON_CreateObject
();
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pRaftStore
->
voteFor
.
addr
);
cJSON_AddStringToObject
(
pVoteFor
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pRaftStore
->
voteFor
.
addr
;
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pVoteFor
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pVoteFor
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pVoteFor
,
"vgId"
,
pRaftStore
->
voteFor
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"voteFor"
,
pVoteFor
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SRaftStore"
,
pRoot
);
return
pJson
;
}
char
*
raftStore2Str
(
SRaftStore
*
pRaftStore
)
{
cJSON
*
pJson
=
raftStore2Json
(
pRaftStore
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
// for debug -------------------
// for debug -------------------
void
raftStorePrint
(
SRaftStore
*
pObj
)
{
void
raftStorePrint
(
SRaftStore
*
pObj
)
{
char
serialized
[
RAFT_STORE_BLOCK_SIZE
];
char
*
serialized
=
raftStore2Str
(
pObj
);
raftStoreSerialize
(
pObj
,
serialized
,
sizeof
(
serialized
));
printf
(
"raftStorePrint | len:%lu | %s
\n
"
,
strlen
(
serialized
),
serialized
);
printf
(
"raftStorePrint | len:%lu | %s
\n
"
,
strlen
(
serialized
),
serialized
);
fflush
(
NULL
);
fflush
(
NULL
);
free
(
serialized
);
}
}
void
raftStorePrint2
(
char
*
s
,
SRaftStore
*
pObj
)
{
void
raftStorePrint2
(
char
*
s
,
SRaftStore
*
pObj
)
{
char
serialized
[
RAFT_STORE_BLOCK_SIZE
];
char
*
serialized
=
raftStore2Str
(
pObj
);
raftStoreSerialize
(
pObj
,
serialized
,
sizeof
(
serialized
));
printf
(
"raftStorePrint2 | len:%lu | %s | %s
\n
"
,
strlen
(
serialized
),
s
,
serialized
);
printf
(
"raftStorePrint2 | len:%lu | %s | %s
\n
"
,
strlen
(
serialized
),
s
,
serialized
);
fflush
(
NULL
);
fflush
(
NULL
);
free
(
serialized
);
}
}
void
raftStoreLog
(
SRaftStore
*
pObj
)
{
void
raftStoreLog
(
SRaftStore
*
pObj
)
{
char
serialized
[
RAFT_STORE_BLOCK_SIZE
];
char
*
serialized
=
raftStore2Str
(
pObj
);
raftStoreSerialize
(
pObj
,
serialized
,
sizeof
(
serialized
));
sTrace
(
"raftStoreLog | len:%lu | %s"
,
strlen
(
serialized
),
serialized
);
sTrace
(
"raftStoreLog | len:%lu | %s"
,
strlen
(
serialized
),
serialized
);
f
flush
(
NULL
);
f
ree
(
serialized
);
}
}
void
raftStoreLog2
(
char
*
s
,
SRaftStore
*
pObj
)
{
void
raftStoreLog2
(
char
*
s
,
SRaftStore
*
pObj
)
{
char
serialized
[
RAFT_STORE_BLOCK_SIZE
];
char
*
serialized
=
raftStore2Str
(
pObj
);
raftStoreSerialize
(
pObj
,
serialized
,
sizeof
(
serialized
));
sTrace
(
"raftStoreLog2 | len:%lu | %s | %s"
,
strlen
(
serialized
),
s
,
serialized
);
sTrace
(
"raftStoreLog2 | len:%lu | %s | %s"
,
strlen
(
serialized
),
s
,
serialized
);
f
flush
(
NULL
);
f
ree
(
serialized
);
}
}
source/libs/sync/src/syncVoteMgr.c
浏览文件 @
b5b1417a
...
@@ -82,6 +82,7 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
...
@@ -82,6 +82,7 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
char
u64buf
[
128
];
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pVotesGranted
!=
NULL
)
{
cJSON_AddNumberToObject
(
pRoot
,
"replicaNum"
,
pVotesGranted
->
replicaNum
);
cJSON_AddNumberToObject
(
pRoot
,
"replicaNum"
,
pVotesGranted
->
replicaNum
);
cJSON
*
pReplicas
=
cJSON_CreateArray
();
cJSON
*
pReplicas
=
cJSON_CreateArray
();
cJSON_AddItemToObject
(
pRoot
,
"replicas"
,
pReplicas
);
cJSON_AddItemToObject
(
pRoot
,
"replicas"
,
pReplicas
);
...
@@ -106,6 +107,7 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
...
@@ -106,6 +107,7 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
bool
majority
=
voteGrantedMajority
(
pVotesGranted
);
bool
majority
=
voteGrantedMajority
(
pVotesGranted
);
cJSON_AddNumberToObject
(
pRoot
,
"majority"
,
majority
);
cJSON_AddNumberToObject
(
pRoot
,
"majority"
,
majority
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SVotesGranted"
,
pRoot
);
cJSON_AddItemToObject
(
pJson
,
"SVotesGranted"
,
pRoot
);
...
@@ -114,7 +116,7 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
...
@@ -114,7 +116,7 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
char
*
voteGranted2Str
(
SVotesGranted
*
pVotesGranted
)
{
char
*
voteGranted2Str
(
SVotesGranted
*
pVotesGranted
)
{
cJSON
*
pJson
=
voteGranted2Json
(
pVotesGranted
);
cJSON
*
pJson
=
voteGranted2Json
(
pVotesGranted
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
return
serialized
;
}
}
...
@@ -203,6 +205,7 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
...
@@ -203,6 +205,7 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
char
u64buf
[
128
];
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pVotesRespond
!=
NULL
)
{
cJSON_AddNumberToObject
(
pRoot
,
"replicaNum"
,
pVotesRespond
->
replicaNum
);
cJSON_AddNumberToObject
(
pRoot
,
"replicaNum"
,
pVotesRespond
->
replicaNum
);
cJSON
*
pReplicas
=
cJSON_CreateArray
();
cJSON
*
pReplicas
=
cJSON_CreateArray
();
cJSON_AddItemToObject
(
pRoot
,
"replicas"
,
pReplicas
);
cJSON_AddItemToObject
(
pRoot
,
"replicas"
,
pReplicas
);
...
@@ -226,6 +229,7 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
...
@@ -226,6 +229,7 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
cJSON_AddStringToObject
(
pRoot
,
"term"
,
u64buf
);
cJSON_AddStringToObject
(
pRoot
,
"term"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pVotesRespond
->
pSyncNode
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pVotesRespond
->
pSyncNode
);
cJSON_AddStringToObject
(
pRoot
,
"pSyncNode"
,
u64buf
);
cJSON_AddStringToObject
(
pRoot
,
"pSyncNode"
,
u64buf
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SVotesRespond"
,
pRoot
);
cJSON_AddItemToObject
(
pJson
,
"SVotesRespond"
,
pRoot
);
...
@@ -234,7 +238,7 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
...
@@ -234,7 +238,7 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
char
*
votesRespond2Str
(
SVotesRespond
*
pVotesRespond
)
{
char
*
votesRespond2Str
(
SVotesRespond
*
pVotesRespond
)
{
cJSON
*
pJson
=
votesRespond2Json
(
pVotesRespond
);
cJSON
*
pJson
=
votesRespond2Json
(
pVotesRespond
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
return
serialized
;
}
}
...
...
source/libs/sync/test/syncInitTest.cpp
浏览文件 @
b5b1417a
...
@@ -47,6 +47,7 @@ SSyncNode* syncNodeInit() {
...
@@ -47,6 +47,7 @@ SSyncNode* syncNodeInit() {
gSyncIO
->
FpOnSyncPing
=
pSyncNode
->
FpOnPing
;
gSyncIO
->
FpOnSyncPing
=
pSyncNode
->
FpOnPing
;
gSyncIO
->
FpOnSyncPingReply
=
pSyncNode
->
FpOnPingReply
;
gSyncIO
->
FpOnSyncPingReply
=
pSyncNode
->
FpOnPingReply
;
gSyncIO
->
FpOnSyncClientRequest
=
pSyncNode
->
FpOnClientRequest
;
gSyncIO
->
FpOnSyncRequestVote
=
pSyncNode
->
FpOnRequestVote
;
gSyncIO
->
FpOnSyncRequestVote
=
pSyncNode
->
FpOnRequestVote
;
gSyncIO
->
FpOnSyncRequestVoteReply
=
pSyncNode
->
FpOnRequestVoteReply
;
gSyncIO
->
FpOnSyncRequestVoteReply
=
pSyncNode
->
FpOnRequestVoteReply
;
gSyncIO
->
FpOnSyncAppendEntries
=
pSyncNode
->
FpOnAppendEntries
;
gSyncIO
->
FpOnSyncAppendEntries
=
pSyncNode
->
FpOnAppendEntries
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录