From 5a4cc36079728eeacdbed652ccc41600a3a146a7 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Wed, 11 Jan 2023 20:44:29 +0800 Subject: [PATCH] enh: skip WAL forceSync for single replica vgroup --- include/libs/sync/sync.h | 2 +- source/libs/sync/src/syncAppendEntries.c | 4 ++-- source/libs/sync/src/syncMain.c | 4 ++-- source/libs/sync/src/syncPipeline.c | 11 ++++++++--- source/libs/sync/src/syncRaftLog.c | 7 ++----- 5 files changed, 15 insertions(+), 13 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 4c23c1f557..defafce30e 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -193,7 +193,7 @@ typedef struct SSyncLogStore { SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore); SyncTerm (*syncLogLastTerm)(struct SSyncLogStore* pLogStore); - int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); + int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forcSync); int32_t (*syncLogGetEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry); int32_t (*syncLogTruncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index e77a8d4be3..948d2f53e7 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -357,7 +357,7 @@ int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ASSERT(pAppendEntry->index == appendIndex); // append - code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry, false); if (code != 0) { char logBuf[128]; snprintf(logBuf, sizeof(logBuf), "ignore, append error, append-index:%" PRId64, appendIndex); @@ -398,7 +398,7 @@ int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } // append - code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry, false); if (code != 0) { char logBuf[128]; snprintf(logBuf, sizeof(logBuf), "ignore, log not exist, append error, append-index:%" PRId64, appendIndex); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index a339cb9857..51a7fa8118 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2478,7 +2478,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) { LRUHandle* h = NULL; if (ths->state == TAOS_SYNC_STATE_LEADER) { - int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); + int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false); if (code != 0) { sError("append noop error"); return -1; @@ -2721,7 +2721,7 @@ int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRe if (ths->state == TAOS_SYNC_STATE_LEADER) { // append entry - code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false); if (code != 0) { if (ths->replicaNum == 1) { if (h) { diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index b61fc2e90d..1d2b0b57fe 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -364,7 +364,11 @@ _out: return ret; } -int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { +static inline bool syncLogStoreNeedFlush(SSyncRaftEntry* pEntry, int32_t replicaNum) { + return (replicaNum > 1) && (pEntry->originalRpcType == TDMT_VND_COMMIT); +} + +int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaftEntry* pEntry) { ASSERT(pEntry->index >= 0); SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore); if (lastVer >= pEntry->index && pLogStore->syncLogTruncate(pLogStore, pEntry->index) < 0) { @@ -374,7 +378,8 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { lastVer = pLogStore->syncLogLastIndex(pLogStore); ASSERT(pEntry->index == lastVer + 1); - if (pLogStore->syncLogAppendEntry(pLogStore, pEntry) < 0) { + bool doFsync = syncLogStoreNeedFlush(pEntry, pNode->replicaNum); + if (pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync) < 0) { sError("failed to append sync log entry since %s. index:%" PRId64 ", term:%" PRId64 "", terrstr(), pEntry->index, pEntry->term); return -1; @@ -436,7 +441,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p (void)syncNodeReplicateWithoutLock(pNode); // persist - if (syncLogStorePersist(pLogStore, pEntry) < 0) { + if (syncLogStorePersist(pLogStore, pNode, pEntry) < 0) { sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId, terrstr(), pEntry->index); goto _out; diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index ca6d3c314f..e6569d9974 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -23,7 +23,7 @@ // public function static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex); -static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); +static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forceSync); static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex); static bool raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index); static int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); @@ -192,9 +192,7 @@ SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) { return SYNC_TERM_INVALID; } -static inline bool raftLogForceSync(SSyncRaftEntry* pEntry) { return (pEntry->originalRpcType == TDMT_VND_COMMIT); } - -static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { +static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forceSync) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; @@ -221,7 +219,6 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ASSERT(pEntry->index == index); - bool forceSync = raftLogForceSync(pEntry); walFsync(pWal, forceSync); sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index, -- GitLab