提交 c56dd863 编写于 作者: S Shengliang Guan

fix: do not process query and fetch msg until vnode restore finished

上级 4df33035
...@@ -99,7 +99,6 @@ void vnodeSyncStart(SVnode* pVnode); ...@@ -99,7 +99,6 @@ void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode);
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg); void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
bool vnodeIsLeader(SVnode* pVnode); bool vnodeIsLeader(SVnode* pVnode);
bool vnodeIsReadyForRead(SVnode* pVnode);
bool vnodeIsRoleLeader(SVnode* pVnode); bool vnodeIsRoleLeader(SVnode* pVnode);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -289,7 +289,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -289,7 +289,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in vnode query queue is processing"); vTrace("message in vnode query queue is processing");
if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsReadyForRead(pVnode)) { if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
vnodeRedirectRpcMsg(pVnode, pMsg); vnodeRedirectRpcMsg(pVnode, pMsg);
return 0; return 0;
} }
...@@ -311,7 +311,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { ...@@ -311,7 +311,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg); vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
pMsg->msgType == TDMT_VND_BATCH_META) && pMsg->msgType == TDMT_VND_BATCH_META) &&
!vnodeIsReadyForRead(pVnode)) { !vnodeIsLeader(pVnode)) {
vnodeRedirectRpcMsg(pVnode, pMsg); vnodeRedirectRpcMsg(pVnode, pMsg);
return 0; return 0;
} }
......
...@@ -796,14 +796,3 @@ bool vnodeIsLeader(SVnode *pVnode) { ...@@ -796,14 +796,3 @@ bool vnodeIsLeader(SVnode *pVnode) {
return true; return true;
} }
bool vnodeIsReadyForRead(SVnode *pVnode) {
if (syncIsReadyForRead(pVnode->sync)) {
if (pVnode->restored) {
return true;
}
}
vDebug("vgId:%d, vnode not ready for read, state:%s, last:%ld, cmt:%ld", pVnode->config.vgId,
syncGetMyRoleStr(pVnode->sync), syncGetLastIndex(pVnode->sync), syncGetCommitIndex(pVnode->sync));
return false;
}
...@@ -396,32 +396,6 @@ bool syncIsReady(int64_t rid) { ...@@ -396,32 +396,6 @@ bool syncIsReady(int64_t rid) {
return b; return b;
} }
bool syncIsReadyForRead(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return false;
}
ASSERT(rid == pSyncNode->rid);
// TODO: last not noop?
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && pSyncNode->restoreFinish;
if (!b) {
b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && (pSyncNode->commitIndex >= lastIndex - SYNC_MAX_READ_RANGE);
}
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
// if false, set error code
if (false == b) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
} else {
terrno = TSDB_CODE_APP_NOT_READY;
}
}
return b;
}
bool syncIsRestoreFinish(int64_t rid) { bool syncIsRestoreFinish(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册