From a4b54c4f0e8ef51fe5ef08e56f0f3055bb24e57a Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 3 Nov 2022 13:57:57 +0800 Subject: [PATCH] refactor(sync): add syncIsReadyForRead --- include/libs/sync/sync.h | 1 + source/dnode/vnode/src/vnd/vnodeSvr.c | 6 +++-- source/libs/sync/src/syncMain.c | 33 +++++++++++++++++++++++++++ 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index b6ff93ec85..8db3d89e39 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -219,6 +219,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex); int32_t syncEndSnapshot(int64_t rid); int32_t syncLeaderTransfer(int64_t rid); int32_t syncStepDown(int64_t rid, SyncTerm newTerm); +bool syncIsReadyForRead(int64_t rid); SSyncState syncGetState(int64_t rid); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 521566b36d..435dbde30a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -334,7 +334,8 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in vnode query queue is processing"); - if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) { + // if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) { + if ((pMsg->msgType == TDMT_SCH_QUERY) && !syncIsReadyForRead(pVnode->sync)) { vnodeRedirectRpcMsg(pVnode, pMsg); return 0; } @@ -356,7 +357,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg); if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType == TDMT_VND_BATCH_META) && - !vnodeIsLeader(pVnode)) { + !syncIsReadyForRead(pVnode->sync)) { + // !vnodeIsLeader(pVnode)) { vnodeRedirectRpcMsg(pVnode, pMsg); return 0; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 5cd1ba3025..268173e795 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -437,6 +437,39 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm) { return 0; } +bool syncIsReadyForRead(int64_t rid) { + SSyncNode* pSyncNode = syncNodeAcquire(rid); + if (pSyncNode == NULL) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return false; + } + ASSERT(rid == pSyncNode->rid); + + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->restoreFinish) { + syncNodeRelease(pSyncNode); + return true; + } + + bool ready = false; + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && !pSyncNode->restoreFinish) { + if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) { + SSyncRaftEntry* pEntry = NULL; + int32_t code = pSyncNode->pLogStore->syncLogGetEntry( + pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry); + if (code == 0 && pEntry != NULL) { + if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) { + ready = true; + } + + syncEntryDestory(pEntry); + } + } + } + + syncNodeRelease(pSyncNode); + return ready; +} + int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) { if (pSyncNode->peersNum == 0) { sDebug("only one replica, cannot leader transfer"); -- GitLab