提交 6810e82f 编写于 作者: S Shengliang Guan

feat[cluster]: create and drop snode

上级 e9f84582
......@@ -55,7 +55,7 @@ int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
if (createReq.dnodeId != pDnode->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create snode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pDnode->dnodeId);
dError("failed to create snode since %s", terrstr());
return -1;
} else {
return smOpen(pWrapper);
......
......@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "smInt.h"
static inline void smSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
static inline void smSendRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.code = code,
......@@ -28,17 +28,19 @@ static inline void smSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t cod
static void smProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SSnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, get from snode monitor queue", pMsg);
dTrace("msg:%p, get from snode-monitor queue", pMsg);
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1;
if (pMsg->rpcMsg.msgType == TDMT_MON_SM_INFO) {
code = smProcessGetMonSmInfoReq(pMgmt->pWrapper, pMsg);
} else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
}
if (pRpc->msgType & 1U) {
if (code != 0 && terrno != 0) code = terrno;
smSendRsp(pMgmt->pWrapper, pMsg, code);
smSendRsp(pMsg, code);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
......@@ -53,7 +55,7 @@ static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t num
SNodeMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg);
dTrace("msg:%p, will be processed in snode unique queue", pMsg);
dTrace("msg:%p, get from snode-unique queue", pMsg);
sndProcessUMsg(pMgmt->pSnode, &pMsg->rpcMsg);
dTrace("msg:%p, is freed", pMsg);
......@@ -65,7 +67,7 @@ static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t num
static void smProcessSharedQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SSnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, will be processed in snode shared queue", pMsg);
dTrace("msg:%p, get from snode-shared queue", pMsg);
sndProcessSMsg(pMgmt->pSnode, &pMsg->rpcMsg);
dTrace("msg:%p, is freed", pMsg);
......@@ -88,7 +90,6 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
}
SMultiWorkerCfg cfg = {.max = 1, .name = "snode-unique", .fp = smProcessUniqueQueue, .param = pMgmt};
if (tMultiWorkerInit(pUniqueWorker, &cfg) != 0) {
dError("failed to start snode-unique worker since %s", terrstr());
return -1;
......@@ -193,7 +194,7 @@ int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
}
int32_t smProcessExecMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
int32_t workerType = smGetSWTypeFromMsg(&pMsg->rpcMsg);
int32_t workerType = smGetSWTypeFromMsg(&pMsg->rpcMsg);
if (workerType == SND_WORKER_TYPE__SHARED) {
return smProcessSharedMsg(pWrapper, pMsg);
} else {
......
......@@ -21,17 +21,17 @@
#include "mndTrans.h"
#include "mndUser.h"
#define TSDB_SNODE_VER_NUMBER 1
#define TSDB_SNODE_RESERVE_SIZE 64
#define SNODE_VER_NUMBER 1
#define SNODE_RESERVE_SIZE 64
static SSdbRaw *mndSnodeActionEncode(SSnodeObj *pObj);
static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw);
static int32_t mndSnodeActionInsert(SSdb *pSdb, SSnodeObj *pObj);
static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj);
static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew);
static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj);
static int32_t mndProcessCreateSnodeReq(SNodeMsg *pReq);
static int32_t mndProcessDropSnodeReq(SNodeMsg *pReq);
static int32_t mndProcessCreateSnodeRsp(SNodeMsg *pRsp);
static int32_t mndProcessDropSnodeReq(SNodeMsg *pReq);
static int32_t mndProcessDropSnodeRsp(SNodeMsg *pRsp);
static int32_t mndGetSnodeMeta(SNodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveSnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows);
......@@ -61,11 +61,9 @@ int32_t mndInitSnode(SMnode *pMnode) {
void mndCleanupSnode(SMnode *pMnode) {}
SEpSet mndAcquireEpFromSnode(SMnode *pMnode, const SSnodeObj *pSnode) {
SEpSet epSet;
memcpy(epSet.eps->fqdn, pSnode->pDnode->fqdn, 128);
epSet.eps->port = pSnode->pDnode->port;
epSet.numOfEps = 1;
epSet.inUse = 0;
SEpSet epSet = {.numOfEps = 1, .inUse = 0};
memcpy(epSet.eps[0].fqdn, pSnode->pDnode->fqdn, TSDB_FQDN_LEN);
epSet.eps[0].port = pSnode->pDnode->port;
return epSet;
}
......@@ -85,18 +83,18 @@ static void mndReleaseSnode(SMnode *pMnode, SSnodeObj *pObj) {
static SSdbRaw *mndSnodeActionEncode(SSnodeObj *pObj) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRaw *pRaw = sdbAllocRaw(SDB_SNODE, TSDB_SNODE_VER_NUMBER, sizeof(SSnodeObj) + TSDB_SNODE_RESERVE_SIZE);
if (pRaw == NULL) goto SNODE_ENCODE_OVER;
SSdbRaw *pRaw = sdbAllocRaw(SDB_SNODE, SNODE_VER_NUMBER, sizeof(SSnodeObj) + SNODE_RESERVE_SIZE);
if (pRaw == NULL) goto _OVER;
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pObj->id, SNODE_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, SNODE_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, SNODE_ENCODE_OVER)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_SNODE_RESERVE_SIZE, SNODE_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, SNODE_RESERVE_SIZE, _OVER)
terrno = 0;
SNODE_ENCODE_OVER:
_OVER:
if (terrno != 0) {
mError("snode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
sdbFreeRaw(pRaw);
......@@ -111,28 +109,28 @@ static SSdbRow *mndSnodeActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SNODE_DECODE_OVER;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
if (sver != TSDB_SNODE_VER_NUMBER) {
if (sver != SNODE_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto SNODE_DECODE_OVER;
goto _OVER;
}
SSdbRow *pRow = sdbAllocRow(sizeof(SSnodeObj));
if (pRow == NULL) goto SNODE_DECODE_OVER;
if (pRow == NULL) goto _OVER;
SSnodeObj *pObj = sdbGetRowObj(pRow);
if (pObj == NULL) goto SNODE_DECODE_OVER;
if (pObj == NULL) goto _OVER;
int32_t dataPos = 0;
SDB_GET_INT32(pRaw, dataPos, &pObj->id, SNODE_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, SNODE_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, SNODE_DECODE_OVER)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_SNODE_RESERVE_SIZE, SNODE_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
SDB_GET_RESERVE(pRaw, dataPos, SNODE_RESERVE_SIZE, _OVER)
terrno = 0;
SNODE_DECODE_OVER:
_OVER:
if (terrno != 0) {
mError("snode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
taosMemoryFreeClear(pRow);
......@@ -258,20 +256,20 @@ static int32_t mndCreateSnode(SMnode *pMnode, SNodeMsg *pReq, SDnodeObj *pDnode,
snodeObj.updateTime = snodeObj.createdTime;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_SNODE, &pReq->rpcMsg);
if (pTrans == NULL) goto CREATE_SNODE_OVER;
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create snode:%d", pTrans->id, pCreate->dnodeId);
if (mndSetCreateSnodeRedoLogs(pTrans, &snodeObj) != 0) goto CREATE_SNODE_OVER;
if (mndSetCreateSnodeUndoLogs(pTrans, &snodeObj) != 0) goto CREATE_SNODE_OVER;
if (mndSetCreateSnodeCommitLogs(pTrans, &snodeObj) != 0) goto CREATE_SNODE_OVER;
if (mndSetCreateSnodeRedoActions(pTrans, pDnode, &snodeObj) != 0) goto CREATE_SNODE_OVER;
if (mndSetCreateSnodeUndoActions(pTrans, pDnode, &snodeObj) != 0) goto CREATE_SNODE_OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_SNODE_OVER;
if (mndSetCreateSnodeRedoLogs(pTrans, &snodeObj) != 0) goto _OVER;
if (mndSetCreateSnodeUndoLogs(pTrans, &snodeObj) != 0) goto _OVER;
if (mndSetCreateSnodeCommitLogs(pTrans, &snodeObj) != 0) goto _OVER;
if (mndSetCreateSnodeRedoActions(pTrans, pDnode, &snodeObj) != 0) goto _OVER;
if (mndSetCreateSnodeUndoActions(pTrans, pDnode, &snodeObj) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
CREATE_SNODE_OVER:
_OVER:
mndTransDrop(pTrans);
return code;
}
......@@ -286,7 +284,7 @@ static int32_t mndProcessCreateSnodeReq(SNodeMsg *pReq) {
if (tDeserializeSCreateDropMQSBNodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto CREATE_SNODE_OVER;
goto _OVER;
}
mDebug("snode:%d, start to create", createReq.dnodeId);
......@@ -294,31 +292,31 @@ static int32_t mndProcessCreateSnodeReq(SNodeMsg *pReq) {
pObj = mndAcquireSnode(pMnode, createReq.dnodeId);
if (pObj != NULL) {
terrno = TSDB_CODE_MND_SNODE_ALREADY_EXIST;
goto CREATE_SNODE_OVER;
goto _OVER;
} else if (terrno != TSDB_CODE_MND_SNODE_NOT_EXIST) {
goto CREATE_SNODE_OVER;
goto _OVER;
}
pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
if (pDnode == NULL) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
goto CREATE_SNODE_OVER;
goto _OVER;
}
pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto CREATE_SNODE_OVER;
goto _OVER;
}
if (mndCheckNodeAuth(pUser)) {
goto CREATE_SNODE_OVER;
goto _OVER;
}
code = mndCreateSnode(pMnode, pReq, pDnode, &createReq);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
CREATE_SNODE_OVER:
_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("snode:%d, failed to create since %s", createReq.dnodeId, terrstr());
return -1;
......@@ -327,7 +325,6 @@ CREATE_SNODE_OVER:
mndReleaseSnode(pMnode, pObj);
mndReleaseDnode(pMnode, pDnode);
mndReleaseUser(pMnode, pUser);
return code;
}
......@@ -378,18 +375,18 @@ static int32_t mndDropSnode(SMnode *pMnode, SNodeMsg *pReq, SSnodeObj *pObj) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_SNODE, &pReq->rpcMsg);
if (pTrans == NULL) goto DROP_SNODE_OVER;
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop snode:%d", pTrans->id, pObj->id);
if (mndSetDropSnodeRedoLogs(pTrans, pObj) != 0) goto DROP_SNODE_OVER;
if (mndSetDropSnodeCommitLogs(pTrans, pObj) != 0) goto DROP_SNODE_OVER;
if (mndSetDropSnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) goto DROP_SNODE_OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_SNODE_OVER;
if (mndSetDropSnodeRedoLogs(pTrans, pObj) != 0) goto _OVER;
if (mndSetDropSnodeCommitLogs(pTrans, pObj) != 0) goto _OVER;
if (mndSetDropSnodeRedoActions(pTrans, pObj->pDnode, pObj) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
DROP_SNODE_OVER:
_OVER:
mndTransDrop(pTrans);
return code;
}
......@@ -403,35 +400,35 @@ static int32_t mndProcessDropSnodeReq(SNodeMsg *pReq) {
if (tDeserializeSCreateDropMQSBNodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto DROP_SNODE_OVER;
goto _OVER;
}
mDebug("snode:%d, start to drop", dropReq.dnodeId);
if (dropReq.dnodeId <= 0) {
terrno = TSDB_CODE_SDB_APP_ERROR;
goto DROP_SNODE_OVER;
goto _OVER;
}
pObj = mndAcquireSnode(pMnode, dropReq.dnodeId);
if (pObj == NULL) {
goto DROP_SNODE_OVER;
goto _OVER;
}
pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto DROP_SNODE_OVER;
goto _OVER;
}
if (mndCheckNodeAuth(pUser)) {
goto DROP_SNODE_OVER;
goto _OVER;
}
code = mndDropSnode(pMnode, pReq, pObj);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
DROP_SNODE_OVER:
_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("snode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册