From 5e53b1225b74ed4d2a7ef288ddcf731e8a9ed97b Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 9 Mar 2023 19:29:09 +0800 Subject: [PATCH] enh: finish restore with commit and applied indexes instead of num of items in apply queue --- include/libs/sync/sync.h | 2 +- source/dnode/mnode/impl/src/mndSync.c | 2 +- source/dnode/vnode/src/vnd/vnodeSync.c | 18 ++++++++++++------ source/libs/sync/src/syncPipeline.c | 6 +++--- 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 5ea90906a8..08a6be8015 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -147,7 +147,7 @@ typedef struct SSyncFSM { int32_t (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); - void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm); + void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm, const SyncIndex commitIdx); void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SReConfigCbMeta* pMeta); void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index ce3caaad6c..998e8b71ab 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -162,7 +162,7 @@ static void mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) { sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex); } -void mndRestoreFinish(const SSyncFSM *pFsm) { +void mndRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) { SMnode *pMnode = pFsm->data; if (!pMnode->deploy) { diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 9613afd837..594a64ea37 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -521,21 +521,27 @@ static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *p return code; } -static void vnodeRestoreFinish(const SSyncFSM *pFsm) { +static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) { SVnode *pVnode = pFsm->data; + SyncIndex appliedIdx = -1; do { - int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE); - if (itemSize == 0) { - vInfo("vgId:%d, apply queue is empty, restore finish", pVnode->config.vgId); + appliedIdx = vnodeSyncAppliedIndex(pFsm); + ASSERT(appliedIdx <= commitIdx); + if (appliedIdx == commitIdx) { + vInfo("vgId:%d, no more items to be applied, restore finish", pVnode->config.vgId); break; } else { - vInfo("vgId:%d, restore not finish since %d items in apply queue", pVnode->config.vgId, itemSize); + int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE); + vInfo("vgId:%d, restore not finish since %" PRId64 + " items to be applied, and %d in apply queue. commit-index:%" PRId64 ", applied-index:%" PRId64, + pVnode->config.vgId, commitIdx - appliedIdx, itemSize, commitIdx, appliedIdx); taosMsleep(10); } } while (true); - walApplyVer(pVnode->pWal, pVnode->state.applied); + ASSERT(appliedIdx == commitIdx); + walApplyVer(pVnode->pWal, commitIdx); pVnode->restored = true; vInfo("vgId:%d, sync restore finished", pVnode->config.vgId); diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index e28a2a6872..ee68824dc8 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -596,10 +596,10 @@ _out: // mark as restored if needed if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL && currentTerm <= pEntry->term) { - pNode->pFsm->FpRestoreFinishCb(pNode->pFsm); + pNode->pFsm->FpRestoreFinishCb(pNode->pFsm, pBuf->commitIndex); pNode->restoreFinish = true; - sInfo("vgId:%d, restore finished. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, - pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + sInfo("vgId:%d, restore finished. term:%" PRId64 ", log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, currentTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); } if (!inBuf) { -- GitLab