未验证 提交 f5a97448 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #21045 from taosdata/fix/block-check-to-async-check

fix/block-check-to-async-check
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "tmsg.h" #include "tmsg.h"
#include "tmsgcb.h" #include "tmsgcb.h"
#include "trpc.h" #include "trpc.h"
#include "sync.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -73,6 +74,7 @@ int32_t mndStart(SMnode *pMnode); ...@@ -73,6 +74,7 @@ int32_t mndStart(SMnode *pMnode);
void mndStop(SMnode *pMnode); void mndStop(SMnode *pMnode);
int32_t mndIsCatchUp(SMnode *pMnode); int32_t mndIsCatchUp(SMnode *pMnode);
ESyncRole mndGetRole(SMnode *pMnode);
/** /**
* @brief Get mnode monitor info. * @brief Get mnode monitor info.
......
...@@ -250,6 +250,7 @@ void syncPreStop(int64_t rid); ...@@ -250,6 +250,7 @@ void syncPreStop(int64_t rid);
void syncPostStop(int64_t rid); void syncPostStop(int64_t rid);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq);
int32_t syncIsCatchUp(int64_t rid); int32_t syncIsCatchUp(int64_t rid);
ESyncRole syncGetRole(int64_t rid);
int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg); int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg);
int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg); int32_t syncReconfig(int64_t rid, SSyncCfg* pCfg);
int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex); int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex);
......
...@@ -403,6 +403,8 @@ int32_t* taosGetErrno(); ...@@ -403,6 +403,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x040F) #define TSDB_CODE_SNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x040F)
#define TSDB_CODE_SNODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0410) #define TSDB_CODE_SNODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0410)
#define TSDB_CODE_SNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0411) #define TSDB_CODE_SNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0411)
#define TSDB_CODE_MNODE_NOT_CATCH_UP TAOS_DEF_ERROR_CODE(0, 0x0412) // internal
#define TSDB_CODE_MNODE_ALREADY_IS_VOTER TAOS_DEF_ERROR_CODE(0, 0x0413) // internal
// vnode // vnode
// #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) // 2.x // #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) // 2.x
...@@ -437,6 +439,8 @@ int32_t* taosGetErrno(); ...@@ -437,6 +439,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_STOPPED TAOS_DEF_ERROR_CODE(0, 0x0529) #define TSDB_CODE_VND_STOPPED TAOS_DEF_ERROR_CODE(0, 0x0529)
#define TSDB_CODE_VND_DUP_REQUEST TAOS_DEF_ERROR_CODE(0, 0x0530) #define TSDB_CODE_VND_DUP_REQUEST TAOS_DEF_ERROR_CODE(0, 0x0530)
#define TSDB_CODE_VND_QUERY_BUSY TAOS_DEF_ERROR_CODE(0, 0x0531) #define TSDB_CODE_VND_QUERY_BUSY TAOS_DEF_ERROR_CODE(0, 0x0531)
#define TSDB_CODE_VND_NOT_CATCH_UP TAOS_DEF_ERROR_CODE(0, 0x0532) // internal
#define TSDB_CODE_VND_ALREADY_IS_VOTER TAOS_DEF_ERROR_CODE(0, 0x0533) // internal
// tsdb // tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
......
...@@ -159,6 +159,10 @@ static int32_t mmSyncIsCatchUp(SMnodeMgmt *pMgmt) { ...@@ -159,6 +159,10 @@ static int32_t mmSyncIsCatchUp(SMnodeMgmt *pMgmt) {
return mndIsCatchUp(pMgmt->pMnode); return mndIsCatchUp(pMgmt->pMnode);
} }
static ESyncRole mmSyncGetRole(SMnodeMgmt *pMgmt) {
return mndGetRole(pMgmt->pMnode);
}
SMgmtFunc mmGetMgmtFunc() { SMgmtFunc mmGetMgmtFunc() {
SMgmtFunc mgmtFunc = {0}; SMgmtFunc mgmtFunc = {0};
mgmtFunc.openFp = mmOpen; mgmtFunc.openFp = mmOpen;
...@@ -170,6 +174,7 @@ SMgmtFunc mmGetMgmtFunc() { ...@@ -170,6 +174,7 @@ SMgmtFunc mmGetMgmtFunc() {
mgmtFunc.requiredFp = mmRequire; mgmtFunc.requiredFp = mmRequire;
mgmtFunc.getHandlesFp = mmGetMsgHandles; mgmtFunc.getHandlesFp = mmGetMsgHandles;
mgmtFunc.isCatchUpFp = (NodeIsCatchUpFp)mmSyncIsCatchUp; mgmtFunc.isCatchUpFp = (NodeIsCatchUpFp)mmSyncIsCatchUp;
mgmtFunc.nodeRoleFp = (NodeRole)mmSyncGetRole;
return mgmtFunc; return mgmtFunc;
} }
...@@ -336,13 +336,23 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -336,13 +336,23 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
dError("vgId:%d, failed to alter hashrange since %s", req.vgId, terrstr()); dError("vgId:%d, failed to alter vnode type since %s", req.vgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST; terrno = TSDB_CODE_VND_NOT_EXIST;
return -1; return -1;
} }
ESyncRole role = vnodeGetRole(pVnode->pImpl);
dInfo("vgId:%d, checking node role:%d", req.vgId, role);
if(role == TAOS_SYNC_ROLE_VOTER){
terrno = TSDB_CODE_VND_ALREADY_IS_VOTER;
vmReleaseVnode(pMgmt, pVnode);
return -1;
}
dInfo("vgId:%d, checking node catch up", req.vgId); dInfo("vgId:%d, checking node catch up", req.vgId);
if(vnodeIsCatchUp(pVnode->pImpl) != 0){ if(vnodeIsCatchUp(pVnode->pImpl) != 1){
terrno = TSDB_CODE_VND_NOT_CATCH_UP;
vmReleaseVnode(pMgmt, pVnode);
return -1; return -1;
} }
...@@ -365,6 +375,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -365,6 +375,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
req.selfIndex >= req.replica || req.learnerSelfIndex >= req.learnerReplica) { req.selfIndex >= req.replica || req.learnerSelfIndex >= req.learnerReplica) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
dError("vgId:%d, failed to alter replica since invalid msg", vgId); dError("vgId:%d, failed to alter replica since invalid msg", vgId);
vmReleaseVnode(pMgmt, pVnode);
return -1; return -1;
} }
...@@ -381,6 +392,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -381,6 +392,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn, dError("vgId:%d, dnodeId:%d ep:%s:%u not matched with local dnode", vgId, pReplica->id, pReplica->fqdn,
pReplica->port); pReplica->port);
vmReleaseVnode(pMgmt, pVnode);
return -1; return -1;
} }
......
...@@ -214,9 +214,19 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) { ...@@ -214,9 +214,19 @@ static int32_t dmProcessAlterNodeTypeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
pWrapper = &pDnode->wrappers[ntype]; pWrapper = &pDnode->wrappers[ntype];
if(pWrapper->func.nodeRoleFp != NULL){
ESyncRole role = (*pWrapper->func.nodeRoleFp)(pWrapper->pMgmt);
dInfo("node:%s, checking node role:%d", pWrapper->name, role);
if(role == TAOS_SYNC_ROLE_VOTER){
terrno = TSDB_CODE_MNODE_ALREADY_IS_VOTER;
return -1;
}
}
if(pWrapper->func.isCatchUpFp != NULL){ if(pWrapper->func.isCatchUpFp != NULL){
dInfo("node:%s, checking node catch up", pWrapper->name); dInfo("node:%s, checking node catch up", pWrapper->name);
if(!(*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) == 0){ if((*pWrapper->func.isCatchUpFp)(pWrapper->pMgmt) != 1){
terrno = TSDB_CODE_MNODE_NOT_CATCH_UP;
return -1; return -1;
} }
} }
......
...@@ -135,6 +135,7 @@ typedef int32_t (*NodeDropFp)(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); ...@@ -135,6 +135,7 @@ typedef int32_t (*NodeDropFp)(const SMgmtInputOpt *pInput, SRpcMsg *pMsg);
typedef int32_t (*NodeRequireFp)(const SMgmtInputOpt *pInput, bool *required); typedef int32_t (*NodeRequireFp)(const SMgmtInputOpt *pInput, bool *required);
typedef SArray *(*NodeGetHandlesFp)(); // array of SMgmtHandle typedef SArray *(*NodeGetHandlesFp)(); // array of SMgmtHandle
typedef bool (*NodeIsCatchUpFp)(void *pMgmt); typedef bool (*NodeIsCatchUpFp)(void *pMgmt);
typedef bool (*NodeRole)(void *pMgmt);
typedef struct { typedef struct {
NodeOpenFp openFp; NodeOpenFp openFp;
...@@ -146,6 +147,7 @@ typedef struct { ...@@ -146,6 +147,7 @@ typedef struct {
NodeRequireFp requiredFp; NodeRequireFp requiredFp;
NodeGetHandlesFp getHandlesFp; NodeGetHandlesFp getHandlesFp;
NodeIsCatchUpFp isCatchUpFp; NodeIsCatchUpFp isCatchUpFp;
NodeRole nodeRoleFp;
} SMgmtFunc; } SMgmtFunc;
typedef struct { typedef struct {
......
...@@ -590,6 +590,11 @@ int32_t mndIsCatchUp(SMnode *pMnode) { ...@@ -590,6 +590,11 @@ int32_t mndIsCatchUp(SMnode *pMnode) {
return syncIsCatchUp(rid); return syncIsCatchUp(rid);
} }
ESyncRole mndGetRole(SMnode *pMnode){
int64_t rid = pMnode->syncMgmt.sync;
return syncGetRole(rid);
}
void mndStop(SMnode *pMnode) { void mndStop(SMnode *pMnode) {
mndSetStop(pMnode); mndSetStop(pMnode);
mndSyncStop(pMnode); mndSyncStop(pMnode);
......
...@@ -322,7 +322,8 @@ static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans, ...@@ -322,7 +322,8 @@ static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans,
.pCont = pReq, .pCont = pReq,
.contLen = contLen, .contLen = contLen,
.msgType = TDMT_DND_ALTER_MNODE_TYPE, .msgType = TDMT_DND_ALTER_MNODE_TYPE,
.acceptableCode = TSDB_CODE_MNODE_ALREADY_DEPLOYED, .retryCode = TSDB_CODE_MNODE_NOT_CATCH_UP,
.acceptableCode = TSDB_CODE_MNODE_ALREADY_IS_VOTER,
}; };
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
......
...@@ -1263,6 +1263,8 @@ int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, ...@@ -1263,6 +1263,8 @@ int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
action.pCont = pReq; action.pCont = pReq;
action.contLen = contLen; action.contLen = contLen;
action.msgType = TDMT_DND_ALTER_VNODE_TYPE; action.msgType = TDMT_DND_ALTER_VNODE_TYPE;
action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER;
action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
......
...@@ -69,6 +69,7 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId); ...@@ -69,6 +69,7 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId);
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen); int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list); int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list);
int32_t vnodeIsCatchUp(SVnode *pVnode); int32_t vnodeIsCatchUp(SVnode *pVnode);
ESyncRole vnodeGetRole(SVnode *pVnode);
int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list); int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list);
int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg); int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg);
......
...@@ -431,6 +431,10 @@ int32_t vnodeIsCatchUp(SVnode *pVnode){ ...@@ -431,6 +431,10 @@ int32_t vnodeIsCatchUp(SVnode *pVnode){
return syncIsCatchUp(pVnode->sync); return syncIsCatchUp(pVnode->sync);
} }
ESyncRole vnodeGetRole(SVnode *pVnode){
return syncGetRole(pVnode->sync);
}
void vnodeStop(SVnode *pVnode) {} void vnodeStop(SVnode *pVnode) {}
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; } int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
......
...@@ -580,25 +580,37 @@ int32_t syncIsCatchUp(int64_t rid) { ...@@ -580,25 +580,37 @@ int32_t syncIsCatchUp(int64_t rid) {
return -1; return -1;
} }
while(1){ int32_t isCatchUp = 0;
if(pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 || if(pSyncNode->pLogBuf->totalIndex < 0 || pSyncNode->pLogBuf->commitIndex < 0 ||
pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex || pSyncNode->pLogBuf->totalIndex < pSyncNode->pLogBuf->commitIndex ||
pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP){ pSyncNode->pLogBuf->totalIndex - pSyncNode->pLogBuf->commitIndex > SYNC_LEARNER_CATCHUP){
sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, sInfo("vgId:%d, Not catch up, wait one second, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
pSyncNode->pLogBuf->matchIndex); pSyncNode->pLogBuf->matchIndex);
taosSsleep(1); isCatchUp = 0;
} }
else{ else{
sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64, sInfo("vgId:%d, Catch up, totalIndex:%" PRId64 " commitIndex:%" PRId64 " matchIndex:%" PRId64,
pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex, pSyncNode->vgId, pSyncNode->pLogBuf->totalIndex, pSyncNode->pLogBuf->commitIndex,
pSyncNode->pLogBuf->matchIndex); pSyncNode->pLogBuf->matchIndex);
break; isCatchUp = 1;
}
} }
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
return 0; return isCatchUp;
}
ESyncRole syncGetRole(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
sError("sync Node Acquire error since %d", errno);
return -1;
}
ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole;
syncNodeRelease(pSyncNode);
return role;
} }
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) { int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册