Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ba0b06fd
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
ba0b06fd
编写于
8月 17, 2022
作者:
H
Hui Li
提交者:
GitHub
8月 17, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #16185 from taosdata/feature/3.0_mhli
refactor(sync): add syncNodeDynamicQuorum
上级
7d7997e3
2ca5bdc7
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
295 addition
and
16 deletion
+295
-16
include/libs/sync/sync.h
include/libs/sync/sync.h
+8
-5
include/libs/sync/syncTools.h
include/libs/sync/syncTools.h
+1
-0
source/libs/sync/inc/syncIndexMgr.h
source/libs/sync/inc/syncIndexMgr.h
+13
-4
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+2
-0
source/libs/sync/inc/syncReplication.h
source/libs/sync/inc/syncReplication.h
+2
-0
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+9
-0
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+12
-0
source/libs/sync/src/syncCommit.c
source/libs/sync/src/syncCommit.c
+58
-0
source/libs/sync/src/syncIndexMgr.c
source/libs/sync/src/syncIndexMgr.c
+64
-2
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+6
-5
source/libs/sync/src/syncMessage.c
source/libs/sync/src/syncMessage.c
+2
-0
source/libs/sync/src/syncReplication.c
source/libs/sync/src/syncReplication.c
+115
-0
source/libs/sync/test/syncAppendEntriesReplyTest.cpp
source/libs/sync/test/syncAppendEntriesReplyTest.cpp
+3
-0
未找到文件。
include/libs/sync/sync.h
浏览文件 @
ba0b06fd
...
@@ -26,11 +26,14 @@ extern "C" {
...
@@ -26,11 +26,14 @@ extern "C" {
extern
bool
gRaftDetailLog
;
extern
bool
gRaftDetailLog
;
#define SYNC_RESP_TTL_MS 10000000
#define SYNC_RESP_TTL_MS 10000000
#define SYNC_SPEED_UP_HB_TIMER 400
#define SYNC_SPEED_UP_HB_TIMER 400
#define SYNC_SPEED_UP_AFTER_MS (1000 * 20)
#define SYNC_SPEED_UP_AFTER_MS (1000 * 20)
#define SYNC_SLOW_DOWN_RANGE 100
#define SYNC_SLOW_DOWN_RANGE 100
#define SYNC_MAX_READ_RANGE 10
#define SYNC_MAX_READ_RANGE 2
#define SYNC_MAX_PROGRESS_WAIT_MS 4000
#define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20)
#define SYNC_MAX_RECV_TIME_RANGE_MS 1000
#define SYNC_MAX_BATCH_SIZE 1
#define SYNC_MAX_BATCH_SIZE 1
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_BEGIN 0
...
...
include/libs/sync/syncTools.h
浏览文件 @
ba0b06fd
...
@@ -423,6 +423,7 @@ typedef struct SyncAppendEntriesReply {
...
@@ -423,6 +423,7 @@ typedef struct SyncAppendEntriesReply {
SyncTerm
privateTerm
;
SyncTerm
privateTerm
;
bool
success
;
bool
success
;
SyncIndex
matchIndex
;
SyncIndex
matchIndex
;
int64_t
startTime
;
}
SyncAppendEntriesReply
;
}
SyncAppendEntriesReply
;
SyncAppendEntriesReply
*
syncAppendEntriesReplyBuild
(
int32_t
vgId
);
SyncAppendEntriesReply
*
syncAppendEntriesReplyBuild
(
int32_t
vgId
);
...
...
source/libs/sync/inc/syncIndexMgr.h
浏览文件 @
ba0b06fd
...
@@ -29,8 +29,12 @@ extern "C" {
...
@@ -29,8 +29,12 @@ extern "C" {
// SIndexMgr -----------------------------
// SIndexMgr -----------------------------
typedef
struct
SSyncIndexMgr
{
typedef
struct
SSyncIndexMgr
{
SRaftId
(
*
replicas
)[
TSDB_MAX_REPLICA
];
SRaftId
(
*
replicas
)[
TSDB_MAX_REPLICA
];
SyncIndex
index
[
TSDB_MAX_REPLICA
];
SyncIndex
index
[
TSDB_MAX_REPLICA
];
SyncTerm
privateTerm
[
TSDB_MAX_REPLICA
];
// for advanced function
SyncTerm
privateTerm
[
TSDB_MAX_REPLICA
];
// for advanced function
int64_t
startTimeArr
[
TSDB_MAX_REPLICA
];
int64_t
recvTimeArr
[
TSDB_MAX_REPLICA
];
int32_t
replicaNum
;
int32_t
replicaNum
;
SSyncNode
*
pSyncNode
;
SSyncNode
*
pSyncNode
;
}
SSyncIndexMgr
;
}
SSyncIndexMgr
;
...
@@ -41,8 +45,13 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr);
...
@@ -41,8 +45,13 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr);
void
syncIndexMgrClear
(
SSyncIndexMgr
*
pSyncIndexMgr
);
void
syncIndexMgrClear
(
SSyncIndexMgr
*
pSyncIndexMgr
);
void
syncIndexMgrSetIndex
(
SSyncIndexMgr
*
pSyncIndexMgr
,
const
SRaftId
*
pRaftId
,
SyncIndex
index
);
void
syncIndexMgrSetIndex
(
SSyncIndexMgr
*
pSyncIndexMgr
,
const
SRaftId
*
pRaftId
,
SyncIndex
index
);
SyncIndex
syncIndexMgrGetIndex
(
SSyncIndexMgr
*
pSyncIndexMgr
,
const
SRaftId
*
pRaftId
);
SyncIndex
syncIndexMgrGetIndex
(
SSyncIndexMgr
*
pSyncIndexMgr
,
const
SRaftId
*
pRaftId
);
cJSON
*
syncIndexMgr2Json
(
SSyncIndexMgr
*
pSyncIndexMgr
);
cJSON
*
syncIndexMgr2Json
(
SSyncIndexMgr
*
pSyncIndexMgr
);
char
*
syncIndexMgr2Str
(
SSyncIndexMgr
*
pSyncIndexMgr
);
char
*
syncIndexMgr2Str
(
SSyncIndexMgr
*
pSyncIndexMgr
);
void
syncIndexMgrSetStartTime
(
SSyncIndexMgr
*
pSyncIndexMgr
,
const
SRaftId
*
pRaftId
,
int64_t
startTime
);
int64_t
syncIndexMgrGetStartTime
(
SSyncIndexMgr
*
pSyncIndexMgr
,
const
SRaftId
*
pRaftId
);
void
syncIndexMgrSetRecvTime
(
SSyncIndexMgr
*
pSyncIndexMgr
,
const
SRaftId
*
pRaftId
,
int64_t
recvTime
);
int64_t
syncIndexMgrGetRecvTime
(
SSyncIndexMgr
*
pSyncIndexMgr
,
const
SRaftId
*
pRaftId
);
// void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term);
// void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term);
// SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
// SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
ba0b06fd
...
@@ -269,6 +269,8 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
...
@@ -269,6 +269,8 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
int32_t
syncNodeLeaderTransferTo
(
SSyncNode
*
pSyncNode
,
SNodeInfo
newLeader
);
int32_t
syncNodeLeaderTransferTo
(
SSyncNode
*
pSyncNode
,
SNodeInfo
newLeader
);
int32_t
syncDoLeaderTransfer
(
SSyncNode
*
ths
,
SRpcMsg
*
pRpcMsg
,
SSyncRaftEntry
*
pEntry
);
int32_t
syncDoLeaderTransfer
(
SSyncNode
*
ths
,
SRpcMsg
*
pRpcMsg
,
SSyncRaftEntry
*
pEntry
);
int32_t
syncNodeDynamicQuorum
(
const
SSyncNode
*
pSyncNode
);
// trace log
// trace log
void
syncLogSendRequestVote
(
SSyncNode
*
pSyncNode
,
const
SyncRequestVote
*
pMsg
,
const
char
*
s
);
void
syncLogSendRequestVote
(
SSyncNode
*
pSyncNode
,
const
SyncRequestVote
*
pMsg
,
const
char
*
s
);
void
syncLogRecvRequestVote
(
SSyncNode
*
pSyncNode
,
const
SyncRequestVote
*
pMsg
,
const
char
*
s
);
void
syncLogRecvRequestVote
(
SSyncNode
*
pSyncNode
,
const
SyncRequestVote
*
pMsg
,
const
char
*
s
);
...
...
source/libs/sync/inc/syncReplication.h
浏览文件 @
ba0b06fd
...
@@ -55,6 +55,8 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
...
@@ -55,6 +55,8 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
int32_t
syncNodeAppendEntriesPeersSnapshot
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeAppendEntriesPeersSnapshot
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeAppendEntriesPeersSnapshot2
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeAppendEntriesPeersSnapshot2
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeAppendEntriesOnePeer
(
SSyncNode
*
pSyncNode
,
SRaftId
*
pDestId
,
SyncIndex
nextIndex
);
int32_t
syncNodeReplicate
(
SSyncNode
*
pSyncNode
,
bool
isTimer
);
int32_t
syncNodeReplicate
(
SSyncNode
*
pSyncNode
,
bool
isTimer
);
int32_t
syncNodeAppendEntries
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
const
SyncAppendEntries
*
pMsg
);
int32_t
syncNodeAppendEntries
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
const
SyncAppendEntries
*
pMsg
);
int32_t
syncNodeAppendEntriesBatch
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
const
SyncAppendEntriesBatch
*
pMsg
);
int32_t
syncNodeAppendEntriesBatch
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
const
SyncAppendEntriesBatch
*
pMsg
);
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
ba0b06fd
...
@@ -148,6 +148,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
...
@@ -148,6 +148,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
pReply
->
term
=
ths
->
pRaftStore
->
currentTerm
;
pReply
->
term
=
ths
->
pRaftStore
->
currentTerm
;
pReply
->
success
=
false
;
pReply
->
success
=
false
;
pReply
->
matchIndex
=
SYNC_INDEX_INVALID
;
pReply
->
matchIndex
=
SYNC_INDEX_INVALID
;
pReply
->
startTime
=
ths
->
startTime
;
// msg event log
// msg event log
syncLogSendAppendEntriesReply
(
ths
,
pReply
,
""
);
syncLogSendAppendEntriesReply
(
ths
,
pReply
,
""
);
...
@@ -290,6 +291,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
...
@@ -290,6 +291,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
pReply
->
matchIndex
=
pMsg
->
prevLogIndex
;
pReply
->
matchIndex
=
pMsg
->
prevLogIndex
;
}
}
pReply
->
startTime
=
ths
->
startTime
;
// msg event log
// msg event log
syncLogSendAppendEntriesReply
(
ths
,
pReply
,
""
);
syncLogSendAppendEntriesReply
(
ths
,
pReply
,
""
);
...
@@ -603,6 +606,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
...
@@ -603,6 +606,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
pReply
->
privateTerm
=
ths
->
pNewNodeReceiver
->
privateTerm
;
pReply
->
privateTerm
=
ths
->
pNewNodeReceiver
->
privateTerm
;
pReply
->
success
=
true
;
pReply
->
success
=
true
;
pReply
->
matchIndex
=
matchIndex
;
pReply
->
matchIndex
=
matchIndex
;
pReply
->
startTime
=
ths
->
startTime
;
// msg event log
// msg event log
syncLogSendAppendEntriesReply
(
ths
,
pReply
,
""
);
syncLogSendAppendEntriesReply
(
ths
,
pReply
,
""
);
...
@@ -651,6 +655,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
...
@@ -651,6 +655,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
pReply
->
privateTerm
=
ths
->
pNewNodeReceiver
->
privateTerm
;
pReply
->
privateTerm
=
ths
->
pNewNodeReceiver
->
privateTerm
;
pReply
->
success
=
false
;
pReply
->
success
=
false
;
pReply
->
matchIndex
=
ths
->
commitIndex
;
pReply
->
matchIndex
=
ths
->
commitIndex
;
pReply
->
startTime
=
ths
->
startTime
;
// msg event log
// msg event log
syncLogSendAppendEntriesReply
(
ths
,
pReply
,
""
);
syncLogSendAppendEntriesReply
(
ths
,
pReply
,
""
);
...
@@ -729,6 +734,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
...
@@ -729,6 +734,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
pReply
->
privateTerm
=
ths
->
pNewNodeReceiver
->
privateTerm
;
pReply
->
privateTerm
=
ths
->
pNewNodeReceiver
->
privateTerm
;
pReply
->
success
=
true
;
pReply
->
success
=
true
;
pReply
->
matchIndex
=
hasAppendEntries
?
pMsg
->
prevLogIndex
+
pMsg
->
dataCount
:
pMsg
->
prevLogIndex
;
pReply
->
matchIndex
=
hasAppendEntries
?
pMsg
->
prevLogIndex
+
pMsg
->
dataCount
:
pMsg
->
prevLogIndex
;
pReply
->
startTime
=
ths
->
startTime
;
// msg event log
// msg event log
syncLogSendAppendEntriesReply
(
ths
,
pReply
,
""
);
syncLogSendAppendEntriesReply
(
ths
,
pReply
,
""
);
...
@@ -874,6 +880,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
...
@@ -874,6 +880,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
pReply
->
privateTerm
=
ths
->
pNewNodeReceiver
->
privateTerm
;
pReply
->
privateTerm
=
ths
->
pNewNodeReceiver
->
privateTerm
;
pReply
->
success
=
true
;
pReply
->
success
=
true
;
pReply
->
matchIndex
=
matchIndex
;
pReply
->
matchIndex
=
matchIndex
;
pReply
->
startTime
=
ths
->
startTime
;
// msg event log
// msg event log
syncLogSendAppendEntriesReply
(
ths
,
pReply
,
""
);
syncLogSendAppendEntriesReply
(
ths
,
pReply
,
""
);
...
@@ -919,6 +926,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
...
@@ -919,6 +926,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
pReply
->
privateTerm
=
ths
->
pNewNodeReceiver
->
privateTerm
;
pReply
->
privateTerm
=
ths
->
pNewNodeReceiver
->
privateTerm
;
pReply
->
success
=
false
;
pReply
->
success
=
false
;
pReply
->
matchIndex
=
SYNC_INDEX_INVALID
;
pReply
->
matchIndex
=
SYNC_INDEX_INVALID
;
pReply
->
startTime
=
ths
->
startTime
;
// msg event log
// msg event log
syncLogSendAppendEntriesReply
(
ths
,
pReply
,
""
);
syncLogSendAppendEntriesReply
(
ths
,
pReply
,
""
);
...
@@ -984,6 +992,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
...
@@ -984,6 +992,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
pReply
->
privateTerm
=
ths
->
pNewNodeReceiver
->
privateTerm
;
pReply
->
privateTerm
=
ths
->
pNewNodeReceiver
->
privateTerm
;
pReply
->
success
=
true
;
pReply
->
success
=
true
;
pReply
->
matchIndex
=
hasAppendEntries
?
pMsg
->
prevLogIndex
+
1
:
pMsg
->
prevLogIndex
;
pReply
->
matchIndex
=
hasAppendEntries
?
pMsg
->
prevLogIndex
+
1
:
pMsg
->
prevLogIndex
;
pReply
->
startTime
=
ths
->
startTime
;
// msg event log
// msg event log
syncLogSendAppendEntriesReply
(
ths
,
pReply
,
""
);
syncLogSendAppendEntriesReply
(
ths
,
pReply
,
""
);
...
...
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
ba0b06fd
...
@@ -64,6 +64,10 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
...
@@ -64,6 +64,10 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
ASSERT
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
);
ASSERT
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
);
// update time
syncIndexMgrSetStartTime
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
),
pMsg
->
startTime
);
syncIndexMgrSetRecvTime
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
),
taosGetTimestampMs
());
SyncIndex
beforeNextIndex
=
syncIndexMgrGetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
));
SyncIndex
beforeNextIndex
=
syncIndexMgrGetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
));
SyncIndex
beforeMatchIndex
=
syncIndexMgrGetIndex
(
ths
->
pMatchIndex
,
&
(
pMsg
->
srcId
));
SyncIndex
beforeMatchIndex
=
syncIndexMgrGetIndex
(
ths
->
pMatchIndex
,
&
(
pMsg
->
srcId
));
...
@@ -170,6 +174,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
...
@@ -170,6 +174,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
ASSERT
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
);
ASSERT
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
);
// update time
syncIndexMgrSetStartTime
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
),
pMsg
->
startTime
);
syncIndexMgrSetRecvTime
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
),
taosGetTimestampMs
());
SyncIndex
beforeNextIndex
=
syncIndexMgrGetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
));
SyncIndex
beforeNextIndex
=
syncIndexMgrGetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
));
SyncIndex
beforeMatchIndex
=
syncIndexMgrGetIndex
(
ths
->
pMatchIndex
,
&
(
pMsg
->
srcId
));
SyncIndex
beforeMatchIndex
=
syncIndexMgrGetIndex
(
ths
->
pMatchIndex
,
&
(
pMsg
->
srcId
));
...
@@ -330,6 +338,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
...
@@ -330,6 +338,10 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
ASSERT
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
);
ASSERT
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
);
// update time
syncIndexMgrSetStartTime
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
),
pMsg
->
startTime
);
syncIndexMgrSetRecvTime
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
),
taosGetTimestampMs
());
SyncIndex
beforeNextIndex
=
syncIndexMgrGetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
));
SyncIndex
beforeNextIndex
=
syncIndexMgrGetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
));
SyncIndex
beforeMatchIndex
=
syncIndexMgrGetIndex
(
ths
->
pMatchIndex
,
&
(
pMsg
->
srcId
));
SyncIndex
beforeMatchIndex
=
syncIndexMgrGetIndex
(
ths
->
pMatchIndex
,
&
(
pMsg
->
srcId
));
...
...
source/libs/sync/src/syncCommit.c
浏览文件 @
ba0b06fd
...
@@ -133,6 +133,63 @@ bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) {
...
@@ -133,6 +133,63 @@ bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index) {
return
false
;
return
false
;
}
}
static
inline
int64_t
syncNodeAbs64
(
int64_t
a
,
int64_t
b
)
{
ASSERT
(
a
>=
0
);
ASSERT
(
b
>=
0
);
int64_t
c
=
a
>
b
?
a
-
b
:
b
-
a
;
return
c
;
}
int32_t
syncNodeDynamicQuorum
(
const
SSyncNode
*
pSyncNode
)
{
int32_t
quorum
=
1
;
// self
int64_t
timeNow
=
taosGetTimestampMs
();
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
int64_t
peerStartTime
=
syncIndexMgrGetStartTime
(
pSyncNode
->
pNextIndex
,
&
(
pSyncNode
->
peersId
)[
i
]);
int64_t
peerRecvTime
=
syncIndexMgrGetRecvTime
(
pSyncNode
->
pNextIndex
,
&
(
pSyncNode
->
peersId
)[
i
]);
int64_t
recvTimeDiff
=
syncNodeAbs64
(
peerRecvTime
,
timeNow
);
int64_t
startTimeDiff
=
syncNodeAbs64
(
peerStartTime
,
pSyncNode
->
startTime
);
int32_t
addQuorum
=
0
;
if
(
recvTimeDiff
<
SYNC_MAX_RECV_TIME_RANGE_MS
)
{
addQuorum
=
1
;
}
else
{
addQuorum
=
0
;
}
if
(
startTimeDiff
>
SYNC_MAX_START_TIME_RANGE_MS
)
{
addQuorum
=
0
;
}
quorum
+=
addQuorum
;
}
ASSERT
(
quorum
<=
pSyncNode
->
replicaNum
);
if
(
quorum
<
pSyncNode
->
quorum
)
{
quorum
=
pSyncNode
->
quorum
;
}
return
quorum
;
}
bool
syncAgree
(
SSyncNode
*
pSyncNode
,
SyncIndex
index
)
{
int
agreeCount
=
0
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
replicaNum
;
++
i
)
{
if
(
syncAgreeIndex
(
pSyncNode
,
&
(
pSyncNode
->
replicasId
[
i
]),
index
))
{
++
agreeCount
;
}
if
(
agreeCount
>=
syncNodeDynamicQuorum
(
pSyncNode
))
{
return
true
;
}
}
return
false
;
}
/*
bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) {
bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) {
int agreeCount = 0;
int agreeCount = 0;
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
...
@@ -145,3 +202,4 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) {
...
@@ -145,3 +202,4 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) {
}
}
return false;
return false;
}
}
*/
source/libs/sync/src/syncIndexMgr.c
浏览文件 @
ba0b06fd
...
@@ -47,6 +47,13 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr) {
...
@@ -47,6 +47,13 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr) {
void
syncIndexMgrClear
(
SSyncIndexMgr
*
pSyncIndexMgr
)
{
void
syncIndexMgrClear
(
SSyncIndexMgr
*
pSyncIndexMgr
)
{
memset
(
pSyncIndexMgr
->
index
,
0
,
sizeof
(
pSyncIndexMgr
->
index
));
memset
(
pSyncIndexMgr
->
index
,
0
,
sizeof
(
pSyncIndexMgr
->
index
));
memset
(
pSyncIndexMgr
->
privateTerm
,
0
,
sizeof
(
pSyncIndexMgr
->
privateTerm
));
memset
(
pSyncIndexMgr
->
privateTerm
,
0
,
sizeof
(
pSyncIndexMgr
->
privateTerm
));
// int64_t timeNow = taosGetMonotonicMs();
for
(
int
i
=
0
;
i
<
pSyncIndexMgr
->
replicaNum
;
++
i
)
{
pSyncIndexMgr
->
startTimeArr
[
i
]
=
0
;
pSyncIndexMgr
->
recvTimeArr
[
i
]
=
0
;
}
/*
/*
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
pSyncIndexMgr->index[i] = 0;
pSyncIndexMgr->index[i] = 0;
...
@@ -68,7 +75,8 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId,
...
@@ -68,7 +75,8 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId,
char
host
[
128
];
char
host
[
128
];
uint16_t
port
;
uint16_t
port
;
syncUtilU642Addr
(
pRaftId
->
addr
,
host
,
sizeof
(
host
),
&
port
);
syncUtilU642Addr
(
pRaftId
->
addr
,
host
,
sizeof
(
host
),
&
port
);
sError
(
"vgId:%d, index mgr set for %s:%d, index:%"
PRId64
" error"
,
pSyncIndexMgr
->
pSyncNode
->
vgId
,
host
,
port
,
index
);
sError
(
"vgId:%d, index mgr set for %s:%d, index:%"
PRId64
" error"
,
pSyncIndexMgr
->
pSyncNode
->
vgId
,
host
,
port
,
index
);
}
}
SyncIndex
syncIndexMgrGetIndex
(
SSyncIndexMgr
*
pSyncIndexMgr
,
const
SRaftId
*
pRaftId
)
{
SyncIndex
syncIndexMgrGetIndex
(
SSyncIndexMgr
*
pSyncIndexMgr
,
const
SRaftId
*
pRaftId
)
{
...
@@ -125,11 +133,65 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
...
@@ -125,11 +133,65 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
char
*
syncIndexMgr2Str
(
SSyncIndexMgr
*
pSyncIndexMgr
)
{
char
*
syncIndexMgr2Str
(
SSyncIndexMgr
*
pSyncIndexMgr
)
{
cJSON
*
pJson
=
syncIndexMgr2Json
(
pSyncIndexMgr
);
cJSON
*
pJson
=
syncIndexMgr2Json
(
pSyncIndexMgr
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
return
serialized
;
}
}
void
syncIndexMgrSetStartTime
(
SSyncIndexMgr
*
pSyncIndexMgr
,
const
SRaftId
*
pRaftId
,
int64_t
startTime
)
{
for
(
int
i
=
0
;
i
<
pSyncIndexMgr
->
replicaNum
;
++
i
)
{
if
(
syncUtilSameId
(
&
((
*
(
pSyncIndexMgr
->
replicas
))[
i
]),
pRaftId
))
{
(
pSyncIndexMgr
->
startTimeArr
)[
i
]
=
startTime
;
return
;
}
}
// maybe config change
// ASSERT(0);
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
pRaftId
->
addr
,
host
,
sizeof
(
host
),
&
port
);
sError
(
"vgId:%d, index mgr set for %s:%d, start-time:%"
PRId64
" error"
,
pSyncIndexMgr
->
pSyncNode
->
vgId
,
host
,
port
,
startTime
);
}
int64_t
syncIndexMgrGetStartTime
(
SSyncIndexMgr
*
pSyncIndexMgr
,
const
SRaftId
*
pRaftId
)
{
for
(
int
i
=
0
;
i
<
pSyncIndexMgr
->
replicaNum
;
++
i
)
{
if
(
syncUtilSameId
(
&
((
*
(
pSyncIndexMgr
->
replicas
))[
i
]),
pRaftId
))
{
int64_t
startTime
=
(
pSyncIndexMgr
->
startTimeArr
)[
i
];
return
startTime
;
}
}
ASSERT
(
0
);
}
void
syncIndexMgrSetRecvTime
(
SSyncIndexMgr
*
pSyncIndexMgr
,
const
SRaftId
*
pRaftId
,
int64_t
recvTime
)
{
for
(
int
i
=
0
;
i
<
pSyncIndexMgr
->
replicaNum
;
++
i
)
{
if
(
syncUtilSameId
(
&
((
*
(
pSyncIndexMgr
->
replicas
))[
i
]),
pRaftId
))
{
(
pSyncIndexMgr
->
recvTimeArr
)[
i
]
=
recvTime
;
return
;
}
}
// maybe config change
// ASSERT(0);
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
pRaftId
->
addr
,
host
,
sizeof
(
host
),
&
port
);
sError
(
"vgId:%d, index mgr set for %s:%d, recv-time:%"
PRId64
" error"
,
pSyncIndexMgr
->
pSyncNode
->
vgId
,
host
,
port
,
recvTime
);
}
int64_t
syncIndexMgrGetRecvTime
(
SSyncIndexMgr
*
pSyncIndexMgr
,
const
SRaftId
*
pRaftId
)
{
for
(
int
i
=
0
;
i
<
pSyncIndexMgr
->
replicaNum
;
++
i
)
{
if
(
syncUtilSameId
(
&
((
*
(
pSyncIndexMgr
->
replicas
))[
i
]),
pRaftId
))
{
int64_t
recvTime
=
(
pSyncIndexMgr
->
recvTimeArr
)[
i
];
return
recvTime
;
}
}
ASSERT
(
0
);
}
// for debug -------------------
// for debug -------------------
void
syncIndexMgrPrint
(
SSyncIndexMgr
*
pObj
)
{
void
syncIndexMgrPrint
(
SSyncIndexMgr
*
pObj
)
{
char
*
serialized
=
syncIndexMgr2Str
(
pObj
);
char
*
serialized
=
syncIndexMgr2Str
(
pObj
);
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
ba0b06fd
...
@@ -1682,13 +1682,13 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
...
@@ -1682,13 +1682,13 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
", sby:%d, "
", sby:%d, "
"stgy:%d, bch:%d, "
"stgy:%d, bch:%d, "
"r-num:%d, "
"r-num:%d, "
"lcfg:%"
PRId64
", chging:%d, rsto:%d, elt:%"
PRId64
", hb:%"
PRId64
", %s"
,
"lcfg:%"
PRId64
", chging:%d, rsto:%d,
dquorum:%d,
elt:%"
PRId64
", hb:%"
PRId64
", %s"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pSyncNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
pRaftCfg
->
snapshotStrategy
,
pSyncNode
->
pRaftCfg
->
batchSize
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
pRaftCfg
->
snapshotStrategy
,
pSyncNode
->
pRaftCfg
->
batchSize
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
pSyncNode
->
changing
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
pSyncNode
->
changing
,
pSyncNode
->
restoreFinish
,
pSyncNode
->
electTimerLogicClockUser
,
pSyncNode
->
heartbea
tTimerLogicClockUser
,
pSyncNode
->
restoreFinish
,
syncNodeDynamicQuorum
(
pSyncNode
),
pSyncNode
->
elec
tTimerLogicClockUser
,
printStr
);
p
SyncNode
->
heartbeatTimerLogicClockUser
,
p
rintStr
);
}
else
{
}
else
{
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"%s"
,
str
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"%s"
,
str
);
}
}
...
@@ -1706,12 +1706,13 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
...
@@ -1706,12 +1706,13 @@ inline void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
", sby:%d, "
", sby:%d, "
"stgy:%d, bch:%d, "
"stgy:%d, bch:%d, "
"r-num:%d, "
"r-num:%d, "
"lcfg:%"
PRId64
", chging:%d, rsto:%d, %s"
,
"lcfg:%"
PRId64
", chging:%d, rsto:%d,
dquorum:%d, elt:%"
PRId64
", hb:%"
PRId64
",
%s"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pSyncNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
pRaftCfg
->
snapshotStrategy
,
pSyncNode
->
pRaftCfg
->
batchSize
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
pRaftCfg
->
snapshotStrategy
,
pSyncNode
->
pRaftCfg
->
batchSize
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
pSyncNode
->
changing
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
pSyncNode
->
changing
,
pSyncNode
->
restoreFinish
,
printStr
);
pSyncNode
->
restoreFinish
,
syncNodeDynamicQuorum
(
pSyncNode
),
pSyncNode
->
electTimerLogicClockUser
,
pSyncNode
->
heartbeatTimerLogicClockUser
,
printStr
);
}
else
{
}
else
{
snprintf
(
s
,
len
,
"%s"
,
str
);
snprintf
(
s
,
len
,
"%s"
,
str
);
}
}
...
...
source/libs/sync/src/syncMessage.c
浏览文件 @
ba0b06fd
...
@@ -1947,6 +1947,8 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) {
...
@@ -1947,6 +1947,8 @@ cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg) {
cJSON_AddNumberToObject
(
pRoot
,
"success"
,
pMsg
->
success
);
cJSON_AddNumberToObject
(
pRoot
,
"success"
,
pMsg
->
success
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRId64
,
pMsg
->
matchIndex
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRId64
,
pMsg
->
matchIndex
);
cJSON_AddStringToObject
(
pRoot
,
"matchIndex"
,
u64buf
);
cJSON_AddStringToObject
(
pRoot
,
"matchIndex"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRId64
,
pMsg
->
startTime
);
cJSON_AddStringToObject
(
pRoot
,
"startTime"
,
u64buf
);
}
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON
*
pJson
=
cJSON_CreateObject
();
...
...
source/libs/sync/src/syncReplication.c
浏览文件 @
ba0b06fd
...
@@ -116,6 +116,120 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
...
@@ -116,6 +116,120 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
return
ret
;
return
ret
;
}
}
int32_t
syncNodeAppendEntriesOnePeer
(
SSyncNode
*
pSyncNode
,
SRaftId
*
pDestId
,
SyncIndex
nextIndex
)
{
int32_t
ret
=
0
;
// pre index, pre term
SyncIndex
preLogIndex
=
syncNodeGetPreIndex
(
pSyncNode
,
nextIndex
);
SyncTerm
preLogTerm
=
syncNodeGetPreTerm
(
pSyncNode
,
nextIndex
);
if
(
preLogTerm
==
SYNC_TERM_INVALID
)
{
SyncIndex
newNextIndex
=
syncNodeGetLastIndex
(
pSyncNode
)
+
1
;
// SyncIndex newNextIndex = nextIndex + 1;
syncIndexMgrSetIndex
(
pSyncNode
->
pNextIndex
,
pDestId
,
newNextIndex
);
syncIndexMgrSetIndex
(
pSyncNode
->
pMatchIndex
,
pDestId
,
SYNC_INDEX_INVALID
);
sError
(
"vgId:%d, sync get pre term error, nextIndex:%"
PRId64
", update next-index:%"
PRId64
", match-index:%d, raftid:%"
PRId64
,
pSyncNode
->
vgId
,
nextIndex
,
newNextIndex
,
SYNC_INDEX_INVALID
,
pDestId
->
addr
);
return
-
1
;
}
// entry pointer array
SSyncRaftEntry
*
entryPArr
[
SYNC_MAX_BATCH_SIZE
];
memset
(
entryPArr
,
0
,
sizeof
(
entryPArr
));
// get entry batch
int32_t
getCount
=
0
;
SyncIndex
getEntryIndex
=
nextIndex
;
for
(
int32_t
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
batchSize
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
NULL
;
int32_t
code
=
pSyncNode
->
pLogStore
->
syncLogGetEntry
(
pSyncNode
->
pLogStore
,
getEntryIndex
,
&
pEntry
);
if
(
code
==
0
)
{
ASSERT
(
pEntry
!=
NULL
);
entryPArr
[
i
]
=
pEntry
;
getCount
++
;
getEntryIndex
++
;
}
else
{
break
;
}
}
// event log
do
{
char
logBuf
[
128
];
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pDestId
->
addr
,
host
,
sizeof
(
host
),
&
port
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"build batch:%d for %s:%d"
,
getCount
,
host
,
port
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
while
(
0
);
// build msg
SyncAppendEntriesBatch
*
pMsg
=
syncAppendEntriesBatchBuild
(
entryPArr
,
getCount
,
pSyncNode
->
vgId
);
ASSERT
(
pMsg
!=
NULL
);
// free entries
for
(
int32_t
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
batchSize
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
entryPArr
[
i
];
if
(
pEntry
!=
NULL
)
{
syncEntryDestory
(
pEntry
);
entryPArr
[
i
]
=
NULL
;
}
}
// prepare msg
pMsg
->
srcId
=
pSyncNode
->
myRaftId
;
pMsg
->
destId
=
*
pDestId
;
pMsg
->
term
=
pSyncNode
->
pRaftStore
->
currentTerm
;
pMsg
->
prevLogIndex
=
preLogIndex
;
pMsg
->
prevLogTerm
=
preLogTerm
;
pMsg
->
commitIndex
=
pSyncNode
->
commitIndex
;
pMsg
->
privateTerm
=
0
;
pMsg
->
dataCount
=
getCount
;
// send msg
syncNodeAppendEntriesBatch
(
pSyncNode
,
pDestId
,
pMsg
);
// speed up
if
(
pMsg
->
dataCount
>
0
&&
pSyncNode
->
commitIndex
-
pMsg
->
prevLogIndex
>
SYNC_SLOW_DOWN_RANGE
)
{
ret
=
1
;
#if 0
do {
char logBuf[128];
char host[64];
uint16_t port;
syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf), "maybe speed up for %s:%d, pre-index:%ld", host, port, pMsg->prevLogIndex);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
#endif
}
syncAppendEntriesBatchDestroy
(
pMsg
);
return
ret
;
}
int32_t
syncNodeAppendEntriesPeersSnapshot2
(
SSyncNode
*
pSyncNode
)
{
if
(
pSyncNode
->
state
!=
TAOS_SYNC_STATE_LEADER
)
{
return
-
1
;
}
int32_t
ret
=
0
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
SRaftId
*
pDestId
=
&
(
pSyncNode
->
peersId
[
i
]);
// next index
SyncIndex
nextIndex
=
syncIndexMgrGetIndex
(
pSyncNode
->
pNextIndex
,
pDestId
);
ret
=
syncNodeAppendEntriesOnePeer
(
pSyncNode
,
pDestId
,
nextIndex
);
}
return
ret
;
}
#if 0
int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
return -1;
return -1;
...
@@ -221,6 +335,7 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
...
@@ -221,6 +335,7 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
return ret;
return ret;
}
}
#endif
int32_t
syncNodeAppendEntriesPeersSnapshot
(
SSyncNode
*
pSyncNode
)
{
int32_t
syncNodeAppendEntriesPeersSnapshot
(
SSyncNode
*
pSyncNode
)
{
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
);
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
);
...
...
source/libs/sync/test/syncAppendEntriesReplyTest.cpp
浏览文件 @
ba0b06fd
...
@@ -24,6 +24,7 @@ SyncAppendEntriesReply *createMsg() {
...
@@ -24,6 +24,7 @@ SyncAppendEntriesReply *createMsg() {
pMsg
->
matchIndex
=
77
;
pMsg
->
matchIndex
=
77
;
pMsg
->
term
=
33
;
pMsg
->
term
=
33
;
pMsg
->
privateTerm
=
44
;
pMsg
->
privateTerm
=
44
;
pMsg
->
startTime
=
taosGetTimestampMs
();
return
pMsg
;
return
pMsg
;
}
}
...
@@ -89,6 +90,8 @@ void test5() {
...
@@ -89,6 +90,8 @@ void test5() {
}
}
int
main
()
{
int
main
()
{
gRaftDetailLog
=
true
;
tsAsyncLog
=
0
;
tsAsyncLog
=
0
;
sDebugFlag
=
DEBUG_TRACE
+
DEBUG_SCREEN
+
DEBUG_FILE
;
sDebugFlag
=
DEBUG_TRACE
+
DEBUG_SCREEN
+
DEBUG_FILE
;
logTest
();
logTest
();
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录