From 006e13e6638475a7f34662fe3aee437865f69684 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Wed, 23 Nov 2022 10:24:17 +0800 Subject: [PATCH] enh: validate alignment of WAL and tsdb commit during syncLogBufferInit --- source/libs/sync/src/syncPipeline.c | 28 ++++++++++++++++++++++------ source/libs/sync/src/syncUtil.c | 7 ++++--- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 3dbca1210d..a31812e51e 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -119,6 +119,26 @@ SSyncRaftEntry* syncEntryBuildDummy(SyncTerm term, SyncIndex index, int32_t vgId return syncEntryBuildNoop(term, index, vgId); } +int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex) { + SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore); + if (firstVer > commitIndex + 1) { + sError("vgId:%d, firstVer of WAL log greater than tsdb commit version + 1. firstVer: %" PRId64 + ", tsdb commit version: %" PRId64 "", + pNode->vgId, firstVer, commitIndex); + return -1; + } + + SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); + if (lastVer < commitIndex) { + sError("vgId:%d, lastVer of WAL log less than tsdb commit version. lastVer: %" PRId64 + ", tsdb commit version: %" PRId64 "", + pNode->vgId, lastVer, commitIndex); + return -1; + } + + return 0; +} + int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { taosThreadMutexLock(&pBuf->mutex); ASSERT(pNode->pLogStore != NULL && "log store not created"); @@ -132,16 +152,12 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { } SyncIndex commitIndex = snapshot.lastApplyIndex; SyncTerm commitTerm = snapshot.lastApplyTerm; - - SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); - if (lastVer < commitIndex) { - sError("vgId:%d, lastVer of WAL log less than tsdb commit version. lastVer: %" PRId64 - ", tsdb commit version: %" PRId64 "", - pNode->vgId, lastVer, commitIndex); + if (syncLogValidateAlignmentOfCommit(pNode, commitIndex)) { terrno = TSDB_CODE_WAL_LOG_INCOMPLETE; goto _err; } + SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); ASSERT(lastVer >= commitIndex); SyncIndex toIndex = lastVer; // update match index diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index b50336cd63..97de188253 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -238,9 +238,10 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo taosPrintLog(flags, level, dflag, "vgId:%d, sync %s " "%s" - ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 - ", snap-tm:%" PRIu64 ", sby:%d, aq:%d, bch:%d, r-num:%d, lcfg:%" PRId64 - ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", + ", term:%" PRIu64 ", commitIdx:%" PRId64 ", firstVer:%" PRId64 ", lastVer:%" PRId64 ", min:%" PRId64 + ", snap.lastApply:%" PRId64 ", snap.term:%" PRIu64 + ", standby:%d, aqItems:%d, batchSz:%d, replicaNum:%d, lastCfgIdx:%" PRId64 + ", changing:%d, restore:%d, quorum:%d, electTimer:%" PRId64 ", hb:%" PRId64 ", %s, %s", pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->pRaftCfg->isStandBy, aqItems, pNode->pRaftCfg->batchSize, pNode->replicaNum, -- GitLab