Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
5b8441f4
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
5b8441f4
编写于
7月 05, 2022
作者:
L
Li Minghao
提交者:
GitHub
7月 05, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #14554 from taosdata/feature/3.0_mhli
refactor(sync): add resp syncRespCleanByTTL
上级
2968ac85
103a9cd9
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
182 addition
and
46 deletion
+182
-46
source/dnode/mnode/impl/src/mndMain.c
source/dnode/mnode/impl/src/mndMain.c
+2
-0
source/libs/sync/inc/syncSnapshot.h
source/libs/sync/inc/syncSnapshot.h
+3
-3
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+2
-1
source/libs/sync/src/syncRespMgr.c
source/libs/sync/src/syncRespMgr.c
+57
-1
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+80
-36
source/libs/sync/test/syncIndexTest.cpp
source/libs/sync/test/syncIndexTest.cpp
+7
-1
source/libs/sync/test/syncRespMgrTest.cpp
source/libs/sync/test/syncRespMgrTest.cpp
+31
-4
未找到文件。
source/dnode/mnode/impl/src/mndMain.c
浏览文件 @
5b8441f4
...
...
@@ -519,6 +519,8 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
}
}
syncNodeRelease
(
pSyncNode
);
if
(
code
!=
0
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
}
...
...
source/libs/sync/inc/syncSnapshot.h
浏览文件 @
5b8441f4
...
...
@@ -83,9 +83,9 @@ typedef struct SSyncSnapshotReceiver {
SSyncSnapshotReceiver
*
snapshotReceiverCreate
(
SSyncNode
*
pSyncNode
,
SRaftId
fromId
);
void
snapshotReceiverDestroy
(
SSyncSnapshotReceiver
*
pReceiver
);
int32_t
snapshotReceiverStart
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncTerm
privateTerm
,
SyncSnapshotSend
*
pBeginMsg
);
int32_t
snapshotReceiverStop
(
SSyncSnapshotReceiver
*
pReceiver
);
bool
snapshotReceiverIsStart
(
SSyncSnapshotReceiver
*
pReceiver
);
int32_t
snapshotReceiverStart
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncSnapshotSend
*
pBeginMsg
);
int32_t
snapshotReceiverStop
(
SSyncSnapshotReceiver
*
pReceiver
);
bool
snapshotReceiverIsStart
(
SSyncSnapshotReceiver
*
pReceiver
);
cJSON
*
snapshotReceiver2Json
(
SSyncSnapshotReceiver
*
pReceiver
);
char
*
snapshotReceiver2Str
(
SSyncSnapshotReceiver
*
pReceiver
);
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
5b8441f4
...
...
@@ -158,13 +158,13 @@ int32_t syncSetStandby(int64_t rid) {
}
if
(
pSyncNode
->
state
!=
TAOS_SYNC_STATE_FOLLOWER
)
{
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
terrno
=
TSDB_CODE_SYN_IS_LEADER
;
}
else
{
terrno
=
TSDB_CODE_SYN_STANDBY_NOT_READY
;
}
sError
(
"failed to set standby since it is not follower, state:%s rid:%"
PRId64
,
syncStr
(
pSyncNode
->
state
),
rid
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
return
-
1
;
}
...
...
@@ -620,6 +620,7 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
SSyncNode
*
pSyncNode
=
taosAcquireRef
(
tsNodeRefId
,
rid
);
if
(
pSyncNode
==
NULL
)
{
taosReleaseRef
(
tsNodeRefId
,
rid
);
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
...
...
source/libs/sync/src/syncRespMgr.c
浏览文件 @
5b8441f4
...
...
@@ -14,6 +14,7 @@
*/
#include "syncRespMgr.h"
#include "syncRaftEntry.h"
#include "syncRaftStore.h"
SSyncRespMgr
*
syncRespMgrCreate
(
void
*
data
,
int64_t
ttl
)
{
...
...
@@ -116,4 +117,59 @@ void syncRespClean(SSyncRespMgr *pObj) {
taosThreadMutexUnlock
(
&
(
pObj
->
mutex
));
}
void
syncRespCleanByTTL
(
SSyncRespMgr
*
pObj
,
int64_t
ttl
)
{}
void
syncRespCleanByTTL
(
SSyncRespMgr
*
pObj
,
int64_t
ttl
)
{
SRespStub
*
pStub
=
(
SRespStub
*
)
taosHashIterate
(
pObj
->
pRespHash
,
NULL
);
int
cnt
=
0
;
SSyncNode
*
pSyncNode
=
pObj
->
data
;
SArray
*
delIndexArray
=
taosArrayInit
(
0
,
sizeof
(
SyncIndex
));
ASSERT
(
delIndexArray
!=
NULL
);
while
(
pStub
)
{
size_t
len
;
void
*
key
=
taosHashGetKey
(
pStub
,
&
len
);
SyncIndex
*
pIndex
=
(
SyncIndex
*
)
key
;
int64_t
nowMS
=
taosGetTimestampMs
();
if
(
nowMS
-
pStub
->
createTime
>
ttl
)
{
taosArrayPush
(
delIndexArray
,
pIndex
);
cnt
++
;
SSyncRaftEntry
*
pEntry
=
NULL
;
int32_t
code
=
0
;
if
(
pSyncNode
->
pLogStore
!=
NULL
)
{
code
=
pSyncNode
->
pLogStore
->
syncLogGetEntry
(
pSyncNode
->
pLogStore
,
*
pIndex
,
&
pEntry
);
if
(
code
==
0
&&
pEntry
!=
NULL
)
{
SFsmCbMeta
cbMeta
=
{
0
};
cbMeta
.
index
=
pEntry
->
index
;
cbMeta
.
lastConfigIndex
=
syncNodeGetSnapshotConfigIndex
(
pSyncNode
,
cbMeta
.
index
);
cbMeta
.
isWeak
=
pEntry
->
isWeak
;
cbMeta
.
code
=
TSDB_CODE_SYN_TIMEOUT
;
cbMeta
.
state
=
pSyncNode
->
state
;
cbMeta
.
seqNum
=
pEntry
->
seqNum
;
cbMeta
.
term
=
pEntry
->
term
;
cbMeta
.
currentTerm
=
pSyncNode
->
pRaftStore
->
currentTerm
;
cbMeta
.
flag
=
0
;
SRpcMsg
rpcMsg
=
pStub
->
rpcMsg
;
rpcMsg
.
pCont
=
rpcMallocCont
(
pEntry
->
dataLen
);
memcpy
(
rpcMsg
.
pCont
,
pEntry
->
data
,
pEntry
->
dataLen
);
pSyncNode
->
pFsm
->
FpCommitCb
(
pSyncNode
->
pFsm
,
&
rpcMsg
,
cbMeta
);
syncEntryDestory
(
pEntry
);
}
}
}
pStub
=
(
SRespStub
*
)
taosHashIterate
(
pObj
->
pRespHash
,
pStub
);
}
int32_t
arraySize
=
taosArrayGetSize
(
delIndexArray
);
sDebug
(
"vgId:%d, resp clean by ttl, cnt:%d, array-size:%d"
,
pSyncNode
->
vgId
,
cnt
,
arraySize
);
for
(
int32_t
i
=
0
;
i
<
arraySize
;
++
i
)
{
SyncIndex
*
pIndex
=
taosArrayGet
(
delIndexArray
,
i
);
taosHashRemove
(
pObj
->
pRespHash
,
pIndex
,
sizeof
(
SyncIndex
));
}
taosArrayDestroy
(
delIndexArray
);
}
source/libs/sync/src/syncSnapshot.c
浏览文件 @
5b8441f4
...
...
@@ -22,9 +22,11 @@
#include "wal.h"
//----------------------------------
static
void
snapshotReceiverDoStart
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncTerm
privateTerm
,
SyncSnapshotSend
*
pBeginMsg
);
static
void
snapshotReceiverGotData
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncSnapshotSend
*
pMsg
);
static
void
snapshotSenderUpdateProgress
(
SSyncSnapshotSender
*
pSender
,
SyncSnapshotRsp
*
pMsg
);
static
void
snapshotReceiverDoStart
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncSnapshotSend
*
pBeginMsg
);
static
void
snapshotReceiverForceStop
(
SSyncSnapshotReceiver
*
pReceiver
);
static
void
snapshotReceiverGotData
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncSnapshotSend
*
pMsg
);
static
int32_t
snapshotReceiverFinish
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncSnapshotSend
*
pMsg
);
//----------------------------------
SSyncSnapshotSender
*
snapshotSenderCreate
(
SSyncNode
*
pSyncNode
,
int32_t
replicaIndex
)
{
...
...
@@ -68,7 +70,9 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
// close reader
if
(
pSender
->
pReader
!=
NULL
)
{
int32_t
ret
=
pSender
->
pSyncNode
->
pFsm
->
FpSnapshotStopRead
(
pSender
->
pSyncNode
->
pFsm
,
pSender
->
pReader
);
ASSERT
(
ret
==
0
);
if
(
ret
!=
0
)
{
syncNodeErrorLog
(
pSender
->
pSyncNode
,
"stop reader error"
);
}
pSender
->
pReader
=
NULL
;
}
...
...
@@ -79,7 +83,12 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
bool
snapshotSenderIsStart
(
SSyncSnapshotSender
*
pSender
)
{
return
pSender
->
start
;
}
// begin send snapshot by snapshot, pReader
// begin send snapshot by param, snapshot, pReader
//
// action:
// 1. assert reader not start
// 2. update state
// 3. send first snapshot block
int32_t
snapshotSenderStart
(
SSyncSnapshotSender
*
pSender
,
SSnapshotParam
snapshotParam
,
SSnapshot
snapshot
,
void
*
pReader
)
{
ASSERT
(
!
snapshotSenderIsStart
(
pSender
));
...
...
@@ -98,7 +107,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapsho
// update term
pSender
->
term
=
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
;
++
(
pSender
->
privateTerm
);
++
(
pSender
->
privateTerm
);
// increase private term
// update state
pSender
->
finish
=
false
;
...
...
@@ -114,9 +123,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapsho
code
=
pSender
->
pSyncNode
->
pLogStore
->
syncLogGetEntry
(
pSender
->
pSyncNode
->
pLogStore
,
pSender
->
snapshot
.
lastConfigIndex
,
&
pEntry
);
if
(
code
==
0
)
{
ASSERT
(
pEntry
!=
NULL
);
if
(
code
==
0
&&
pEntry
!=
NULL
)
{
SRpcMsg
rpcMsg
;
syncEntry2OriginalRpc
(
pEntry
,
&
rpcMsg
);
...
...
@@ -207,6 +214,8 @@ int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) {
pSender
->
start
=
false
;
pSender
->
finish
=
finish
;
// do not update term, maybe print
// event log
do
{
char
*
eventLog
=
snapshotSender2SimpleStr
(
pSender
,
"snapshot sender stop"
);
...
...
@@ -243,6 +252,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
pMsg
->
srcId
=
pSender
->
pSyncNode
->
myRaftId
;
pMsg
->
destId
=
(
pSender
->
pSyncNode
->
replicasId
)[
pSender
->
replicaIndex
];
pMsg
->
term
=
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
;
pMsg
->
beginIndex
=
pSender
->
snapshotParam
.
start
;
pMsg
->
lastIndex
=
pSender
->
snapshot
.
lastApplyIndex
;
pMsg
->
lastTerm
=
pSender
->
snapshot
.
lastApplyTerm
;
pMsg
->
lastConfigIndex
=
pSender
->
snapshot
.
lastConfigIndex
;
...
...
@@ -281,11 +291,13 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
pMsg
->
srcId
=
pSender
->
pSyncNode
->
myRaftId
;
pMsg
->
destId
=
(
pSender
->
pSyncNode
->
replicasId
)[
pSender
->
replicaIndex
];
pMsg
->
term
=
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
;
pMsg
->
beginIndex
=
pSender
->
snapshotParam
.
start
;
pMsg
->
lastIndex
=
pSender
->
snapshot
.
lastApplyIndex
;
pMsg
->
lastTerm
=
pSender
->
snapshot
.
lastApplyTerm
;
pMsg
->
lastConfigIndex
=
pSender
->
snapshot
.
lastConfigIndex
;
pMsg
->
lastConfig
=
pSender
->
lastConfig
;
pMsg
->
seq
=
pSender
->
seq
;
pMsg
->
privateTerm
=
pSender
->
privateTerm
;
memcpy
(
pMsg
->
data
,
pSender
->
pCurrentBlock
,
pSender
->
blockLen
);
// send msg
...
...
@@ -305,6 +317,12 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
return
0
;
}
static
void
snapshotSenderUpdateProgress
(
SSyncSnapshotSender
*
pSender
,
SyncSnapshotRsp
*
pMsg
)
{
ASSERT
(
pMsg
->
ack
==
pSender
->
seq
);
pSender
->
ack
=
pMsg
->
ack
;
++
(
pSender
->
seq
);
}
cJSON
*
snapshotSender2Json
(
SSyncSnapshotSender
*
pSender
)
{
char
u64buf
[
128
];
cJSON
*
pRoot
=
cJSON_CreateObject
();
...
...
@@ -371,10 +389,11 @@ char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
syncUtilU642Addr
(
destId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
snprintf
(
s
,
len
,
"%s {%p laindex:%ld laterm:%lu lcindex:%ld seq:%d ack:%d finish:%d pterm:%lu replica-index:%d %s:%d}"
,
event
,
pSender
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
finish
,
pSender
->
privateTerm
,
pSender
->
replicaIndex
,
host
,
port
);
"%s {%p s-param:%ld e-param:%ld laindex:%ld laterm:%lu lcindex:%ld seq:%d ack:%d finish:%d pterm:%lu "
"replica-index:%d %s:%d}"
,
event
,
pSender
,
pSender
->
snapshotParam
.
start
,
pSender
->
snapshotParam
.
end
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
finish
,
pSender
->
privateTerm
,
pSender
->
replicaIndex
,
host
,
port
);
return
s
;
}
...
...
@@ -429,11 +448,10 @@ bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceive
// static do start by privateTerm, pBeginMsg
// receive first snapshot data
// write first block data
static
void
snapshotReceiverDoStart
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncTerm
privateTerm
,
SyncSnapshotSend
*
pBeginMsg
)
{
static
void
snapshotReceiverDoStart
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncSnapshotSend
*
pBeginMsg
)
{
// update state
pReceiver
->
term
=
pReceiver
->
pSyncNode
->
pRaftStore
->
currentTerm
;
pReceiver
->
privateTerm
=
privateTerm
;
pReceiver
->
privateTerm
=
p
BeginMsg
->
p
rivateTerm
;
pReceiver
->
ack
=
SYNC_SNAPSHOT_SEQ_BEGIN
;
pReceiver
->
fromId
=
pBeginMsg
->
srcId
;
pReceiver
->
start
=
true
;
...
...
@@ -445,7 +463,7 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p
pReceiver
->
snapshotParam
.
start
=
pBeginMsg
->
beginIndex
;
pReceiver
->
snapshotParam
.
end
=
pBeginMsg
->
lastIndex
;
//
write data
//
start writer
ASSERT
(
pReceiver
->
pWriter
==
NULL
);
int32_t
ret
=
pReceiver
->
pSyncNode
->
pFsm
->
FpSnapshotStartWrite
(
pReceiver
->
pSyncNode
->
pFsm
,
&
(
pReceiver
->
snapshotParam
),
&
(
pReceiver
->
pWriter
));
...
...
@@ -481,10 +499,10 @@ static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver
// if already start, force close, start again
int32_t
snapshotReceiverStart
(
SSyncSnapshotReceiver
*
pReceiver
,
Sync
Term
privateTerm
,
Sync
SnapshotSend
*
pBeginMsg
)
{
int32_t
snapshotReceiverStart
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncSnapshotSend
*
pBeginMsg
)
{
if
(
!
snapshotReceiverIsStart
(
pReceiver
))
{
// first start
snapshotReceiverDoStart
(
pReceiver
,
p
rivateTerm
,
p
BeginMsg
);
snapshotReceiverDoStart
(
pReceiver
,
pBeginMsg
);
}
else
{
// already start
...
...
@@ -494,12 +512,14 @@ int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncTerm private
snapshotReceiverForceStop
(
pReceiver
);
// start again
snapshotReceiverDoStart
(
pReceiver
,
p
rivateTerm
,
p
BeginMsg
);
snapshotReceiverDoStart
(
pReceiver
,
pBeginMsg
);
}
return
0
;
}
// just set start = false
// FpSnapshotStopWrite should not be called, assert writer == NULL
int32_t
snapshotReceiverStop
(
SSyncSnapshotReceiver
*
pReceiver
)
{
if
(
pReceiver
->
pWriter
!=
NULL
)
{
int32_t
ret
=
...
...
@@ -522,6 +542,7 @@ int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
return
0
;
}
// when recv last snapshot block, apply data into snapshot
static
int32_t
snapshotReceiverFinish
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncSnapshotSend
*
pMsg
)
{
ASSERT
(
pMsg
->
seq
==
SYNC_SNAPSHOT_SEQ_END
);
...
...
@@ -550,7 +571,7 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
pReceiver
->
pSyncNode
->
commitIndex
=
pReceiver
->
snapshot
.
lastApplyIndex
;
}
// stop writer
// stop writer
, apply data
code
=
pReceiver
->
pSyncNode
->
pFsm
->
FpSnapshotStopWrite
(
pReceiver
->
pSyncNode
->
pFsm
,
pReceiver
->
pWriter
,
true
);
if
(
code
!=
0
)
{
syncNodeErrorLog
(
pReceiver
->
pSyncNode
,
"snapshot stop writer true error"
);
...
...
@@ -579,15 +600,20 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
return
0
;
}
// apply data block
// update progress
static
void
snapshotReceiverGotData
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncSnapshotSend
*
pMsg
)
{
ASSERT
(
pMsg
->
seq
==
pReceiver
->
ack
+
1
);
if
(
pReceiver
->
pWriter
!=
NULL
)
{
if
(
pMsg
->
dataLen
>
0
)
{
// apply data block
int32_t
code
=
pReceiver
->
pSyncNode
->
pFsm
->
FpSnapshotDoWrite
(
pReceiver
->
pSyncNode
->
pFsm
,
pReceiver
->
pWriter
,
pMsg
->
data
,
pMsg
->
dataLen
);
ASSERT
(
code
==
0
);
}
// update progress
pReceiver
->
ack
=
pMsg
->
seq
;
// event log
...
...
@@ -665,14 +691,23 @@ char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event)
uint16_t
port
;
syncUtilU642Addr
(
fromId
.
addr
,
host
,
sizeof
(
host
),
&
port
);
snprintf
(
s
,
len
,
"%s {%p start:%d ack:%d term:%lu pterm:%lu from:%s:%d laindex:%ld laterm:%lu lcindex:%ld}"
,
event
,
pReceiver
,
pReceiver
->
start
,
pReceiver
->
ack
,
pReceiver
->
term
,
pReceiver
->
privateTerm
,
host
,
port
,
pReceiver
->
snapshot
.
lastApplyIndex
,
pReceiver
->
snapshot
.
lastApplyTerm
,
pReceiver
->
snapshot
.
lastConfigIndex
);
snprintf
(
s
,
len
,
"%s {%p start:%d ack:%d term:%lu pterm:%lu from:%s:%d s-param:%ld e-param:%ld laindex:%ld laterm:%lu "
"lcindex:%ld}"
,
event
,
pReceiver
,
pReceiver
->
start
,
pReceiver
->
ack
,
pReceiver
->
term
,
pReceiver
->
privateTerm
,
host
,
port
,
pReceiver
->
snapshotParam
.
start
,
pReceiver
->
snapshotParam
.
end
,
pReceiver
->
snapshot
.
lastApplyIndex
,
pReceiver
->
snapshot
.
lastApplyTerm
,
pReceiver
->
snapshot
.
lastConfigIndex
);
return
s
;
}
// receiver do something
// receiver on message
//
// condition 1, recv SYNC_SNAPSHOT_SEQ_BEGIN, start receiver, update privateTerm
// condition 2, recv SYNC_SNAPSHOT_SEQ_END, finish receiver(apply snapshot data, update commit index, maybe reconfig)
// condition 3, recv SYNC_SNAPSHOT_SEQ_FORCE_CLOSE, force close
// condition 4, got data, update ack
//
int32_t
syncNodeOnSnapshotSendCb
(
SSyncNode
*
pSyncNode
,
SyncSnapshotSend
*
pMsg
)
{
// get receiver
SSyncSnapshotReceiver
*
pReceiver
=
pSyncNode
->
pNewNodeReceiver
;
...
...
@@ -683,11 +718,13 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_FOLLOWER
)
{
if
(
pMsg
->
term
==
pSyncNode
->
pRaftStore
->
currentTerm
)
{
if
(
pMsg
->
seq
==
SYNC_SNAPSHOT_SEQ_BEGIN
)
{
// condition 1
// begin, no data
snapshotReceiverStart
(
pReceiver
,
pMsg
->
privateTerm
,
pMsg
);
snapshotReceiverStart
(
pReceiver
,
pMsg
);
needRsp
=
true
;
}
else
if
(
pMsg
->
seq
==
SYNC_SNAPSHOT_SEQ_END
)
{
// condition 2
// end, finish FSM
code
=
snapshotReceiverFinish
(
pReceiver
,
pMsg
);
if
(
code
==
0
)
{
...
...
@@ -697,7 +734,6 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// maybe update lastconfig
if
(
pMsg
->
lastConfigIndex
>=
SYNC_INDEX_BEGIN
)
{
// int32_t oldReplicaNum = pSyncNode->replicaNum;
SSyncCfg
oldSyncCfg
=
pSyncNode
->
pRaftCfg
->
cfg
;
// update new config myIndex
...
...
@@ -709,11 +745,13 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
}
}
else
if
(
pMsg
->
seq
==
SYNC_SNAPSHOT_SEQ_FORCE_CLOSE
)
{
// condition 3
// force close
snapshotReceiverForceStop
(
pReceiver
);
needRsp
=
false
;
}
else
if
(
pMsg
->
seq
>
SYNC_SNAPSHOT_SEQ_BEGIN
&&
pMsg
->
seq
<
SYNC_SNAPSHOT_SEQ_END
)
{
// condition 4
// transfering
if
(
pMsg
->
seq
==
pReceiver
->
ack
+
1
)
{
snapshotReceiverGotData
(
pReceiver
,
pMsg
);
...
...
@@ -752,6 +790,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
syncNodeSendMsgById
(
&
(
pRspMsg
->
destId
),
pSyncNode
,
&
rpcMsg
);
syncSnapshotRspDestroy
(
pRspMsg
);
}
}
else
{
// error log
do
{
...
...
@@ -759,6 +798,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
syncNodeErrorLog
(
pSyncNode
,
eventLog
);
taosMemoryFree
(
eventLog
);
}
while
(
0
);
return
-
1
;
}
}
else
{
// error log
...
...
@@ -767,19 +808,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
syncNodeErrorLog
(
pSyncNode
,
eventLog
);
taosMemoryFree
(
eventLog
);
}
while
(
0
);
return
-
1
;
}
return
0
;
}
static
void
snapshotSenderUpdateProgress
(
SSyncSnapshotSender
*
pSender
,
SyncSnapshotRsp
*
pMsg
)
{
ASSERT
(
pMsg
->
ack
==
pSender
->
seq
);
pSender
->
ack
=
pMsg
->
ack
;
++
(
pSender
->
seq
);
}
// sender receives ack, set seq = ack + 1, send msg from seq
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
// sender on message
//
// condition 1 sender receives SYNC_SNAPSHOT_SEQ_END, close sender
// condition 2 sender receives ack, set seq = ack + 1, send msg from seq
// condition 3 sender receives error msg, just print error log
//
int32_t
syncNodeOnSnapshotRspCb
(
SSyncNode
*
pSyncNode
,
SyncSnapshotRsp
*
pMsg
)
{
// if already drop replica, do not process
if
(
!
syncNodeInRaftGroup
(
pSyncNode
,
&
(
pMsg
->
srcId
))
&&
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
...
...
@@ -794,12 +835,14 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
// state, term, seq/ack
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
if
(
pMsg
->
term
==
pSyncNode
->
pRaftStore
->
currentTerm
)
{
// receiver ack is finish, close sender
// condition 1
// receive ack is finish, close sender
if
(
pMsg
->
ack
==
SYNC_SNAPSHOT_SEQ_END
)
{
snapshotSenderStop
(
pSender
,
true
);
return
0
;
}
// condition 2
// send next msg
if
(
pMsg
->
ack
==
pSender
->
seq
)
{
// update sender ack
...
...
@@ -807,6 +850,7 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
snapshotSend
(
pSender
);
}
else
if
(
pMsg
->
ack
==
pSender
->
seq
-
1
)
{
// maybe resend
snapshotReSend
(
pSender
);
}
else
{
...
...
source/libs/sync/test/syncIndexTest.cpp
浏览文件 @
5b8441f4
...
...
@@ -8,7 +8,13 @@ void print(SHashObj *pNextIndex) {
printf
(
"----------------
\n
"
);
uint64_t
*
p
=
(
uint64_t
*
)
taosHashIterate
(
pNextIndex
,
NULL
);
while
(
p
)
{
printf
(
"%lu
\n
"
,
*
p
);
size_t
len
;
void
*
key
=
taosHashGetKey
(
p
,
&
len
);
SRaftId
*
pRaftId
=
(
SRaftId
*
)
key
;
printf
(
"key:<%lu, %d>, value:%lu
\n
"
,
pRaftId
->
addr
,
pRaftId
->
vgId
,
*
p
);
p
=
(
uint64_t
*
)
taosHashIterate
(
pNextIndex
,
p
);
}
}
...
...
source/libs/sync/test/syncRespMgrTest.cpp
浏览文件 @
5b8441f4
...
...
@@ -73,9 +73,15 @@ void syncRespMgrGetAndDelTest(uint64_t i) {
}
}
SSyncNode
*
createSyncNode
()
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosMemoryMalloc
(
sizeof
(
SSyncNode
));
memset
(
pSyncNode
,
0
,
sizeof
(
SSyncNode
));
return
pSyncNode
;
}
void
test1
()
{
printf
(
"------- test1 ---------
\n
"
);
pMgr
=
syncRespMgrCreate
(
NULL
,
0
);
pMgr
=
syncRespMgrCreate
(
createSyncNode
()
,
0
);
assert
(
pMgr
!=
NULL
);
syncRespMgrInsert
(
10
);
...
...
@@ -100,7 +106,7 @@ void test1() {
void
test2
()
{
printf
(
"------- test2 ---------
\n
"
);
pMgr
=
syncRespMgrCreate
(
NULL
,
0
);
pMgr
=
syncRespMgrCreate
(
createSyncNode
()
,
0
);
assert
(
pMgr
!=
NULL
);
syncRespMgrInsert
(
10
);
...
...
@@ -117,7 +123,7 @@ void test2() {
void
test3
()
{
printf
(
"------- test3 ---------
\n
"
);
pMgr
=
syncRespMgrCreate
(
NULL
,
0
);
pMgr
=
syncRespMgrCreate
(
createSyncNode
()
,
0
);
assert
(
pMgr
!=
NULL
);
syncRespMgrInsert
(
10
);
...
...
@@ -132,13 +138,34 @@ void test3() {
syncRespMgrDestroy
(
pMgr
);
}
void
test4
()
{
printf
(
"------- test4 ---------
\n
"
);
pMgr
=
syncRespMgrCreate
(
createSyncNode
(),
2
);
assert
(
pMgr
!=
NULL
);
syncRespMgrInsert
(
5
);
syncRespMgrPrint
();
taosMsleep
(
3000
);
syncRespMgrInsert
(
3
);
syncRespMgrPrint
();
printf
(
"====== after clean ttl
\n
"
);
syncRespClean
(
pMgr
);
syncRespMgrPrint
();
syncRespMgrDestroy
(
pMgr
);
}
int
main
()
{
tsAsyncLog
=
0
;
sDebugFlag
=
DEBUG_TRACE
+
DEBUG_SCREEN
+
DEBUG_FILE
;
sDebugFlag
=
DEBUG_
DEBUG
+
DEBUG_
TRACE
+
DEBUG_SCREEN
+
DEBUG_FILE
;
logTest
();
test1
();
test2
();
test3
();
test4
();
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录