提交 a1c218d3 编写于 作者: S Shengliang Guan

feat[cluster]: create and drop bnode

上级 4da5a8ed
...@@ -16,8 +16,7 @@ ...@@ -16,8 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "bmInt.h" #include "bmInt.h"
void bmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonBmInfo *bmInfo) { void bmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonBmInfo *bmInfo) {}
}
int32_t bmProcessGetMonBmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { int32_t bmProcessGetMonBmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
SMonBmInfo bmInfo = {0}; SMonBmInfo bmInfo = {0};
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "bmInt.h" #include "bmInt.h"
static void bmSendErrorRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { static void bmSendErrorRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code}; SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
tmsgSendRsp(&rpcRsp); tmsgSendRsp(&rpcRsp);
...@@ -25,15 +25,17 @@ static void bmSendErrorRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) ...@@ -25,15 +25,17 @@ static void bmSendErrorRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code)
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
static void bmSendErrorRsps(SMgmtWrapper *pWrapper, STaosQall *qall, int32_t numOfMsgs, int32_t code) { static void bmSendErrorRsps(STaosQall *qall, int32_t numOfMsgs, int32_t code) {
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
SNodeMsg *pMsg = NULL; SNodeMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
bmSendErrorRsp(pWrapper, pMsg, code); if (pMsg != NULL) {
bmSendErrorRsp(pMsg, code);
}
} }
} }
static inline void bmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { static inline void bmSendRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle, .ahandle = pMsg->rpcMsg.ahandle,
.code = code, .code = code,
...@@ -42,20 +44,22 @@ static inline void bmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t cod ...@@ -42,20 +44,22 @@ static inline void bmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t cod
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
static void bmProcessMonQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { static void bmProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SBnodeMgmt *pMgmt = pInfo->ahandle; SBnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, get from bnode monitor queue", pMsg); dTrace("msg:%p, get from bnode-monitor queue", pMsg);
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1; int32_t code = -1;
if (pMsg->rpcMsg.msgType == TDMT_MON_BM_INFO) { if (pMsg->rpcMsg.msgType == TDMT_MON_BM_INFO) {
code = bmProcessGetMonBmInfoReq(pMgmt->pWrapper, pMsg); code = bmProcessGetMonBmInfoReq(pMgmt->pWrapper, pMsg);
} else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
} }
if (pRpc->msgType & 1U) { if (pRpc->msgType & 1U) {
if (code != 0 && terrno != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
bmSendRsp(pMgmt->pWrapper, pMsg, code); bmSendRsp(pMsg, code);
} }
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
...@@ -64,21 +68,22 @@ static void bmProcessMonQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -64,21 +68,22 @@ static void bmProcessMonQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
} }
static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SBnodeMgmt *pMgmt = pInfo->ahandle; SBnodeMgmt *pMgmt = pInfo->ahandle;
SMgmtWrapper *pWrapper = pMgmt->pWrapper;
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
if (pArray == NULL) { if (pArray == NULL) {
bmSendErrorRsps(pWrapper, qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY); bmSendErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY);
return; return;
} }
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
SNodeMsg *pMsg = NULL; SNodeMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg); taosGetQitem(qall, (void **)&pMsg);
dTrace("msg:%p, will be processed in bnode queue", pMsg); if (pMsg != NULL) {
if (taosArrayPush(pArray, &pMsg) == NULL) { dTrace("msg:%p, get from bnode-write queue", pMsg);
bmSendErrorRsp(pWrapper, pMsg, TSDB_CODE_OUT_OF_MEMORY); if (taosArrayPush(pArray, &pMsg) == NULL) {
bmSendErrorRsp(pMsg, TSDB_CODE_OUT_OF_MEMORY);
}
} }
} }
...@@ -86,9 +91,11 @@ static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO ...@@ -86,9 +91,11 @@ static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
for (size_t i = 0; i < numOfMsgs; i++) { for (size_t i = 0; i < numOfMsgs; i++) {
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
dTrace("msg:%p, is freed", pMsg); if (pMsg != NULL) {
rpcFreeCont(pMsg->rpcMsg.pCont); dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg); rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
} }
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
} }
...@@ -120,12 +127,12 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) { ...@@ -120,12 +127,12 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
if (tsMultiProcess) { if (tsMultiProcess) {
SSingleWorkerCfg mCfg = { SSingleWorkerCfg mCfg = {
.min = 1, .max = 1, .name = "bnode-monitor", .fp = (FItem)bmProcessMonQueue, .param = pMgmt}; .min = 1, .max = 1, .name = "bnode-monitor", .fp = (FItem)bmProcessMonitorQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
dError("failed to start bnode-monitor worker since %s", terrstr()); dError("failed to start bnode-monitor worker since %s", terrstr());
return -1; return -1;
} }
} }
dDebug("bnode workers are initialized"); dDebug("bnode workers are initialized");
return 0; return 0;
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "qmInt.h" #include "qmInt.h"
static inline void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { static inline void qmSendRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle, .ahandle = pMsg->rpcMsg.ahandle,
.code = code, .code = code,
...@@ -40,7 +40,7 @@ static void qmProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -40,7 +40,7 @@ static void qmProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
if (pRpc->msgType & 1U) { if (pRpc->msgType & 1U) {
if (code != 0 && terrno != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
qmSendRsp(pMgmt->pWrapper, pMsg, code); qmSendRsp(pMsg, code);
} }
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
...@@ -56,7 +56,7 @@ static void qmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -56,7 +56,7 @@ static void qmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
int32_t code = qndProcessQueryMsg(pMgmt->pQnode, pRpc); int32_t code = qndProcessQueryMsg(pMgmt->pQnode, pRpc);
if (pRpc->msgType & 1U && code != 0) { if (pRpc->msgType & 1U && code != 0) {
qmSendRsp(pMgmt->pWrapper, pMsg, code); qmSendRsp(pMsg, code);
} }
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
...@@ -72,7 +72,7 @@ static void qmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -72,7 +72,7 @@ static void qmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
int32_t code = qndProcessFetchMsg(pMgmt->pQnode, pRpc); int32_t code = qndProcessFetchMsg(pMgmt->pQnode, pRpc);
if (pRpc->msgType & 1U && code != 0) { if (pRpc->msgType & 1U && code != 0) {
qmSendRsp(pMgmt->pWrapper, pMsg, code); qmSendRsp(pMsg, code);
} }
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
......
...@@ -21,17 +21,17 @@ ...@@ -21,17 +21,17 @@
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#define TSDB_BNODE_VER_NUMBER 1 #define BNODE_VER_NUMBER 1
#define TSDB_BNODE_RESERVE_SIZE 64 #define BNODE_RESERVE_SIZE 64
static SSdbRaw *mndBnodeActionEncode(SBnodeObj *pObj); static SSdbRaw *mndBnodeActionEncode(SBnodeObj *pObj);
static SSdbRow *mndBnodeActionDecode(SSdbRaw *pRaw); static SSdbRow *mndBnodeActionDecode(SSdbRaw *pRaw);
static int32_t mndBnodeActionInsert(SSdb *pSdb, SBnodeObj *pObj); static int32_t mndBnodeActionInsert(SSdb *pSdb, SBnodeObj *pObj);
static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj);
static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOld, SBnodeObj *pNew); static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOld, SBnodeObj *pNew);
static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj);
static int32_t mndProcessCreateBnodeReq(SNodeMsg *pReq); static int32_t mndProcessCreateBnodeReq(SNodeMsg *pReq);
static int32_t mndProcessDropBnodeReq(SNodeMsg *pReq);
static int32_t mndProcessCreateBnodeRsp(SNodeMsg *pRsp); static int32_t mndProcessCreateBnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessDropBnodeReq(SNodeMsg *pReq);
static int32_t mndProcessDropBnodeRsp(SNodeMsg *pRsp); static int32_t mndProcessDropBnodeRsp(SNodeMsg *pRsp);
static int32_t mndGetBnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndGetBnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveBnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static int32_t mndRetrieveBnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
...@@ -76,18 +76,18 @@ static void mndReleaseBnode(SMnode *pMnode, SBnodeObj *pObj) { ...@@ -76,18 +76,18 @@ static void mndReleaseBnode(SMnode *pMnode, SBnodeObj *pObj) {
static SSdbRaw *mndBnodeActionEncode(SBnodeObj *pObj) { static SSdbRaw *mndBnodeActionEncode(SBnodeObj *pObj) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRaw *pRaw = sdbAllocRaw(SDB_BNODE, TSDB_BNODE_VER_NUMBER, sizeof(SBnodeObj) + TSDB_BNODE_RESERVE_SIZE); SSdbRaw *pRaw = sdbAllocRaw(SDB_BNODE, BNODE_VER_NUMBER, sizeof(SBnodeObj) + BNODE_RESERVE_SIZE);
if (pRaw == NULL) goto BNODE_ENCODE_OVER; if (pRaw == NULL) goto _OVER;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pObj->id, BNODE_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, BNODE_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, BNODE_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_BNODE_RESERVE_SIZE, BNODE_ENCODE_OVER) SDB_SET_RESERVE(pRaw, dataPos, BNODE_RESERVE_SIZE, _OVER)
terrno = 0; terrno = 0;
BNODE_ENCODE_OVER: _OVER:
if (terrno != 0) { if (terrno != 0) {
mError("bnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr()); mError("bnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
...@@ -102,28 +102,28 @@ static SSdbRow *mndBnodeActionDecode(SSdbRaw *pRaw) { ...@@ -102,28 +102,28 @@ static SSdbRow *mndBnodeActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto BNODE_DECODE_OVER; if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != TSDB_BNODE_VER_NUMBER) { if (sver != BNODE_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER; terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto BNODE_DECODE_OVER; goto _OVER;
} }
SSdbRow *pRow = sdbAllocRow(sizeof(SBnodeObj)); SSdbRow *pRow = sdbAllocRow(sizeof(SBnodeObj));
if (pRow == NULL) goto BNODE_DECODE_OVER; if (pRow == NULL) goto _OVER;
SBnodeObj *pObj = sdbGetRowObj(pRow); SBnodeObj *pObj = sdbGetRowObj(pRow);
if (pObj == NULL) goto BNODE_DECODE_OVER; if (pObj == NULL) goto _OVER;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_GET_INT32(pRaw, dataPos, &pObj->id, BNODE_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, BNODE_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, BNODE_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_BNODE_RESERVE_SIZE, BNODE_DECODE_OVER) SDB_GET_RESERVE(pRaw, dataPos, BNODE_RESERVE_SIZE, _OVER)
terrno = 0; terrno = 0;
BNODE_DECODE_OVER: _OVER:
if (terrno != 0) { if (terrno != 0) {
mError("bnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr()); mError("bnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
taosMemoryFreeClear(pRow); taosMemoryFreeClear(pRow);
...@@ -249,19 +249,19 @@ static int32_t mndCreateBnode(SMnode *pMnode, SNodeMsg *pReq, SDnodeObj *pDnode, ...@@ -249,19 +249,19 @@ static int32_t mndCreateBnode(SMnode *pMnode, SNodeMsg *pReq, SDnodeObj *pDnode,
bnodeObj.updateTime = bnodeObj.createdTime; bnodeObj.updateTime = bnodeObj.createdTime;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_BNODE, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_BNODE, &pReq->rpcMsg);
if (pTrans == NULL) goto CREATE_BNODE_OVER; if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create bnode:%d", pTrans->id, pCreate->dnodeId); mDebug("trans:%d, used to create bnode:%d", pTrans->id, pCreate->dnodeId);
if (mndSetCreateBnodeRedoLogs(pTrans, &bnodeObj) != 0) goto CREATE_BNODE_OVER; if (mndSetCreateBnodeRedoLogs(pTrans, &bnodeObj) != 0) goto _OVER;
if (mndSetCreateBnodeUndoLogs(pTrans, &bnodeObj) != 0) goto CREATE_BNODE_OVER; if (mndSetCreateBnodeUndoLogs(pTrans, &bnodeObj) != 0) goto _OVER;
if (mndSetCreateBnodeCommitLogs(pTrans, &bnodeObj) != 0) goto CREATE_BNODE_OVER; if (mndSetCreateBnodeCommitLogs(pTrans, &bnodeObj) != 0) goto _OVER;
if (mndSetCreateBnodeRedoActions(pTrans, pDnode, &bnodeObj) != 0) goto CREATE_BNODE_OVER; if (mndSetCreateBnodeRedoActions(pTrans, pDnode, &bnodeObj) != 0) goto _OVER;
if (mndSetCreateBnodeUndoActions(pTrans, pDnode, &bnodeObj) != 0) goto CREATE_BNODE_OVER; if (mndSetCreateBnodeUndoActions(pTrans, pDnode, &bnodeObj) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_BNODE_OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0; code = 0;
CREATE_BNODE_OVER: _OVER:
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;
} }
...@@ -276,7 +276,7 @@ static int32_t mndProcessCreateBnodeReq(SNodeMsg *pReq) { ...@@ -276,7 +276,7 @@ static int32_t mndProcessCreateBnodeReq(SNodeMsg *pReq) {
if (tDeserializeSCreateDropMQSBNodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) { if (tDeserializeSCreateDropMQSBNodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto CREATE_BNODE_OVER; goto _OVER;
} }
mDebug("bnode:%d, start to create", createReq.dnodeId); mDebug("bnode:%d, start to create", createReq.dnodeId);
...@@ -284,31 +284,31 @@ static int32_t mndProcessCreateBnodeReq(SNodeMsg *pReq) { ...@@ -284,31 +284,31 @@ static int32_t mndProcessCreateBnodeReq(SNodeMsg *pReq) {
pObj = mndAcquireBnode(pMnode, createReq.dnodeId); pObj = mndAcquireBnode(pMnode, createReq.dnodeId);
if (pObj != NULL) { if (pObj != NULL) {
terrno = TSDB_CODE_MND_BNODE_ALREADY_EXIST; terrno = TSDB_CODE_MND_BNODE_ALREADY_EXIST;
goto CREATE_BNODE_OVER; goto _OVER;
} else if (terrno != TSDB_CODE_MND_BNODE_NOT_EXIST) { } else if (terrno != TSDB_CODE_MND_BNODE_NOT_EXIST) {
goto CREATE_BNODE_OVER; goto _OVER;
} }
pDnode = mndAcquireDnode(pMnode, createReq.dnodeId); pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
if (pDnode == NULL) { if (pDnode == NULL) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
goto CREATE_BNODE_OVER; goto _OVER;
} }
pUser = mndAcquireUser(pMnode, pReq->user); pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) { if (pUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto CREATE_BNODE_OVER; goto _OVER;
} }
if (mndCheckNodeAuth(pUser)) { if (mndCheckNodeAuth(pUser)) {
goto CREATE_BNODE_OVER; goto _OVER;
} }
code = mndCreateBnode(pMnode, pReq, pDnode, &createReq); code = mndCreateBnode(pMnode, pReq, pDnode, &createReq);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
CREATE_BNODE_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("bnode:%d, failed to create since %s", createReq.dnodeId, terrstr()); mError("bnode:%d, failed to create since %s", createReq.dnodeId, terrstr());
} }
...@@ -316,7 +316,6 @@ CREATE_BNODE_OVER: ...@@ -316,7 +316,6 @@ CREATE_BNODE_OVER:
mndReleaseBnode(pMnode, pObj); mndReleaseBnode(pMnode, pObj);
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
mndReleaseUser(pMnode, pUser); mndReleaseUser(pMnode, pUser);
return code; return code;
} }
...@@ -367,17 +366,17 @@ static int32_t mndDropBnode(SMnode *pMnode, SNodeMsg *pReq, SBnodeObj *pObj) { ...@@ -367,17 +366,17 @@ static int32_t mndDropBnode(SMnode *pMnode, SNodeMsg *pReq, SBnodeObj *pObj) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_BNODE, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_BNODE, &pReq->rpcMsg);
if (pTrans == NULL) goto DROP_BNODE_OVER; if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop bnode:%d", pTrans->id, pObj->id); mDebug("trans:%d, used to drop bnode:%d", pTrans->id, pObj->id);
if (mndSetDropBnodeRedoLogs(pTrans, pObj) != 0) goto DROP_BNODE_OVER; if (mndSetDropBnodeRedoLogs(pTrans, pObj) != 0) goto _OVER;
if (mndSetDropBnodeCommitLogs(pTrans, pObj) != 0) goto DROP_BNODE_OVER; if (mndSetDropBnodeCommitLogs(pTrans, pObj) != 0) goto _OVER;
if (mndSetDropBnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) goto DROP_BNODE_OVER; if (mndSetDropBnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_BNODE_OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0; code = 0;
DROP_BNODE_OVER: _OVER:
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;
} }
...@@ -391,35 +390,35 @@ static int32_t mndProcessDropBnodeReq(SNodeMsg *pReq) { ...@@ -391,35 +390,35 @@ static int32_t mndProcessDropBnodeReq(SNodeMsg *pReq) {
if (tDeserializeSCreateDropMQSBNodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) { if (tDeserializeSCreateDropMQSBNodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto DROP_BNODE_OVER; goto _OVER;
} }
mDebug("bnode:%d, start to drop", dropReq.dnodeId); mDebug("bnode:%d, start to drop", dropReq.dnodeId);
if (dropReq.dnodeId <= 0) { if (dropReq.dnodeId <= 0) {
terrno = TSDB_CODE_SDB_APP_ERROR; terrno = TSDB_CODE_SDB_APP_ERROR;
goto DROP_BNODE_OVER; goto _OVER;
} }
pObj = mndAcquireBnode(pMnode, dropReq.dnodeId); pObj = mndAcquireBnode(pMnode, dropReq.dnodeId);
if (pObj == NULL) { if (pObj == NULL) {
goto DROP_BNODE_OVER; goto _OVER;
} }
pUser = mndAcquireUser(pMnode, pReq->user); pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) { if (pUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto DROP_BNODE_OVER; goto _OVER;
} }
if (mndCheckNodeAuth(pUser)) { if (mndCheckNodeAuth(pUser)) {
goto DROP_BNODE_OVER; goto _OVER;
} }
code = mndDropBnode(pMnode, pReq, pObj); code = mndDropBnode(pMnode, pReq, pObj);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
DROP_BNODE_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("bnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr()); mError("bnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册