未验证 提交 a8630c0a 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #20031 from taosdata/FIX/TD-22224-main

fix: not allow to read if sync is restoring
...@@ -351,7 +351,6 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -351,7 +351,6 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in vnode query queue is processing"); vTrace("message in vnode query queue is processing");
// if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
if ((pMsg->msgType == TDMT_SCH_QUERY) && !syncIsReadyForRead(pVnode->sync)) { if ((pMsg->msgType == TDMT_SCH_QUERY) && !syncIsReadyForRead(pVnode->sync)) {
vnodeRedirectRpcMsg(pVnode, pMsg, terrno); vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
return 0; return 0;
...@@ -375,7 +374,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -375,7 +374,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
pMsg->msgType == TDMT_VND_BATCH_META) && pMsg->msgType == TDMT_VND_BATCH_META) &&
!syncIsReadyForRead(pVnode->sync)) { !syncIsReadyForRead(pVnode->sync)) {
// !vnodeIsLeader(pVnode)) {
vnodeRedirectRpcMsg(pVnode, pMsg, terrno); vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
return 0; return 0;
} }
......
...@@ -436,55 +436,12 @@ bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) { ...@@ -436,55 +436,12 @@ bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
return false; return false;
} }
if (pSyncNode->restoreFinish) { if (!pSyncNode->restoreFinish) {
return true;
}
bool ready = false;
if (!pSyncNode->pFsm->FpApplyQueueEmptyCb(pSyncNode->pFsm)) {
// apply queue not empty
ready = false;
} else {
if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
SSyncRaftEntry* pEntry = NULL;
SLRUCache* pCache = pSyncNode->pLogStore->pCache;
LRUHandle* h = taosLRUCacheLookup(pCache, &lastIndex, sizeof(lastIndex));
int32_t code = 0;
if (h) {
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
code = 0;
pSyncNode->pLogStore->cacheHit++;
sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", lastIndex, pEntry->bytes, pEntry);
} else {
pSyncNode->pLogStore->cacheMiss++;
sNTrace(pSyncNode, "miss cache index:%" PRId64, lastIndex);
code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, lastIndex, &pEntry);
}
if (code == 0 && pEntry != NULL) {
if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == raftStoreGetTerm(pSyncNode)) {
ready = true;
}
if (h) {
taosLRUCacheRelease(pCache, h, false);
} else {
syncEntryDestroy(pEntry);
}
}
}
}
if (!ready) {
terrno = TSDB_CODE_SYN_RESTORING; terrno = TSDB_CODE_SYN_RESTORING;
return false;
} }
return ready; return true;
} }
bool syncIsReadyForRead(int64_t rid) { bool syncIsReadyForRead(int64_t rid) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册