提交 3bd42107 编写于 作者: S Shengliang Guan

When an error occurs in rollback or retry, the failed result is returned first

上级 9dd5922d
...@@ -94,6 +94,7 @@ typedef struct SSdbRaw SSdbRaw; ...@@ -94,6 +94,7 @@ typedef struct SSdbRaw SSdbRaw;
typedef struct SSdbRow SSdbRow; typedef struct SSdbRow SSdbRow;
typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyType; typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyType;
typedef enum { typedef enum {
SDB_STATUS_INIT = 0,
SDB_STATUS_CREATING = 1, SDB_STATUS_CREATING = 1,
SDB_STATUS_UPDATING = 2, SDB_STATUS_UPDATING = 2,
SDB_STATUS_DROPPING = 3, SDB_STATUS_DROPPING = 3,
......
...@@ -28,6 +28,7 @@ typedef struct { ...@@ -28,6 +28,7 @@ typedef struct {
int8_t msgSent; int8_t msgSent;
int8_t msgReceived; int8_t msgReceived;
int32_t errCode; int32_t errCode;
int32_t acceptableCode;
int32_t contLen; int32_t contLen;
void *pCont; void *pCont;
} STransAction; } STransAction;
......
...@@ -59,9 +59,8 @@ int32_t mndInitBnode(SMnode *pMnode) { ...@@ -59,9 +59,8 @@ int32_t mndInitBnode(SMnode *pMnode) {
void mndCleanupBnode(SMnode *pMnode) {} void mndCleanupBnode(SMnode *pMnode) {}
static SBnodeObj *mndAcquireBnode(SMnode *pMnode, int32_t bnodeId) { static SBnodeObj *mndAcquireBnode(SMnode *pMnode, int32_t bnodeId) {
SSdb *pSdb = pMnode->pSdb; SBnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_BNODE, &bnodeId);
SBnodeObj *pObj = sdbAcquire(pSdb, SDB_BNODE, &bnodeId); if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
if (pObj == NULL) {
terrno = TSDB_CODE_MND_BNODE_NOT_EXIST; terrno = TSDB_CODE_MND_BNODE_NOT_EXIST;
} }
return pObj; return pObj;
...@@ -169,6 +168,14 @@ static int32_t mndSetCreateBnodeRedoLogs(STrans *pTrans, SBnodeObj *pObj) { ...@@ -169,6 +168,14 @@ static int32_t mndSetCreateBnodeRedoLogs(STrans *pTrans, SBnodeObj *pObj) {
return 0; return 0;
} }
static int32_t mndSetCreateBnodeUndoLogs(STrans *pTrans, SBnodeObj *pObj) {
SSdbRaw *pUndoRaw = mndBnodeActionEncode(pObj);
if (pUndoRaw == NULL) return -1;
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
return 0;
}
static int32_t mndSetCreateBnodeCommitLogs(STrans *pTrans, SBnodeObj *pObj) { static int32_t mndSetCreateBnodeCommitLogs(STrans *pTrans, SBnodeObj *pObj) {
SSdbRaw *pCommitRaw = mndBnodeActionEncode(pObj); SSdbRaw *pCommitRaw = mndBnodeActionEncode(pObj);
if (pCommitRaw == NULL) return -1; if (pCommitRaw == NULL) return -1;
...@@ -190,6 +197,7 @@ static int32_t mndSetCreateBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S ...@@ -190,6 +197,7 @@ static int32_t mndSetCreateBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S
action.pCont = pMsg; action.pCont = pMsg;
action.contLen = sizeof(SDCreateBnodeReq); action.contLen = sizeof(SDCreateBnodeReq);
action.msgType = TDMT_DND_CREATE_BNODE; action.msgType = TDMT_DND_CREATE_BNODE;
action.acceptableCode = TSDB_CODE_DND_BNODE_ALREADY_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg); free(pMsg);
...@@ -199,39 +207,47 @@ static int32_t mndSetCreateBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S ...@@ -199,39 +207,47 @@ static int32_t mndSetCreateBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S
return 0; return 0;
} }
static int32_t mndSetCreateBnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, SBnodeObj *pObj) {
SDDropBnodeReq *pMsg = malloc(sizeof(SDDropBnodeReq));
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pMsg->dnodeId = htonl(pDnode->id);
STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode);
action.pCont = pMsg;
action.contLen = sizeof(SDDropBnodeReq);
action.msgType = TDMT_DND_DROP_BNODE;
action.acceptableCode = TSDB_CODE_DND_BNODE_NOT_DEPLOYED;
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
free(pMsg);
return -1;
}
return 0;
}
static int32_t mndCreateBnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateBnodeReq *pCreate) { static int32_t mndCreateBnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateBnodeReq *pCreate) {
int32_t code = -1;
SBnodeObj bnodeObj = {0}; SBnodeObj bnodeObj = {0};
bnodeObj.id = pDnode->id; bnodeObj.id = pDnode->id;
bnodeObj.createdTime = taosGetTimestampMs(); bnodeObj.createdTime = taosGetTimestampMs();
bnodeObj.updateTime = bnodeObj.createdTime; bnodeObj.updateTime = bnodeObj.createdTime;
int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); if (pTrans == NULL) goto CREATE_BNODE_OVER;
if (pTrans == NULL) {
mError("bnode:%d, failed to create since %s", pCreate->dnodeId, terrstr());
goto CREATE_BNODE_OVER;
}
mDebug("trans:%d, used to create bnode:%d", pTrans->id, pCreate->dnodeId);
if (mndSetCreateBnodeRedoLogs(pTrans, &bnodeObj) != 0) {
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
goto CREATE_BNODE_OVER;
}
if (mndSetCreateBnodeCommitLogs(pTrans, &bnodeObj) != 0) { mDebug("trans:%d, used to create bnode:%d", pTrans->id, pCreate->dnodeId);
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); if (mndSetCreateBnodeRedoLogs(pTrans, &bnodeObj) != 0) goto CREATE_BNODE_OVER;
goto CREATE_BNODE_OVER; if (mndSetCreateBnodeUndoLogs(pTrans, &bnodeObj) != 0) goto CREATE_BNODE_OVER;
} if (mndSetCreateBnodeCommitLogs(pTrans, &bnodeObj) != 0) goto CREATE_BNODE_OVER;
if (mndSetCreateBnodeRedoActions(pTrans, pDnode, &bnodeObj) != 0) goto CREATE_BNODE_OVER;
if (mndSetCreateBnodeRedoActions(pTrans, pDnode, &bnodeObj) != 0) { if (mndSetCreateBnodeUndoActions(pTrans, pDnode, &bnodeObj) != 0) goto CREATE_BNODE_OVER;
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_BNODE_OVER;
goto CREATE_BNODE_OVER;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
goto CREATE_BNODE_OVER;
}
code = 0; code = 0;
...@@ -254,6 +270,9 @@ static int32_t mndProcessCreateBnodeReq(SMnodeMsg *pMsg) { ...@@ -254,6 +270,9 @@ static int32_t mndProcessCreateBnodeReq(SMnodeMsg *pMsg) {
terrno = TSDB_CODE_MND_BNODE_ALREADY_EXIST; terrno = TSDB_CODE_MND_BNODE_ALREADY_EXIST;
mndReleaseBnode(pMnode, pObj); mndReleaseBnode(pMnode, pObj);
return -1; return -1;
} else if (terrno != TSDB_CODE_MND_BNODE_NOT_EXIST) {
mError("bnode:%d, failed to create bnode since %s", pCreate->dnodeId, terrstr());
return -1;
} }
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId); SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId);
...@@ -303,6 +322,7 @@ static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBn ...@@ -303,6 +322,7 @@ static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBn
action.pCont = pMsg; action.pCont = pMsg;
action.contLen = sizeof(SDDropBnodeReq); action.contLen = sizeof(SDDropBnodeReq);
action.msgType = TDMT_DND_DROP_BNODE; action.msgType = TDMT_DND_DROP_BNODE;
action.acceptableCode = TSDB_CODE_DND_BNODE_NOT_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg); free(pMsg);
...@@ -314,33 +334,15 @@ static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBn ...@@ -314,33 +334,15 @@ static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBn
static int32_t mndDropBnode(SMnode *pMnode, SMnodeMsg *pMsg, SBnodeObj *pObj) { static int32_t mndDropBnode(SMnode *pMnode, SMnodeMsg *pMsg, SBnodeObj *pObj) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) goto DROP_BNODE_OVER;
mError("bnode:%d, failed to drop since %s", pObj->id, terrstr());
goto DROP_BNODE_OVER;
}
mDebug("trans:%d, used to drop bnode:%d", pTrans->id, pObj->id); mDebug("trans:%d, used to drop bnode:%d", pTrans->id, pObj->id);
if (mndSetDropBnodeRedoLogs(pTrans, pObj) != 0) goto DROP_BNODE_OVER;
if (mndSetDropBnodeRedoLogs(pTrans, pObj) != 0) { if (mndSetDropBnodeCommitLogs(pTrans, pObj) != 0) goto DROP_BNODE_OVER;
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); if (mndSetDropBnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) goto DROP_BNODE_OVER;
goto DROP_BNODE_OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_BNODE_OVER;
}
if (mndSetDropBnodeCommitLogs(pTrans, pObj) != 0) {
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
goto DROP_BNODE_OVER;
}
if (mndSetDropBnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto DROP_BNODE_OVER;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
goto DROP_BNODE_OVER;
}
code = 0; code = 0;
...@@ -364,8 +366,7 @@ static int32_t mndProcessDropBnodeReq(SMnodeMsg *pMsg) { ...@@ -364,8 +366,7 @@ static int32_t mndProcessDropBnodeReq(SMnodeMsg *pMsg) {
SBnodeObj *pObj = mndAcquireBnode(pMnode, pDrop->dnodeId); SBnodeObj *pObj = mndAcquireBnode(pMnode, pDrop->dnodeId);
if (pObj == NULL) { if (pObj == NULL) {
mError("bnode:%d, not exist", pDrop->dnodeId); mError("bnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr());
terrno = TSDB_CODE_MND_BNODE_NOT_EXIST;
return -1; return -1;
} }
......
...@@ -59,9 +59,8 @@ int32_t mndInitQnode(SMnode *pMnode) { ...@@ -59,9 +59,8 @@ int32_t mndInitQnode(SMnode *pMnode) {
void mndCleanupQnode(SMnode *pMnode) {} void mndCleanupQnode(SMnode *pMnode) {}
static SQnodeObj *mndAcquireQnode(SMnode *pMnode, int32_t qnodeId) { static SQnodeObj *mndAcquireQnode(SMnode *pMnode, int32_t qnodeId) {
SSdb *pSdb = pMnode->pSdb; SQnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_QNODE, &qnodeId);
SQnodeObj *pObj = sdbAcquire(pSdb, SDB_QNODE, &qnodeId); if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
if (pObj == NULL) {
terrno = TSDB_CODE_MND_QNODE_NOT_EXIST; terrno = TSDB_CODE_MND_QNODE_NOT_EXIST;
} }
return pObj; return pObj;
...@@ -169,6 +168,14 @@ static int32_t mndSetCreateQnodeRedoLogs(STrans *pTrans, SQnodeObj *pObj) { ...@@ -169,6 +168,14 @@ static int32_t mndSetCreateQnodeRedoLogs(STrans *pTrans, SQnodeObj *pObj) {
return 0; return 0;
} }
static int32_t mndSetCreateQnodeUndoLogs(STrans *pTrans, SQnodeObj *pObj) {
SSdbRaw *pUndoRaw = mndQnodeActionEncode(pObj);
if (pUndoRaw == NULL) return -1;
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
return 0;
}
static int32_t mndSetCreateQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj) { static int32_t mndSetCreateQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj) {
SSdbRaw *pCommitRaw = mndQnodeActionEncode(pObj); SSdbRaw *pCommitRaw = mndQnodeActionEncode(pObj);
if (pCommitRaw == NULL) return -1; if (pCommitRaw == NULL) return -1;
...@@ -190,6 +197,7 @@ static int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S ...@@ -190,6 +197,7 @@ static int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S
action.pCont = pMsg; action.pCont = pMsg;
action.contLen = sizeof(SDCreateQnodeReq); action.contLen = sizeof(SDCreateQnodeReq);
action.msgType = TDMT_DND_CREATE_QNODE; action.msgType = TDMT_DND_CREATE_QNODE;
action.acceptableCode = TSDB_CODE_DND_QNODE_ALREADY_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg); free(pMsg);
...@@ -199,39 +207,47 @@ static int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S ...@@ -199,39 +207,47 @@ static int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S
return 0; return 0;
} }
static int32_t mndSetCreateQnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) {
SDDropQnodeReq *pMsg = malloc(sizeof(SDDropQnodeReq));
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pMsg->dnodeId = htonl(pDnode->id);
STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode);
action.pCont = pMsg;
action.contLen = sizeof(SDDropQnodeReq);
action.msgType = TDMT_DND_DROP_QNODE;
action.acceptableCode = TSDB_CODE_DND_QNODE_NOT_DEPLOYED;
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
free(pMsg);
return -1;
}
return 0;
}
static int32_t mndCreateQnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateQnodeReq *pCreate) { static int32_t mndCreateQnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateQnodeReq *pCreate) {
int32_t code = -1;
SQnodeObj qnodeObj = {0}; SQnodeObj qnodeObj = {0};
qnodeObj.id = pDnode->id; qnodeObj.id = pDnode->id;
qnodeObj.createdTime = taosGetTimestampMs(); qnodeObj.createdTime = taosGetTimestampMs();
qnodeObj.updateTime = qnodeObj.createdTime; qnodeObj.updateTime = qnodeObj.createdTime;
int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); if (pTrans == NULL) goto CREATE_QNODE_OVER;
if (pTrans == NULL) {
mError("qnode:%d, failed to create since %s", pCreate->dnodeId, terrstr());
goto CREATE_QNODE_OVER;
}
mDebug("trans:%d, used to create qnode:%d", pTrans->id, pCreate->dnodeId);
if (mndSetCreateQnodeRedoLogs(pTrans, &qnodeObj) != 0) {
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
goto CREATE_QNODE_OVER;
}
if (mndSetCreateQnodeCommitLogs(pTrans, &qnodeObj) != 0) { mDebug("trans:%d, used to create qnode:%d", pTrans->id, pCreate->dnodeId);
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr()); if (mndSetCreateQnodeRedoLogs(pTrans, &qnodeObj) != 0) goto CREATE_QNODE_OVER;
goto CREATE_QNODE_OVER; if (mndSetCreateQnodeUndoLogs(pTrans, &qnodeObj) != 0) goto CREATE_QNODE_OVER;
} if (mndSetCreateQnodeCommitLogs(pTrans, &qnodeObj) != 0) goto CREATE_QNODE_OVER;
if (mndSetCreateQnodeRedoActions(pTrans, pDnode, &qnodeObj) != 0) goto CREATE_QNODE_OVER;
if (mndSetCreateQnodeRedoActions(pTrans, pDnode, &qnodeObj) != 0) { if (mndSetCreateQnodeUndoActions(pTrans, pDnode, &qnodeObj) != 0) goto CREATE_QNODE_OVER;
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr()); if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_QNODE_OVER;
goto CREATE_QNODE_OVER;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
goto CREATE_QNODE_OVER;
}
code = 0; code = 0;
...@@ -254,6 +270,9 @@ static int32_t mndProcessCreateQnodeReq(SMnodeMsg *pMsg) { ...@@ -254,6 +270,9 @@ static int32_t mndProcessCreateQnodeReq(SMnodeMsg *pMsg) {
terrno = TSDB_CODE_MND_QNODE_ALREADY_EXIST; terrno = TSDB_CODE_MND_QNODE_ALREADY_EXIST;
mndReleaseQnode(pMnode, pObj); mndReleaseQnode(pMnode, pObj);
return -1; return -1;
} else if (terrno != TSDB_CODE_MND_QNODE_NOT_EXIST) {
mError("qnode:%d, failed to create qnode since %s", pCreate->dnodeId, terrstr());
return -1;
} }
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId); SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId);
...@@ -303,6 +322,7 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn ...@@ -303,6 +322,7 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn
action.pCont = pMsg; action.pCont = pMsg;
action.contLen = sizeof(SDDropQnodeReq); action.contLen = sizeof(SDDropQnodeReq);
action.msgType = TDMT_DND_DROP_QNODE; action.msgType = TDMT_DND_DROP_QNODE;
action.acceptableCode = TSDB_CODE_DND_QNODE_NOT_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg); free(pMsg);
...@@ -314,33 +334,15 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn ...@@ -314,33 +334,15 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn
static int32_t mndDropQnode(SMnode *pMnode, SMnodeMsg *pMsg, SQnodeObj *pObj) { static int32_t mndDropQnode(SMnode *pMnode, SMnodeMsg *pMsg, SQnodeObj *pObj) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) goto DROP_QNODE_OVER;
mError("qnode:%d, failed to drop since %s", pObj->id, terrstr());
goto DROP_QNODE_OVER;
}
mDebug("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id); mDebug("trans:%d, used to drop qnode:%d", pTrans->id, pObj->id);
if (mndSetDropQnodeRedoLogs(pTrans, pObj) != 0) goto DROP_QNODE_OVER;
if (mndSetDropQnodeRedoLogs(pTrans, pObj) != 0) { if (mndSetDropQnodeCommitLogs(pTrans, pObj) != 0) goto DROP_QNODE_OVER;
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); if (mndSetDropQnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) goto DROP_QNODE_OVER;
goto DROP_QNODE_OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_QNODE_OVER;
}
if (mndSetDropQnodeCommitLogs(pTrans, pObj) != 0) {
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
goto DROP_QNODE_OVER;
}
if (mndSetDropQnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto DROP_QNODE_OVER;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
goto DROP_QNODE_OVER;
}
code = 0; code = 0;
...@@ -364,8 +366,7 @@ static int32_t mndProcessDropQnodeReq(SMnodeMsg *pMsg) { ...@@ -364,8 +366,7 @@ static int32_t mndProcessDropQnodeReq(SMnodeMsg *pMsg) {
SQnodeObj *pObj = mndAcquireQnode(pMnode, pDrop->dnodeId); SQnodeObj *pObj = mndAcquireQnode(pMnode, pDrop->dnodeId);
if (pObj == NULL) { if (pObj == NULL) {
mError("qnode:%d, not exist", pDrop->dnodeId); mError("qnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr());
terrno = TSDB_CODE_MND_QNODE_NOT_EXIST;
return -1; return -1;
} }
......
...@@ -59,9 +59,8 @@ int32_t mndInitSnode(SMnode *pMnode) { ...@@ -59,9 +59,8 @@ int32_t mndInitSnode(SMnode *pMnode) {
void mndCleanupSnode(SMnode *pMnode) {} void mndCleanupSnode(SMnode *pMnode) {}
static SSnodeObj *mndAcquireSnode(SMnode *pMnode, int32_t snodeId) { static SSnodeObj *mndAcquireSnode(SMnode *pMnode, int32_t snodeId) {
SSdb *pSdb = pMnode->pSdb; SSnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_SNODE, &snodeId);
SSnodeObj *pObj = sdbAcquire(pSdb, SDB_SNODE, &snodeId); if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
if (pObj == NULL) {
terrno = TSDB_CODE_MND_SNODE_NOT_EXIST; terrno = TSDB_CODE_MND_SNODE_NOT_EXIST;
} }
return pObj; return pObj;
...@@ -169,6 +168,14 @@ static int32_t mndSetCreateSnodeRedoLogs(STrans *pTrans, SSnodeObj *pObj) { ...@@ -169,6 +168,14 @@ static int32_t mndSetCreateSnodeRedoLogs(STrans *pTrans, SSnodeObj *pObj) {
return 0; return 0;
} }
static int32_t mndSetCreateSnodeUndoLogs(STrans *pTrans, SSnodeObj *pObj) {
SSdbRaw *pUndoRaw = mndSnodeActionEncode(pObj);
if (pUndoRaw == NULL) return -1;
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
return 0;
}
static int32_t mndSetCreateSnodeCommitLogs(STrans *pTrans, SSnodeObj *pObj) { static int32_t mndSetCreateSnodeCommitLogs(STrans *pTrans, SSnodeObj *pObj) {
SSdbRaw *pCommitRaw = mndSnodeActionEncode(pObj); SSdbRaw *pCommitRaw = mndSnodeActionEncode(pObj);
if (pCommitRaw == NULL) return -1; if (pCommitRaw == NULL) return -1;
...@@ -190,6 +197,7 @@ static int32_t mndSetCreateSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S ...@@ -190,6 +197,7 @@ static int32_t mndSetCreateSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S
action.pCont = pMsg; action.pCont = pMsg;
action.contLen = sizeof(SDCreateSnodeReq); action.contLen = sizeof(SDCreateSnodeReq);
action.msgType = TDMT_DND_CREATE_SNODE; action.msgType = TDMT_DND_CREATE_SNODE;
action.acceptableCode = TSDB_CODE_DND_SNODE_ALREADY_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg); free(pMsg);
...@@ -199,39 +207,48 @@ static int32_t mndSetCreateSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S ...@@ -199,39 +207,48 @@ static int32_t mndSetCreateSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, S
return 0; return 0;
} }
static int32_t mndSetCreateSnodeUndoActions(STrans *pTrans, SDnodeObj *pDnode, SSnodeObj *pObj) {
SDDropSnodeReq *pMsg = malloc(sizeof(SDDropSnodeReq));
if (pMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pMsg->dnodeId = htonl(pDnode->id);
STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode);
action.pCont = pMsg;
action.contLen = sizeof(SDDropSnodeReq);
action.msgType = TDMT_DND_DROP_SNODE;
action.acceptableCode = TSDB_CODE_DND_SNODE_NOT_DEPLOYED;
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
free(pMsg);
return -1;
}
return 0;
}
static int32_t mndCreateSnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateSnodeReq *pCreate) { static int32_t mndCreateSnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode, SMCreateSnodeReq *pCreate) {
int32_t code = -1;
SSnodeObj snodeObj = {0}; SSnodeObj snodeObj = {0};
snodeObj.id = pDnode->id; snodeObj.id = pDnode->id;
snodeObj.createdTime = taosGetTimestampMs(); snodeObj.createdTime = taosGetTimestampMs();
snodeObj.updateTime = snodeObj.createdTime; snodeObj.updateTime = snodeObj.createdTime;
int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); if (pTrans == NULL) goto CREATE_SNODE_OVER;
if (pTrans == NULL) {
mError("snode:%d, failed to create since %s", pCreate->dnodeId, terrstr());
goto CREATE_SNODE_OVER;
}
mDebug("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId);
if (mndSetCreateSnodeRedoLogs(pTrans, &snodeObj) != 0) { mDebug("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId);
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr());
goto CREATE_SNODE_OVER;
}
if (mndSetCreateSnodeCommitLogs(pTrans, &snodeObj) != 0) {
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
goto CREATE_SNODE_OVER;
}
if (mndSetCreateSnodeRedoActions(pTrans, pDnode, &snodeObj) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto CREATE_SNODE_OVER;
}
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndSetCreateSnodeRedoLogs(pTrans, &snodeObj) != 0) goto CREATE_SNODE_OVER;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); if (mndSetCreateSnodeUndoLogs(pTrans, &snodeObj) != 0) goto CREATE_SNODE_OVER;
goto CREATE_SNODE_OVER; if (mndSetCreateSnodeCommitLogs(pTrans, &snodeObj) != 0) goto CREATE_SNODE_OVER;
} if (mndSetCreateSnodeRedoActions(pTrans, pDnode, &snodeObj) != 0) goto CREATE_SNODE_OVER;
if (mndSetCreateSnodeUndoActions(pTrans, pDnode, &snodeObj) != 0) goto CREATE_SNODE_OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_SNODE_OVER;
code = 0; code = 0;
...@@ -254,6 +271,9 @@ static int32_t mndProcessCreateSnodeReq(SMnodeMsg *pMsg) { ...@@ -254,6 +271,9 @@ static int32_t mndProcessCreateSnodeReq(SMnodeMsg *pMsg) {
terrno = TSDB_CODE_MND_SNODE_ALREADY_EXIST; terrno = TSDB_CODE_MND_SNODE_ALREADY_EXIST;
mndReleaseSnode(pMnode, pObj); mndReleaseSnode(pMnode, pObj);
return -1; return -1;
} else if (terrno != TSDB_CODE_MND_SNODE_NOT_EXIST) {
mError("snode:%d, failed to create snode since %s", pCreate->dnodeId, terrstr());
return -1;
} }
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId); SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId);
...@@ -303,6 +323,7 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn ...@@ -303,6 +323,7 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn
action.pCont = pMsg; action.pCont = pMsg;
action.contLen = sizeof(SDDropSnodeReq); action.contLen = sizeof(SDDropSnodeReq);
action.msgType = TDMT_DND_DROP_SNODE; action.msgType = TDMT_DND_DROP_SNODE;
action.acceptableCode = TSDB_CODE_DND_SNODE_NOT_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
free(pMsg); free(pMsg);
...@@ -314,33 +335,16 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn ...@@ -314,33 +335,16 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn
static int32_t mndDropSnode(SMnode *pMnode, SMnodeMsg *pMsg, SSnodeObj *pObj) { static int32_t mndDropSnode(SMnode *pMnode, SMnodeMsg *pMsg, SSnodeObj *pObj) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) goto DROP_SNODE_OVER;
mError("snode:%d, failed to drop since %s", pObj->id, terrstr());
goto DROP_SNODE_OVER;
}
mDebug("trans:%d, used to drop snode:%d", pTrans->id, pObj->id); mDebug("trans:%d, used to drop snode:%d", pTrans->id, pObj->id);
if (mndSetDropSnodeRedoLogs(pTrans, pObj) != 0) { if (mndSetDropSnodeRedoLogs(pTrans, pObj) != 0) goto DROP_SNODE_OVER;
mError("trans:%d, failed to set redo log since %s", pTrans->id, terrstr()); if (mndSetDropSnodeCommitLogs(pTrans, pObj) != 0) goto DROP_SNODE_OVER;
goto DROP_SNODE_OVER; if (mndSetDropSnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) goto DROP_SNODE_OVER;
} if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_SNODE_OVER;
if (mndSetDropSnodeCommitLogs(pTrans, pObj) != 0) {
mError("trans:%d, failed to set commit log since %s", pTrans->id, terrstr());
goto DROP_SNODE_OVER;
}
if (mndSetDropSnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) {
mError("trans:%d, failed to set redo actions since %s", pTrans->id, terrstr());
goto DROP_SNODE_OVER;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
goto DROP_SNODE_OVER;
}
code = 0; code = 0;
...@@ -364,8 +368,7 @@ static int32_t mndProcessDropSnodeReq(SMnodeMsg *pMsg) { ...@@ -364,8 +368,7 @@ static int32_t mndProcessDropSnodeReq(SMnodeMsg *pMsg) {
SSnodeObj *pObj = mndAcquireSnode(pMnode, pDrop->dnodeId); SSnodeObj *pObj = mndAcquireSnode(pMnode, pDrop->dnodeId);
if (pObj == NULL) { if (pObj == NULL) {
mError("snode:%d, not exist", pDrop->dnodeId); mError("snode:%d, failed to drop since %s", pDrop->dnodeId, terrstr());
terrno = TSDB_CODE_MND_SNODE_NOT_EXIST;
return -1; return -1;
} }
......
...@@ -143,6 +143,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { ...@@ -143,6 +143,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, i); STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER) SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, TRANS_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER)
} }
...@@ -151,6 +152,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { ...@@ -151,6 +152,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
STransAction *pAction = taosArrayGet(pTrans->undoActions, i); STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER) SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, TRANS_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER)
} }
...@@ -253,6 +255,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { ...@@ -253,6 +255,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
for (int32_t i = 0; i < redoActionNum; ++i) { for (int32_t i = 0; i < redoActionNum; ++i) {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), TRANS_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), TRANS_DECODE_OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, TRANS_DECODE_OVER) SDB_GET_INT16(pRaw, dataPos, &action.msgType, TRANS_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, TRANS_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, TRANS_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &action.contLen, TRANS_DECODE_OVER)
action.pCont = malloc(action.contLen); action.pCont = malloc(action.contLen);
if (action.pCont == NULL) goto TRANS_DECODE_OVER; if (action.pCont == NULL) goto TRANS_DECODE_OVER;
...@@ -264,6 +267,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { ...@@ -264,6 +267,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
for (int32_t i = 0; i < undoActionNum; ++i) { for (int32_t i = 0; i < undoActionNum; ++i) {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), TRANS_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), TRANS_DECODE_OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, TRANS_DECODE_OVER) SDB_GET_INT16(pRaw, dataPos, &action.msgType, TRANS_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, TRANS_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, TRANS_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &action.contLen, TRANS_DECODE_OVER)
action.pCont = malloc(action.contLen); action.pCont = malloc(action.contLen);
if (action.pCont == NULL) goto TRANS_DECODE_OVER; if (action.pCont == NULL) goto TRANS_DECODE_OVER;
...@@ -496,16 +500,32 @@ static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) { ...@@ -496,16 +500,32 @@ static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
} }
static void mndTransSendRpcRsp(STrans *pTrans) { static void mndTransSendRpcRsp(STrans *pTrans) {
if (pTrans->stage == TRN_STAGE_FINISHED || pTrans->stage == TRN_STAGE_UNDO_LOG || bool sendRsp = false;
pTrans->stage == TRN_STAGE_UNDO_ACTION || pTrans->stage == TRN_STAGE_ROLLBACK) {
if (pTrans->rpcHandle != NULL) { if (pTrans->stage == TRN_STAGE_FINISHED) {
mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage, sendRsp = true;
pTrans->rpcAHandle); }
SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = pTrans->code, .ahandle = pTrans->rpcAHandle};
rpcSendResponse(&rspMsg); if (pTrans->policy == TRN_POLICY_ROLLBACK) {
pTrans->rpcHandle = NULL; if (pTrans->stage == TRN_STAGE_UNDO_LOG || pTrans->stage == TRN_STAGE_UNDO_ACTION ||
pTrans->stage == TRN_STAGE_ROLLBACK) {
sendRsp = true;
}
}
if (pTrans->policy == TRN_POLICY_RETRY) {
if (pTrans->stage == TRN_STAGE_REDO_ACTION && pTrans->failedTimes > 0) {
sendRsp = true;
} }
} }
if (sendRsp && pTrans->rpcHandle != NULL) {
mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage,
pTrans->rpcAHandle);
SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = pTrans->code, .ahandle = pTrans->rpcAHandle};
rpcSendResponse(&rspMsg);
pTrans->rpcHandle = NULL;
}
} }
void mndTransProcessRsp(SMnodeMsg *pMsg) { void mndTransProcessRsp(SMnodeMsg *pMsg) {
...@@ -547,7 +567,8 @@ void mndTransProcessRsp(SMnodeMsg *pMsg) { ...@@ -547,7 +567,8 @@ void mndTransProcessRsp(SMnodeMsg *pMsg) {
pAction->errCode = pMsg->rpcMsg.code; pAction->errCode = pMsg->rpcMsg.code;
} }
mDebug("trans:%d, action:%d response is received, code:0x%x", transId, action, pMsg->rpcMsg.code); mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%x", transId, action, pMsg->rpcMsg.code,
pAction->acceptableCode);
mndTransExecute(pMnode, pTrans); mndTransExecute(pMnode, pTrans);
HANDLE_ACTION_RSP_OVER: HANDLE_ACTION_RSP_OVER:
...@@ -647,7 +668,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA ...@@ -647,7 +668,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
if (pAction == NULL) continue; if (pAction == NULL) continue;
if (pAction->msgSent && pAction->msgReceived) { if (pAction->msgSent && pAction->msgReceived) {
numOfReceived++; numOfReceived++;
if (pAction->errCode != 0) { if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
errCode = pAction->errCode; errCode = pAction->errCode;
} }
} }
...@@ -695,7 +716,7 @@ static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans) { ...@@ -695,7 +716,7 @@ static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans) {
} else { } else {
pTrans->code = terrno; pTrans->code = terrno;
pTrans->stage = TRN_STAGE_UNDO_LOG; pTrans->stage = TRN_STAGE_UNDO_LOG;
mError("trans:%d, stage from redoLog to undoLog", pTrans->id); mError("trans:%d, stage from redoLog to undoLog since %s", pTrans->id, terrstr());
} }
return continueExec; return continueExec;
......
...@@ -50,7 +50,18 @@ TEST_F(MndTestBnode, 01_Show_Bnode) { ...@@ -50,7 +50,18 @@ TEST_F(MndTestBnode, 01_Show_Bnode) {
EXPECT_EQ(test.GetShowRows(), 0); EXPECT_EQ(test.GetShowRows(), 0);
} }
TEST_F(MndTestBnode, 02_Create_Bnode_Invalid_Id) { TEST_F(MndTestBnode, 02_Create_Bnode) {
{
int32_t contLen = sizeof(SMCreateBnodeReq);
SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_DNODE_NOT_EXIST);
}
{ {
int32_t contLen = sizeof(SMCreateBnodeReq); int32_t contLen = sizeof(SMCreateBnodeReq);
...@@ -63,11 +74,6 @@ TEST_F(MndTestBnode, 02_Create_Bnode_Invalid_Id) { ...@@ -63,11 +74,6 @@ TEST_F(MndTestBnode, 02_Create_Bnode_Invalid_Id) {
test.SendShowMetaMsg(TSDB_MGMT_TABLE_BNODE, ""); test.SendShowMetaMsg(TSDB_MGMT_TABLE_BNODE, "");
CHECK_META("show bnodes", 3); CHECK_META("show bnodes", 3);
CHECK_SCHEMA(0, TSDB_DATA_TYPE_SMALLINT, 2, "id");
CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN + VARSTR_HEADER_SIZE, "endpoint");
CHECK_SCHEMA(2, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
test.SendShowRetrieveMsg(); test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 1); EXPECT_EQ(test.GetShowRows(), 1);
...@@ -75,24 +81,21 @@ TEST_F(MndTestBnode, 02_Create_Bnode_Invalid_Id) { ...@@ -75,24 +81,21 @@ TEST_F(MndTestBnode, 02_Create_Bnode_Invalid_Id) {
CheckBinary("localhost:9018", TSDB_EP_LEN); CheckBinary("localhost:9018", TSDB_EP_LEN);
CheckTimestamp(); CheckTimestamp();
} }
}
TEST_F(MndTestBnode, 03_Create_Bnode_Invalid_Id) {
{ {
int32_t contLen = sizeof(SMCreateBnodeReq); int32_t contLen = sizeof(SMCreateBnodeReq);
SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen); SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2); pReq->dnodeId = htonl(1);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_DNODE_NOT_EXIST); ASSERT_EQ(pMsg->code, TSDB_CODE_MND_BNODE_ALREADY_EXIST);
} }
} }
TEST_F(MndTestBnode, 04_Create_Bnode) { TEST_F(MndTestBnode, 03_Drop_Bnode) {
{ {
// create dnode
int32_t contLen = sizeof(SCreateDnodeMsg); int32_t contLen = sizeof(SCreateDnodeMsg);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
...@@ -110,7 +113,6 @@ TEST_F(MndTestBnode, 04_Create_Bnode) { ...@@ -110,7 +113,6 @@ TEST_F(MndTestBnode, 04_Create_Bnode) {
} }
{ {
// create bnode
int32_t contLen = sizeof(SMCreateBnodeReq); int32_t contLen = sizeof(SMCreateBnodeReq);
SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen); SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen);
...@@ -133,7 +135,6 @@ TEST_F(MndTestBnode, 04_Create_Bnode) { ...@@ -133,7 +135,6 @@ TEST_F(MndTestBnode, 04_Create_Bnode) {
} }
{ {
// drop bnode
int32_t contLen = sizeof(SMDropBnodeReq); int32_t contLen = sizeof(SMDropBnodeReq);
SMDropBnodeReq* pReq = (SMDropBnodeReq*)rpcMallocCont(contLen); SMDropBnodeReq* pReq = (SMDropBnodeReq*)rpcMallocCont(contLen);
...@@ -151,4 +152,143 @@ TEST_F(MndTestBnode, 04_Create_Bnode) { ...@@ -151,4 +152,143 @@ TEST_F(MndTestBnode, 04_Create_Bnode) {
CheckBinary("localhost:9018", TSDB_EP_LEN); CheckBinary("localhost:9018", TSDB_EP_LEN);
CheckTimestamp(); CheckTimestamp();
} }
{
int32_t contLen = sizeof(SMDropBnodeReq);
SMDropBnodeReq* pReq = (SMDropBnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_BNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_BNODE_NOT_EXIST);
}
}
TEST_F(MndTestBnode, 03_Create_Bnode_Rollback) {
{
// send message first, then dnode2 crash, result is returned, and rollback is started
int32_t contLen = sizeof(SMCreateBnodeReq);
SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
server2.Stop();
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
}
{
// continue send message, bnode is creating
int32_t contLen = sizeof(SMCreateBnodeReq);
SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
server2.Stop();
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_CREATING);
}
{
// continue send message, bnode is creating
int32_t contLen = sizeof(SMDropBnodeReq);
SMDropBnodeReq* pReq = (SMDropBnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
server2.Stop();
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_BNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_CREATING);
}
{
// server start, wait until the rollback finished
server2.DoStart();
taosMsleep(1000);
int32_t retry = 0;
int32_t retryMax = 10;
for (retry = 0; retry < retryMax; retry++) {
int32_t contLen = sizeof(SMCreateBnodeReq);
SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
if (pMsg->code == 0) break;
taosMsleep(1000);
}
ASSERT_NE(retry, retryMax);
}
}
TEST_F(MndTestBnode, 04_Drop_Bnode_Rollback) {
{
// send message first, then dnode2 crash, result is returned, and rollback is started
int32_t contLen = sizeof(SMDropBnodeReq);
SMDropBnodeReq* pReq = (SMDropBnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
server2.Stop();
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_BNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
}
{
// continue send message, bnode is dropping
int32_t contLen = sizeof(SMCreateBnodeReq);
SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
server2.Stop();
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_DROPPING);
}
{
// continue send message, bnode is dropping
int32_t contLen = sizeof(SMDropBnodeReq);
SMDropBnodeReq* pReq = (SMDropBnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
server2.Stop();
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_BNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_DROPPING);
}
{
// server start, wait until the rollback finished
server2.DoStart();
taosMsleep(1000);
int32_t retry = 0;
int32_t retryMax = 10;
for (retry = 0; retry < retryMax; retry++) {
int32_t contLen = sizeof(SMCreateBnodeReq);
SMCreateBnodeReq* pReq = (SMCreateBnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_BNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
if (pMsg->code == 0) break;
taosMsleep(1000);
}
ASSERT_NE(retry, retryMax);
}
} }
\ No newline at end of file
...@@ -90,13 +90,12 @@ TEST_F(MndTestQnode, 02_Create_Qnode) { ...@@ -90,13 +90,12 @@ TEST_F(MndTestQnode, 02_Create_Qnode) {
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_QNODE, pReq, contLen); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_QNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_DNODE_NOT_EXIST); ASSERT_EQ(pMsg->code, TSDB_CODE_MND_QNODE_ALREADY_EXIST);
} }
} }
TEST_F(MndTestQnode, 04_Create_Qnode) { TEST_F(MndTestQnode, 03_Drop_Qnode) {
{ {
// create dnode
int32_t contLen = sizeof(SCreateDnodeMsg); int32_t contLen = sizeof(SCreateDnodeMsg);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
...@@ -114,7 +113,6 @@ TEST_F(MndTestQnode, 04_Create_Qnode) { ...@@ -114,7 +113,6 @@ TEST_F(MndTestQnode, 04_Create_Qnode) {
} }
{ {
// create qnode
int32_t contLen = sizeof(SMCreateQnodeReq); int32_t contLen = sizeof(SMCreateQnodeReq);
SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen); SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen);
...@@ -137,7 +135,6 @@ TEST_F(MndTestQnode, 04_Create_Qnode) { ...@@ -137,7 +135,6 @@ TEST_F(MndTestQnode, 04_Create_Qnode) {
} }
{ {
// drop qnode
int32_t contLen = sizeof(SMDropQnodeReq); int32_t contLen = sizeof(SMDropQnodeReq);
SMDropQnodeReq* pReq = (SMDropQnodeReq*)rpcMallocCont(contLen); SMDropQnodeReq* pReq = (SMDropQnodeReq*)rpcMallocCont(contLen);
...@@ -155,4 +152,143 @@ TEST_F(MndTestQnode, 04_Create_Qnode) { ...@@ -155,4 +152,143 @@ TEST_F(MndTestQnode, 04_Create_Qnode) {
CheckBinary("localhost:9014", TSDB_EP_LEN); CheckBinary("localhost:9014", TSDB_EP_LEN);
CheckTimestamp(); CheckTimestamp();
} }
{
int32_t contLen = sizeof(SMDropQnodeReq);
SMDropQnodeReq* pReq = (SMDropQnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_QNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_QNODE_NOT_EXIST);
}
}
TEST_F(MndTestQnode, 03_Create_Qnode_Rollback) {
{
// send message first, then dnode2 crash, result is returned, and rollback is started
int32_t contLen = sizeof(SMCreateQnodeReq);
SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
server2.Stop();
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_QNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
}
{
// continue send message, qnode is creating
int32_t contLen = sizeof(SMCreateQnodeReq);
SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
server2.Stop();
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_QNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_CREATING);
}
{
// continue send message, qnode is creating
int32_t contLen = sizeof(SMDropQnodeReq);
SMDropQnodeReq* pReq = (SMDropQnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
server2.Stop();
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_QNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_CREATING);
}
{
// server start, wait until the rollback finished
server2.DoStart();
taosMsleep(1000);
int32_t retry = 0;
int32_t retryMax = 10;
for (retry = 0; retry < retryMax; retry++) {
int32_t contLen = sizeof(SMCreateQnodeReq);
SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_QNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
if (pMsg->code == 0) break;
taosMsleep(1000);
}
ASSERT_NE(retry, retryMax);
}
}
TEST_F(MndTestQnode, 04_Drop_Qnode_Rollback) {
{
// send message first, then dnode2 crash, result is returned, and rollback is started
int32_t contLen = sizeof(SMDropQnodeReq);
SMDropQnodeReq* pReq = (SMDropQnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
server2.Stop();
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_QNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
}
{
// continue send message, qnode is dropping
int32_t contLen = sizeof(SMCreateQnodeReq);
SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
server2.Stop();
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_QNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_DROPPING);
}
{
// continue send message, qnode is dropping
int32_t contLen = sizeof(SMDropQnodeReq);
SMDropQnodeReq* pReq = (SMDropQnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
server2.Stop();
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_QNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_DROPPING);
}
{
// server start, wait until the rollback finished
server2.DoStart();
taosMsleep(1000);
int32_t retry = 0;
int32_t retryMax = 10;
for (retry = 0; retry < retryMax; retry++) {
int32_t contLen = sizeof(SMCreateQnodeReq);
SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_QNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
if (pMsg->code == 0) break;
taosMsleep(1000);
}
ASSERT_NE(retry, retryMax);
}
} }
\ No newline at end of file
...@@ -50,7 +50,18 @@ TEST_F(MndTestSnode, 01_Show_Snode) { ...@@ -50,7 +50,18 @@ TEST_F(MndTestSnode, 01_Show_Snode) {
EXPECT_EQ(test.GetShowRows(), 0); EXPECT_EQ(test.GetShowRows(), 0);
} }
TEST_F(MndTestSnode, 02_Create_Snode_Invalid_Id) { TEST_F(MndTestSnode, 02_Create_Snode) {
{
int32_t contLen = sizeof(SMCreateSnodeReq);
SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_DNODE_NOT_EXIST);
}
{ {
int32_t contLen = sizeof(SMCreateSnodeReq); int32_t contLen = sizeof(SMCreateSnodeReq);
...@@ -63,11 +74,6 @@ TEST_F(MndTestSnode, 02_Create_Snode_Invalid_Id) { ...@@ -63,11 +74,6 @@ TEST_F(MndTestSnode, 02_Create_Snode_Invalid_Id) {
test.SendShowMetaMsg(TSDB_MGMT_TABLE_SNODE, ""); test.SendShowMetaMsg(TSDB_MGMT_TABLE_SNODE, "");
CHECK_META("show snodes", 3); CHECK_META("show snodes", 3);
CHECK_SCHEMA(0, TSDB_DATA_TYPE_SMALLINT, 2, "id");
CHECK_SCHEMA(1, TSDB_DATA_TYPE_BINARY, TSDB_EP_LEN + VARSTR_HEADER_SIZE, "endpoint");
CHECK_SCHEMA(2, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
test.SendShowRetrieveMsg(); test.SendShowRetrieveMsg();
EXPECT_EQ(test.GetShowRows(), 1); EXPECT_EQ(test.GetShowRows(), 1);
...@@ -75,24 +81,21 @@ TEST_F(MndTestSnode, 02_Create_Snode_Invalid_Id) { ...@@ -75,24 +81,21 @@ TEST_F(MndTestSnode, 02_Create_Snode_Invalid_Id) {
CheckBinary("localhost:9016", TSDB_EP_LEN); CheckBinary("localhost:9016", TSDB_EP_LEN);
CheckTimestamp(); CheckTimestamp();
} }
}
TEST_F(MndTestSnode, 03_Create_Snode_Invalid_Id) {
{ {
int32_t contLen = sizeof(SMCreateSnodeReq); int32_t contLen = sizeof(SMCreateSnodeReq);
SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen); SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2); pReq->dnodeId = htonl(1);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_DNODE_NOT_EXIST); ASSERT_EQ(pMsg->code, TSDB_CODE_MND_SNODE_ALREADY_EXIST);
} }
} }
TEST_F(MndTestSnode, 04_Create_Snode) { TEST_F(MndTestSnode, 03_Drop_Snode) {
{ {
// create dnode
int32_t contLen = sizeof(SCreateDnodeMsg); int32_t contLen = sizeof(SCreateDnodeMsg);
SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen);
...@@ -110,7 +113,6 @@ TEST_F(MndTestSnode, 04_Create_Snode) { ...@@ -110,7 +113,6 @@ TEST_F(MndTestSnode, 04_Create_Snode) {
} }
{ {
// create snode
int32_t contLen = sizeof(SMCreateSnodeReq); int32_t contLen = sizeof(SMCreateSnodeReq);
SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen); SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen);
...@@ -133,7 +135,6 @@ TEST_F(MndTestSnode, 04_Create_Snode) { ...@@ -133,7 +135,6 @@ TEST_F(MndTestSnode, 04_Create_Snode) {
} }
{ {
// drop snode
int32_t contLen = sizeof(SMDropSnodeReq); int32_t contLen = sizeof(SMDropSnodeReq);
SMDropSnodeReq* pReq = (SMDropSnodeReq*)rpcMallocCont(contLen); SMDropSnodeReq* pReq = (SMDropSnodeReq*)rpcMallocCont(contLen);
...@@ -151,4 +152,143 @@ TEST_F(MndTestSnode, 04_Create_Snode) { ...@@ -151,4 +152,143 @@ TEST_F(MndTestSnode, 04_Create_Snode) {
CheckBinary("localhost:9016", TSDB_EP_LEN); CheckBinary("localhost:9016", TSDB_EP_LEN);
CheckTimestamp(); CheckTimestamp();
} }
{
int32_t contLen = sizeof(SMDropSnodeReq);
SMDropSnodeReq* pReq = (SMDropSnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_SNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_MND_SNODE_NOT_EXIST);
}
}
TEST_F(MndTestSnode, 03_Create_Snode_Rollback) {
{
// send message first, then dnode2 crash, result is returned, and rollback is started
int32_t contLen = sizeof(SMCreateSnodeReq);
SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
server2.Stop();
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
}
{
// continue send message, snode is creating
int32_t contLen = sizeof(SMCreateSnodeReq);
SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
server2.Stop();
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_CREATING);
}
{
// continue send message, snode is creating
int32_t contLen = sizeof(SMDropSnodeReq);
SMDropSnodeReq* pReq = (SMDropSnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
server2.Stop();
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_SNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_CREATING);
}
{
// server start, wait until the rollback finished
server2.DoStart();
taosMsleep(1000);
int32_t retry = 0;
int32_t retryMax = 10;
for (retry = 0; retry < retryMax; retry++) {
int32_t contLen = sizeof(SMCreateSnodeReq);
SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
if (pMsg->code == 0) break;
taosMsleep(1000);
}
ASSERT_NE(retry, retryMax);
}
}
TEST_F(MndTestSnode, 04_Drop_Snode_Rollback) {
{
// send message first, then dnode2 crash, result is returned, and rollback is started
int32_t contLen = sizeof(SMDropSnodeReq);
SMDropSnodeReq* pReq = (SMDropSnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
server2.Stop();
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_SNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_RPC_NETWORK_UNAVAIL);
}
{
// continue send message, snode is dropping
int32_t contLen = sizeof(SMCreateSnodeReq);
SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
server2.Stop();
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_DROPPING);
}
{
// continue send message, snode is dropping
int32_t contLen = sizeof(SMDropSnodeReq);
SMDropSnodeReq* pReq = (SMDropSnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
server2.Stop();
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_DROP_SNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, TSDB_CODE_SDB_OBJ_DROPPING);
}
{
// server start, wait until the rollback finished
server2.DoStart();
taosMsleep(1000);
int32_t retry = 0;
int32_t retryMax = 10;
for (retry = 0; retry < retryMax; retry++) {
int32_t contLen = sizeof(SMCreateSnodeReq);
SMCreateSnodeReq* pReq = (SMCreateSnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_SNODE, pReq, contLen);
ASSERT_NE(pMsg, nullptr);
if (pMsg->code == 0) break;
taosMsleep(1000);
}
ASSERT_NE(retry, retryMax);
}
} }
\ No newline at end of file
...@@ -69,6 +69,8 @@ static const char *sdbStatusStr(ESdbStatus status) { ...@@ -69,6 +69,8 @@ static const char *sdbStatusStr(ESdbStatus status) {
return "ready"; return "ready";
case SDB_STATUS_DROPPED: case SDB_STATUS_DROPPED:
return "dropped"; return "dropped";
case SDB_STATUS_INIT:
return "init";
default: default:
return "undefine"; return "undefine";
} }
...@@ -261,6 +263,8 @@ int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw) { ...@@ -261,6 +263,8 @@ int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw) {
} }
void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey) { void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey) {
terrno = 0;
SHashObj *hash = sdbGetHash(pSdb, type); SHashObj *hash = sdbGetHash(pSdb, type);
if (hash == NULL) return NULL; if (hash == NULL) return NULL;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册