diff --git a/source/libs/sync/inc/syncCommit.h b/source/libs/sync/inc/syncCommit.h index c76236d5bfd3ca3050c7dbe3900a5644c4388911..7458ce28ab8e02196d74b9adbac80deb6acb7690 100644 --- a/source/libs/sync/inc/syncCommit.h +++ b/source/libs/sync/inc/syncCommit.h @@ -49,6 +49,7 @@ extern "C" { // IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex] // /\ UNCHANGED <> // +void syncOneReplicaAdvance(SSyncNode* pSyncNode); void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode); bool syncAgreeIndex(SSyncNode* pSyncNode, SRaftId* pRaftId, SyncIndex index); bool syncAgree(SSyncNode* pSyncNode, SyncIndex index); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 3aeb2d30b54552040bb47b19f6cc66ef24838964..95787bbe6c34df233cefb1618e681918cb531772 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -44,6 +44,56 @@ // IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex] // /\ UNCHANGED <> // +void syncOneReplicaAdvance(SSyncNode* pSyncNode) { + if (pSyncNode == NULL) { + sError("pSyncNode is NULL"); + return; + } + + if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { + syncNodeErrorLog(pSyncNode, "not leader, can not advance commit index"); + return; + } + + if (pSyncNode->replicaNum != 1) { + syncNodeErrorLog(pSyncNode, "not one replica, can not advance commit index"); + return; + } + + // advance commit index to snapshot first + SSnapshot snapshot; + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); + if (snapshot.lastApplyIndex > 0 && snapshot.lastApplyIndex > pSyncNode->commitIndex) { + SyncIndex commitBegin = pSyncNode->commitIndex; + SyncIndex commitEnd = snapshot.lastApplyIndex; + pSyncNode->commitIndex = snapshot.lastApplyIndex; + + char eventLog[128]; + snprintf(eventLog, sizeof(eventLog), "commit by snapshot from index:%" PRId64 " to index:%" PRId64, commitBegin, + commitEnd); + syncNodeEventLog(pSyncNode, eventLog); + } + + // advance commit index as large as possible + SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); + if (lastIndex > pSyncNode->commitIndex) { + do { + char eventLog[128]; + snprintf(eventLog, sizeof(eventLog), "commit by wal from index:%" PRId64 " to index:%" PRId64, + pSyncNode->commitIndex + 1, lastIndex); + syncNodeEventLog(pSyncNode, eventLog); + } while (0); + + pSyncNode->commitIndex = lastIndex; + } + + // call back Wal + SyncIndex walCommitVer = logStoreWalCommitVer(pSyncNode->pLogStore); + if (pSyncNode->commitIndex > walCommitVer) { + pSyncNode->pLogStore->syncLogUpdateCommitIndex(pSyncNode->pLogStore, pSyncNode->commitIndex); + } +} + void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { if (pSyncNode == NULL) { sError("pSyncNode is NULL"); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 27f4eeedb2aa5e98c59755c07f388198681e7e22..47b8614717152ea7b275b88cf354ac3ce4aa9666 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -3010,7 +3010,11 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd // if only myself, maybe commit right now if (ths->replicaNum == 1) { - syncMaybeAdvanceCommitIndex(ths); + if (syncNodeIsMnode(ths)) { + syncMaybeAdvanceCommitIndex(ths); + } else { + syncOneReplicaAdvance(ths); + } } }