提交 6cff4dcf 编写于 作者: M Minghao Li

refactor(sync): add syncIsReadyForRead

上级 e50f09aa
......@@ -30,6 +30,7 @@ extern bool gRaftDetailLog;
#define SYNC_SPEED_UP_HB_TIMER 400
#define SYNC_SPEED_UP_AFTER_MS (1000 * 20)
#define SYNC_SLOW_DOWN_RANGE 100
#define SYNC_MAX_READ_RANGE 10
#define SYNC_MAX_BATCH_SIZE 1
#define SYNC_INDEX_BEGIN 0
......@@ -210,9 +211,12 @@ void syncStop(int64_t rid);
int32_t syncSetStandby(int64_t rid);
ESyncState syncGetMyRole(int64_t rid);
bool syncIsReady(int64_t rid);
bool syncIsReadyForRead(int64_t rid);
const char* syncGetMyRoleStr(int64_t rid);
bool syncRestoreFinish(int64_t rid);
SyncTerm syncGetMyTerm(int64_t rid);
SyncIndex syncGetLastIndex(int64_t rid);
SyncIndex syncGetCommitIndex(int64_t rid);
SyncGroupId syncGetVgId(int64_t rid);
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
......
......@@ -80,7 +80,7 @@ int32_t vnodeQueryOpen(SVnode* pVnode);
void vnodeQueryClose(SVnode* pVnode);
int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit.c
int32_t vnodeBegin(SVnode* pVnode);
......@@ -98,6 +98,7 @@ void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode);
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
bool vnodeIsLeader(SVnode* pVnode);
bool vnodeIsReadyForRead(SVnode* pVnode);
#ifdef __cplusplus
}
......
......@@ -281,7 +281,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in vnode query queue is processing");
if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsReadyForRead(pVnode)) {
vnodeRedirectRpcMsg(pVnode, pMsg);
return 0;
}
......@@ -305,7 +305,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
pMsg->msgType == TDMT_VND_BATCH_META) &&
!vnodeIsLeader(pVnode)) {
!vnodeIsReadyForRead(pVnode)) {
vnodeRedirectRpcMsg(pVnode, pMsg);
return 0;
}
......
......@@ -771,3 +771,17 @@ bool vnodeIsLeader(SVnode *pVnode) {
return true;
}
bool vnodeIsReadyForRead(SVnode *pVnode) {
if (syncIsReady(pVnode->sync)) {
return true;
}
if (syncIsReadyForRead(pVnode->sync)) {
return true;
}
vDebug("vgId:%d, vnode not ready for read, state:%s, last:%ld, cmt:%ld", pVnode->config.vgId,
syncGetMyRoleStr(pVnode->sync), syncGetLastIndex(pVnode->sync), syncGetCommitIndex(pVnode->sync));
return false;
}
......@@ -392,6 +392,29 @@ bool syncIsReady(int64_t rid) {
return b;
}
bool syncIsReadyForRead(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return false;
}
ASSERT(rid == pSyncNode->rid);
// TODO: last not noop?
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && (pSyncNode->commitIndex >= lastIndex - SYNC_MAX_READ_RANGE);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
// if false, set error code
if (false == b) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER;
} else {
terrno = TSDB_CODE_APP_NOT_READY;
}
}
return b;
}
bool syncIsRestoreFinish(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
......@@ -519,6 +542,30 @@ SyncTerm syncGetMyTerm(int64_t rid) {
return term;
}
SyncIndex syncGetLastIndex(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return SYNC_INDEX_INVALID;
}
ASSERT(rid == pSyncNode->rid);
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return lastIndex;
}
SyncIndex syncGetCommitIndex(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return SYNC_INDEX_INVALID;
}
ASSERT(rid == pSyncNode->rid);
SyncIndex cmtIndex = pSyncNode->commitIndex;
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return cmtIndex;
}
SyncGroupId syncGetVgId(int64_t rid) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
......@@ -828,6 +875,15 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
pSyncNode->changing = true;
}
// not restored
if (!pSyncNode->restoreFinish) {
ret = -1;
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
sError("vgId:%d, failed to sync propose since not ready, type:%s, last:%ld, cmt:%ld", pSyncNode->vgId,
TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
goto _END;
}
SRespStub stub;
stub.createTime = taosGetTimestampMs();
stub.rpcMsg = *pMsg;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册