Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5e4a49f4
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
5e4a49f4
编写于
3月 15, 2022
作者:
L
Li Minghao
提交者:
GitHub
3月 15, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #10752 from taosdata/feature/3.0_mhli
Feature/3.0 mhli
上级
7aeb0a2a
3b279462
变更
33
展开全部
隐藏空白更改
内联
并排
Showing
33 changed file
with
1177 addition
and
584 deletion
+1177
-584
source/libs/sync/inc/syncEnv.h
source/libs/sync/inc/syncEnv.h
+16
-4
source/libs/sync/inc/syncIO.h
source/libs/sync/inc/syncIO.h
+13
-5
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+32
-12
source/libs/sync/inc/syncMessage.h
source/libs/sync/inc/syncMessage.h
+4
-1
source/libs/sync/inc/syncRaftLog.h
source/libs/sync/inc/syncRaftLog.h
+3
-0
source/libs/sync/inc/syncRaftStore.h
source/libs/sync/inc/syncRaftStore.h
+6
-0
source/libs/sync/inc/syncUtil.h
source/libs/sync/inc/syncUtil.h
+3
-0
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+123
-1
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+44
-1
source/libs/sync/src/syncElection.c
source/libs/sync/src/syncElection.c
+14
-1
source/libs/sync/src/syncEnv.c
source/libs/sync/src/syncEnv.c
+55
-25
source/libs/sync/src/syncIO.c
source/libs/sync/src/syncIO.c
+183
-146
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+301
-261
source/libs/sync/src/syncMessage.c
source/libs/sync/src/syncMessage.c
+32
-5
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+1
-1
source/libs/sync/src/syncRaftStore.c
source/libs/sync/src/syncRaftStore.c
+29
-0
source/libs/sync/src/syncReplication.c
source/libs/sync/src/syncReplication.c
+36
-0
source/libs/sync/src/syncRequestVote.c
source/libs/sync/src/syncRequestVote.c
+35
-1
source/libs/sync/src/syncRequestVoteReply.c
source/libs/sync/src/syncRequestVoteReply.c
+34
-1
source/libs/sync/src/syncTimeout.c
source/libs/sync/src/syncTimeout.c
+1
-9
source/libs/sync/src/syncUtil.c
source/libs/sync/src/syncUtil.c
+12
-0
source/libs/sync/test/CMakeLists.txt
source/libs/sync/test/CMakeLists.txt
+10
-10
source/libs/sync/test/syncEnqTest.cpp
source/libs/sync/test/syncEnqTest.cpp
+45
-38
source/libs/sync/test/syncEnvTest.cpp
source/libs/sync/test/syncEnvTest.cpp
+12
-14
source/libs/sync/test/syncIOClientTest.cpp
source/libs/sync/test/syncIOClientTest.cpp
+12
-12
source/libs/sync/test/syncIOSendMsgTest.cpp
source/libs/sync/test/syncIOSendMsgTest.cpp
+81
-20
source/libs/sync/test/syncIOServerTest.cpp
source/libs/sync/test/syncIOServerTest.cpp
+0
-0
source/libs/sync/test/syncIOTickPingTest.cpp
source/libs/sync/test/syncIOTickPingTest.cpp
+9
-2
source/libs/sync/test/syncIOTickQTest.cpp
source/libs/sync/test/syncIOTickQTest.cpp
+11
-4
source/libs/sync/test/syncLogStoreTest.cpp
source/libs/sync/test/syncLogStoreTest.cpp
+10
-1
source/libs/sync/test/syncRequestVoteTest.cpp
source/libs/sync/test/syncRequestVoteTest.cpp
+1
-1
source/libs/sync/test/syncRpcMsgTest.cpp
source/libs/sync/test/syncRpcMsgTest.cpp
+8
-8
source/libs/sync/test/syncUtilTest.cpp
source/libs/sync/test/syncUtilTest.cpp
+1
-0
未找到文件。
source/libs/sync/inc/syncEnv.h
浏览文件 @
5e4a49f4
...
...
@@ -29,6 +29,7 @@ extern "C" {
#include "ttimer.h"
#define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000
#define PING_TIMER_MS 1000
#define ELECT_TIMER_MS_MIN 150
#define ELECT_TIMER_MS_MAX 300
...
...
@@ -38,17 +39,28 @@ extern "C" {
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
typedef
struct
SSyncEnv
{
tmr_h
pEnvTickTimer
;
// tick timer
tmr_h
pEnvTickTimer
;
int32_t
envTickTimerMS
;
uint64_t
envTickTimerLogicClock
;
// if use queue, should pass logic clock into queue item
uint64_t
envTickTimerLogicClockUser
;
TAOS_TMR_CALLBACK
FpEnvTickTimer
;
// Timer Fp
uint64_t
envTickTimerCounter
;
// timer manager
tmr_h
pTimerManager
;
char
name
[
128
];
// other resources shared by SyncNodes
// ...
}
SSyncEnv
;
extern
SSyncEnv
*
gSyncEnv
;
int32_t
syncEnvStart
();
int32_t
syncEnvStop
();
tmr_h
syncEnvStartTimer
(
TAOS_TMR_CALLBACK
fp
,
int
mseconds
,
void
*
param
);
void
syncEnvStopTimer
(
tmr_h
*
pTimer
);
int32_t
syncEnvStartTimer
(
);
int32_t
syncEnvStopTimer
(
);
#ifdef __cplusplus
}
...
...
source/libs/sync/inc/syncIO.h
浏览文件 @
5e4a49f4
...
...
@@ -29,6 +29,9 @@ extern "C" {
#include "tqueue.h"
#include "trpc.h"
#define TICK_Q_TIMER_MS 1000
#define TICK_Ping_TIMER_MS 1000
typedef
struct
SSyncIO
{
STaosQueue
*
pMsgQ
;
STaosQset
*
pQset
;
...
...
@@ -38,9 +41,11 @@ typedef struct SSyncIO {
void
*
clientRpc
;
SEpSet
myAddr
;
void
*
ioTimerTickQ
;
void
*
ioTimerTickPing
;
void
*
ioTimerManager
;
tmr_h
qTimer
;
int32_t
qTimerMS
;
tmr_h
pingTimer
;
int32_t
pingTimerMS
;
tmr_h
timerMgr
;
void
*
pSyncNode
;
int32_t
(
*
FpOnSyncPing
)(
SSyncNode
*
pSyncNode
,
SyncPing
*
pMsg
);
...
...
@@ -59,11 +64,14 @@ extern SSyncIO *gSyncIO;
int32_t
syncIOStart
(
char
*
host
,
uint16_t
port
);
int32_t
syncIOStop
();
int32_t
syncIOTickQ
();
int32_t
syncIOTickPing
();
int32_t
syncIOSendMsg
(
void
*
clientRpc
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
);
int32_t
syncIOEqMsg
(
void
*
queue
,
SRpcMsg
*
pMsg
);
int32_t
syncIOQTimerStart
();
int32_t
syncIOQTimerStop
();
int32_t
syncIOPingTimerStart
();
int32_t
syncIOPingTimerStop
();
#ifdef __cplusplus
}
#endif
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
5e4a49f4
...
...
@@ -67,9 +67,6 @@ extern "C" {
} \
}
struct
SRaft
;
typedef
struct
SRaft
SRaft
;
struct
SyncTimeout
;
typedef
struct
SyncTimeout
SyncTimeout
;
...
...
@@ -117,8 +114,10 @@ typedef struct SSyncNode {
SSyncCfg
syncCfg
;
char
path
[
TSDB_FILENAME_LEN
];
char
raftStorePath
[
TSDB_FILENAME_LEN
*
2
];
SWal
*
pWal
;
void
*
rpcClient
;
// sync io
SWal
*
pWal
;
void
*
rpcClient
;
int32_t
(
*
FpSendMsg
)(
void
*
rpcClient
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
);
void
*
queue
;
int32_t
(
*
FpEqMsg
)(
void
*
queue
,
SRpcMsg
*
pMsg
);
...
...
@@ -164,7 +163,7 @@ typedef struct SSyncNode {
int32_t
pingTimerMS
;
uint64_t
pingTimerLogicClock
;
uint64_t
pingTimerLogicClockUser
;
TAOS_TMR_CALLBACK
FpPingTimer
;
// Timer Fp
TAOS_TMR_CALLBACK
FpPingTimer
CB
;
// Timer Fp
uint64_t
pingTimerCounter
;
// elect timer
...
...
@@ -172,7 +171,7 @@ typedef struct SSyncNode {
int32_t
electTimerMS
;
uint64_t
electTimerLogicClock
;
uint64_t
electTimerLogicClockUser
;
TAOS_TMR_CALLBACK
FpElectTimer
;
// Timer Fp
TAOS_TMR_CALLBACK
FpElectTimer
CB
;
// Timer Fp
uint64_t
electTimerCounter
;
// heartbeat timer
...
...
@@ -180,7 +179,7 @@ typedef struct SSyncNode {
int32_t
heartbeatTimerMS
;
uint64_t
heartbeatTimerLogicClock
;
uint64_t
heartbeatTimerLogicClockUser
;
TAOS_TMR_CALLBACK
FpHeartbeatTimer
;
// Timer Fp
TAOS_TMR_CALLBACK
FpHeartbeatTimer
CB
;
// Timer Fp
uint64_t
heartbeatTimerCounter
;
// callback
...
...
@@ -194,26 +193,47 @@ typedef struct SSyncNode {
}
SSyncNode
;
// open/close --------------
SSyncNode
*
syncNodeOpen
(
const
SSyncInfo
*
pSyncInfo
);
void
syncNodeClose
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeSendMsgById
(
const
SRaftId
*
destRaftId
,
SSyncNode
*
pSyncNode
,
SRpcMsg
*
pMsg
);
int32_t
syncNodeSendMsgByInfo
(
const
SNodeInfo
*
nodeInfo
,
SSyncNode
*
pSyncNode
,
SRpcMsg
*
pMsg
);
// ping --------------
int32_t
syncNodePing
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
SyncPing
*
pMsg
);
int32_t
syncNodePingAll
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodePingPeers
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodePingSelf
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodePingPeers
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodePingAll
(
SSyncNode
*
pSyncNode
);
// timer control --------------
int32_t
syncNodeStartPingTimer
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeStopPingTimer
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeStartElectTimer
(
SSyncNode
*
pSyncNode
,
int32_t
ms
);
int32_t
syncNodeStopElectTimer
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeRestartElectTimer
(
SSyncNode
*
pSyncNode
,
int32_t
ms
);
int32_t
syncNodeResetElectTimer
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeStartHeartbeatTimer
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeStopHeartbeatTimer
(
SSyncNode
*
pSyncNode
);
// utils --------------
int32_t
syncNodeSendMsgById
(
const
SRaftId
*
destRaftId
,
SSyncNode
*
pSyncNode
,
SRpcMsg
*
pMsg
);
int32_t
syncNodeSendMsgByInfo
(
const
SNodeInfo
*
nodeInfo
,
SSyncNode
*
pSyncNode
,
SRpcMsg
*
pMsg
);
cJSON
*
syncNode2Json
(
const
SSyncNode
*
pSyncNode
);
char
*
syncNode2Str
(
const
SSyncNode
*
pSyncNode
);
// raft state change --------------
void
syncNodeUpdateTerm
(
SSyncNode
*
pSyncNode
,
SyncTerm
term
);
void
syncNodeBecomeFollower
(
SSyncNode
*
pSyncNode
);
void
syncNodeBecomeLeader
(
SSyncNode
*
pSyncNode
);
void
syncNodeCandidate2Leader
(
SSyncNode
*
pSyncNode
);
void
syncNodeFollower2Candidate
(
SSyncNode
*
pSyncNode
);
void
syncNodeLeader2Follower
(
SSyncNode
*
pSyncNode
);
void
syncNodeCandidate2Follower
(
SSyncNode
*
pSyncNode
);
// raft vote --------------
void
syncNodeVoteForTerm
(
SSyncNode
*
pSyncNode
,
SyncTerm
term
,
SRaftId
*
pRaftId
);
void
syncNodeVoteForSelf
(
SSyncNode
*
pSyncNode
);
void
syncNodeMaybeAdvanceCommitIndex
(
SSyncNode
*
pSyncNode
);
// for debug --------------
void
syncNodePrint
(
SSyncNode
*
pObj
);
void
syncNodePrint2
(
char
*
s
,
SSyncNode
*
pObj
);
...
...
source/libs/sync/inc/syncMessage.h
浏览文件 @
5e4a49f4
...
...
@@ -39,6 +39,7 @@ typedef enum ESyncMessageType {
SYNC_REQUEST_VOTE_REPLY
=
111
,
SYNC_APPEND_ENTRIES
=
113
,
SYNC_APPEND_ENTRIES_REPLY
=
115
,
SYNC_RESPONSE
=
119
,
}
ESyncMessageType
;
...
...
@@ -195,7 +196,7 @@ typedef struct SyncRequestVote {
SRaftId
srcId
;
SRaftId
destId
;
// private data
SyncTerm
currentT
erm
;
SyncTerm
t
erm
;
SyncIndex
lastLogIndex
;
SyncTerm
lastLogTerm
;
}
SyncRequestVote
;
...
...
@@ -254,6 +255,7 @@ typedef struct SyncAppendEntries {
SRaftId
srcId
;
SRaftId
destId
;
// private data
SyncTerm
term
;
SyncIndex
prevLogIndex
;
SyncTerm
prevLogTerm
;
SyncIndex
commitIndex
;
...
...
@@ -286,6 +288,7 @@ typedef struct SyncAppendEntriesReply {
SRaftId
srcId
;
SRaftId
destId
;
// private data
SyncTerm
term
;
bool
success
;
SyncIndex
matchIndex
;
}
SyncAppendEntriesReply
;
...
...
source/libs/sync/inc/syncRaftLog.h
浏览文件 @
5e4a49f4
...
...
@@ -27,6 +27,9 @@ extern "C" {
#include "syncRaftEntry.h"
#include "taosdef.h"
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
typedef
struct
SSyncLogStoreData
{
SSyncNode
*
pSyncNode
;
SWal
*
pWal
;
...
...
source/libs/sync/inc/syncRaftStore.h
浏览文件 @
5e4a49f4
...
...
@@ -43,6 +43,12 @@ int32_t raftStorePersist(SRaftStore *pRaftStore);
int32_t
raftStoreSerialize
(
SRaftStore
*
pRaftStore
,
char
*
buf
,
size_t
len
);
int32_t
raftStoreDeserialize
(
SRaftStore
*
pRaftStore
,
char
*
buf
,
size_t
len
);
bool
raftStoreHasVoted
(
SRaftStore
*
pRaftStore
);
void
raftStoreVote
(
SRaftStore
*
pRaftStore
,
SRaftId
*
pRaftId
);
void
raftStoreClearVote
(
SRaftStore
*
pRaftStore
);
void
raftStoreNextTerm
(
SRaftStore
*
pRaftStore
);
void
raftStoreSetTerm
(
SRaftStore
*
pRaftStore
,
SyncTerm
term
);
// for debug -------------------
void
raftStorePrint
(
SRaftStore
*
pObj
);
void
raftStorePrint2
(
char
*
s
,
SRaftStore
*
pObj
);
...
...
source/libs/sync/inc/syncUtil.h
浏览文件 @
5e4a49f4
...
...
@@ -34,6 +34,7 @@ void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet);
void
syncUtilraftId2EpSet
(
const
SRaftId
*
raftId
,
SEpSet
*
pEpSet
);
void
syncUtilnodeInfo2raftId
(
const
SNodeInfo
*
pNodeInfo
,
SyncGroupId
vgId
,
SRaftId
*
raftId
);
bool
syncUtilSameId
(
const
SRaftId
*
pId1
,
const
SRaftId
*
pId2
);
bool
syncUtilEmptyId
(
const
SRaftId
*
pId
);
// ---- SSyncBuffer ----
void
syncUtilbufBuild
(
SSyncBuffer
*
syncBuf
,
size_t
len
);
...
...
@@ -52,6 +53,8 @@ const char* syncUtilState2String(ESyncState state);
bool
syncUtilCanPrint
(
char
c
);
char
*
syncUtilprintBin
(
char
*
ptr
,
uint32_t
len
);
char
*
syncUtilprintBin2
(
char
*
ptr
,
uint32_t
len
);
SyncIndex
syncUtilMinIndex
(
SyncIndex
a
,
SyncIndex
b
);
SyncIndex
syncUtilMaxIndex
(
SyncIndex
a
,
SyncIndex
b
);
#ifdef __cplusplus
}
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
5e4a49f4
...
...
@@ -14,6 +14,11 @@
*/
#include "syncAppendEntries.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
// TLA+ Spec
// HandleAppendEntriesRequest(i, j, m) ==
...
...
@@ -80,4 +85,121 @@
// /\ UNCHANGED <<serverVars, commitIndex, messages>>
// /\ UNCHANGED <<candidateVars, leaderVars>>
//
int32_t
syncNodeOnAppendEntriesCb
(
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
)
{}
int32_t
syncNodeOnAppendEntriesCb
(
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
)
{
int32_t
ret
=
0
;
syncAppendEntriesLog2
(
"==syncNodeOnAppendEntriesCb=="
,
pMsg
);
if
(
pMsg
->
term
>
ths
->
pRaftStore
->
currentTerm
)
{
syncNodeUpdateTerm
(
ths
,
pMsg
->
term
);
}
assert
(
pMsg
->
term
<=
ths
->
pRaftStore
->
currentTerm
);
if
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
)
{
ths
->
leaderCache
=
pMsg
->
srcId
;
syncNodeResetElectTimer
(
ths
);
}
assert
(
pMsg
->
dataLen
>=
0
);
SyncTerm
localPreLogTerm
=
0
;
if
(
pMsg
->
prevLogTerm
>=
SYNC_INDEX_BEGIN
&&
pMsg
->
prevLogTerm
<=
ths
->
pLogStore
->
getLastIndex
(
ths
->
pLogStore
))
{
SSyncRaftEntry
*
pEntry
=
logStoreGetEntry
(
ths
->
pLogStore
,
pMsg
->
prevLogTerm
);
assert
(
pEntry
!=
NULL
);
localPreLogTerm
=
pEntry
->
term
;
syncEntryDestory
(
pEntry
);
}
bool
logOK
=
(
pMsg
->
prevLogIndex
==
SYNC_INDEX_INVALID
)
||
((
pMsg
->
prevLogIndex
>=
SYNC_INDEX_BEGIN
)
&&
(
pMsg
->
prevLogIndex
<=
ths
->
pLogStore
->
getLastIndex
(
ths
->
pLogStore
))
&&
(
pMsg
->
prevLogIndex
==
localPreLogTerm
));
// reject
if
((
pMsg
->
term
<
ths
->
pRaftStore
->
currentTerm
)
||
((
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
)
&&
(
ths
->
state
==
TAOS_SYNC_STATE_FOLLOWER
)
&&
!
logOK
))
{
SyncAppendEntriesReply
*
pReply
=
syncAppendEntriesReplyBuild
();
pReply
->
srcId
=
ths
->
myRaftId
;
pReply
->
destId
=
pMsg
->
srcId
;
pReply
->
term
=
ths
->
pRaftStore
->
currentTerm
;
pReply
->
success
=
false
;
pReply
->
matchIndex
=
SYNC_INDEX_INVALID
;
SRpcMsg
rpcMsg
;
syncAppendEntriesReply2RpcMsg
(
pReply
,
&
rpcMsg
);
syncNodeSendMsgById
(
&
pReply
->
destId
,
ths
,
&
rpcMsg
);
syncAppendEntriesReplyDestroy
(
pReply
);
return
ret
;
}
// return to follower state
if
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
&&
ths
->
state
==
TAOS_SYNC_STATE_CANDIDATE
)
{
syncNodeBecomeFollower
(
ths
);
}
// accept request
if
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
&&
ths
->
state
==
TAOS_SYNC_STATE_FOLLOWER
&&
logOK
)
{
bool
matchSuccess
=
false
;
if
(
pMsg
->
prevLogIndex
==
SYNC_INDEX_INVALID
&&
ths
->
pLogStore
->
getLastIndex
(
ths
->
pLogStore
)
==
SYNC_INDEX_INVALID
)
{
matchSuccess
=
true
;
}
if
(
pMsg
->
prevLogIndex
>=
SYNC_INDEX_BEGIN
&&
pMsg
->
prevLogIndex
<=
ths
->
pLogStore
->
getLastIndex
(
ths
->
pLogStore
))
{
SSyncRaftEntry
*
pEntry
=
logStoreGetEntry
(
ths
->
pLogStore
,
pMsg
->
prevLogTerm
);
assert
(
pEntry
!=
NULL
);
if
(
pMsg
->
prevLogTerm
==
pEntry
->
term
)
{
matchSuccess
=
true
;
}
syncEntryDestory
(
pEntry
);
}
if
(
matchSuccess
)
{
// delete conflict entries
if
(
ths
->
pLogStore
->
getLastIndex
(
ths
->
pLogStore
)
>
pMsg
->
prevLogIndex
)
{
SyncIndex
fromIndex
=
pMsg
->
prevLogIndex
+
1
;
ths
->
pLogStore
->
truncate
(
ths
->
pLogStore
,
fromIndex
);
}
// append one entry
if
(
pMsg
->
dataLen
>
0
)
{
SSyncRaftEntry
*
pEntry
=
syncEntryDeserialize
(
pMsg
->
data
,
pMsg
->
dataLen
);
ths
->
pLogStore
->
appendEntry
(
ths
->
pLogStore
,
pEntry
);
syncEntryDestory
(
pEntry
);
}
SyncAppendEntriesReply
*
pReply
=
syncAppendEntriesReplyBuild
();
pReply
->
srcId
=
ths
->
myRaftId
;
pReply
->
destId
=
pMsg
->
srcId
;
pReply
->
term
=
ths
->
pRaftStore
->
currentTerm
;
pReply
->
success
=
true
;
pReply
->
matchIndex
=
pMsg
->
prevLogIndex
+
1
;
SRpcMsg
rpcMsg
;
syncAppendEntriesReply2RpcMsg
(
pReply
,
&
rpcMsg
);
syncNodeSendMsgById
(
&
pReply
->
destId
,
ths
,
&
rpcMsg
);
syncAppendEntriesReplyDestroy
(
pReply
);
}
else
{
SyncAppendEntriesReply
*
pReply
=
syncAppendEntriesReplyBuild
();
pReply
->
srcId
=
ths
->
myRaftId
;
pReply
->
destId
=
pMsg
->
srcId
;
pReply
->
term
=
ths
->
pRaftStore
->
currentTerm
;
pReply
->
success
=
false
;
pReply
->
matchIndex
=
SYNC_INDEX_INVALID
;
SRpcMsg
rpcMsg
;
syncAppendEntriesReply2RpcMsg
(
pReply
,
&
rpcMsg
);
syncNodeSendMsgById
(
&
pReply
->
destId
,
ths
,
&
rpcMsg
);
syncAppendEntriesReplyDestroy
(
pReply
);
}
if
(
pMsg
->
commitIndex
>
ths
->
commitIndex
)
{
if
(
pMsg
->
commitIndex
<=
ths
->
pLogStore
->
getLastIndex
(
ths
->
pLogStore
))
{
// commit
ths
->
commitIndex
=
pMsg
->
commitIndex
;
ths
->
pLogStore
->
updateCommitIndex
(
ths
->
pLogStore
,
ths
->
commitIndex
);
}
}
}
return
ret
;
}
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
5e4a49f4
...
...
@@ -14,6 +14,12 @@
*/
#include "syncAppendEntriesReply.h"
#include "syncIndexMgr.h"
#include "syncInt.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
// TLA+ Spec
// HandleAppendEntriesResponse(i, j, m) ==
...
...
@@ -28,4 +34,41 @@
// /\ Discard(m)
// /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
//
int32_t
syncNodeOnAppendEntriesReplyCb
(
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
)
{}
int32_t
syncNodeOnAppendEntriesReplyCb
(
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
)
{
int32_t
ret
=
0
;
syncAppendEntriesReplyLog2
(
"==syncNodeOnAppendEntriesReplyCb=="
,
pMsg
);
if
(
pMsg
->
term
<
ths
->
pRaftStore
->
currentTerm
)
{
sTrace
(
"DropStaleResponse, receive term:%lu, current term:%lu"
,
pMsg
->
term
,
ths
->
pRaftStore
->
currentTerm
);
return
ret
;
}
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term);
// }
assert
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
);
if
(
pMsg
->
success
)
{
// nextIndex = reply.matchIndex + 1
syncIndexMgrSetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
),
pMsg
->
matchIndex
+
1
);
// matchIndex = reply.matchIndex
syncIndexMgrSetIndex
(
ths
->
pMatchIndex
,
&
(
pMsg
->
srcId
),
pMsg
->
matchIndex
);
// maybe commit
syncNodeMaybeAdvanceCommitIndex
(
ths
);
}
else
{
SyncIndex
nextIndex
=
syncIndexMgrGetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
));
if
(
nextIndex
>
SYNC_INDEX_BEGIN
)
{
--
nextIndex
;
}
else
{
nextIndex
=
SYNC_INDEX_BEGIN
;
}
syncIndexMgrSetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
),
nextIndex
);
}
return
ret
;
}
source/libs/sync/src/syncElection.c
浏览文件 @
5e4a49f4
...
...
@@ -16,6 +16,7 @@
#include "syncElection.h"
#include "syncMessage.h"
#include "syncRaftStore.h"
#include "syncVoteMgr.h"
// TLA+ Spec
// RequestVote(i, j) ==
...
...
@@ -37,7 +38,7 @@ int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {
SyncRequestVote
*
pMsg
=
syncRequestVoteBuild
();
pMsg
->
srcId
=
pSyncNode
->
myRaftId
;
pMsg
->
destId
=
pSyncNode
->
peersId
[
i
];
pMsg
->
currentT
erm
=
pSyncNode
->
pRaftStore
->
currentTerm
;
pMsg
->
t
erm
=
pSyncNode
->
pRaftStore
->
currentTerm
;
pMsg
->
lastLogIndex
=
pSyncNode
->
pLogStore
->
getLastIndex
(
pSyncNode
->
pLogStore
);
pMsg
->
lastLogTerm
=
pSyncNode
->
pLogStore
->
getLastTerm
(
pSyncNode
->
pLogStore
);
...
...
@@ -49,10 +50,22 @@ int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) {
}
int32_t
syncNodeElect
(
SSyncNode
*
pSyncNode
)
{
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_FOLLOWER
)
{
syncNodeFollower2Candidate
(
pSyncNode
);
}
assert
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_CANDIDATE
);
// start election
raftStoreNextTerm
(
pSyncNode
->
pRaftStore
);
raftStoreClearVote
(
pSyncNode
->
pRaftStore
);
voteGrantedReset
(
pSyncNode
->
pVotesGranted
,
pSyncNode
->
pRaftStore
->
currentTerm
);
votesRespondReset
(
pSyncNode
->
pVotesRespond
,
pSyncNode
->
pRaftStore
->
currentTerm
);
syncNodeVoteForSelf
(
pSyncNode
);
int32_t
ret
=
syncNodeRequestVotePeers
(
pSyncNode
);
assert
(
ret
==
0
);
syncNodeResetElectTimer
(
pSyncNode
);
return
ret
;
}
...
...
source/libs/sync/src/syncEnv.c
浏览文件 @
5e4a49f4
...
...
@@ -19,19 +19,18 @@
SSyncEnv
*
gSyncEnv
=
NULL
;
// local function -----------------
static
void
syncEnvTick
(
void
*
param
,
void
*
tmrId
);
static
int32_t
doSyncEnvStart
(
SSyncEnv
*
pSyncEnv
);
static
int32_t
doSyncEnvStop
(
SSyncEnv
*
pSyncEnv
);
static
tmr_h
doSyncEnvStartTimer
(
SSyncEnv
*
pSyncEnv
,
TAOS_TMR_CALLBACK
fp
,
int
mseconds
,
void
*
param
);
static
void
doSyncEnvStopTimer
(
SSyncEnv
*
pSyncEnv
,
tmr_h
*
pTimer
);
static
SSyncEnv
*
doSyncEnvStart
(
);
static
int32_t
doSyncEnvStop
(
SSyncEnv
*
pSyncEnv
);
static
int32_t
doSyncEnvStartTimer
(
SSyncEnv
*
pSyncEnv
);
static
int32_t
doSyncEnvStopTimer
(
SSyncEnv
*
pSyncEnv
);
static
void
syncEnvTick
(
void
*
param
,
void
*
tmrId
);
// --------------------------------
int32_t
syncEnvStart
()
{
int32_t
ret
;
int32_t
ret
=
0
;
taosSeedRand
(
taosGetTimestampSec
());
gSyncEnv
=
(
SSyncEnv
*
)
malloc
(
sizeof
(
SSyncEnv
)
);
gSyncEnv
=
doSyncEnvStart
(
gSyncEnv
);
assert
(
gSyncEnv
!=
NULL
);
ret
=
doSyncEnvStart
(
gSyncEnv
);
return
ret
;
}
...
...
@@ -40,31 +39,52 @@ int32_t syncEnvStop() {
return
ret
;
}
tmr_h
syncEnvStartTimer
(
TAOS_TMR_CALLBACK
fp
,
int
mseconds
,
void
*
param
)
{
return
doSyncEnvStartTimer
(
gSyncEnv
,
fp
,
mseconds
,
param
);
int32_t
syncEnvStartTimer
()
{
int32_t
ret
=
doSyncEnvStartTimer
(
gSyncEnv
);
return
ret
;
}
void
syncEnvStopTimer
(
tmr_h
*
pTimer
)
{
doSyncEnvStopTimer
(
gSyncEnv
,
pTimer
);
}
int32_t
syncEnvStopTimer
()
{
int32_t
ret
=
doSyncEnvStopTimer
(
gSyncEnv
);
return
ret
;
}
// local function -----------------
static
void
syncEnvTick
(
void
*
param
,
void
*
tmrId
)
{
SSyncEnv
*
pSyncEnv
=
(
SSyncEnv
*
)
param
;
sTrace
(
"syncEnvTick ... name:%s "
,
pSyncEnv
->
name
);
pSyncEnv
->
pEnvTickTimer
=
taosTmrStart
(
syncEnvTick
,
1000
,
pSyncEnv
,
pSyncEnv
->
pTimerManager
);
if
(
atomic_load_64
(
&
pSyncEnv
->
envTickTimerLogicClockUser
)
<=
atomic_load_64
(
&
pSyncEnv
->
envTickTimerLogicClock
))
{
++
(
pSyncEnv
->
envTickTimerCounter
);
sTrace
(
"syncEnvTick do ... envTickTimerLogicClockUser:%lu, envTickTimerLogicClock:%lu, envTickTimerCounter:%lu, "
"envTickTimerMS:%d, tmrId:%p"
,
pSyncEnv
->
envTickTimerLogicClockUser
,
pSyncEnv
->
envTickTimerLogicClock
,
pSyncEnv
->
envTickTimerCounter
,
pSyncEnv
->
envTickTimerMS
,
tmrId
);
// do something, tick ...
taosTmrReset
(
syncEnvTick
,
pSyncEnv
->
envTickTimerMS
,
pSyncEnv
,
pSyncEnv
->
pTimerManager
,
&
pSyncEnv
->
pEnvTickTimer
);
}
else
{
sTrace
(
"syncEnvTick pass ... envTickTimerLogicClockUser:%lu, envTickTimerLogicClock:%lu, envTickTimerCounter:%lu, "
"envTickTimerMS:%d, tmrId:%p"
,
pSyncEnv
->
envTickTimerLogicClockUser
,
pSyncEnv
->
envTickTimerLogicClock
,
pSyncEnv
->
envTickTimerCounter
,
pSyncEnv
->
envTickTimerMS
,
tmrId
);
}
}
static
int32_t
doSyncEnvStart
(
SSyncEnv
*
pSyncEnv
)
{
snprintf
(
pSyncEnv
->
name
,
sizeof
(
pSyncEnv
->
name
),
"SyncEnv_%p"
,
pSyncEnv
);
static
SSyncEnv
*
doSyncEnvStart
()
{
SSyncEnv
*
pSyncEnv
=
(
SSyncEnv
*
)
malloc
(
sizeof
(
SSyncEnv
));
assert
(
pSyncEnv
!=
NULL
);
memset
(
pSyncEnv
,
0
,
sizeof
(
pSyncEnv
));
pSyncEnv
->
envTickTimerCounter
=
0
;
pSyncEnv
->
envTickTimerMS
=
ENV_TICK_TIMER_MS
;
pSyncEnv
->
FpEnvTickTimer
=
syncEnvTick
;
atomic_store_64
(
&
pSyncEnv
->
envTickTimerLogicClock
,
0
);
atomic_store_64
(
&
pSyncEnv
->
envTickTimerLogicClockUser
,
0
);
// start tmr thread
pSyncEnv
->
pTimerManager
=
taosTmrInit
(
1000
,
50
,
10000
,
"SYNC-ENV"
);
// pSyncEnv->pEnvTickTimer = taosTmrStart(syncEnvTick, 1000, pSyncEnv, pSyncEnv->pTimerManager);
sTrace
(
"SyncEnv start ok, name:%s"
,
pSyncEnv
->
name
);
return
0
;
return
pSyncEnv
;
}
static
int32_t
doSyncEnvStop
(
SSyncEnv
*
pSyncEnv
)
{
...
...
@@ -72,8 +92,18 @@ static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) {
return
0
;
}
static
tmr_h
doSyncEnvStartTimer
(
SSyncEnv
*
pSyncEnv
,
TAOS_TMR_CALLBACK
fp
,
int
mseconds
,
void
*
param
)
{
return
taosTmrStart
(
fp
,
mseconds
,
pSyncEnv
,
pSyncEnv
->
pTimerManager
);
static
int32_t
doSyncEnvStartTimer
(
SSyncEnv
*
pSyncEnv
)
{
int32_t
ret
=
0
;
taosTmrReset
(
pSyncEnv
->
FpEnvTickTimer
,
pSyncEnv
->
envTickTimerMS
,
pSyncEnv
,
pSyncEnv
->
pTimerManager
,
&
pSyncEnv
->
pEnvTickTimer
);
atomic_store_64
(
&
pSyncEnv
->
envTickTimerLogicClock
,
pSyncEnv
->
envTickTimerLogicClockUser
);
return
ret
;
}
static
void
doSyncEnvStopTimer
(
SSyncEnv
*
pSyncEnv
,
tmr_h
*
pTimer
)
{}
static
int32_t
doSyncEnvStopTimer
(
SSyncEnv
*
pSyncEnv
)
{
int32_t
ret
=
0
;
atomic_add_fetch_64
(
&
pSyncEnv
->
envTickTimerLogicClockUser
,
1
);
taosTmrStop
(
pSyncEnv
->
pEnvTickTimer
);
pSyncEnv
->
pEnvTickTimer
=
NULL
;
return
ret
;
}
source/libs/sync/src/syncIO.c
浏览文件 @
5e4a49f4
...
...
@@ -16,6 +16,7 @@
#include "syncIO.h"
#include <tdatablock.h>
#include "syncMessage.h"
#include "syncUtil.h"
#include "tglobal.h"
#include "ttimer.h"
#include "tutil.h"
...
...
@@ -23,33 +24,36 @@
SSyncIO
*
gSyncIO
=
NULL
;
// local function ------------
static
int32_t
syncIOStartInternal
(
SSyncIO
*
io
);
static
int32_t
syncIOStopInternal
(
SSyncIO
*
io
);
static
SSyncIO
*
syncIOCreate
(
char
*
host
,
uint16_t
port
);
static
int32_t
syncIODestroy
(
SSyncIO
*
io
);
static
int32_t
syncIOStartInternal
(
SSyncIO
*
io
);
static
int32_t
syncIOStopInternal
(
SSyncIO
*
io
);
static
void
*
syncIOConsumerFunc
(
void
*
param
);
static
int
syncIOAuth
(
void
*
parent
,
char
*
meterId
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
static
void
syncIOProcessRequest
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
void
syncIOProcessReply
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
int32_t
syncIOTickQInternal
(
SSyncIO
*
io
);
static
void
syncIOTickQFunc
(
void
*
param
,
void
*
tmrId
);
static
int32_t
syncIOTickPingInternal
(
SSyncIO
*
io
);
static
void
syncIOTickPingFunc
(
void
*
param
,
void
*
tmrId
);
static
void
*
syncIOConsumerFunc
(
void
*
param
);
static
void
syncIOProcessRequest
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
void
syncIOProcessReply
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
int32_t
syncIOAuth
(
void
*
parent
,
char
*
meterId
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
static
int32_t
syncIOStartQ
(
SSyncIO
*
io
);
static
int32_t
syncIOStopQ
(
SSyncIO
*
io
);
static
int32_t
syncIOStartPing
(
SSyncIO
*
io
);
static
int32_t
syncIOStopPing
(
SSyncIO
*
io
);
static
void
syncIOTickQ
(
void
*
param
,
void
*
tmrId
);
static
void
syncIOTickPing
(
void
*
param
,
void
*
tmrId
);
// ----------------------------
// public function ------------
int32_t
syncIOStart
(
char
*
host
,
uint16_t
port
)
{
int32_t
ret
=
0
;
gSyncIO
=
syncIOCreate
(
host
,
port
);
assert
(
gSyncIO
!=
NULL
);
taosSeedRand
(
taosGetTimestampSec
());
int32_t
ret
=
syncIOStartInternal
(
gSyncIO
);
ret
=
syncIOStartInternal
(
gSyncIO
);
assert
(
ret
==
0
);
sTrace
(
"syncIOStart ok, gSyncIO:%p
gSyncIO->clientRpc:%p"
,
gSyncIO
,
gSyncIO
->
clientRpc
);
return
0
;
sTrace
(
"syncIOStart ok, gSyncIO:%p
"
,
gSyncIO
);
return
ret
;
}
int32_t
syncIOStop
()
{
...
...
@@ -61,37 +65,25 @@ int32_t syncIOStop() {
return
ret
;
}
int32_t
syncIOTickQ
()
{
int32_t
ret
=
syncIOTickQInternal
(
gSyncIO
);
assert
(
ret
==
0
);
return
ret
;
}
int32_t
syncIOSendMsg
(
void
*
clientRpc
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
)
{
assert
(
pEpSet
->
inUse
==
0
);
assert
(
pEpSet
->
numOfEps
==
1
);
int32_t
syncIOTickPing
()
{
int32_t
ret
=
syncIOTickPingInternal
(
gSyncIO
);
assert
(
ret
==
0
);
return
ret
;
}
int32_t
ret
=
0
;
char
logBuf
[
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==syncIOSendMsg== %s:%d"
,
pEpSet
->
eps
[
0
].
fqdn
,
pEpSet
->
eps
[
0
].
port
);
syncRpcMsgPrint2
(
logBuf
,
pMsg
);
int32_t
syncIOSendMsg
(
void
*
clientRpc
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
)
{
sTrace
(
"<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u, pMsg->ahandle:%p, pMsg->handle:%p, "
"pMsg->msgType:%d, pMsg->contLen:%d"
,
clientRpc
,
pEpSet
->
numOfEps
,
pEpSet
->
inUse
,
pEpSet
->
eps
[
0
].
fqdn
,
pEpSet
->
eps
[
0
].
port
,
pMsg
->
ahandle
,
pMsg
->
handle
,
pMsg
->
msgType
,
pMsg
->
contLen
);
{
cJSON
*
pJson
=
syncRpcMsg2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
sTrace
(
"process syncMessage send: pMsg:%s "
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
pMsg
->
handle
=
NULL
;
rpcSendRequest
(
clientRpc
,
pEpSet
,
pMsg
,
NULL
);
return
0
;
return
ret
;
}
int32_t
syncIOEqMsg
(
void
*
queue
,
SRpcMsg
*
pMsg
)
{
int32_t
ret
=
0
;
char
logBuf
[
128
];
syncRpcMsgPrint2
((
char
*
)
"==syncIOEqMsg=="
,
pMsg
);
SRpcMsg
*
pTemp
;
pTemp
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
));
memcpy
(
pTemp
,
pMsg
,
sizeof
(
SRpcMsg
));
...
...
@@ -99,11 +91,75 @@ int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) {
STaosQueue
*
pMsgQ
=
queue
;
taosWriteQitem
(
pMsgQ
,
pTemp
);
return
0
;
return
ret
;
}
int32_t
syncIOQTimerStart
()
{
int32_t
ret
=
syncIOStartQ
(
gSyncIO
);
assert
(
ret
==
0
);
return
ret
;
}
int32_t
syncIOQTimerStop
()
{
int32_t
ret
=
syncIOStopQ
(
gSyncIO
);
assert
(
ret
==
0
);
return
ret
;
}
int32_t
syncIOPingTimerStart
()
{
int32_t
ret
=
syncIOStartPing
(
gSyncIO
);
assert
(
ret
==
0
);
return
ret
;
}
int32_t
syncIOPingTimerStop
()
{
int32_t
ret
=
syncIOStopPing
(
gSyncIO
);
assert
(
ret
==
0
);
return
ret
;
}
// local function ------------
static
SSyncIO
*
syncIOCreate
(
char
*
host
,
uint16_t
port
)
{
SSyncIO
*
io
=
(
SSyncIO
*
)
malloc
(
sizeof
(
SSyncIO
));
memset
(
io
,
0
,
sizeof
(
*
io
));
io
->
pMsgQ
=
taosOpenQueue
();
io
->
pQset
=
taosOpenQset
();
taosAddIntoQset
(
io
->
pQset
,
io
->
pMsgQ
,
NULL
);
io
->
myAddr
.
inUse
=
0
;
io
->
myAddr
.
numOfEps
=
0
;
addEpIntoEpSet
(
&
io
->
myAddr
,
host
,
port
);
io
->
qTimerMS
=
TICK_Q_TIMER_MS
;
io
->
pingTimerMS
=
TICK_Ping_TIMER_MS
;
return
io
;
}
static
int32_t
syncIODestroy
(
SSyncIO
*
io
)
{
int32_t
ret
=
0
;
int8_t
start
=
atomic_load_8
(
&
io
->
isStart
);
assert
(
start
==
0
);
if
(
io
->
serverRpc
!=
NULL
)
{
rpcClose
(
io
->
serverRpc
);
io
->
serverRpc
=
NULL
;
}
if
(
io
->
clientRpc
!=
NULL
)
{
rpcClose
(
io
->
clientRpc
);
io
->
clientRpc
=
NULL
;
}
taosCloseQueue
(
io
->
pMsgQ
);
taosCloseQset
(
io
->
pQset
);
return
ret
;
}
static
int32_t
syncIOStartInternal
(
SSyncIO
*
io
)
{
int32_t
ret
=
0
;
taosBlockSIGPIPE
();
rpcInit
();
...
...
@@ -163,58 +219,24 @@ static int32_t syncIOStartInternal(SSyncIO *io) {
}
// start tmr thread
io
->
ioTimerManager
=
taosTmrInit
(
1000
,
50
,
10000
,
"SYNC
"
);
io
->
timerMgr
=
taosTmrInit
(
1000
,
50
,
10000
,
"SYNC-IO
"
);
return
0
;
atomic_store_8
(
&
io
->
isStart
,
1
);
return
ret
;
}
static
int32_t
syncIOStopInternal
(
SSyncIO
*
io
)
{
int32_t
ret
=
0
;
atomic_store_8
(
&
io
->
isStart
,
0
);
pthread_join
(
io
->
consumerTid
,
NULL
);
return
0
;
}
static
SSyncIO
*
syncIOCreate
(
char
*
host
,
uint16_t
port
)
{
SSyncIO
*
io
=
(
SSyncIO
*
)
malloc
(
sizeof
(
SSyncIO
));
memset
(
io
,
0
,
sizeof
(
*
io
));
io
->
pMsgQ
=
taosOpenQueue
();
io
->
pQset
=
taosOpenQset
();
taosAddIntoQset
(
io
->
pQset
,
io
->
pMsgQ
,
NULL
);
io
->
myAddr
.
inUse
=
0
;
addEpIntoEpSet
(
&
io
->
myAddr
,
host
,
port
);
return
io
;
}
static
int32_t
syncIODestroy
(
SSyncIO
*
io
)
{
int8_t
start
=
atomic_load_8
(
&
io
->
isStart
);
assert
(
start
==
0
);
if
(
io
->
serverRpc
!=
NULL
)
{
free
(
io
->
serverRpc
);
io
->
serverRpc
=
NULL
;
}
if
(
io
->
clientRpc
!=
NULL
)
{
free
(
io
->
clientRpc
);
io
->
clientRpc
=
NULL
;
}
taosCloseQueue
(
io
->
pMsgQ
);
taosCloseQset
(
io
->
pQset
);
return
0
;
taosTmrCleanUp
(
io
->
timerMgr
);
return
ret
;
}
static
void
*
syncIOConsumerFunc
(
void
*
param
)
{
SSyncIO
*
io
=
param
;
SSyncIO
*
io
=
param
;
STaosQall
*
qall
;
SRpcMsg
*
pRpcMsg
,
rpcMsg
;
int
type
;
qall
=
taosAllocateQall
();
while
(
1
)
{
...
...
@@ -226,77 +248,67 @@ static void *syncIOConsumerFunc(void *param) {
for
(
int
i
=
0
;
i
<
numOfMsgs
;
++
i
)
{
taosGetQitem
(
qall
,
(
void
**
)
&
pRpcMsg
);
syncRpcMsgLog2
((
char
*
)
"==syncIOConsumerFunc=="
,
pRpcMsg
);
char
*
s
=
syncRpcMsg2Str
(
pRpcMsg
);
sTrace
(
"syncIOConsumerFunc get item from queue: msgType:%d contLen:%d msg:%s"
,
pRpcMsg
->
msgType
,
pRpcMsg
->
contLen
,
s
);
free
(
s
);
// use switch case instead of if else
if
(
pRpcMsg
->
msgType
==
SYNC_PING
)
{
if
(
io
->
FpOnSyncPing
!=
NULL
)
{
SyncPing
*
pSyncMsg
;
SyncPing
*
pSyncMsg
=
syncPingFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
io
->
FpOnSyncPing
(
io
->
pSyncNode
,
pSyncMsg
);
syncPingDestroy
(
pSyncMsg
);
/*
pSyncMsg = syncPingBuild(pRpcMsg->contLen);
syncPingFromRpcMsg(pRpcMsg, pSyncMsg);
// memcpy(pSyncMsg, tmpRpcMsg.pCont, tmpRpcMsg.contLen);
io->FpOnSyncPing(io->pSyncNode, pSyncMsg);
syncPingDestroy(pSyncMsg);
*/
}
}
else
if
(
pRpcMsg
->
msgType
==
SYNC_PING_REPLY
)
{
if
(
io
->
FpOnSyncPingReply
!=
NULL
)
{
SyncPingReply
*
pSyncMsg
;
pSyncMsg
=
syncPingReplyBuild
(
pRpcMsg
->
contLen
);
syncPingReplyFromRpcMsg
(
pRpcMsg
,
pSyncMsg
);
SyncPingReply
*
pSyncMsg
=
syncPingReplyFromRpcMsg2
(
pRpcMsg
);
io
->
FpOnSyncPingReply
(
io
->
pSyncNode
,
pSyncMsg
);
syncPingReplyDestroy
(
pSyncMsg
);
}
}
else
if
(
pRpcMsg
->
msgType
==
SYNC_REQUEST_VOTE
)
{
if
(
io
->
FpOnSyncRequestVote
!=
NULL
)
{
SyncRequestVote
*
pSyncMsg
;
pSyncMsg
=
syncRequestVoteBuild
(
pRpcMsg
->
contLen
);
syncRequestVoteFromRpcMsg
(
pRpcMsg
,
pSyncMsg
);
SyncRequestVote
*
pSyncMsg
=
syncRequestVoteFromRpcMsg2
(
pRpcMsg
);
io
->
FpOnSyncRequestVote
(
io
->
pSyncNode
,
pSyncMsg
);
syncRequestVoteDestroy
(
pSyncMsg
);
}
}
else
if
(
pRpcMsg
->
msgType
==
SYNC_REQUEST_VOTE_REPLY
)
{
if
(
io
->
FpOnSyncRequestVoteReply
!=
NULL
)
{
SyncRequestVoteReply
*
pSyncMsg
;
pSyncMsg
=
syncRequestVoteReplyBuild
();
syncRequestVoteReplyFromRpcMsg
(
pRpcMsg
,
pSyncMsg
);
SyncRequestVoteReply
*
pSyncMsg
=
syncRequestVoteReplyFromRpcMsg2
(
pRpcMsg
);
io
->
FpOnSyncRequestVoteReply
(
io
->
pSyncNode
,
pSyncMsg
);
syncRequestVoteReplyDestroy
(
pSyncMsg
);
}
}
else
if
(
pRpcMsg
->
msgType
==
SYNC_APPEND_ENTRIES
)
{
if
(
io
->
FpOnSyncAppendEntries
!=
NULL
)
{
SyncAppendEntries
*
pSyncMsg
;
pSyncMsg
=
syncAppendEntriesBuild
(
pRpcMsg
->
contLen
);
syncAppendEntriesFromRpcMsg
(
pRpcMsg
,
pSyncMsg
);
SyncAppendEntries
*
pSyncMsg
=
syncAppendEntriesFromRpcMsg2
(
pRpcMsg
);
io
->
FpOnSyncAppendEntries
(
io
->
pSyncNode
,
pSyncMsg
);
syncAppendEntriesDestroy
(
pSyncMsg
);
}
}
else
if
(
pRpcMsg
->
msgType
==
SYNC_APPEND_ENTRIES_REPLY
)
{
if
(
io
->
FpOnSyncAppendEntriesReply
!=
NULL
)
{
SyncAppendEntriesReply
*
pSyncMsg
;
pSyncMsg
=
syncAppendEntriesReplyBuild
();
syncAppendEntriesReplyFromRpcMsg
(
pRpcMsg
,
pSyncMsg
);
SyncAppendEntriesReply
*
pSyncMsg
=
syncAppendEntriesReplyFromRpcMsg2
(
pRpcMsg
);
io
->
FpOnSyncAppendEntriesReply
(
io
->
pSyncNode
,
pSyncMsg
);
syncAppendEntriesReplyDestroy
(
pSyncMsg
);
}
}
else
if
(
pRpcMsg
->
msgType
==
SYNC_TIMEOUT
)
{
if
(
io
->
FpOnSyncTimeout
!=
NULL
)
{
SyncTimeout
*
pSyncMsg
;
pSyncMsg
=
syncTimeoutBuild
();
syncTimeoutFromRpcMsg
(
pRpcMsg
,
pSyncMsg
);
SyncTimeout
*
pSyncMsg
=
syncTimeoutFromRpcMsg2
(
pRpcMsg
);
io
->
FpOnSyncTimeout
(
io
->
pSyncNode
,
pSyncMsg
);
syncTimeoutDestroy
(
pSyncMsg
);
}
}
else
{
;
sTrace
(
"unknown msgType:%d, no operator"
,
pRpcMsg
->
msgType
)
;
}
}
...
...
@@ -306,15 +318,16 @@ static void *syncIOConsumerFunc(void *param) {
rpcFreeCont
(
pRpcMsg
->
pCont
);
if
(
pRpcMsg
->
handle
!=
NULL
)
{
int
msgSize
=
128
;
int
msgSize
=
32
;
memset
(
&
rpcMsg
,
0
,
sizeof
(
rpcMsg
));
rpcMsg
.
msgType
=
SYNC_RESPONSE
;
rpcMsg
.
pCont
=
rpcMallocCont
(
msgSize
);
rpcMsg
.
contLen
=
msgSize
;
snprintf
(
rpcMsg
.
pCont
,
rpcMsg
.
contLen
,
"%s"
,
"give a reply"
);
rpcMsg
.
handle
=
pRpcMsg
->
handle
;
rpcMsg
.
code
=
0
;
s
Trace
(
"syncIOConsumerFunc rpcSendResponse ... msgType:%d contLen:%d"
,
pRpcMsg
->
msgType
,
rpcMsg
.
contLen
);
s
yncRpcMsgPrint2
((
char
*
)
"syncIOConsumerFunc rpcSendResponse --> "
,
&
rpcMsg
);
rpcSendResponse
(
&
rpcMsg
);
}
...
...
@@ -326,71 +339,95 @@ static void *syncIOConsumerFunc(void *param) {
return
NULL
;
}
static
int
syncIOAuth
(
void
*
parent
,
char
*
meterId
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
// app shall retrieve the auth info based on meterID from DB or a data file
// demo code here only for simple demo
int
ret
=
0
;
return
ret
;
}
static
void
syncIOProcessRequest
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
sTrace
(
"<-- syncIOProcessRequest --> type:%d, contLen:%d, cont:%s"
,
pMsg
->
msgType
,
pMsg
->
contLen
,
(
char
*
)
pMsg
->
pCont
);
syncRpcMsgPrint2
((
char
*
)
"==syncIOProcessRequest=="
,
pMsg
);
SSyncIO
*
io
=
pParent
;
SRpcMsg
*
pTemp
;
pTemp
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
));
memcpy
(
pTemp
,
pMsg
,
sizeof
(
SRpcMsg
));
taosWriteQitem
(
io
->
pMsgQ
,
pTemp
);
}
static
void
syncIOProcessReply
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
sTrace
(
"syncIOProcessReply: type:%d, contLen:%d msg:%s"
,
pMsg
->
msgType
,
pMsg
->
contLen
,
(
char
*
)
pMsg
->
pCont
);
if
(
pMsg
->
msgType
==
SYNC_RESPONSE
)
{
sTrace
(
"==syncIOProcessReply=="
);
}
else
{
syncRpcMsgPrint2
((
char
*
)
"==syncIOProcessReply=="
,
pMsg
);
}
rpcFreeCont
(
pMsg
->
pCont
);
}
static
int32_t
syncIOTickQInternal
(
SSyncIO
*
io
)
{
io
->
ioTimerTickQ
=
taosTmrStart
(
syncIOTickQFunc
,
1000
,
io
,
io
->
ioTimerManager
);
return
0
;
static
int32_t
syncIOAuth
(
void
*
parent
,
char
*
meterId
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
// app shall retrieve the auth info based on meterID from DB or a data file
// demo code here only for simple demo
int32_t
ret
=
0
;
return
ret
;
}
static
int32_t
syncIOStartQ
(
SSyncIO
*
io
)
{
int32_t
ret
=
0
;
taosTmrReset
(
syncIOTickQ
,
io
->
qTimerMS
,
io
,
io
->
timerMgr
,
&
io
->
qTimer
);
return
ret
;
}
static
void
syncIOTickQFunc
(
void
*
param
,
void
*
tmrId
)
{
static
int32_t
syncIOStopQ
(
SSyncIO
*
io
)
{
int32_t
ret
=
0
;
taosTmrStop
(
io
->
qTimer
);
io
->
qTimer
=
NULL
;
return
ret
;
}
static
int32_t
syncIOStartPing
(
SSyncIO
*
io
)
{
int32_t
ret
=
0
;
taosTmrReset
(
syncIOTickPing
,
io
->
pingTimerMS
,
io
,
io
->
timerMgr
,
&
io
->
pingTimer
);
return
ret
;
}
static
int32_t
syncIOStopPing
(
SSyncIO
*
io
)
{
int32_t
ret
=
0
;
taosTmrStop
(
io
->
pingTimer
);
io
->
pingTimer
=
NULL
;
return
ret
;
}
static
void
syncIOTickQ
(
void
*
param
,
void
*
tmrId
)
{
SSyncIO
*
io
=
(
SSyncIO
*
)
param
;
sTrace
(
"<-- syncIOTickQFunc -->"
);
SR
pcMsg
rpcMsg
;
rpcMsg
.
contLen
=
64
;
rpcMsg
.
pCont
=
rpcMallocCont
(
rpcMsg
.
contLen
)
;
snprintf
(
rpcMsg
.
pCont
,
rpcMsg
.
contLen
,
"%s"
,
"syncIOTickQ"
);
rpcMsg
.
handle
=
NULL
;
rpcMsg
.
msgType
=
55
;
SR
aftId
srcId
,
destId
;
srcId
.
addr
=
syncUtilAddr2U64
(
io
->
myAddr
.
eps
[
0
].
fqdn
,
io
->
myAddr
.
eps
[
0
].
port
)
;
srcId
.
vgId
=
-
1
;
destId
.
addr
=
syncUtilAddr2U64
(
io
->
myAddr
.
eps
[
0
].
fqdn
,
io
->
myAddr
.
eps
[
0
].
port
);
destId
.
vgId
=
-
1
;
SyncPingReply
*
pMsg
=
syncPingReplyBuild2
(
&
srcId
,
&
destId
,
"syncIOTickQ"
)
;
SRpcMsg
rpcMsg
;
syncPingReply2RpcMsg
(
pMsg
,
&
rpcMsg
);
SRpcMsg
*
pTemp
;
pTemp
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
));
memcpy
(
pTemp
,
&
rpcMsg
,
sizeof
(
SRpcMsg
));
syncRpcMsgPrint2
((
char
*
)
"==syncIOTickQ=="
,
&
rpcMsg
);
taosWriteQitem
(
io
->
pMsgQ
,
pTemp
);
taosTmrReset
(
syncIOTickQFunc
,
1000
,
io
,
io
->
ioTimerManager
,
&
io
->
ioTimerTickQ
);
}
syncPingReplyDestroy
(
pMsg
);
static
int32_t
syncIOTickPingInternal
(
SSyncIO
*
io
)
{
io
->
ioTimerTickPing
=
taosTmrStart
(
syncIOTickPingFunc
,
1000
,
io
,
io
->
ioTimerManager
);
return
0
;
taosTmrReset
(
syncIOTickQ
,
io
->
qTimerMS
,
io
,
io
->
timerMgr
,
&
io
->
qTimer
);
}
static
void
syncIOTickPing
Func
(
void
*
param
,
void
*
tmrId
)
{
static
void
syncIOTickPing
(
void
*
param
,
void
*
tmrId
)
{
SSyncIO
*
io
=
(
SSyncIO
*
)
param
;
sTrace
(
"<-- syncIOTickPingFunc -->"
);
SRpcMsg
rpcMsg
;
rpcMsg
.
contLen
=
64
;
rpcMsg
.
pCont
=
rpcMallocCont
(
rpcMsg
.
contLen
);
snprintf
(
rpcMsg
.
pCont
,
rpcMsg
.
contLen
,
"%s"
,
"syncIOTickPing"
);
rpcMsg
.
handle
=
NULL
;
rpcMsg
.
msgType
=
77
;
SRaftId
srcId
,
destId
;
srcId
.
addr
=
syncUtilAddr2U64
(
io
->
myAddr
.
eps
[
0
].
fqdn
,
io
->
myAddr
.
eps
[
0
].
port
);
srcId
.
vgId
=
-
1
;
destId
.
addr
=
syncUtilAddr2U64
(
io
->
myAddr
.
eps
[
0
].
fqdn
,
io
->
myAddr
.
eps
[
0
].
port
);
destId
.
vgId
=
-
1
;
SyncPing
*
pMsg
=
syncPingBuild2
(
&
srcId
,
&
destId
,
"syncIOTickPing"
);
// SyncPing *pMsg = syncPingBuild3(&srcId, &destId);
SRpcMsg
rpcMsg
;
syncPing2RpcMsg
(
pMsg
,
&
rpcMsg
);
syncRpcMsgPrint2
((
char
*
)
"==syncIOTickPing=="
,
&
rpcMsg
);
rpcSendRequest
(
io
->
clientRpc
,
&
io
->
myAddr
,
&
rpcMsg
,
NULL
);
taosTmrReset
(
syncIOTickPingFunc
,
1000
,
io
,
io
->
ioTimerManager
,
&
io
->
ioTimerTickPing
);
syncPingDestroy
(
pMsg
);
taosTmrReset
(
syncIOTickPing
,
io
->
pingTimerMS
,
io
,
io
->
timerMgr
,
&
io
->
pingTimer
);
}
\ No newline at end of file
source/libs/sync/src/syncMain.c
浏览文件 @
5e4a49f4
此差异已折叠。
点击以展开。
source/libs/sync/src/syncMessage.c
浏览文件 @
5e4a49f4
...
...
@@ -65,10 +65,32 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
pRoot
=
syncAppendEntriesReply2Json
(
pSyncMsg
);
syncAppendEntriesReplyDestroy
(
pSyncMsg
);
}
else
if
(
pRpcMsg
->
msgType
==
SYNC_RESPONSE
)
{
pRoot
=
cJSON_CreateObject
();
char
*
s
;
s
=
syncUtilprintBin
((
char
*
)(
pRpcMsg
->
pCont
),
pRpcMsg
->
contLen
);
cJSON_AddStringToObject
(
pRoot
,
"pCont"
,
s
);
free
(
s
);
s
=
syncUtilprintBin2
((
char
*
)(
pRpcMsg
->
pCont
),
pRpcMsg
->
contLen
);
cJSON_AddStringToObject
(
pRoot
,
"pCont2"
,
s
);
free
(
s
);
}
else
{
pRoot
=
syncRpcUnknownMsg2Json
();
char
*
s
;
s
=
syncUtilprintBin
((
char
*
)(
pRpcMsg
->
pCont
),
pRpcMsg
->
contLen
);
cJSON_AddStringToObject
(
pRoot
,
"pCont"
,
s
);
free
(
s
);
s
=
syncUtilprintBin2
((
char
*
)(
pRpcMsg
->
pCont
),
pRpcMsg
->
contLen
);
cJSON_AddStringToObject
(
pRoot
,
"pCont2"
,
s
);
free
(
s
);
}
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pRpcMsg
->
msgType
);
cJSON_AddNumberToObject
(
pRoot
,
"contLen"
,
pRpcMsg
->
contLen
);
cJSON_AddNumberToObject
(
pRoot
,
"code"
,
pRpcMsg
->
code
);
// cJSON_AddNumberToObject(pRoot, "persist", pRpcMsg->persist);
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"RpcMsg"
,
pRoot
);
return
pJson
;
...
...
@@ -77,7 +99,7 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
cJSON
*
syncRpcUnknownMsg2Json
()
{
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
SYNC_UNKNOWN
);
cJSON_AddStringToObject
(
pRoot
,
"data"
,
"known message"
);
cJSON_AddStringToObject
(
pRoot
,
"data"
,
"
un
known message"
);
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncUnknown"
,
pRoot
);
...
...
@@ -798,8 +820,8 @@ cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg) {
cJSON_AddNumberToObject
(
pDestId
,
"vgId"
,
pMsg
->
destId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"destId"
,
pDestId
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
currentT
erm
);
cJSON_AddStringToObject
(
pRoot
,
"
currentT
erm"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
t
erm
);
cJSON_AddStringToObject
(
pRoot
,
"
t
erm"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
lastLogIndex
);
cJSON_AddStringToObject
(
pRoot
,
"lastLogIndex"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
lastLogTerm
);
...
...
@@ -1086,6 +1108,9 @@ cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg) {
cJSON_AddNumberToObject
(
pDestId
,
"vgId"
,
pMsg
->
destId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"destId"
,
pDestId
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
term
);
cJSON_AddStringToObject
(
pRoot
,
"term"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
prevLogIndex
);
cJSON_AddStringToObject
(
pRoot
,
"pre_log_index"
,
u64buf
);
...
...
@@ -1242,9 +1267,11 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) {
cJSON_AddNumberToObject
(
pDestId
,
"vgId"
,
pMsg
->
destId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"destId"
,
pDestId
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
term
);
cJSON_AddStringToObject
(
pRoot
,
"term"
,
u64buf
);
cJSON_AddNumberToObject
(
pRoot
,
"success"
,
pMsg
->
success
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
matchIndex
);
cJSON_AddStringToObject
(
pRoot
,
"match
_i
ndex"
,
u64buf
);
cJSON_AddStringToObject
(
pRoot
,
"match
I
ndex"
,
u64buf
);
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncAppendEntriesReply"
,
pRoot
);
...
...
@@ -1283,4 +1310,4 @@ void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg) {
char
*
serialized
=
syncAppendEntriesReply2Str
(
pMsg
);
sTrace
(
"syncAppendEntriesReplyLog2 | len:%lu | %s | %s"
,
strlen
(
serialized
),
s
,
serialized
);
free
(
serialized
);
}
\ No newline at end of file
}
source/libs/sync/src/syncRaftLog.c
浏览文件 @
5e4a49f4
...
...
@@ -130,7 +130,7 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) {
cJSON_AddStringToObject
(
pRoot
,
"pSyncNode"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pData
->
pWal
);
cJSON_AddStringToObject
(
pRoot
,
"pWal"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%l
u
"
,
logStoreLastIndex
(
pLogStore
));
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%l
d
"
,
logStoreLastIndex
(
pLogStore
));
cJSON_AddStringToObject
(
pRoot
,
"LastIndex"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
logStoreLastTerm
(
pLogStore
));
cJSON_AddStringToObject
(
pRoot
,
"LastTerm"
,
u64buf
);
...
...
source/libs/sync/src/syncRaftStore.c
浏览文件 @
5e4a49f4
...
...
@@ -15,6 +15,8 @@
#include "syncRaftStore.h"
#include "cJSON.h"
#include "syncEnv.h"
#include "syncUtil.h"
// private function
static
int32_t
raftStoreInit
(
SRaftStore
*
pRaftStore
);
...
...
@@ -135,6 +137,33 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) {
return
0
;
}
bool
raftStoreHasVoted
(
SRaftStore
*
pRaftStore
)
{
bool
b
=
syncUtilEmptyId
(
&
(
pRaftStore
->
voteFor
));
return
b
;
}
void
raftStoreVote
(
SRaftStore
*
pRaftStore
,
SRaftId
*
pRaftId
)
{
assert
(
!
raftStoreHasVoted
(
pRaftStore
));
assert
(
!
syncUtilEmptyId
(
pRaftId
));
pRaftStore
->
voteFor
=
*
pRaftId
;
raftStorePersist
(
pRaftStore
);
}
void
raftStoreClearVote
(
SRaftStore
*
pRaftStore
)
{
pRaftStore
->
voteFor
=
EMPTY_RAFT_ID
;
raftStorePersist
(
pRaftStore
);
}
void
raftStoreNextTerm
(
SRaftStore
*
pRaftStore
)
{
++
(
pRaftStore
->
currentTerm
);
raftStorePersist
(
pRaftStore
);
}
void
raftStoreSetTerm
(
SRaftStore
*
pRaftStore
,
SyncTerm
term
)
{
pRaftStore
->
currentTerm
=
term
;
raftStorePersist
(
pRaftStore
);
}
// for debug -------------------
void
raftStorePrint
(
SRaftStore
*
pObj
)
{
char
serialized
[
RAFT_STORE_BLOCK_SIZE
];
...
...
source/libs/sync/src/syncReplication.c
浏览文件 @
5e4a49f4
...
...
@@ -14,7 +14,11 @@
*/
#include "syncReplication.h"
#include "syncIndexMgr.h"
#include "syncMessage.h"
#include "syncRaftEntry.h"
#include "syncRaftLog.h"
#include "syncUtil.h"
// TLA+ Spec
// AppendEntries(i, j) ==
...
...
@@ -42,7 +46,39 @@
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
//
int32_t
syncNodeAppendEntriesPeers
(
SSyncNode
*
pSyncNode
)
{
assert
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
);
int32_t
ret
=
0
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
SRaftId
*
pDestId
=
&
(
pSyncNode
->
peersId
[
i
]);
SyncIndex
nextIndex
=
syncIndexMgrGetIndex
(
pSyncNode
->
pNextIndex
,
pDestId
);
SyncIndex
preLogIndex
=
nextIndex
-
1
;
SyncTerm
preLogTerm
=
0
;
if
(
preLogIndex
>=
SYNC_INDEX_BEGIN
)
{
SSyncRaftEntry
*
pPreEntry
=
pSyncNode
->
pLogStore
->
getEntry
(
pSyncNode
->
pLogStore
,
preLogIndex
);
preLogTerm
=
pPreEntry
->
term
;
}
SyncIndex
lastIndex
=
syncUtilMinIndex
(
pSyncNode
->
pLogStore
->
getLastIndex
(
pSyncNode
->
pLogStore
),
nextIndex
);
assert
(
nextIndex
==
lastIndex
);
SSyncRaftEntry
*
pEntry
=
logStoreGetEntry
(
pSyncNode
->
pLogStore
,
nextIndex
);
assert
(
pEntry
!=
NULL
);
SyncAppendEntries
*
pMsg
=
syncAppendEntriesBuild
(
pEntry
->
bytes
);
pMsg
->
srcId
=
pSyncNode
->
myRaftId
;
pMsg
->
destId
=
*
pDestId
;
pMsg
->
prevLogIndex
=
preLogIndex
;
pMsg
->
prevLogTerm
=
preLogTerm
;
pMsg
->
commitIndex
=
pSyncNode
->
commitIndex
;
pMsg
->
dataLen
=
pEntry
->
bytes
;
// add pEntry into msg
syncNodeAppendEntries
(
pSyncNode
,
pDestId
,
pMsg
);
}
return
ret
;
}
...
...
source/libs/sync/src/syncRequestVote.c
浏览文件 @
5e4a49f4
...
...
@@ -14,6 +14,10 @@
*/
#include "syncRequestVote.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
// TLA+ Spec
// HandleRequestVoteRequest(i, j, m) ==
...
...
@@ -37,4 +41,34 @@
// m)
// /\ UNCHANGED <<state, currentTerm, candidateVars, leaderVars, logVars>>
//
int32_t
syncNodeOnRequestVoteCb
(
SSyncNode
*
ths
,
SyncRequestVote
*
pMsg
)
{}
int32_t
syncNodeOnRequestVoteCb
(
SSyncNode
*
ths
,
SyncRequestVote
*
pMsg
)
{
int32_t
ret
=
0
;
syncRequestVoteLog2
(
"==syncNodeOnRequestVoteCb=="
,
pMsg
);
if
(
pMsg
->
term
>
ths
->
pRaftStore
->
currentTerm
)
{
syncNodeUpdateTerm
(
ths
,
pMsg
->
term
);
}
assert
(
pMsg
->
term
<=
ths
->
pRaftStore
->
currentTerm
);
bool
logOK
=
(
pMsg
->
lastLogTerm
>
ths
->
pLogStore
->
getLastTerm
(
ths
->
pLogStore
))
||
((
pMsg
->
lastLogTerm
==
ths
->
pLogStore
->
getLastTerm
(
ths
->
pLogStore
))
&&
(
pMsg
->
lastLogIndex
>=
ths
->
pLogStore
->
getLastIndex
(
ths
->
pLogStore
)));
bool
grant
=
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
)
&&
logOK
&&
((
!
raftStoreHasVoted
(
ths
->
pRaftStore
))
||
(
syncUtilSameId
(
&
(
ths
->
pRaftStore
->
voteFor
),
&
(
pMsg
->
srcId
))));
if
(
grant
)
{
raftStoreVote
(
ths
->
pRaftStore
,
&
(
pMsg
->
srcId
));
}
SyncRequestVoteReply
*
pReply
=
syncRequestVoteReplyBuild
();
pReply
->
srcId
=
ths
->
myRaftId
;
pReply
->
destId
=
pMsg
->
srcId
;
pReply
->
term
=
ths
->
pRaftStore
->
currentTerm
;
pReply
->
voteGranted
=
grant
;
SRpcMsg
rpcMsg
;
syncRequestVoteReply2RpcMsg
(
pReply
,
&
rpcMsg
);
syncNodeSendMsgById
(
&
pReply
->
destId
,
ths
,
&
rpcMsg
);
syncRequestVoteReplyDestroy
(
pReply
);
return
ret
;
}
source/libs/sync/src/syncRequestVoteReply.c
浏览文件 @
5e4a49f4
...
...
@@ -14,6 +14,10 @@
*/
#include "syncRequestVoteReply.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
#include "syncVoteMgr.h"
// TLA+ Spec
// HandleRequestVoteResponse(i, j, m) ==
...
...
@@ -32,4 +36,33 @@
// /\ Discard(m)
// /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars>>
//
int32_t
syncNodeOnRequestVoteReplyCb
(
SSyncNode
*
ths
,
SyncRequestVoteReply
*
pMsg
)
{}
int32_t
syncNodeOnRequestVoteReplyCb
(
SSyncNode
*
ths
,
SyncRequestVoteReply
*
pMsg
)
{
int32_t
ret
=
0
;
syncRequestVoteReplyLog2
(
"==syncNodeOnRequestVoteReplyCb=="
,
pMsg
);
if
(
pMsg
->
term
<
ths
->
pRaftStore
->
currentTerm
)
{
sTrace
(
"DropStaleResponse, receive term:%lu, current term:%lu"
,
pMsg
->
term
,
ths
->
pRaftStore
->
currentTerm
);
return
ret
;
}
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term);
// }
assert
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
);
if
(
ths
->
state
==
TAOS_SYNC_STATE_CANDIDATE
)
{
votesRespondAdd
(
ths
->
pVotesRespond
,
pMsg
);
if
(
pMsg
->
voteGranted
)
{
voteGrantedVote
(
ths
->
pVotesGranted
,
pMsg
);
if
(
voteGrantedMajority
(
ths
->
pVotesGranted
))
{
if
(
ths
->
pVotesGranted
->
toLeader
)
{
syncNodeCandidate2Leader
(
ths
);
ths
->
pVotesGranted
->
toLeader
=
true
;
}
}
}
}
return
ret
;
}
source/libs/sync/src/syncTimeout.c
浏览文件 @
5e4a49f4
...
...
@@ -19,15 +19,7 @@
int32_t
syncNodeOnTimeoutCb
(
SSyncNode
*
ths
,
SyncTimeout
*
pMsg
)
{
int32_t
ret
=
0
;
sTrace
(
"<-- syncNodeOnTimeoutCb -->"
);
{
cJSON
*
pJson
=
syncTimeout2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
sTrace
(
"process syncMessage recv: syncNodeOnTimeoutCb pMsg:%s "
,
serialized
);
free
(
serialized
);
cJSON_Delete
(
pJson
);
}
syncTimeoutLog2
(
"==syncNodeOnTimeoutCb=="
,
pMsg
);
if
(
pMsg
->
timeoutType
==
SYNC_TIMEOUT_PING
)
{
if
(
atomic_load_64
(
&
ths
->
pingTimerLogicClockUser
)
<=
pMsg
->
logicClock
)
{
...
...
source/libs/sync/src/syncUtil.c
浏览文件 @
5e4a49f4
...
...
@@ -74,6 +74,8 @@ bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) {
return
ret
;
}
bool
syncUtilEmptyId
(
const
SRaftId
*
pId
)
{
return
(
pId
->
addr
==
0
&&
pId
->
vgId
==
0
);
}
// ---- SSyncBuffer -----
void
syncUtilbufBuild
(
SSyncBuffer
*
syncBuf
,
size_t
len
)
{
syncBuf
->
len
=
len
;
...
...
@@ -184,4 +186,14 @@ char* syncUtilprintBin2(char* ptr, uint32_t len) {
p
+=
n
;
}
return
s
;
}
SyncIndex
syncUtilMinIndex
(
SyncIndex
a
,
SyncIndex
b
)
{
SyncIndex
r
=
a
<
b
?
a
:
b
;
return
r
;
}
SyncIndex
syncUtilMaxIndex
(
SyncIndex
a
,
SyncIndex
b
)
{
SyncIndex
r
=
a
>
b
?
a
:
b
;
return
r
;
}
\ No newline at end of file
source/libs/sync/test/CMakeLists.txt
浏览文件 @
5e4a49f4
...
...
@@ -4,8 +4,8 @@ add_executable(syncPingTimerTest "")
add_executable
(
syncIOTickQTest
""
)
add_executable
(
syncIOTickPingTest
""
)
add_executable
(
syncIOSendMsgTest
""
)
add_executable
(
syncIO
SendMsg
ClientTest
""
)
add_executable
(
syncIOSe
ndMsgSe
rverTest
""
)
add_executable
(
syncIOClientTest
""
)
add_executable
(
syncIOServerTest
""
)
add_executable
(
syncRaftStoreTest
""
)
add_executable
(
syncEnqTest
""
)
add_executable
(
syncIndexTest
""
)
...
...
@@ -51,13 +51,13 @@ target_sources(syncIOSendMsgTest
PRIVATE
"syncIOSendMsgTest.cpp"
)
target_sources
(
syncIO
SendMsg
ClientTest
target_sources
(
syncIOClientTest
PRIVATE
"syncIO
SendMsg
ClientTest.cpp"
"syncIOClientTest.cpp"
)
target_sources
(
syncIOSe
ndMsgSe
rverTest
target_sources
(
syncIOServerTest
PRIVATE
"syncIOSe
ndMsgSe
rverTest.cpp"
"syncIOServerTest.cpp"
)
target_sources
(
syncRaftStoreTest
PRIVATE
...
...
@@ -167,12 +167,12 @@ target_include_directories(syncIOSendMsgTest
"
${
CMAKE_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
syncIO
SendMsg
ClientTest
target_include_directories
(
syncIOClientTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
syncIOSe
ndMsgSe
rverTest
target_include_directories
(
syncIOServerTest
PUBLIC
"
${
CMAKE_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
...
...
@@ -298,11 +298,11 @@ target_link_libraries(syncIOSendMsgTest
sync
gtest_main
)
target_link_libraries
(
syncIO
SendMsg
ClientTest
target_link_libraries
(
syncIOClientTest
sync
gtest_main
)
target_link_libraries
(
syncIOSe
ndMsgSe
rverTest
target_link_libraries
(
syncIOServerTest
sync
gtest_main
)
...
...
source/libs/sync/test/syncEnqTest.cpp
浏览文件 @
5e4a49f4
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
...
...
@@ -14,64 +15,69 @@ void logTest() {
sFatal
(
"--- sync log test: fatal"
);
}
uint16_t
ports
[
3
]
=
{
7010
,
7110
,
7210
};
uint16_t
ports
[]
=
{
7010
,
7110
,
7210
,
7310
,
7410
};
int32_t
replicaNum
=
5
;
int32_t
myIndex
=
0
;
SSyncNode
*
doSync
(
int
myIndex
)
{
SSyncFSM
*
pFsm
;
SRaftId
ids
[
TSDB_MAX_REPLICA
];
SSyncInfo
syncInfo
;
SSyncFSM
*
pFsm
;
SSyncInfo
syncInfo
;
syncInfo
.
vgId
=
1
;
SSyncNode
*
syncNodeInit
()
{
syncInfo
.
vgId
=
1
234
;
syncInfo
.
rpcClient
=
gSyncIO
->
clientRpc
;
syncInfo
.
FpSendMsg
=
syncIOSendMsg
;
syncInfo
.
queue
=
gSyncIO
->
pMsgQ
;
syncInfo
.
FpEqMsg
=
syncIOEqMsg
;
syncInfo
.
pFsm
=
pFsm
;
snprintf
(
syncInfo
.
path
,
sizeof
(
syncInfo
.
path
),
"%s"
,
"./
test_sync_ping
"
);
snprintf
(
syncInfo
.
path
,
sizeof
(
syncInfo
.
path
),
"%s"
,
"./"
);
SSyncCfg
*
pCfg
=
&
syncInfo
.
syncCfg
;
pCfg
->
myIndex
=
myIndex
;
pCfg
->
replicaNum
=
3
;
pCfg
->
replicaNum
=
replicaNum
;
pCfg
->
nodeInfo
[
0
].
nodePort
=
ports
[
0
];
snprintf
(
pCfg
->
nodeInfo
[
0
].
nodeFqdn
,
sizeof
(
pCfg
->
nodeInfo
[
0
].
nodeFqdn
),
"%s"
,
"127.0.0.1"
);
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
pCfg
->
nodeInfo
[
1
].
nodePort
=
ports
[
1
];
snprintf
(
pCfg
->
nodeInfo
[
1
].
nodeFqdn
,
sizeof
(
pCfg
->
nodeInfo
[
1
].
nodeFqdn
),
"%s"
,
"127.0.0.1"
);
// taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn);
pCfg
->
nodeInfo
[
2
].
nodePort
=
ports
[
2
];
snprintf
(
pCfg
->
nodeInfo
[
2
].
nodeFqdn
,
sizeof
(
pCfg
->
nodeInfo
[
2
].
nodeFqdn
),
"%s"
,
"127.0.0.1"
);
// taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn);
for
(
int
i
=
0
;
i
<
replicaNum
;
++
i
)
{
pCfg
->
nodeInfo
[
i
].
nodePort
=
ports
[
i
];
snprintf
(
pCfg
->
nodeInfo
[
i
].
nodeFqdn
,
sizeof
(
pCfg
->
nodeInfo
[
i
].
nodeFqdn
),
"%s"
,
"127.0.0.1"
);
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
}
SSyncNode
*
pSyncNode
=
syncNodeOpen
(
&
syncInfo
);
assert
(
pSyncNode
!=
NULL
);
gSyncIO
->
FpOnSyncPing
=
pSyncNode
->
FpOnPing
;
gSyncIO
->
FpOnSyncPingReply
=
pSyncNode
->
FpOnPingReply
;
gSyncIO
->
FpOnSyncRequestVote
=
pSyncNode
->
FpOnRequestVote
;
gSyncIO
->
FpOnSyncRequestVoteReply
=
pSyncNode
->
FpOnRequestVoteReply
;
gSyncIO
->
FpOnSyncAppendEntries
=
pSyncNode
->
FpOnAppendEntries
;
gSyncIO
->
FpOnSyncAppendEntriesReply
=
pSyncNode
->
FpOnAppendEntriesReply
;
gSyncIO
->
FpOnSyncPing
=
pSyncNode
->
FpOnPing
;
gSyncIO
->
FpOnSyncPingReply
=
pSyncNode
->
FpOnPingReply
;
gSyncIO
->
FpOnSyncTimeout
=
pSyncNode
->
FpOnTimeout
;
gSyncIO
->
pSyncNode
=
pSyncNode
;
return
pSyncNode
;
}
void
timerPingAll
(
void
*
param
,
void
*
tmrId
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
param
;
syncNodePingAll
(
pSyncNode
);
SSyncNode
*
syncInitTest
()
{
return
syncNodeInit
();
}
void
initRaftId
(
SSyncNode
*
pSyncNode
)
{
for
(
int
i
=
0
;
i
<
replicaNum
;
++
i
)
{
ids
[
i
]
=
pSyncNode
->
replicasId
[
i
];
char
*
s
=
syncUtilRaftId2Str
(
&
ids
[
i
]);
printf
(
"raftId[%d] : %s
\n
"
,
i
,
s
);
free
(
s
);
}
}
int
main
(
int
argc
,
char
**
argv
)
{
// taosInitLog((char
*)"syncPing
Test.log", 100000, 10);
// taosInitLog((char
*)"sync
Test.log", 100000, 10);
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
logTest
();
int
myIndex
=
0
;
myIndex
=
0
;
if
(
argc
>=
2
)
{
myIndex
=
atoi
(
argv
[
1
]);
if
(
myIndex
>
2
||
myIndex
<
0
)
{
fprintf
(
stderr
,
"myIndex:%d error. should be 0 - 2"
,
myIndex
);
return
1
;
}
}
int32_t
ret
=
syncIOStart
((
char
*
)
"127.0.0.1"
,
ports
[
myIndex
]);
...
...
@@ -80,21 +86,22 @@ int main(int argc, char** argv) {
ret
=
syncEnvStart
();
assert
(
ret
==
0
);
SSyncNode
*
pSyncNode
=
doSync
(
myIndex
);
gSyncIO
->
FpOnSyncPing
=
pSyncNode
->
FpOnPing
;
gSyncIO
->
FpOnSyncPingReply
=
pSyncNode
->
FpOnPingReply
;
SSyncNode
*
pSyncNode
=
syncInitTest
();
assert
(
pSyncNode
!=
NULL
);
syncNodePrint2
((
char
*
)
"syncInitTest"
,
pSyncNode
);
initRaftId
(
pSyncNode
);
//--------------------------------------------------------------
for
(
int
i
=
0
;
i
<
10
;
++
i
)
{
SyncPingReply
*
pSyncMsg
=
syncPingReplyBuild
3
(
&
pSyncNode
->
myRaftId
,
&
pSyncNode
->
myRaftId
);
SyncPingReply
*
pSyncMsg
=
syncPingReplyBuild
2
(
&
pSyncNode
->
myRaftId
,
&
pSyncNode
->
myRaftId
,
"syncEnqTest"
);
SRpcMsg
rpcMsg
;
syncPingReply2RpcMsg
(
pSyncMsg
,
&
rpcMsg
);
pSyncNode
->
FpEqMsg
(
pSyncNode
->
queue
,
&
rpcMsg
);
taosMsleep
(
1000
);
}
while
(
1
)
{
taosMsleep
(
1000
);
}
return
0
;
}
source/libs/sync/test/syncEnvTest.cpp
浏览文件 @
5e4a49f4
...
...
@@ -14,15 +14,6 @@ void logTest() {
sFatal
(
"--- sync log test: fatal"
);
}
void
*
pTimer
=
NULL
;
void
*
pTimerMgr
=
NULL
;
int
g
=
300
;
static
void
timerFp
(
void
*
param
,
void
*
tmrId
)
{
printf
(
"param:%p, tmrId:%p, pTimer:%p, pTimerMgr:%p
\n
"
,
param
,
tmrId
,
pTimer
,
pTimerMgr
);
taosTmrReset
(
timerFp
,
1000
,
param
,
pTimerMgr
,
&
pTimer
);
}
int
main
()
{
// taosInitLog((char*)"syncEnvTest.log", 100000, 10);
tsAsyncLog
=
0
;
...
...
@@ -34,13 +25,20 @@ int main() {
ret
=
syncEnvStart
();
assert
(
ret
==
0
);
// timer
pTimerMgr
=
taosTmrInit
(
1000
,
50
,
10000
,
"SYNC-ENV-TEST"
);
taosTmrStart
(
timerFp
,
1000
,
&
g
,
pTimerMgr
);
for
(
int
i
=
0
;
i
<
5
;
++
i
)
{
ret
=
syncEnvStartTimer
();
assert
(
ret
==
0
);
taosMsleep
(
5000
);
while
(
1
)
{
taosMsleep
(
1000
);
ret
=
syncEnvStopTimer
();
assert
(
ret
==
0
);
taosMsleep
(
5000
);
}
ret
=
syncEnvStop
();
assert
(
ret
==
0
);
return
0
;
}
source/libs/sync/test/syncIO
SendMsg
ClientTest.cpp
→
source/libs/sync/test/syncIOClientTest.cpp
浏览文件 @
5e4a49f4
...
...
@@ -2,7 +2,8 @@
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncMessage.h"
#include "syncUtil.h"
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
...
...
@@ -22,7 +23,7 @@ int main() {
int32_t
ret
;
ret
=
syncIOStart
((
char
*
)
"127.0.0.1"
,
7010
);
ret
=
syncIOStart
((
char
*
)
"127.0.0.1"
,
7010
);
assert
(
ret
==
0
);
for
(
int
i
=
0
;
i
<
10
;
++
i
)
{
...
...
@@ -31,18 +32,17 @@ int main() {
epSet
.
numOfEps
=
0
;
addEpIntoEpSet
(
&
epSet
,
"127.0.0.1"
,
7030
);
SRpcMsg
rpcMsg
;
rpcMsg
.
contLen
=
64
;
rpcMsg
.
pCont
=
rpcMallocCont
(
rpcMsg
.
contLen
);
snprintf
((
char
*
)
rpcMsg
.
pCont
,
rpcMsg
.
contLen
,
"%s"
,
"syncIOSendMsgTest"
);
rpcMsg
.
handle
=
NULL
;
rpcMsg
.
msgType
=
77
;
SRaftId
srcId
,
destId
;
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1234
);
srcId
.
vgId
=
100
;
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
5678
);
destId
.
vgId
=
100
;
syncIOSendMsg
(
gSyncIO
->
clientRpc
,
&
epSet
,
&
rpcMsg
);
taosSsleep
(
1
)
;
}
SyncPingReply
*
pSyncMsg
=
syncPingReplyBuild2
(
&
srcId
,
&
destId
,
"syncIOClientTest"
);
SRpcMsg
rpcMsg
;
syncPingReply2RpcMsg
(
pSyncMsg
,
&
rpcMsg
);
while
(
1
)
{
syncIOSendMsg
(
gSyncIO
->
clientRpc
,
&
epSet
,
&
rpcMsg
);
taosSsleep
(
1
);
}
...
...
source/libs/sync/test/syncIOSendMsgTest.cpp
浏览文件 @
5e4a49f4
#include <gtest/gtest.h>
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
...
...
@@ -13,37 +15,96 @@ void logTest() {
sFatal
(
"--- sync log test: fatal"
);
}
int
main
()
{
uint16_t
ports
[]
=
{
7010
,
7110
,
7210
,
7310
,
7410
};
int32_t
replicaNum
=
5
;
int32_t
myIndex
=
0
;
SRaftId
ids
[
TSDB_MAX_REPLICA
];
SSyncInfo
syncInfo
;
SSyncFSM
*
pFsm
;
SSyncNode
*
syncNodeInit
()
{
syncInfo
.
vgId
=
1234
;
syncInfo
.
rpcClient
=
gSyncIO
->
clientRpc
;
syncInfo
.
FpSendMsg
=
syncIOSendMsg
;
syncInfo
.
queue
=
gSyncIO
->
pMsgQ
;
syncInfo
.
FpEqMsg
=
syncIOEqMsg
;
syncInfo
.
pFsm
=
pFsm
;
snprintf
(
syncInfo
.
path
,
sizeof
(
syncInfo
.
path
),
"%s"
,
"./"
);
SSyncCfg
*
pCfg
=
&
syncInfo
.
syncCfg
;
pCfg
->
myIndex
=
myIndex
;
pCfg
->
replicaNum
=
replicaNum
;
for
(
int
i
=
0
;
i
<
replicaNum
;
++
i
)
{
pCfg
->
nodeInfo
[
i
].
nodePort
=
ports
[
i
];
snprintf
(
pCfg
->
nodeInfo
[
i
].
nodeFqdn
,
sizeof
(
pCfg
->
nodeInfo
[
i
].
nodeFqdn
),
"%s"
,
"127.0.0.1"
);
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
}
SSyncNode
*
pSyncNode
=
syncNodeOpen
(
&
syncInfo
);
assert
(
pSyncNode
!=
NULL
);
gSyncIO
->
FpOnSyncPing
=
pSyncNode
->
FpOnPing
;
gSyncIO
->
FpOnSyncPingReply
=
pSyncNode
->
FpOnPingReply
;
gSyncIO
->
FpOnSyncRequestVote
=
pSyncNode
->
FpOnRequestVote
;
gSyncIO
->
FpOnSyncRequestVoteReply
=
pSyncNode
->
FpOnRequestVoteReply
;
gSyncIO
->
FpOnSyncAppendEntries
=
pSyncNode
->
FpOnAppendEntries
;
gSyncIO
->
FpOnSyncAppendEntriesReply
=
pSyncNode
->
FpOnAppendEntriesReply
;
gSyncIO
->
FpOnSyncPing
=
pSyncNode
->
FpOnPing
;
gSyncIO
->
FpOnSyncPingReply
=
pSyncNode
->
FpOnPingReply
;
gSyncIO
->
FpOnSyncTimeout
=
pSyncNode
->
FpOnTimeout
;
gSyncIO
->
pSyncNode
=
pSyncNode
;
return
pSyncNode
;
}
SSyncNode
*
syncInitTest
()
{
return
syncNodeInit
();
}
void
initRaftId
(
SSyncNode
*
pSyncNode
)
{
for
(
int
i
=
0
;
i
<
replicaNum
;
++
i
)
{
ids
[
i
]
=
pSyncNode
->
replicasId
[
i
];
char
*
s
=
syncUtilRaftId2Str
(
&
ids
[
i
]);
printf
(
"raftId[%d] : %s
\n
"
,
i
,
s
);
free
(
s
);
}
}
int
main
(
int
argc
,
char
**
argv
)
{
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
logTest
();
myIndex
=
0
;
if
(
argc
>=
2
)
{
myIndex
=
atoi
(
argv
[
1
]);
}
int32_t
ret
;
int32_t
ret
=
syncIOStart
((
char
*
)
"127.0.0.1"
,
ports
[
myIndex
]);
assert
(
ret
==
0
);
ret
=
sync
IOStart
((
char
*
)
"127.0.0.1"
,
7010
);
ret
=
sync
EnvStart
(
);
assert
(
ret
==
0
);
SSyncNode
*
pSyncNode
=
syncInitTest
();
assert
(
pSyncNode
!=
NULL
);
syncNodePrint2
((
char
*
)
"syncInitTest"
,
pSyncNode
);
initRaftId
(
pSyncNode
);
//--------------------------------------------------------------
for
(
int
i
=
0
;
i
<
10
;
++
i
)
{
SyncPingReply
*
pSyncMsg
=
syncPingReplyBuild2
(
&
pSyncNode
->
myRaftId
,
&
pSyncNode
->
myRaftId
,
"syncIOSendMsgTest"
);
SRpcMsg
rpcMsg
;
syncPingReply2RpcMsg
(
pSyncMsg
,
&
rpcMsg
);
SEpSet
epSet
;
epSet
.
inUse
=
0
;
epSet
.
numOfEps
=
0
;
addEpIntoEpSet
(
&
epSet
,
"127.0.0.1"
,
7010
);
SRpcMsg
rpcMsg
;
rpcMsg
.
contLen
=
64
;
rpcMsg
.
pCont
=
rpcMallocCont
(
rpcMsg
.
contLen
);
snprintf
((
char
*
)
rpcMsg
.
pCont
,
rpcMsg
.
contLen
,
"%s"
,
"syncIOSendMsgTest"
);
rpcMsg
.
handle
=
NULL
;
rpcMsg
.
msgType
=
77
;
syncIOSendMsg
(
gSyncIO
->
clientRpc
,
&
epSet
,
&
rpcMsg
);
taosSsleep
(
1
);
}
syncUtilnodeInfo2EpSet
(
&
pSyncNode
->
myNodeInfo
,
&
epSet
);
pSyncNode
->
FpSendMsg
(
pSyncNode
->
rpcClient
,
&
epSet
,
&
rpcMsg
);
while
(
1
)
{
taosSsleep
(
1
);
taosMsleep
(
1000
);
}
return
0
;
...
...
source/libs/sync/test/syncIOSe
ndMsgSe
rverTest.cpp
→
source/libs/sync/test/syncIOServerTest.cpp
浏览文件 @
5e4a49f4
文件已移动
source/libs/sync/test/syncIOTickPingTest.cpp
浏览文件 @
5e4a49f4
...
...
@@ -25,8 +25,15 @@ int main() {
ret
=
syncIOStart
((
char
*
)
"127.0.0.1"
,
7010
);
assert
(
ret
==
0
);
ret
=
syncIOTickPing
();
assert
(
ret
==
0
);
for
(
int
i
=
0
;
i
<
3
;
++
i
)
{
ret
=
syncIOPingTimerStart
();
assert
(
ret
==
0
);
taosMsleep
(
5000
);
ret
=
syncIOPingTimerStop
();
assert
(
ret
==
0
);
taosMsleep
(
5000
);
}
while
(
1
)
{
taosSsleep
(
1
);
...
...
source/libs/sync/test/syncIOTickQTest.cpp
浏览文件 @
5e4a49f4
...
...
@@ -25,11 +25,18 @@ int main() {
ret
=
syncIOStart
((
char
*
)
"127.0.0.1"
,
7010
);
assert
(
ret
==
0
);
ret
=
syncIOTickQ
();
assert
(
ret
==
0
);
for
(
int
i
=
0
;
i
<
3
;
++
i
)
{
ret
=
syncIOQTimerStart
();
assert
(
ret
==
0
);
taosMsleep
(
5000
);
while
(
1
)
{
taosSsleep
(
1
);
ret
=
syncIOQTimerStop
();
assert
(
ret
==
0
);
taosMsleep
(
5000
);
}
ret
=
syncIOStop
();
assert
(
ret
==
0
);
return
0
;
}
source/libs/sync/test/syncLogStoreTest.cpp
浏览文件 @
5e4a49f4
...
...
@@ -81,7 +81,10 @@ SSyncNode* syncNodeInit() {
SSyncNode
*
syncInitTest
()
{
return
syncNodeInit
();
}
void
logStoreTest
()
{
logStorePrint
(
pSyncNode
->
pLogStore
);
logStorePrint2
((
char
*
)
"logStoreTest2"
,
pSyncNode
->
pLogStore
);
assert
(
pSyncNode
->
pLogStore
->
getLastIndex
(
pSyncNode
->
pLogStore
)
==
SYNC_INDEX_INVALID
);
for
(
int
i
=
0
;
i
<
5
;
++
i
)
{
int32_t
dataLen
=
10
;
SSyncRaftEntry
*
pEntry
=
syncEntryBuild
(
dataLen
);
...
...
@@ -97,6 +100,10 @@ void logStoreTest() {
// syncEntryPrint2((char*)"write entry:", pEntry);
pSyncNode
->
pLogStore
->
appendEntry
(
pSyncNode
->
pLogStore
,
pEntry
);
syncEntryDestory
(
pEntry
);
if
(
i
==
0
)
{
assert
(
pSyncNode
->
pLogStore
->
getLastIndex
(
pSyncNode
->
pLogStore
)
==
SYNC_INDEX_BEGIN
);
}
}
logStorePrint
(
pSyncNode
->
pLogStore
);
...
...
@@ -129,6 +136,8 @@ int main(int argc, char** argv) {
ret
=
syncEnvStart
();
assert
(
ret
==
0
);
taosRemoveDir
(
"./wal_test"
);
pSyncNode
=
syncInitTest
();
assert
(
pSyncNode
!=
NULL
);
...
...
source/libs/sync/test/syncRequestVoteTest.cpp
浏览文件 @
5e4a49f4
...
...
@@ -20,7 +20,7 @@ SyncRequestVote *createMsg() {
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
5678
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
currentT
erm
=
11
;
pMsg
->
t
erm
=
11
;
pMsg
->
lastLogIndex
=
22
;
pMsg
->
lastLogTerm
=
33
;
return
pMsg
;
...
...
source/libs/sync/test/syncRpcMsgTest.cpp
浏览文件 @
5e4a49f4
...
...
@@ -57,7 +57,7 @@ SyncRequestVote *createSyncRequestVote() {
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
5678
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
currentT
erm
=
11
;
pMsg
->
t
erm
=
11
;
pMsg
->
lastLogIndex
=
22
;
pMsg
->
lastLogTerm
=
33
;
return
pMsg
;
...
...
@@ -100,7 +100,7 @@ SyncAppendEntriesReply *createSyncAppendEntriesReply() {
void
test1
()
{
SyncTimeout
*
pMsg
=
createSyncTimeout
();
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
;
syncTimeout2RpcMsg
(
pMsg
,
&
rpcMsg
);
syncRpcMsgPrint2
((
char
*
)
"test1"
,
&
rpcMsg
);
syncTimeoutDestroy
(
pMsg
);
...
...
@@ -108,7 +108,7 @@ void test1() {
void
test2
()
{
SyncPing
*
pMsg
=
createSyncPing
();
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
;
syncPing2RpcMsg
(
pMsg
,
&
rpcMsg
);
syncRpcMsgPrint2
((
char
*
)
"test2"
,
&
rpcMsg
);
syncPingDestroy
(
pMsg
);
...
...
@@ -116,7 +116,7 @@ void test2() {
void
test3
()
{
SyncPingReply
*
pMsg
=
createSyncPingReply
();
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
;
syncPingReply2RpcMsg
(
pMsg
,
&
rpcMsg
);
syncRpcMsgPrint2
((
char
*
)
"test3"
,
&
rpcMsg
);
syncPingReplyDestroy
(
pMsg
);
...
...
@@ -132,7 +132,7 @@ void test4() {
void
test5
()
{
SyncRequestVoteReply
*
pMsg
=
createSyncRequestVoteReply
();
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
;
syncRequestVoteReply2RpcMsg
(
pMsg
,
&
rpcMsg
);
syncRpcMsgPrint2
((
char
*
)
"test5"
,
&
rpcMsg
);
syncRequestVoteReplyDestroy
(
pMsg
);
...
...
@@ -140,7 +140,7 @@ void test5() {
void
test6
()
{
SyncAppendEntries
*
pMsg
=
createSyncAppendEntries
();
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
;
syncAppendEntries2RpcMsg
(
pMsg
,
&
rpcMsg
);
syncRpcMsgPrint2
((
char
*
)
"test6"
,
&
rpcMsg
);
syncAppendEntriesDestroy
(
pMsg
);
...
...
@@ -148,7 +148,7 @@ void test6() {
void
test7
()
{
SyncAppendEntriesReply
*
pMsg
=
createSyncAppendEntriesReply
();
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
;
syncAppendEntriesReply2RpcMsg
(
pMsg
,
&
rpcMsg
);
syncRpcMsgPrint2
((
char
*
)
"test7"
,
&
rpcMsg
);
syncAppendEntriesReplyDestroy
(
pMsg
);
...
...
@@ -156,7 +156,7 @@ void test7() {
void
test8
()
{
SyncClientRequest
*
pMsg
=
createSyncClientRequest
();
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
;
syncClientRequest2RpcMsg
(
pMsg
,
&
rpcMsg
);
syncRpcMsgPrint2
((
char
*
)
"test8"
,
&
rpcMsg
);
syncClientRequestDestroy
(
pMsg
);
...
...
source/libs/sync/test/syncUtilTest.cpp
浏览文件 @
5e4a49f4
...
...
@@ -26,6 +26,7 @@ int main() {
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
logTest
();
electRandomMSTest
();
return
0
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录