提交 a781d435 编写于 作者: C cadem

fix/block-check-to-async-check

上级 03ffbdd4
......@@ -250,6 +250,7 @@ void syncPreStop(int64_t rid);
void syncPostStop(int64_t rid);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
int32_t syncIsCatchUp(int64_t rid);
ESyncRole syncGetRole(int64_t rid);
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex);
......
......@@ -437,6 +437,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_STOPPED TAOS_DEF_ERROR_CODE(0, 0x0529)
#define TSDB_CODE_VND_DUP_REQUEST TAOS_DEF_ERROR_CODE(0, 0x0530)
#define TSDB_CODE_VND_QUERY_BUSY TAOS_DEF_ERROR_CODE(0, 0x0531)
#define TSDB_CODE_VND_NOT_CATCH_UP TAOS_DEF_ERROR_CODE(0, 0x0532) // internal
#define TSDB_CODE_VND_ALREADY_IS_VOTER TAOS_DEF_ERROR_CODE(0, 0x0533) // internal
// tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
......
......@@ -336,13 +336,21 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode == NULL) {
dError("vgId:%d, failed to alter hashrange since %s", req.vgId, terrstr());
dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST;
return -1;
}
if(vnodeGetRole(pVnode->pImpl) == TAOS_SYNC_ROLE_VOTER){
terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
vmReleaseVnode(pMgmt, pVnode);
return -1;
}
dInfo("vgId:%d, checking node catch up", req.vgId);
if(vnodeIsCatchUp(pVnode->pImpl) != 0){
if(vnodeIsCatchUp(pVnode->pImpl) != 1){
terrno = TSDB_CODE_VND_NOT_CATCH_UP;
vmReleaseVnode(pMgmt, pVnode);
return -1;
}
......@@ -365,6 +373,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
req.selfIndex >= req.replica || req.learnerSelfIndex >= req.learnerReplica) {
terrno = TSDB_CODE_INVALID_MSG;
dError("vgId:%d, failed to alter replica since invalid msg", vgId);
vmReleaseVnode(pMgmt, pVnode);
return -1;
}
......@@ -381,6 +390,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
terrno = TSDB_CODE_INVALID_MSG;
dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
pReplica->port);
vmReleaseVnode(pMgmt, pVnode);
return -1;
}
......
......@@ -1263,6 +1263,8 @@ int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
......
......@@ -69,6 +69,7 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId);
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list);
int32_t vnodeIsCatchUp(SVnode *pVnode);
ESyncRole vnodeGetRole(SVnode *pVnode);
int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list);
int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg);
......
......@@ -431,6 +431,10 @@ int32_t vnodeIsCatchUp(SVnode *pVnode){
return syncIsCatchUp(pVnode->sync);
}
ESyncRole vnodeGetRole(SVnode *pVnode){
return syncGetRole(pVnode->sync);
}
void vnodeStop(SVnode *pVnode) {}
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
......
......@@ -580,25 +580,37 @@ int32_t syncIsCatchUp(int64_t rid) {
return -1;
}
while(1){
if(pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP){
sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
pSyncNode->pLogBuf->matchIndex);
taosSsleep(1);
}
else{
sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
pSyncNode->pLogBuf->matchIndex);
break;
}
int32_t isCatchUp = 0;
if(pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP){
sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
pSyncNode->pLogBuf->matchIndex);
isCatchUp = 0;
}
else{
sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
pSyncNode->pLogBuf->matchIndex);
isCatchUp = 1;
}
syncNodeRelease(pSyncNode);
return 0;
return isCatchUp;
}
ESyncRole syncGetRole(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
sError("sync Node Acquire error since %d", errno);
return -1;
}
ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
syncNodeRelease(pSyncNode);
return role;
}
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册