提交 27d72283 编写于 作者: M Minghao Li

refactor(sync): make leader life longer

上级 327b938c
......@@ -717,24 +717,15 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// maybe update commit index, leader notice me
if (pMsg->commitIndex > ths->commitIndex) {
// has commit entry in local
if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
// advance commit index to sanpshot first
SSnapshot snapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) {
SyncIndex commitBegin = ths->commitIndex;
SyncIndex commitEnd = snapshot.lastApplyIndex;
ths->commitIndex = snapshot.lastApplyIndex;
SyncIndex lastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64,
commitBegin, commitEnd);
syncNodeEventLog(ths, eventLog);
}
SyncIndex beginIndex = 0;
SyncIndex endIndex = -1;
SyncIndex beginIndex = ths->commitIndex + 1;
SyncIndex endIndex = pMsg->commitIndex;
// has commit entry in local
if (pMsg->commitIndex <= lastIndex) {
beginIndex = ths->commitIndex + 1;
endIndex = pMsg->commitIndex;
// update commit index
ths->commitIndex = pMsg->commitIndex;
......@@ -743,10 +734,22 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
ASSERT(code == 0);
code = syncNodeCommit(ths, beginIndex, endIndex, ths->state);
} else if (pMsg->commitIndex > lastIndex && ths->commitIndex < lastIndex) {
beginIndex = ths->commitIndex + 1;
endIndex = lastIndex;
// update commit index, speed up
ths->commitIndex = lastIndex;
// call back Wal
code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
ASSERT(code == 0);
}
code = syncNodeCommit(ths, beginIndex, endIndex, ths->state);
ASSERT(code == 0);
}
return 0;
}
} while (0);
......
......@@ -2803,6 +2803,23 @@ bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
}
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
if (beginIndex > endIndex) {
return 0;
}
// advance commit index to sanpshot first
SSnapshot snapshot = {0};
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex >= beginIndex) {
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, beginIndex,
snapshot.lastApplyIndex);
syncNodeEventLog(ths, eventLog);
// update begin index
beginIndex = snapshot.lastApplyIndex + 1;
}
int32_t code = 0;
ESyncState state = flag;
......
......@@ -140,9 +140,6 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
sError("vgId:%d, sync get pre term error, nextIndex:%" PRId64 ", update next-index:%" PRId64
", match-index:%d, raftid:%" PRId64,
pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr);
// syncNodeRestartNowHeartbeatTimer(pSyncNode);
syncNodeStartNowHeartbeatTimer(pSyncNode);
return -1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册