提交 b19987ac 编写于 作者: S Shengliang Guan

serialize mnode msg

上级 b8bda48a
......@@ -541,13 +541,13 @@ static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode)
}
static int32_t mndProcessDropDnodeReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
int32_t code = -1;
SUserObj *pUser = NULL;
SDnodeObj *pDnode = NULL;
SDropDnodeReq dropReq = {0};
SMnode *pMnode = pReq->pMnode;
int32_t code = -1;
SUserObj *pUser = NULL;
SDnodeObj *pDnode = NULL;
SMDropMnodeReq dropReq = {0};
if (tDeserializeSDropDnodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
if (tDeserializeSMCreateDropMnodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto DROP_DNODE_OVER;
}
......@@ -591,14 +591,18 @@ DROP_DNODE_OVER:
}
static int32_t mndProcessConfigDnodeReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SMCfgDnodeReq *pCfg = pReq->rpcMsg.pCont;
pCfg->dnodeId = htonl(pCfg->dnodeId);
SMnode *pMnode = pReq->pMnode;
SMCfgDnodeReq cfgReq = {0};
if (tDeserializeSMCfgDnodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &cfgReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCfg->dnodeId);
SDnodeObj *pDnode = mndAcquireDnode(pMnode, cfgReq.dnodeId);
if (pDnode == NULL) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
mError("dnode:%d, failed to config since %s ", pCfg->dnodeId, terrstr());
mError("dnode:%d, failed to config since %s ", cfgReq.dnodeId, terrstr());
return -1;
}
......@@ -606,15 +610,15 @@ static int32_t mndProcessConfigDnodeReq(SMnodeMsg *pReq) {
mndReleaseDnode(pMnode, pDnode);
SDCfgDnodeReq *pCfgDnode = rpcMallocCont(sizeof(SDCfgDnodeReq));
pCfgDnode->dnodeId = htonl(pCfg->dnodeId);
memcpy(pCfgDnode->config, pCfg->config, TSDB_DNODE_CONFIG_LEN);
pCfgDnode->dnodeId = htonl(cfgReq.dnodeId);
memcpy(pCfgDnode->config, cfgReq.config, TSDB_DNODE_CONFIG_LEN);
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE,
.pCont = pCfgDnode,
.contLen = sizeof(SDCfgDnodeReq),
.ahandle = pReq->rpcMsg.ahandle};
mInfo("dnode:%d, app:%p config:%s req send to dnode", pCfg->dnodeId, rpcMsg.ahandle, pCfg->config);
mInfo("dnode:%d, app:%p config:%s req send to dnode", cfgReq.dnodeId, rpcMsg.ahandle, cfgReq.config);
mndSendReqToDnode(pMnode, &epSet, &rpcMsg);
return 0;
......
......@@ -15,9 +15,11 @@
#define _DEFAULT_SOURCE
#include "mndMnode.h"
#include "mndAuth.h"
#include "mndDnode.h"
#include "mndShow.h"
#include "mndTrans.h"
#include "mndUser.h"
#define TSDB_MNODE_VER_NUMBER 1
#define TSDB_MNODE_RESERVE_SIZE 64
......@@ -379,40 +381,58 @@ CREATE_MNODE_OVER:
}
static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SMCreateMnodeReq *pCreate = pReq->rpcMsg.pCont;
pCreate->dnodeId = htonl(pCreate->dnodeId);
SMnode *pMnode = pReq->pMnode;
int32_t code = -1;
SMnodeObj *pObj = NULL;
SDnodeObj *pDnode = NULL;
SUserObj *pUser = NULL;
SMCreateMnodeReq createReq = {0};
if (tDeserializeSMCreateDropMnodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto CREATE_MNODE_OVER;
}
mDebug("mnode:%d, start to create", pCreate->dnodeId);
mDebug("mnode:%d, start to create", createReq.dnodeId);
SMnodeObj *pObj = mndAcquireMnode(pMnode, pCreate->dnodeId);
pObj = mndAcquireMnode(pMnode, createReq.dnodeId);
if (pObj != NULL) {
mndReleaseMnode(pMnode, pObj);
mError("mnode:%d, mnode already exist", pObj->id);
terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST;
return -1;
goto CREATE_MNODE_OVER;
} else if (terrno != TSDB_CODE_MND_MNODE_NOT_EXIST) {
mError("qnode:%d, failed to create mnode since %s", pCreate->dnodeId, terrstr());
return -1;
goto CREATE_MNODE_OVER;
}
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId);
pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
if (pDnode == NULL) {
mError("mnode:%d, dnode not exist", pCreate->dnodeId);
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
return -1;
goto CREATE_MNODE_OVER;
}
int32_t code = mndCreateMnode(pMnode, pReq, pDnode, pCreate);
mndReleaseDnode(pMnode, pDnode);
pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto CREATE_MNODE_OVER;
}
if (code != 0) {
mError("mnode:%d, failed to create since %s", pCreate->dnodeId, terrstr());
if (mndCheckDropNodeAuth(pUser)) {
goto CREATE_MNODE_OVER;
}
code = mndCreateMnode(pMnode, pReq, pDnode, &createReq);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
CREATE_MNODE_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("mnode:%d, failed to create since %s", createReq.dnodeId, terrstr());
return -1;
}
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
mndReleaseMnode(pMnode, pObj);
mndReleaseDnode(pMnode, pDnode);
mndReleaseUser(pMnode, pUser);
return code;
}
static int32_t mndSetDropMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
......@@ -534,32 +554,51 @@ DROP_MNODE_OVER:
}
static int32_t mndProcessDropMnodeReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SMDropMnodeReq *pDrop = pReq->rpcMsg.pCont;
pDrop->dnodeId = htonl(pDrop->dnodeId);
SMnode *pMnode = pReq->pMnode;
int32_t code = -1;
SUserObj *pUser = NULL;
SMnodeObj *pObj = NULL;
SMDropMnodeReq dropReq = {0};
if (tDeserializeSMCreateDropMnodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto DROP_MNODE_OVER;
}
mDebug("mnode:%d, start to drop", pDrop->dnodeId);
mDebug("mnode:%d, start to drop", dropReq.dnodeId);
if (pDrop->dnodeId <= 0) {
if (dropReq.dnodeId <= 0) {
terrno = TSDB_CODE_SDB_APP_ERROR;
mError("mnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr());
return -1;
goto DROP_MNODE_OVER;
}
SMnodeObj *pObj = mndAcquireMnode(pMnode, pDrop->dnodeId);
pObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
if (pObj == NULL) {
mError("mnode:%d, not exist", pDrop->dnodeId);
return -1;
goto DROP_MNODE_OVER;
}
int32_t code = mndDropMnode(pMnode, pReq, pObj);
if (code != 0) {
mError("mnode:%d, failed to drop since %s", pMnode->dnodeId, terrstr());
return -1;
pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto DROP_MNODE_OVER;
}
if (mndCheckCreateNodeAuth(pUser)) {
goto DROP_MNODE_OVER;
}
sdbRelease(pMnode->pSdb, pObj);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
code = mndDropMnode(pMnode, pReq, pObj);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
DROP_MNODE_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("mnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
}
mndReleaseMnode(pMnode, pObj);
mndReleaseUser(pMnode, pUser);
return code;
}
static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pRsp) {
......
......@@ -75,11 +75,14 @@ TEST_F(MndTestDnode, 01_ShowDnode) {
}
TEST_F(MndTestDnode, 02_ConfigDnode) {
int32_t contLen = sizeof(SMCfgDnodeReq);
SMCfgDnodeReq* pReq = (SMCfgDnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
strcpy(pReq->config, "ddebugflag 131");
SMCfgDnodeReq cfgReq = {0};
cfgReq.dnodeId = 1;
strcpy(cfgReq.config, "ddebugflag");
strcpy(cfgReq.value, "131");
int32_t contLen = tSerializeSMCfgDnodeReq(NULL, 0, &cfgReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCfgDnodeReq(pReq, contLen, &cfgReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONFIG_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
......
......@@ -60,10 +60,12 @@ TEST_F(MndTestMnode, 01_ShowDnode) {
TEST_F(MndTestMnode, 02_Create_Mnode_Invalid_Id) {
{
int32_t contLen = sizeof(SMCreateMnodeReq);
SMCreateMnodeReq createReq = {0};
createReq.dnodeId = 1;
SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropMnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
......@@ -73,10 +75,12 @@ TEST_F(MndTestMnode, 02_Create_Mnode_Invalid_Id) {
TEST_F(MndTestMnode, 03_Create_Mnode_Invalid_Id) {
{
int32_t contLen = sizeof(SMCreateMnodeReq);
SMCreateMnodeReq createReq = {0};
createReq.dnodeId = 2;
SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropMnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
......@@ -107,10 +111,12 @@ TEST_F(MndTestMnode, 04_Create_Mnode) {
{
// create mnode
int32_t contLen = sizeof(SMCreateMnodeReq);
SMCreateMnodeReq createReq = {0};
createReq.dnodeId = 2;
SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropMnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
......@@ -134,10 +140,12 @@ TEST_F(MndTestMnode, 04_Create_Mnode) {
{
// drop mnode
int32_t contLen = sizeof(SMDropMnodeReq);
SMDropMnodeReq dropReq = {0};
dropReq.dnodeId = 2;
SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &dropReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropMnodeReq(pReq, contLen, &dropReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
......@@ -156,10 +164,12 @@ TEST_F(MndTestMnode, 04_Create_Mnode) {
{
// drop mnode
int32_t contLen = sizeof(SMDropMnodeReq);
SMDropMnodeReq dropReq = {0};
dropReq.dnodeId = 2;
SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &dropReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropMnodeReq(pReq, contLen, &dropReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
......@@ -170,10 +180,12 @@ TEST_F(MndTestMnode, 04_Create_Mnode) {
TEST_F(MndTestMnode, 03_Create_Mnode_Rollback) {
{
// send message first, then dnode2 crash, result is returned, and rollback is started
int32_t contLen = sizeof(SMCreateMnodeReq);
SMCreateMnodeReq createReq = {0};
createReq.dnodeId = 2;
SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropMnodeReq(pReq, contLen, &createReq);
server2.Stop();
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen);
......@@ -183,10 +195,12 @@ TEST_F(MndTestMnode, 03_Create_Mnode_Rollback) {
{
// continue send message, mnode is creating
int32_t contLen = sizeof(SMCreateMnodeReq);
SMCreateMnodeReq createReq = {0};
createReq.dnodeId = 2;
SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropMnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
......@@ -195,10 +209,12 @@ TEST_F(MndTestMnode, 03_Create_Mnode_Rollback) {
{
// continue send message, mnode is creating
int32_t contLen = sizeof(SMDropMnodeReq);
SMDropMnodeReq dropReq = {0};
dropReq.dnodeId = 2;
SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &dropReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropMnodeReq(pReq, contLen, &dropReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
......@@ -214,10 +230,12 @@ TEST_F(MndTestMnode, 03_Create_Mnode_Rollback) {
int32_t retryMax = 20;
for (retry = 0; retry < retryMax; retry++) {
int32_t contLen = sizeof(SMCreateMnodeReq);
SMCreateMnodeReq createReq = {0};
createReq.dnodeId = 2;
SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropMnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
......@@ -232,10 +250,12 @@ TEST_F(MndTestMnode, 03_Create_Mnode_Rollback) {
TEST_F(MndTestMnode, 04_Drop_Mnode_Rollback) {
{
// send message first, then dnode2 crash, result is returned, and rollback is started
int32_t contLen = sizeof(SMDropMnodeReq);
SMDropMnodeReq dropReq = {0};
dropReq.dnodeId = 2;
SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &dropReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropMnodeReq(pReq, contLen, &dropReq);
server2.Stop();
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen);
......@@ -245,10 +265,12 @@ TEST_F(MndTestMnode, 04_Drop_Mnode_Rollback) {
{
// continue send message, mnode is dropping
int32_t contLen = sizeof(SMCreateMnodeReq);
SMCreateMnodeReq createReq = {0};
createReq.dnodeId = 2;
SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropMnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
......@@ -257,10 +279,12 @@ TEST_F(MndTestMnode, 04_Drop_Mnode_Rollback) {
{
// continue send message, mnode is dropping
int32_t contLen = sizeof(SMDropMnodeReq);
SMDropMnodeReq dropReq = {0};
dropReq.dnodeId = 2;
SMDropMnodeReq* pReq = (SMDropMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &dropReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropMnodeReq(pReq, contLen, &dropReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
......@@ -276,10 +300,12 @@ TEST_F(MndTestMnode, 04_Drop_Mnode_Rollback) {
int32_t retryMax = 20;
for (retry = 0; retry < retryMax; retry++) {
int32_t contLen = sizeof(SMCreateMnodeReq);
SMCreateMnodeReq createReq = {0};
createReq.dnodeId = 2;
SMCreateMnodeReq* pReq = (SMCreateMnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropMnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册