提交 44a893a1 编写于 作者: M Minghao Li

fix(sync): snapshot maybe change when sending

上级 a56f149a
...@@ -165,15 +165,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -165,15 +165,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
// has snapshot // has snapshot
if (syncNodeHasSnapshot(ths)) { if (syncNodeHasSnapshot(ths)) {
SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
if (nextIndex <= snapshot.lastApplyIndex) {
// ASSERT(nextIndex == snapshot.lastApplyIndex);
nextIndex = snapshot.lastApplyIndex + 1;
sInfo("reset new nextIndex %ld, snapshot.lastApplyIndex:%ld", nextIndex, snapshot.lastApplyIndex);
// start send snapshot
// get sender // get sender
SSyncSnapshotSender* pSender = NULL; SSyncSnapshotSender* pSender = NULL;
for (int i = 0; i < ths->replicaNum; ++i) { for (int i = 0; i < ths->replicaNum; ++i) {
...@@ -183,11 +174,24 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -183,11 +174,24 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
} }
ASSERT(pSender != NULL); ASSERT(pSender != NULL);
// calculate sentryIndex
SyncIndex sentryIndex;
if (pSender->start) {
sentryIndex = pSender->snapshot.lastApplyIndex;
} else {
// start send snapshot
if (!(pSender->term == ths->pRaftStore->currentTerm && pSender->finish == true)) { if (!(pSender->term == ths->pRaftStore->currentTerm && pSender->finish == true)) {
snapshotSenderStart(pSender); snapshotSenderStart(pSender);
} else { } else {
sInfo("snapshot send finish, send_term:%lu, current_term:%lu", pSender->term, ths->pRaftStore->currentTerm); sInfo("snapshot send finish, send_term:%lu, current_term:%lu", pSender->term, ths->pRaftStore->currentTerm);
} }
sentryIndex = pSender->snapshot.lastApplyIndex;
}
// update nextIndex to sentryIndex + 1
if (nextIndex <= sentryIndex) {
nextIndex = sentryIndex + 1;
} }
} }
......
...@@ -1693,6 +1693,7 @@ const char* syncStr(ESyncState state) { ...@@ -1693,6 +1693,7 @@ const char* syncStr(ESyncState state) {
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
int32_t code = 0; int32_t code = 0;
sInfo("sync commit from %ld to %ld, flag:0x%lX", beginIndex, endIndex, flag);
// maybe execute by leader, skip snapshot // maybe execute by leader, skip snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册