Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3d9ffc42
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3d9ffc42
编写于
6月 23, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(sync): refactor wal abstraction
上级
f140a5ad
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
187 addition
and
75 deletion
+187
-75
include/libs/sync/sync.h
include/libs/sync/sync.h
+3
-3
source/libs/sync/inc/syncRaftLog.h
source/libs/sync/inc/syncRaftLog.h
+1
-1
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+11
-26
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+11
-7
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+127
-32
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+2
-1
source/libs/sync/test/syncRaftLogTest2.cpp
source/libs/sync/test/syncRaftLogTest2.cpp
+29
-4
source/libs/sync/test/syncRaftLogTest3.cpp
source/libs/sync/test/syncRaftLogTest3.cpp
+3
-1
未找到文件。
include/libs/sync/sync.h
浏览文件 @
3d9ffc42
...
...
@@ -157,13 +157,13 @@ typedef struct SSyncLogStore {
SyncIndex
(
*
getCommitIndex
)(
struct
SSyncLogStore
*
pLogStore
);
// refactor, log[0 .. n] ==> log[m .. n]
int32_t
(
*
syncLogSetBeginIndex
)(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
beginIndex
);
int32_t
(
*
syncLogResetBeginIndex
)(
struct
SSyncLogStore
*
pLogStore
);
//
int32_t (*syncLogSetBeginIndex)(struct SSyncLogStore* pLogStore, SyncIndex beginIndex);
SyncIndex
(
*
syncLogBeginIndex
)(
struct
SSyncLogStore
*
pLogStore
);
SyncIndex
(
*
syncLogEndIndex
)(
struct
SSyncLogStore
*
pLogStore
);
bool
(
*
syncLogIsEmpty
)(
struct
SSyncLogStore
*
pLogStore
);
int32_t
(
*
syncLogEntryCount
)(
struct
SSyncLogStore
*
pLogStore
);
// bool (*syncLogInRange
)(struct SSyncLogStore* pLogStore, SyncIndex index);
int32_t
(
*
syncLogRestoreFromSnapshot
)(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
index
);
SyncIndex
(
*
syncLogWriteIndex
)(
struct
SSyncLogStore
*
pLogStore
);
SyncIndex
(
*
syncLogLastIndex
)(
struct
SSyncLogStore
*
pLogStore
);
...
...
source/libs/sync/inc/syncRaftLog.h
浏览文件 @
3d9ffc42
...
...
@@ -32,7 +32,7 @@ typedef struct SSyncLogStoreData {
SSyncNode
*
pSyncNode
;
SWal
*
pWal
;
SWalReadHandle
*
pWalHandle
;
SyncIndex
beginIndex
;
// valid begin index, default 0, may be set beginIndex > 0
//
SyncIndex beginIndex; // valid begin index, default 0, may be set beginIndex > 0
}
SSyncLogStoreData
;
SSyncLogStore
*
logStoreCreate
(
SSyncNode
*
pSyncNode
);
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
3d9ffc42
...
...
@@ -420,44 +420,26 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
// prevLogIndex == -1
static
bool
syncNodeOnAppendEntriesLogOK
(
SSyncNode
*
pSyncNode
,
SyncAppendEntries
*
pMsg
)
{
if
(
pMsg
->
prevLogIndex
==
SYNC_INDEX_INVALID
)
{
if
(
gRaftDetailLog
)
{
sTrace
(
"syncNodeOnAppendEntriesLogOK true, pMsg->prevLogIndex:%ld"
,
pMsg
->
prevLogIndex
);
}
return
true
;
}
SyncIndex
myLastIndex
=
syncNodeGetLastIndex
(
pSyncNode
);
if
(
pMsg
->
prevLogIndex
>
myLastIndex
)
{
if
(
gRaftDetailLog
)
{
sTrace
(
"syncNodeOnAppendEntriesLogOK false, pMsg->prevLogIndex:%ld, myLastIndex:%ld"
,
pMsg
->
prevLogIndex
,
myLastIndex
);
}
sDebug
(
"vgId:%d sync log not ok, preindex:%ld"
,
pSyncNode
->
vgId
,
pMsg
->
prevLogIndex
);
return
false
;
}
SyncTerm
myPreLogTerm
=
syncNodeGetPreTerm
(
pSyncNode
,
pMsg
->
prevLogIndex
+
1
);
if
(
myPreLogTerm
==
SYNC_TERM_INVALID
)
{
s
Error
(
"vgId:%d sync get pre term error
, preindex:%ld"
,
pSyncNode
->
vgId
,
pMsg
->
prevLogIndex
);
s
Debug
(
"vgId:%d sync log not ok2
, preindex:%ld"
,
pSyncNode
->
vgId
,
pMsg
->
prevLogIndex
);
return
false
;
}
if
(
pMsg
->
prevLogIndex
<=
myLastIndex
&&
pMsg
->
prevLogTerm
==
myPreLogTerm
)
{
if
(
gRaftDetailLog
)
{
sTrace
(
"syncNodeOnAppendEntriesLogOK true, pMsg->prevLogIndex:%ld, myLastIndex:%ld, pMsg->prevLogTerm:%lu, "
"myPreLogTerm:%lu"
,
pMsg
->
prevLogIndex
,
myLastIndex
,
pMsg
->
prevLogTerm
,
myPreLogTerm
);
}
return
true
;
}
if
(
gRaftDetailLog
)
{
sTrace
(
"syncNodeOnAppendEntriesLogOK false, pMsg->prevLogIndex:%ld, myLastIndex:%ld, pMsg->prevLogTerm:%lu, "
"myPreLogTerm:%lu"
,
pMsg
->
prevLogIndex
,
myLastIndex
,
pMsg
->
prevLogTerm
,
myPreLogTerm
);
}
sDebug
(
"vgId:%d sync log not ok3, preindex:%ld"
,
pSyncNode
->
vgId
,
pMsg
->
prevLogIndex
);
return
false
;
}
...
...
@@ -528,7 +510,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
bool condition0 = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
syncNodeHasSnapshot(ths);
bool condition1 =
condition0 && (ths->pLogStore->syncLogEntryCount(ths->pLogStore) == 0) && (pMsg->prevLogIndex > myLastIndex);
condition0 && (ths->pLogStore->syncLogEntryCount(ths->pLogStore) == 0) && (pMsg->prevLogIndex > myLastIndex);
// donot use syncLogEntryCount!!! use isEmpty
bool condition2 = condition0 && (ths->pLogStore->syncLogLastIndex(ths->pLogStore) <= snapshot.lastApplyIndex) &&
(pMsg->prevLogIndex > myLastIndex);
bool condition3 = condition0 && (pMsg->prevLogIndex < snapshot.lastApplyIndex);
...
...
@@ -574,10 +556,13 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
bool
condition
=
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
)
&&
(
ths
->
state
==
TAOS_SYNC_STATE_FOLLOWER
)
&&
(
pMsg
->
prevLogIndex
<=
ths
->
commitIndex
);
if
(
condition
)
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"recv sync-append-entries, fake match2, pre-index:%ld, pre-term:%lu, datalen:%d"
,
pMsg
->
prevLogIndex
,
pMsg
->
prevLogTerm
,
pMsg
->
dataLen
);
syncNodeEventLog
(
ths
,
logBuf
);
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"recv sync-append-entries, fake match2, pre-index:%ld, pre-term:%lu, datalen:%d"
,
pMsg
->
prevLogIndex
,
pMsg
->
prevLogTerm
,
pMsg
->
dataLen
);
syncNodeEventLog
(
ths
,
logBuf
);
}
while
(
0
);
SyncIndex
matchIndex
=
ths
->
commitIndex
;
bool
hasAppendEntries
=
pMsg
->
dataLen
>
0
;
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
3d9ffc42
...
...
@@ -1962,17 +1962,21 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
taosMemoryFree
(
pPreEntry
);
return
preTerm
;
}
else
{
if
(
terrno
==
TSDB_CODE_WAL_LOG_NOT_EXIST
)
{
SSnapshot
snapshot
=
{.
data
=
NULL
,
.
lastApplyIndex
=
-
1
,
.
lastApplyTerm
=
0
,
.
lastConfigIndex
=
-
1
};
if
(
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
!=
NULL
)
{
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
(
pSyncNode
->
pFsm
,
&
snapshot
);
if
(
snapshot
.
lastApplyIndex
==
preIndex
)
{
return
snapshot
.
lastApplyTerm
;
}
SSnapshot
snapshot
=
{.
data
=
NULL
,
.
lastApplyIndex
=
-
1
,
.
lastApplyTerm
=
0
,
.
lastConfigIndex
=
-
1
};
if
(
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
!=
NULL
)
{
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
(
pSyncNode
->
pFsm
,
&
snapshot
);
if
(
snapshot
.
lastApplyIndex
==
preIndex
)
{
return
snapshot
.
lastApplyTerm
;
}
}
}
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"sync node get pre term error, index:%ld"
,
index
);
syncNodeErrorLog
(
pSyncNode
,
logBuf
);
}
while
(
0
);
return
SYNC_TERM_INVALID
;
}
...
...
source/libs/sync/src/syncRaftLog.c
浏览文件 @
3d9ffc42
...
...
@@ -18,7 +18,8 @@
#include "syncRaftStore.h"
// refactor, log[0 .. n] ==> log[m .. n]
static
int32_t
raftLogSetBeginIndex
(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
beginIndex
);
static
int32_t
raftLogRestoreFromSnapshot
(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
snapshotIndex
);
// static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex);
static
SyncIndex
raftLogBeginIndex
(
struct
SSyncLogStore
*
pLogStore
);
static
SyncIndex
raftLogEndIndex
(
struct
SSyncLogStore
*
pLogStore
);
static
SyncIndex
raftLogWriteIndex
(
struct
SSyncLogStore
*
pLogStore
);
...
...
@@ -44,6 +45,7 @@ static int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncI
static
SyncIndex
logStoreGetCommitIndex
(
SSyncLogStore
*
pLogStore
);
// refactor, log[0 .. n] ==> log[m .. n]
/*
static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex) {
// if beginIndex == 0, donot need call this funciton
ASSERT(beginIndex > 0);
...
...
@@ -56,19 +58,30 @@ static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex b
walRestoreFromSnapshot(pWal, beginIndex - 1);
return 0;
}
*/
static
int32_t
raftLogRestoreFromSnapshot
(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
snapshotIndex
)
{
ASSERT
(
snapshotIndex
>=
0
);
SSyncLogStoreData
*
pData
=
pLogStore
->
data
;
SWal
*
pWal
=
pData
->
pWal
;
walRestoreFromSnapshot
(
pWal
,
snapshotIndex
);
return
0
;
}
static
SyncIndex
raftLogBeginIndex
(
struct
SSyncLogStore
*
pLogStore
)
{
SSyncLogStoreData
*
pData
=
pLogStore
->
data
;
SWal
*
pWal
=
pData
->
pWal
;
return
pData
->
beginIndex
;
SyncIndex
firstVer
=
walGetFirstVer
(
pWal
);
return
firstVer
;
}
static
SyncIndex
raftLogEndIndex
(
struct
SSyncLogStore
*
pLogStore
)
{
return
raftLogLastIndex
(
pLogStore
);
}
static
bool
raftLogIsEmpty
(
struct
SSyncLogStore
*
pLogStore
)
{
S
yncIndex
beginIndex
=
raftLogBeginIndex
(
pLogStore
)
;
S
yncIndex
endIndex
=
raftLogEndIndex
(
pLogStore
)
;
return
(
endIndex
<
beginIndex
);
S
SyncLogStoreData
*
pData
=
pLogStore
->
data
;
S
Wal
*
pWal
=
pData
->
pWal
;
return
walIsEmpty
(
pWal
);
}
static
int32_t
raftLogEntryCount
(
struct
SSyncLogStore
*
pLogStore
)
{
...
...
@@ -95,23 +108,8 @@ static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) {
SSyncLogStoreData
*
pData
=
pLogStore
->
data
;
SWal
*
pWal
=
pData
->
pWal
;
SyncIndex
lastVer
=
walGetLastVer
(
pWal
);
SyncIndex
firstVer
=
walGetFirstVer
(
pWal
);
if
(
lastVer
<
firstVer
)
{
// no record
lastIndex
=
-
1
;
}
else
{
if
(
firstVer
>=
0
)
{
lastIndex
=
lastVer
;
}
else
if
(
firstVer
==
-
1
)
{
lastIndex
=
-
1
;
}
else
{
ASSERT
(
0
);
}
}
return
lastIndex
;
return
lastVer
;
}
static
SyncIndex
raftLogWriteIndex
(
struct
SSyncLogStore
*
pLogStore
)
{
...
...
@@ -121,6 +119,26 @@ static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) {
return
lastVer
+
1
;
}
static
SyncTerm
raftLogLastTerm
(
struct
SSyncLogStore
*
pLogStore
)
{
SSyncLogStoreData
*
pData
=
pLogStore
->
data
;
SWal
*
pWal
=
pData
->
pWal
;
if
(
walIsEmpty
(
pWal
))
{
return
0
;
}
else
{
SSyncRaftEntry
*
pLastEntry
;
int32_t
code
=
raftLogGetLastEntry
(
pLogStore
,
&
pLastEntry
);
ASSERT
(
code
==
0
);
ASSERT
(
pLastEntry
!=
NULL
);
SyncTerm
lastTerm
=
pLastEntry
->
term
;
taosMemoryFree
(
pLastEntry
);
return
lastTerm
;
}
return
0
;
}
/*
static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
SyncTerm lastTerm = 0;
if (raftLogEntryCount(pLogStore) == 0) {
...
...
@@ -136,6 +154,7 @@ static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
}
return lastTerm;
}
*/
static
int32_t
raftLogAppendEntry
(
struct
SSyncLogStore
*
pLogStore
,
SSyncRaftEntry
*
pEntry
)
{
SSyncLogStoreData
*
pData
=
pLogStore
->
data
;
...
...
@@ -302,6 +321,25 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
return
code
;
}
static
int32_t
raftLogGetLastEntry
(
SSyncLogStore
*
pLogStore
,
SSyncRaftEntry
**
ppLastEntry
)
{
SSyncLogStoreData
*
pData
=
pLogStore
->
data
;
SWal
*
pWal
=
pData
->
pWal
;
ASSERT
(
ppLastEntry
!=
NULL
);
*
ppLastEntry
=
NULL
;
if
(
walIsEmpty
(
pWal
))
{
terrno
=
TSDB_CODE_WAL_LOG_NOT_EXIST
;
return
-
1
;
}
else
{
SyncIndex
lastIndex
=
raftLogLastIndex
(
pLogStore
);
int32_t
code
=
raftLogGetEntry
(
pLogStore
,
lastIndex
,
ppLastEntry
);
return
code
;
}
return
-
1
;
}
/*
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) {
*ppLastEntry = NULL;
if (raftLogEntryCount(pLogStore) == 0) {
...
...
@@ -311,6 +349,7 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp
int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry);
return code;
}
*/
//-------------------------------
SSyncLogStore
*
logStoreCreate
(
SSyncNode
*
pSyncNode
)
{
...
...
@@ -328,15 +367,17 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
pData
->
pWalHandle
=
walOpenReadHandle
(
pData
->
pWal
);
ASSERT
(
pData
->
pWalHandle
!=
NULL
);
SyncIndex
firstVer
=
walGetFirstVer
(
pData
->
pWal
);
SyncIndex
lastVer
=
walGetLastVer
(
pData
->
pWal
);
if
(
firstVer
>=
0
)
{
pData
->
beginIndex
=
firstVer
;
}
else
if
(
firstVer
==
-
1
)
{
pData
->
beginIndex
=
lastVer
+
1
;
}
else
{
ASSERT
(
0
);
}
/*
SyncIndex firstVer = walGetFirstVer(pData->pWal);
SyncIndex lastVer = walGetLastVer(pData->pWal);
if (firstVer >= 0) {
pData->beginIndex = firstVer;
} else if (firstVer == -1) {
pData->beginIndex = lastVer + 1;
} else {
ASSERT(0);
}
*/
pLogStore
->
appendEntry
=
logStoreAppendEntry
;
pLogStore
->
getEntry
=
logStoreGetEntry
;
...
...
@@ -346,7 +387,8 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
pLogStore
->
updateCommitIndex
=
logStoreUpdateCommitIndex
;
pLogStore
->
getCommitIndex
=
logStoreGetCommitIndex
;
pLogStore
->
syncLogSetBeginIndex
=
raftLogSetBeginIndex
;
// pLogStore->syncLogSetBeginIndex = raftLogSetBeginIndex;
pLogStore
->
syncLogRestoreFromSnapshot
=
raftLogRestoreFromSnapshot
;
pLogStore
->
syncLogBeginIndex
=
raftLogBeginIndex
;
pLogStore
->
syncLogEndIndex
=
raftLogEndIndex
;
pLogStore
->
syncLogIsEmpty
=
raftLogIsEmpty
;
...
...
@@ -538,6 +580,7 @@ SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) {
return
pEntry
;
}
/*
cJSON* logStore2Json(SSyncLogStore* pLogStore) {
char u64buf[128] = {0};
SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
...
...
@@ -584,6 +627,57 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) {
cJSON_AddItemToObject(pJson, "SSyncLogStore", pRoot);
return pJson;
}
*/
cJSON
*
logStore2Json
(
SSyncLogStore
*
pLogStore
)
{
char
u64buf
[
128
]
=
{
0
};
SSyncLogStoreData
*
pData
=
(
SSyncLogStoreData
*
)
pLogStore
->
data
;
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pData
!=
NULL
&&
pData
->
pWal
!=
NULL
)
{
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pData
->
pSyncNode
);
cJSON_AddStringToObject
(
pRoot
,
"pSyncNode"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pData
->
pWal
);
cJSON_AddStringToObject
(
pRoot
,
"pWal"
,
u64buf
);
SyncIndex
beginIndex
=
raftLogBeginIndex
(
pLogStore
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%ld"
,
beginIndex
);
cJSON_AddStringToObject
(
pRoot
,
"beginIndex"
,
u64buf
);
SyncIndex
endIndex
=
raftLogEndIndex
(
pLogStore
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%ld"
,
endIndex
);
cJSON_AddStringToObject
(
pRoot
,
"endIndex"
,
u64buf
);
int32_t
count
=
raftLogEntryCount
(
pLogStore
);
cJSON_AddNumberToObject
(
pRoot
,
"entryCount"
,
count
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%ld"
,
raftLogWriteIndex
(
pLogStore
));
cJSON_AddStringToObject
(
pRoot
,
"WriteIndex"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%d"
,
raftLogIsEmpty
(
pLogStore
));
cJSON_AddStringToObject
(
pRoot
,
"IsEmpty"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%ld"
,
raftLogLastIndex
(
pLogStore
));
cJSON_AddStringToObject
(
pRoot
,
"LastIndex"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
raftLogLastTerm
(
pLogStore
));
cJSON_AddStringToObject
(
pRoot
,
"LastTerm"
,
u64buf
);
cJSON
*
pEntries
=
cJSON_CreateArray
();
cJSON_AddItemToObject
(
pRoot
,
"pEntries"
,
pEntries
);
if
(
!
raftLogIsEmpty
(
pLogStore
))
{
for
(
SyncIndex
i
=
beginIndex
;
i
<=
endIndex
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
logStoreGetEntry
(
pLogStore
,
i
);
cJSON_AddItemToArray
(
pEntries
,
syncEntry2Json
(
pEntry
));
syncEntryDestory
(
pEntry
);
}
}
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SSyncLogStore"
,
pRoot
);
return
pJson
;
}
char
*
logStore2Str
(
SSyncLogStore
*
pLogStore
)
{
cJSON
*
pJson
=
logStore2Json
(
pLogStore
);
...
...
@@ -603,7 +697,8 @@ cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore) {
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pData
->
pWal
);
cJSON_AddStringToObject
(
pRoot
,
"pWal"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%ld"
,
pData
->
beginIndex
);
SyncIndex
beginIndex
=
raftLogBeginIndex
(
pLogStore
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%ld"
,
beginIndex
);
cJSON_AddStringToObject
(
pRoot
,
"beginIndex"
,
u64buf
);
SyncIndex
endIndex
=
raftLogEndIndex
(
pLogStore
);
...
...
source/libs/sync/src/syncSnapshot.c
浏览文件 @
3d9ffc42
...
...
@@ -545,7 +545,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
pReceiver
->
pSyncNode
->
commitIndex
=
pReceiver
->
snapshot
.
lastApplyIndex
;
}
pSyncNode
->
pLogStore
->
syncLogSetBeginIndex
(
pSyncNode
->
pLogStore
,
pMsg
->
lastIndex
+
1
);
// pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
pSyncNode
->
pLogStore
->
syncLogRestoreFromSnapshot
(
pSyncNode
->
pLogStore
,
pMsg
->
lastIndex
);
// maybe update lastconfig
if
(
pMsg
->
lastConfigIndex
>=
SYNC_INDEX_BEGIN
)
{
...
...
source/libs/sync/test/syncRaftLogTest2.cpp
浏览文件 @
3d9ffc42
...
...
@@ -113,7 +113,8 @@ void test2() {
pLogStore
=
logStoreCreate
(
pSyncNode
);
assert
(
pLogStore
);
pSyncNode
->
pLogStore
=
pLogStore
;
pLogStore
->
syncLogSetBeginIndex
(
pLogStore
,
5
);
//pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore
->
syncLogRestoreFromSnapshot
(
pLogStore
,
4
);
logStoreLog2
((
char
*
)
"
\n\n\n
test2 ----- "
,
pLogStore
);
if
(
gAssert
)
{
...
...
@@ -228,7 +229,8 @@ void test4() {
assert
(
pLogStore
);
pSyncNode
->
pLogStore
=
pLogStore
;
logStoreLog2
((
char
*
)
"
\n\n\n
test4 ----- "
,
pLogStore
);
pLogStore
->
syncLogSetBeginIndex
(
pLogStore
,
5
);
//pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore
->
syncLogRestoreFromSnapshot
(
pLogStore
,
4
);
for
(
int
i
=
5
;
i
<=
9
;
++
i
)
{
int32_t
dataLen
=
10
;
...
...
@@ -289,7 +291,8 @@ void test5() {
assert
(
pLogStore
);
pSyncNode
->
pLogStore
=
pLogStore
;
logStoreLog2
((
char
*
)
"
\n\n\n
test5 ----- "
,
pLogStore
);
pLogStore
->
syncLogSetBeginIndex
(
pLogStore
,
5
);
//pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore
->
syncLogRestoreFromSnapshot
(
pLogStore
,
4
);
for
(
int
i
=
5
;
i
<=
9
;
++
i
)
{
int32_t
dataLen
=
10
;
...
...
@@ -363,7 +366,8 @@ void test6() {
assert
(
pLogStore
);
pSyncNode
->
pLogStore
=
pLogStore
;
logStoreLog2
((
char
*
)
"
\n\n\n
test6 ----- "
,
pLogStore
);
pLogStore
->
syncLogSetBeginIndex
(
pLogStore
,
5
);
// pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore
->
syncLogRestoreFromSnapshot
(
pLogStore
,
4
);
for
(
int
i
=
5
;
i
<=
9
;
++
i
)
{
int32_t
dataLen
=
10
;
...
...
@@ -405,14 +409,32 @@ void test6() {
assert
(
pLogStore
->
syncLogLastTerm
(
pLogStore
)
==
0
);
}
do
{
SyncIndex
firstVer
=
walGetFirstVer
(
pWal
);
SyncIndex
lastVer
=
walGetLastVer
(
pWal
);
bool
isEmpty
=
walIsEmpty
(
pWal
);
printf
(
"before -------- firstVer:%ld lastVer:%ld isEmpty:%d
\n
"
,
firstVer
,
lastVer
,
isEmpty
);
}
while
(
0
);
logStoreDestory
(
pLogStore
);
cleanup
();
// restart
init
();
pLogStore
=
logStoreCreate
(
pSyncNode
);
assert
(
pLogStore
);
pSyncNode
->
pLogStore
=
pLogStore
;
do
{
SyncIndex
firstVer
=
walGetFirstVer
(
pWal
);
SyncIndex
lastVer
=
walGetLastVer
(
pWal
);
bool
isEmpty
=
walIsEmpty
(
pWal
);
printf
(
"after -------- firstVer:%ld lastVer:%ld isEmpty:%d
\n
"
,
firstVer
,
lastVer
,
isEmpty
);
}
while
(
0
);
logStoreLog2
((
char
*
)
"
\n\n\n
test6 restart ----- "
,
pLogStore
);
if
(
gAssert
)
{
...
...
@@ -432,17 +454,20 @@ void test6() {
int
main
(
int
argc
,
char
**
argv
)
{
tsAsyncLog
=
0
;
sDebugFlag
=
DEBUG_TRACE
+
DEBUG_INFO
+
DEBUG_SCREEN
+
DEBUG_FILE
;
gRaftDetailLog
=
true
;
if
(
argc
==
2
)
{
gAssert
=
atoi
(
argv
[
1
]);
}
sTrace
(
"gAssert : %d"
,
gAssert
);
/*
test1();
test2();
test3();
test4();
test5();
*/
test6
();
return
0
;
...
...
source/libs/sync/test/syncRaftLogTest3.cpp
浏览文件 @
3d9ffc42
...
...
@@ -312,7 +312,8 @@ void test5() {
pSyncNode
->
pLogStore
=
pLogStore
;
logStoreLog2
((
char
*
)
"
\n\n\n
test5 ----- "
,
pLogStore
);
pSyncNode
->
pLogStore
->
syncLogSetBeginIndex
(
pSyncNode
->
pLogStore
,
6
);
//pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, 6);
pLogStore
->
syncLogRestoreFromSnapshot
(
pSyncNode
->
pLogStore
,
5
);
for
(
int
i
=
6
;
i
<=
10
;
++
i
)
{
int32_t
dataLen
=
10
;
SSyncRaftEntry
*
pEntry
=
syncEntryBuild
(
dataLen
);
...
...
@@ -372,6 +373,7 @@ void test5() {
int
main
(
int
argc
,
char
**
argv
)
{
tsAsyncLog
=
0
;
sDebugFlag
=
DEBUG_TRACE
+
DEBUG_INFO
+
DEBUG_SCREEN
+
DEBUG_FILE
;
gRaftDetailLog
=
true
;
if
(
argc
==
2
)
{
gAssert
=
atoi
(
argv
[
1
]);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录