提交 ddb815ac 编写于 作者: M Minghao Li

refactor(sync): optimize advance commit index when one replica

上级 47d9fe64
...@@ -49,6 +49,7 @@ extern "C" { ...@@ -49,6 +49,7 @@ extern "C" {
// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex] // IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex]
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>> // /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
// //
void syncOneReplicaAdvance(SSyncNode* pSyncNode);
void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode); void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode);
bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index); bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index);
bool syncAgree(SSyncNode* pSyncNode, SyncIndex index); bool syncAgree(SSyncNode* pSyncNode, SyncIndex index);
......
...@@ -44,6 +44,56 @@ ...@@ -44,6 +44,56 @@
// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex] // IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex]
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>> // /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
// //
void syncOneReplicaAdvance(SSyncNode* pSyncNode) {
if (pSyncNode == NULL) {
sError("pSyncNode is NULL");
return;
}
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
syncNodeErrorLog(pSyncNode, "not leader, can not advance commit index");
return;
}
if (pSyncNode->replicaNum != 1) {
syncNodeErrorLog(pSyncNode, "not one replica, can not advance commit index");
return;
}
// advance commit index to snapshot first
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex > 0 && snapshot.lastApplyIndex > pSyncNode->commitIndex) {
SyncIndex commitBegin = pSyncNode->commitIndex;
SyncIndex commitEnd = snapshot.lastApplyIndex;
pSyncNode->commitIndex = snapshot.lastApplyIndex;
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, commitBegin,
commitEnd);
syncNodeEventLog(pSyncNode, eventLog);
}
// advance commit index as large as possible
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
if (lastIndex > pSyncNode->commitIndex) {
do {
char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "commit by wal from index:%" PRId64 " to index:%" PRId64,
pSyncNode->commitIndex + 1, lastIndex);
syncNodeEventLog(pSyncNode, eventLog);
} while (0);
pSyncNode->commitIndex = lastIndex;
}
// call back Wal
SyncIndex walCommitVer = logStoreWalCommitVer(pSyncNode->pLogStore);
if (pSyncNode->commitIndex > walCommitVer) {
pSyncNode->pLogStore->syncLogUpdateCommitIndex(pSyncNode->pLogStore, pSyncNode->commitIndex);
}
}
void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
sError("pSyncNode is NULL"); sError("pSyncNode is NULL");
......
...@@ -3010,7 +3010,11 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd ...@@ -3010,7 +3010,11 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd
// if only myself, maybe commit right now // if only myself, maybe commit right now
if (ths->replicaNum == 1) { if (ths->replicaNum == 1) {
syncMaybeAdvanceCommitIndex(ths); if (syncNodeIsMnode(ths)) {
syncMaybeAdvanceCommitIndex(ths);
} else {
syncOneReplicaAdvance(ths);
}
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册