提交 5a4cc360 编写于 作者: B Benguang Zhao

enh: skip WAL forceSync for single replica vgroup

上级 6af99d75
...@@ -193,7 +193,7 @@ typedef struct SSyncLogStore { ...@@ -193,7 +193,7 @@ typedef struct SSyncLogStore {
SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore);
SyncTerm (*syncLogLastTerm)(struct SSyncLogStore* pLogStore); SyncTerm (*syncLogLastTerm)(struct SSyncLogStore* pLogStore);
int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forcSync);
int32_t (*syncLogGetEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry); int32_t (*syncLogGetEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry);
int32_t (*syncLogTruncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex); int32_t (*syncLogTruncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
......
...@@ -357,7 +357,7 @@ int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -357,7 +357,7 @@ int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
ASSERT(pAppendEntry->index == appendIndex); ASSERT(pAppendEntry->index == appendIndex);
// append // append
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry, false);
if (code != 0) { if (code != 0) {
char logBuf[128]; char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "ignore, append error, append-index:%" PRId64, appendIndex); snprintf(logBuf, sizeof(logBuf), "ignore, append error, append-index:%" PRId64, appendIndex);
...@@ -398,7 +398,7 @@ int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -398,7 +398,7 @@ int32_t syncNodeOnAppendEntriesOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
} }
// append // append
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry, false);
if (code != 0) { if (code != 0) {
char logBuf[128]; char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "ignore, log not exist, append error, append-index:%" PRId64, appendIndex); snprintf(logBuf, sizeof(logBuf), "ignore, log not exist, append error, append-index:%" PRId64, appendIndex);
......
...@@ -2478,7 +2478,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) { ...@@ -2478,7 +2478,7 @@ static int32_t syncNodeAppendNoopOld(SSyncNode* ths) {
LRUHandle* h = NULL; LRUHandle* h = NULL;
if (ths->state == TAOS_SYNC_STATE_LEADER) { if (ths->state == TAOS_SYNC_STATE_LEADER) {
int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
if (code != 0) { if (code != 0) {
sError("append noop error"); sError("append noop error");
return -1; return -1;
...@@ -2721,7 +2721,7 @@ int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRe ...@@ -2721,7 +2721,7 @@ int32_t syncNodeOnClientRequestOld(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRe
if (ths->state == TAOS_SYNC_STATE_LEADER) { if (ths->state == TAOS_SYNC_STATE_LEADER) {
// append entry // append entry
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry, false);
if (code != 0) { if (code != 0) {
if (ths->replicaNum == 1) { if (ths->replicaNum == 1) {
if (h) { if (h) {
......
...@@ -364,7 +364,11 @@ _out: ...@@ -364,7 +364,11 @@ _out:
return ret; return ret;
} }
int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { static inline bool syncLogStoreNeedFlush(SSyncRaftEntry* pEntry, int32_t replicaNum) {
return (replicaNum > 1) && (pEntry->originalRpcType == TDMT_VND_COMMIT);
}
int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaftEntry* pEntry) {
ASSERT(pEntry->index >= 0); ASSERT(pEntry->index >= 0);
SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore); SyncIndex lastVer = pLogStore->syncLogLastIndex(pLogStore);
if (lastVer >= pEntry->index && pLogStore->syncLogTruncate(pLogStore, pEntry->index) < 0) { if (lastVer >= pEntry->index && pLogStore->syncLogTruncate(pLogStore, pEntry->index) < 0) {
...@@ -374,7 +378,8 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { ...@@ -374,7 +378,8 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
lastVer = pLogStore->syncLogLastIndex(pLogStore); lastVer = pLogStore->syncLogLastIndex(pLogStore);
ASSERT(pEntry->index == lastVer + 1); ASSERT(pEntry->index == lastVer + 1);
if (pLogStore->syncLogAppendEntry(pLogStore, pEntry) < 0) { bool doFsync = syncLogStoreNeedFlush(pEntry, pNode->replicaNum);
if (pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync) < 0) {
sError("failed to append sync log entry since %s. index:%" PRId64 ", term:%" PRId64 "", terrstr(), pEntry->index, sError("failed to append sync log entry since %s. index:%" PRId64 ", term:%" PRId64 "", terrstr(), pEntry->index,
pEntry->term); pEntry->term);
return -1; return -1;
...@@ -436,7 +441,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p ...@@ -436,7 +441,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
(void)syncNodeReplicateWithoutLock(pNode); (void)syncNodeReplicateWithoutLock(pNode);
// persist // persist
if (syncLogStorePersist(pLogStore, pEntry) < 0) { if (syncLogStorePersist(pLogStore, pNode, pEntry) < 0) {
sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId, terrstr(), sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId, terrstr(),
pEntry->index); pEntry->index);
goto _out; goto _out;
......
...@@ -23,7 +23,7 @@ ...@@ -23,7 +23,7 @@
// public function // public function
static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex); static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncIndex snapshotIndex);
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forceSync);
static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex); static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
static bool raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index); static bool raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index);
static int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); static int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
...@@ -192,9 +192,7 @@ SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) { ...@@ -192,9 +192,7 @@ SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
return SYNC_TERM_INVALID; return SYNC_TERM_INVALID;
} }
static inline bool raftLogForceSync(SSyncRaftEntry* pEntry) { return (pEntry->originalRpcType == TDMT_VND_COMMIT); } static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forceSync) {
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
...@@ -221,7 +219,6 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -221,7 +219,6 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
ASSERT(pEntry->index == index); ASSERT(pEntry->index == index);
bool forceSync = raftLogForceSync(pEntry);
walFsync(pWal, forceSync); walFsync(pWal, forceSync);
sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index, sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册