Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a43d02a8
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
a43d02a8
编写于
6月 21, 2022
作者:
L
Li Minghao
提交者:
GitHub
6月 21, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14051 from taosdata/feature/3.0_mhli
fix(sync): config change when set index-mgr
上级
1644294e
b61a5cd1
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
129 addition
and
79 deletion
+129
-79
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+1
-0
source/libs/sync/inc/syncRaftCfg.h
source/libs/sync/inc/syncRaftCfg.h
+6
-6
source/libs/sync/inc/syncSnapshot.h
source/libs/sync/inc/syncSnapshot.h
+9
-9
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+31
-14
source/libs/sync/src/syncIndexMgr.c
source/libs/sync/src/syncIndexMgr.c
+15
-4
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+31
-9
source/libs/sync/src/syncRaftCfg.c
source/libs/sync/src/syncRaftCfg.c
+10
-5
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+13
-17
source/libs/sync/test/syncReconfigFinishTest.cpp
source/libs/sync/test/syncReconfigFinishTest.cpp
+13
-15
未找到文件。
source/libs/sync/inc/syncInt.h
浏览文件 @
a43d02a8
...
...
@@ -253,6 +253,7 @@ void syncNodePrint(SSyncNode* pObj);
void
syncNodePrint2
(
char
*
s
,
SSyncNode
*
pObj
);
void
syncNodeLog
(
SSyncNode
*
pObj
);
void
syncNodeLog2
(
char
*
s
,
SSyncNode
*
pObj
);
void
syncNodeLog3
(
char
*
s
,
SSyncNode
*
pObj
);
#ifdef __cplusplus
}
...
...
source/libs/sync/inc/syncRaftCfg.h
浏览文件 @
a43d02a8
...
...
@@ -27,7 +27,7 @@ extern "C" {
#include "syncInt.h"
#include "taosdef.h"
#define CONFIG_FILE_LEN
1024
#define CONFIG_FILE_LEN
2048
#define MAX_CONFIG_INDEX_COUNT 512
...
...
@@ -49,14 +49,14 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg);
int32_t
raftCfgPersist
(
SRaftCfg
*
pRaftCfg
);
int32_t
raftCfgAddConfigIndex
(
SRaftCfg
*
pRaftCfg
,
SyncIndex
configIndex
);
cJSON
*
syncCfg2Json
(
SSyncCfg
*
pSyncCfg
);
char
*
syncCfg2Str
(
SSyncCfg
*
pSyncCfg
);
char
*
syncCfg2SimpleStr
(
SSyncCfg
*
pSyncCfg
);
cJSON
*
syncCfg2Json
(
SSyncCfg
*
pSyncCfg
);
char
*
syncCfg2Str
(
SSyncCfg
*
pSyncCfg
);
char
*
syncCfg2SimpleStr
(
SSyncCfg
*
pSyncCfg
);
int32_t
syncCfgFromJson
(
const
cJSON
*
pRoot
,
SSyncCfg
*
pSyncCfg
);
int32_t
syncCfgFromStr
(
const
char
*
s
,
SSyncCfg
*
pSyncCfg
);
cJSON
*
raftCfg2Json
(
SRaftCfg
*
pRaftCfg
);
char
*
raftCfg2Str
(
SRaftCfg
*
pRaftCfg
);
cJSON
*
raftCfg2Json
(
SRaftCfg
*
pRaftCfg
);
char
*
raftCfg2Str
(
SRaftCfg
*
pRaftCfg
);
int32_t
raftCfgFromJson
(
const
cJSON
*
pRoot
,
SRaftCfg
*
pRaftCfg
);
int32_t
raftCfgFromStr
(
const
char
*
s
,
SRaftCfg
*
pRaftCfg
);
...
...
source/libs/sync/inc/syncSnapshot.h
浏览文件 @
a43d02a8
...
...
@@ -39,8 +39,8 @@ typedef struct SSyncSnapshotSender {
bool
start
;
int32_t
seq
;
int32_t
ack
;
void
*
pReader
;
void
*
pCurrentBlock
;
void
*
pReader
;
void
*
pCurrentBlock
;
int32_t
blockLen
;
SSnapshot
snapshot
;
SSyncCfg
lastConfig
;
...
...
@@ -55,19 +55,19 @@ typedef struct SSyncSnapshotSender {
SSyncSnapshotSender
*
snapshotSenderCreate
(
SSyncNode
*
pSyncNode
,
int32_t
replicaIndex
);
void
snapshotSenderDestroy
(
SSyncSnapshotSender
*
pSender
);
bool
snapshotSenderIsStart
(
SSyncSnapshotSender
*
pSender
);
void
snapshotSenderStart
(
SSyncSnapshotSender
*
pSender
);
void
snapshotSenderStart
(
SSyncSnapshotSender
*
pSender
,
SSnapshot
snapshot
,
void
*
pReader
);
void
snapshotSenderStop
(
SSyncSnapshotSender
*
pSender
);
int32_t
snapshotSend
(
SSyncSnapshotSender
*
pSender
);
int32_t
snapshotReSend
(
SSyncSnapshotSender
*
pSender
);
cJSON
*
snapshotSender2Json
(
SSyncSnapshotSender
*
pSender
);
char
*
snapshotSender2Str
(
SSyncSnapshotSender
*
pSender
);
char
*
snapshotSender2SimpleStr
(
SSyncSnapshotSender
*
pSender
,
char
*
event
);
cJSON
*
snapshotSender2Json
(
SSyncSnapshotSender
*
pSender
);
char
*
snapshotSender2Str
(
SSyncSnapshotSender
*
pSender
);
char
*
snapshotSender2SimpleStr
(
SSyncSnapshotSender
*
pSender
,
char
*
event
);
typedef
struct
SSyncSnapshotReceiver
{
bool
start
;
int32_t
ack
;
void
*
pWriter
;
void
*
pWriter
;
SyncTerm
term
;
SyncTerm
privateTerm
;
SSnapshot
snapshot
;
...
...
@@ -83,8 +83,8 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm privateT
bool
snapshotReceiverIsStart
(
SSyncSnapshotReceiver
*
pReceiver
);
void
snapshotReceiverStop
(
SSyncSnapshotReceiver
*
pReceiver
,
bool
apply
);
cJSON
*
snapshotReceiver2Json
(
SSyncSnapshotReceiver
*
pReceiver
);
char
*
snapshotReceiver2Str
(
SSyncSnapshotReceiver
*
pReceiver
);
char
*
snapshotReceiver2SimpleStr
(
SSyncSnapshotReceiver
*
pReceiver
,
char
*
event
);
char
*
snapshotReceiver2Str
(
SSyncSnapshotReceiver
*
pReceiver
);
char
*
snapshotReceiver2SimpleStr
(
SSyncSnapshotReceiver
*
pReceiver
,
char
*
event
);
int32_t
syncNodeOnSnapshotSendCb
(
SSyncNode
*
ths
,
SyncSnapshotSend
*
pMsg
);
int32_t
syncNodeOnSnapshotRspCb
(
SSyncNode
*
ths
,
SyncSnapshotRsp
*
pMsg
);
...
...
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
a43d02a8
...
...
@@ -173,21 +173,44 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
// get sender
SSyncSnapshotSender
*
pSender
=
syncNodeGetSnapshotSender
(
ths
,
&
(
pMsg
->
srcId
));
ASSERT
(
pSender
!=
NULL
);
bool
hasSnapshot
=
syncNodeHasSnapshot
(
ths
);
SSnapshot
snapshot
;
ths
->
pFsm
->
FpGetSnapshotInfo
(
ths
->
pFsm
,
&
snapshot
);
// start sending snapshot first time
// start here, stop by receiver
if
(
hasSnapshot
&&
nextIndex
<=
snapshot
.
lastApplyIndex
+
1
&&
!
snapshotSenderIsStart
(
pSender
)
&&
pMsg
->
privateTerm
<
pSender
->
privateTerm
)
{
snapshotSenderStart
(
pSender
);
SSnapshot
snapshot
;
void
*
pReader
=
NULL
;
ths
->
pFsm
->
FpGetSnapshot
(
ths
->
pFsm
,
&
snapshot
,
NULL
,
&
pReader
);
if
(
snapshot
.
lastApplyIndex
>=
SYNC_INDEX_BEGIN
&&
nextIndex
<=
snapshot
.
lastApplyIndex
+
1
&&
!
snapshotSenderIsStart
(
pSender
)
&&
pMsg
->
privateTerm
<
pSender
->
privateTerm
)
{
// has snapshot
ASSERT
(
pReader
!=
NULL
);
snapshotSenderStart
(
pSender
,
snapshot
,
pReader
);
char
*
eventLog
=
snapshotSender2SimpleStr
(
pSender
,
"snapshot sender start"
);
syncNodeEventLog
(
ths
,
eventLog
);
taosMemoryFree
(
eventLog
);
}
else
{
// no snapshot
if
(
pReader
!=
NULL
)
{
ths
->
pFsm
->
FpSnapshotStopRead
(
ths
->
pFsm
,
pReader
);
}
}
/*
bool hasSnapshot = syncNodeHasSnapshot(ths);
SSnapshot snapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
// start sending snapshot first time
// start here, stop by receiver
if (hasSnapshot && nextIndex <= snapshot.lastApplyIndex + 1 && !snapshotSenderIsStart(pSender) &&
pMsg->privateTerm < pSender->privateTerm) {
snapshotSenderStart(pSender);
char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender start");
syncNodeEventLog(ths, eventLog);
taosMemoryFree(eventLog);
}
*/
SyncIndex
sentryIndex
=
pSender
->
snapshot
.
lastApplyIndex
+
1
;
// update nextIndex to sentryIndex
...
...
@@ -207,12 +230,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
syncIndexMgrLog2
(
"recv SyncAppendEntriesReply, after pNextIndex:"
,
ths
->
pNextIndex
);
syncIndexMgrLog2
(
"recv SyncAppendEntriesReply, after pMatchIndex:"
,
ths
->
pMatchIndex
);
if
(
gRaftDetailLog
)
{
SSnapshot
snapshot
;
ths
->
pFsm
->
FpGetSnapshotInfo
(
ths
->
pFsm
,
&
snapshot
);
sTrace
(
"recv SyncAppendEntriesReply, after snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu"
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
);
}
return
ret
;
}
\ No newline at end of file
source/libs/sync/src/syncIndexMgr.c
浏览文件 @
a43d02a8
...
...
@@ -63,7 +63,12 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId,
}
// maybe config change
assert
(
0
);
// assert(0);
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
pRaftId
->
addr
,
host
,
sizeof
(
host
),
&
port
);
sError
(
"vgId:%d index mgr set for %s:%d, index:%"
PRId64
" error"
,
pSyncIndexMgr
->
pSyncNode
->
vgId
,
host
,
port
,
index
);
}
SyncIndex
syncIndexMgrGetIndex
(
SSyncIndexMgr
*
pSyncIndexMgr
,
const
SRaftId
*
pRaftId
)
{
...
...
@@ -73,7 +78,9 @@ SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaf
return
idx
;
}
}
assert
(
0
);
syncNodeLog3
(
"syncIndexMgrGetIndex"
,
pSyncIndexMgr
->
pSyncNode
);
ASSERT
(
0
);
}
cJSON
*
syncIndexMgr2Json
(
SSyncIndexMgr
*
pSyncIndexMgr
)
{
...
...
@@ -119,7 +126,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
char
*
syncIndexMgr2Str
(
SSyncIndexMgr
*
pSyncIndexMgr
)
{
cJSON
*
pJson
=
syncIndexMgr2Json
(
pSyncIndexMgr
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
...
...
@@ -162,7 +169,11 @@ void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, S
}
// maybe config change
assert
(
0
);
// assert(0);
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
pRaftId
->
addr
,
host
,
sizeof
(
host
),
&
port
);
sError
(
"vgId:%d index mgr set for %s:%d, term:%lu error"
,
pSyncIndexMgr
->
pSyncNode
->
vgId
,
host
,
port
,
term
);
}
SyncTerm
syncIndexMgrGetTerm
(
SSyncIndexMgr
*
pSyncIndexMgr
,
const
SRaftId
*
pRaftId
)
{
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
a43d02a8
...
...
@@ -1282,6 +1282,9 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
// snapshot receivers
cJSON
*
pReceivers
=
cJSON_CreateArray
();
cJSON_AddItemToObject
(
pRoot
,
"receiver"
,
snapshotReceiver2Json
(
pSyncNode
->
pNewNodeReceiver
));
// changing
cJSON_AddNumberToObject
(
pRoot
,
"changing"
,
pSyncNode
->
changing
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
...
...
@@ -1304,26 +1307,31 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
(
pSyncNode
->
pFsm
,
&
snapshot
);
}
SyncIndex
logLastIndex
=
pSyncNode
->
pLogStore
->
syncLogLastIndex
(
pSyncNode
->
pLogStore
);
SyncIndex
logBeginIndex
=
pSyncNode
->
pLogStore
->
syncLogBeginIndex
(
pSyncNode
->
pLogStore
);
if
(
userStrLen
<
256
)
{
char
logBuf
[
128
+
256
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"vgId:%d, sync %s %s, term:%lu, commit:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, replica-num:%d, "
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"replica-num:%d, "
"lconfig:%ld, changing:%d"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
logLastIndex
,
snapshot
.
lastApplyIndex
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
pSyncNode
->
changing
);
pSyncNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
snapshot
.
lastApplyIndex
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
pSyncNode
->
changing
);
sDebug
(
"%s"
,
logBuf
);
}
else
{
int
len
=
128
+
userStrLen
;
char
*
s
=
(
char
*
)
taosMemoryMalloc
(
len
);
snprintf
(
s
,
len
,
"vgId:%d, sync %s %s, term:%lu, commit:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, replica-num:%d, "
"vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, "
"replica-num:%d, "
"lconfig:%ld, changing:%d"
,
pSyncNode
->
vgId
,
syncUtilState2String
(
pSyncNode
->
state
),
str
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
logLastIndex
,
snapshot
.
lastApplyIndex
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
pSyncNode
->
changing
);
pSyncNode
->
commitIndex
,
logBeginIndex
,
logLastIndex
,
snapshot
.
lastApplyIndex
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
pRaftCfg
->
lastConfigIndex
,
pSyncNode
->
changing
);
sDebug
(
"%s"
,
s
);
taosMemoryFree
(
s
);
}
...
...
@@ -1400,7 +1408,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
pSyncNode
->
pRaftCfg
->
isStandBy
=
1
;
// set standby
}
//
persist
last config index
//
add
last config index
raftCfgAddConfigIndex
(
pSyncNode
->
pRaftCfg
,
lastConfigChangeIndex
);
if
(
IamInNew
)
{
...
...
@@ -1827,7 +1835,11 @@ SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
SyncIndex
syncNodeGetPreIndex
(
SSyncNode
*
pSyncNode
,
SyncIndex
index
)
{
ASSERT
(
index
>=
SYNC_INDEX_BEGIN
);
SyncIndex
syncStartIndex
=
syncNodeSyncStartIndex
(
pSyncNode
);
ASSERT
(
index
<=
syncStartIndex
);
if
(
index
>
syncStartIndex
)
{
syncNodeLog3
(
"syncNodeGetPreIndex"
,
pSyncNode
);
ASSERT
(
0
);
}
SyncIndex
preIndex
=
index
-
1
;
return
preIndex
;
...
...
@@ -1836,7 +1848,11 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
SyncTerm
syncNodeGetPreTerm
(
SSyncNode
*
pSyncNode
,
SyncIndex
index
)
{
ASSERT
(
index
>=
SYNC_INDEX_BEGIN
);
SyncIndex
syncStartIndex
=
syncNodeSyncStartIndex
(
pSyncNode
);
ASSERT
(
index
<=
syncStartIndex
);
if
(
index
>
syncStartIndex
)
{
syncNodeLog3
(
"syncNodeGetPreTerm"
,
pSyncNode
);
ASSERT
(
0
);
}
if
(
index
==
SYNC_INDEX_BEGIN
)
{
return
0
;
...
...
@@ -1929,6 +1945,12 @@ void syncNodeLog2(char* s, SSyncNode* pObj) {
}
}
void
syncNodeLog3
(
char
*
s
,
SSyncNode
*
pObj
)
{
char
*
serialized
=
syncNode2Str
(
pObj
);
sTraceLong
(
"syncNodeLog3 | len:%lu | %s | %s"
,
strlen
(
serialized
),
s
,
serialized
);
taosMemoryFree
(
serialized
);
}
// ------ local funciton ---------
// enqueue message ----
static
void
syncNodeEqPingTimer
(
void
*
param
,
void
*
tmrId
)
{
...
...
source/libs/sync/src/syncRaftCfg.c
浏览文件 @
a43d02a8
...
...
@@ -53,7 +53,12 @@ int32_t raftCfgPersist(SRaftCfg *pRaftCfg) {
char
buf
[
CONFIG_FILE_LEN
]
=
{
0
};
memset
(
buf
,
0
,
sizeof
(
buf
));
ASSERT
(
strlen
(
s
)
+
1
<=
CONFIG_FILE_LEN
);
if
(
strlen
(
s
)
+
1
>
CONFIG_FILE_LEN
)
{
sError
(
"too long config str:%s"
,
s
);
ASSERT
(
0
);
}
snprintf
(
buf
,
sizeof
(
buf
),
"%s"
,
s
);
int64_t
ret
=
taosWriteFile
(
pRaftCfg
->
pFile
,
buf
,
sizeof
(
buf
));
assert
(
ret
==
sizeof
(
buf
));
...
...
@@ -96,14 +101,14 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
char
*
syncCfg2Str
(
SSyncCfg
*
pSyncCfg
)
{
cJSON
*
pJson
=
syncCfg2Json
(
pSyncCfg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
char
*
syncCfg2SimpleStr
(
SSyncCfg
*
pSyncCfg
)
{
int32_t
len
=
512
;
char
*
s
=
taosMemoryMalloc
(
len
);
char
*
s
=
taosMemoryMalloc
(
len
);
memset
(
s
,
0
,
len
);
snprintf
(
s
,
len
,
"{replica-num:%d, my-index:%d, "
,
pSyncCfg
->
replicaNum
,
pSyncCfg
->
myIndex
);
...
...
@@ -196,7 +201,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
char
*
raftCfg2Str
(
SRaftCfg
*
pRaftCfg
)
{
cJSON
*
pJson
=
raftCfg2Json
(
pRaftCfg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
...
...
@@ -262,7 +267,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
(
pRaftCfg
->
configIndexArr
)[
i
]
=
atoll
(
pIndex
->
valuestring
);
}
cJSON
*
pJsonSyncCfg
=
cJSON_GetObjectItem
(
pJson
,
"SSyncCfg"
);
cJSON
*
pJsonSyncCfg
=
cJSON_GetObjectItem
(
pJson
,
"SSyncCfg"
);
int32_t
code
=
syncCfgFromJson
(
pJsonSyncCfg
,
&
(
pRaftCfg
->
cfg
));
ASSERT
(
code
==
0
);
...
...
source/libs/sync/src/syncSnapshot.c
浏览文件 @
a43d02a8
...
...
@@ -67,7 +67,7 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
bool
snapshotSenderIsStart
(
SSyncSnapshotSender
*
pSender
)
{
return
pSender
->
start
;
}
// begin send snapshot (current term, seq begin)
void
snapshotSenderStart
(
SSyncSnapshotSender
*
pSender
)
{
void
snapshotSenderStart
(
SSyncSnapshotSender
*
pSender
,
SSnapshot
snapshot
,
void
*
pReader
)
{
ASSERT
(
!
snapshotSenderIsStart
(
pSender
));
pSender
->
seq
=
SYNC_SNAPSHOT_SEQ_BEGIN
;
...
...
@@ -75,8 +75,18 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
// open snapshot reader
ASSERT
(
pSender
->
pReader
==
NULL
);
int32_t
ret
=
pSender
->
pSyncNode
->
pFsm
->
FpSnapshotStartRead
(
pSender
->
pSyncNode
->
pFsm
,
&
(
pSender
->
pReader
));
ASSERT
(
ret
==
0
);
pSender
->
pReader
=
pReader
;
pSender
->
snapshot
=
snapshot
;
/*
// open snapshot reader
ASSERT(pSender->pReader == NULL);
int32_t ret = pSender->pSyncNode->pFsm->FpSnapshotStartRead(pSender->pSyncNode->pFsm, &(pSender->pReader));
ASSERT(ret == 0);
// get current snapshot info
pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot));
*/
if
(
pSender
->
pCurrentBlock
!=
NULL
)
{
taosMemoryFree
(
pSender
->
pCurrentBlock
);
...
...
@@ -84,21 +94,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
pSender
->
blockLen
=
0
;
// get current snapshot info
pSender
->
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
(
pSender
->
pSyncNode
->
pFsm
,
&
(
pSender
->
snapshot
));
sTrace
(
"snapshotSenderStart lastApplyIndex:%ld, lastApplyTerm:%lu, lastConfigIndex:%ld"
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
);
if
(
pSender
->
snapshot
.
lastConfigIndex
!=
SYNC_INDEX_INVALID
)
{
/*
SSyncRaftEntry *pEntry = NULL;
int32_t code = pSender->pSyncNode->pLogStore->syncLogGetEntry(pSender->pSyncNode->pLogStore,
pSender->snapshot.lastConfigIndex, &pEntry);
ASSERT(code == 0);
ASSERT(pEntry != NULL);
*/
SSyncRaftEntry
*
pEntry
=
pSender
->
pSyncNode
->
pLogStore
->
getEntry
(
pSender
->
pSyncNode
->
pLogStore
,
pSender
->
snapshot
.
lastConfigIndex
);
ASSERT
(
pEntry
!=
NULL
);
...
...
source/libs/sync/test/syncReconfigFinishTest.cpp
浏览文件 @
a43d02a8
...
...
@@ -14,8 +14,8 @@ void logTest() {
sFatal
(
"--- sync log test: fatal"
);
}
SSyncCfg
*
createSyncOldCfg
()
{
SSyncCfg
*
pCfg
=
(
SSyncCfg
*
)
taosMemoryMalloc
(
sizeof
(
SSyncCfg
));
SSyncCfg
*
createSyncOldCfg
()
{
SSyncCfg
*
pCfg
=
(
SSyncCfg
*
)
taosMemoryMalloc
(
sizeof
(
SSyncCfg
));
memset
(
pCfg
,
0
,
sizeof
(
SSyncCfg
));
pCfg
->
replicaNum
=
3
;
...
...
@@ -28,8 +28,8 @@ SSyncCfg* createSyncOldCfg() {
return
pCfg
;
}
SSyncCfg
*
createSyncNewCfg
()
{
SSyncCfg
*
pCfg
=
(
SSyncCfg
*
)
taosMemoryMalloc
(
sizeof
(
SSyncCfg
));
SSyncCfg
*
createSyncNewCfg
()
{
SSyncCfg
*
pCfg
=
(
SSyncCfg
*
)
taosMemoryMalloc
(
sizeof
(
SSyncCfg
));
memset
(
pCfg
,
0
,
sizeof
(
SSyncCfg
));
pCfg
->
replicaNum
=
3
;
...
...
@@ -44,9 +44,9 @@ SSyncCfg* createSyncNewCfg() {
SyncReconfigFinish
*
createMsg
()
{
SyncReconfigFinish
*
pMsg
=
syncReconfigFinishBuild
(
1234
);
SSyncCfg
*
pOld
=
createSyncOldCfg
();
SSyncCfg
*
pNew
=
createSyncNewCfg
();
SSyncCfg
*
pOld
=
createSyncOldCfg
();
SSyncCfg
*
pNew
=
createSyncNewCfg
();
pMsg
->
oldCfg
=
*
pOld
;
pMsg
->
newCfg
=
*
pNew
;
...
...
@@ -60,18 +60,16 @@ SyncReconfigFinish *createMsg() {
return
pMsg
;
}
void
test1
()
{
SyncReconfigFinish
*
pMsg
=
createMsg
();
syncReconfigFinishLog2
((
char
*
)
"test1:"
,
pMsg
);
syncReconfigFinishDestroy
(
pMsg
);
}
void
test2
()
{
SyncReconfigFinish
*
pMsg
=
createMsg
();
uint32_t
len
=
pMsg
->
bytes
;
char
*
serialized
=
(
char
*
)
taosMemoryMalloc
(
len
);
uint32_t
len
=
pMsg
->
bytes
;
char
*
serialized
=
(
char
*
)
taosMemoryMalloc
(
len
);
syncReconfigFinishSerialize
(
pMsg
,
serialized
,
len
);
SyncReconfigFinish
*
pMsg2
=
syncReconfigFinishBuild
(
1000
);
syncReconfigFinishDeserialize
(
serialized
,
len
,
pMsg2
);
...
...
@@ -84,8 +82,8 @@ void test2() {
void
test3
()
{
SyncReconfigFinish
*
pMsg
=
createMsg
();
uint32_t
len
;
char
*
serialized
=
syncReconfigFinishSerialize2
(
pMsg
,
&
len
);
uint32_t
len
;
char
*
serialized
=
syncReconfigFinishSerialize2
(
pMsg
,
&
len
);
SyncReconfigFinish
*
pMsg2
=
syncReconfigFinishDeserialize2
(
serialized
,
len
);
syncReconfigFinishLog2
((
char
*
)
"test3: SyncReconfigFinishSerialize2 -> syncReconfigFinishDeserialize2 "
,
pMsg2
);
...
...
@@ -96,7 +94,7 @@ void test3() {
void
test4
()
{
SyncReconfigFinish
*
pMsg
=
createMsg
();
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
;
syncReconfigFinish2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncReconfigFinish
*
pMsg2
=
(
SyncReconfigFinish
*
)
taosMemoryMalloc
(
rpcMsg
.
contLen
);
syncReconfigFinishFromRpcMsg
(
&
rpcMsg
,
pMsg2
);
...
...
@@ -109,7 +107,7 @@ void test4() {
void
test5
()
{
SyncReconfigFinish
*
pMsg
=
createMsg
();
SRpcMsg
rpcMsg
;
SRpcMsg
rpcMsg
;
syncReconfigFinish2RpcMsg
(
pMsg
,
&
rpcMsg
);
SyncReconfigFinish
*
pMsg2
=
syncReconfigFinishFromRpcMsg2
(
&
rpcMsg
);
syncReconfigFinishLog2
((
char
*
)
"test5: syncReconfigFinish2RpcMsg -> syncReconfigFinishFromRpcMsg2 "
,
pMsg2
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录