diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 8db3d89e392af31968789efc3052577166116595..74a73f6b10f08f3571cbdfc9c0339b93aa161ab9 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -138,6 +138,7 @@ typedef struct SSyncFSM { void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm); void (*FpReConfigCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SReConfigCbMeta* pMeta); void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta); + bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm); void (*FpBecomeLeaderCb)(const struct SSyncFSM* pFsm); void (*FpBecomeFollowerCb)(const struct SSyncFSM* pFsm); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 0f50391ac597a76510b867370b8e235f92e672ba..e6e134c6a1027a18777d2b136139ea9bba6a7cda 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -202,6 +202,13 @@ static void mndBecomeLeader(const SSyncFSM *pFsm) { SMnode *pMnode = pFsm->data; } +static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) { + SMnode *pMnode = pFsm->data; + + int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE); + return (itemSize == 0); +} + SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); pFsm->data = pMnode; @@ -210,6 +217,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { pFsm->FpRollBackCb = NULL; pFsm->FpRestoreFinishCb = mndRestoreFinish; pFsm->FpLeaderTransferCb = NULL; + pFsm->FpApplyQueueEmptyCb = mndApplyQueueEmpty; pFsm->FpReConfigCb = NULL; pFsm->FpBecomeLeaderCb = mndBecomeLeader; pFsm->FpBecomeFollowerCb = mndBecomeFollower; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 3913561ae79b67c55f4746d153948dbff2a0fb15..e7f8c9f562bbc3b193bc9236f3f0a2243a2d7fce 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -436,6 +436,12 @@ static void vnodeBecomeLeader(const SSyncFSM *pFsm) { vDebug("vgId:%d, become leader", pVnode->config.vgId); } +static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) { + SVnode *pVnode = pFsm->data; + int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE); + return (itemSize == 0); +} + static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); pFsm->data = pVnode; @@ -445,6 +451,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot; pFsm->FpRestoreFinishCb = vnodeRestoreFinish; pFsm->FpLeaderTransferCb = NULL; + pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty; pFsm->FpBecomeLeaderCb = vnodeBecomeLeader; pFsm->FpBecomeFollowerCb = vnodeBecomeFollower; pFsm->FpReConfigCb = NULL; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 47b8614717152ea7b275b88cf354ac3ce4aa9666..38cc9f3dfedc061f7c2f620ee2af3504ee1eb783 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -452,16 +452,22 @@ bool syncIsReadyForRead(int64_t rid) { bool ready = false; if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && !pSyncNode->restoreFinish) { - if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) { - SSyncRaftEntry* pEntry = NULL; - int32_t code = pSyncNode->pLogStore->syncLogGetEntry( - pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry); - if (code == 0 && pEntry != NULL) { - if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) { - ready = true; - } + if (!pSyncNode->pFsm->FpApplyQueueEmptyCb(pSyncNode->pFsm)) { + // apply queue not empty + ready = false; + + } else { + if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) { + SSyncRaftEntry* pEntry = NULL; + int32_t code = pSyncNode->pLogStore->syncLogGetEntry( + pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry); + if (code == 0 && pEntry != NULL) { + if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) { + ready = true; + } - syncEntryDestory(pEntry); + syncEntryDestory(pEntry); + } } } }