提交 b8bda48a 编写于 作者: S Shengliang Guan

serialize msg

上级 c4efa321
...@@ -15,9 +15,11 @@ ...@@ -15,9 +15,11 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndDnode.h" #include "mndDnode.h"
#include "mndAuth.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
#define TSDB_DNODE_VER_NUMBER 1 #define TSDB_DNODE_VER_NUMBER 1
...@@ -354,7 +356,8 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) { ...@@ -354,7 +356,8 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
if (pDnode != NULL) { if (pDnode != NULL) {
pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH; pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
} }
mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, pMnode->cfg.sver); mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver,
pMnode->cfg.sver);
terrno = TSDB_CODE_MND_INVALID_MSG_VERSION; terrno = TSDB_CODE_MND_INVALID_MSG_VERSION;
goto PROCESS_STATUS_MSG_OVER; goto PROCESS_STATUS_MSG_OVER;
} }
...@@ -461,35 +464,54 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pReq, SCreateDnodeReq * ...@@ -461,35 +464,54 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pReq, SCreateDnodeReq *
} }
static int32_t mndProcessCreateDnodeReq(SMnodeMsg *pReq) { static int32_t mndProcessCreateDnodeReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
SCreateDnodeReq *pCreate = pReq->rpcMsg.pCont; int32_t code = -1;
pCreate->port = htonl(pCreate->port); SUserObj *pUser = NULL;
mDebug("dnode:%s:%d, start to create", pCreate->fqdn, pCreate->port); SDnodeObj *pDnode = NULL;
SCreateDnodeReq createReq = {0};
if (tDeserializeSCreateDnodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto CREATE_DNODE_OVER;
}
mDebug("dnode:%s:%d, start to create", createReq.fqdn, createReq.port);
if (pCreate->fqdn[0] == 0 || pCreate->port <= 0 || pCreate->port > UINT16_MAX) { if (createReq.fqdn[0] == 0 || createReq.port <= 0 || createReq.port > UINT16_MAX) {
terrno = TSDB_CODE_MND_INVALID_DNODE_EP; terrno = TSDB_CODE_MND_INVALID_DNODE_EP;
mError("dnode:%s:%d, failed to create since %s", pCreate->fqdn, pCreate->port, terrstr()); goto CREATE_DNODE_OVER;
return -1;
} }
char ep[TSDB_EP_LEN]; char ep[TSDB_EP_LEN];
snprintf(ep, TSDB_EP_LEN, "%s:%d", pCreate->fqdn, pCreate->port); snprintf(ep, TSDB_EP_LEN, "%s:%d", createReq.fqdn, createReq.port);
SDnodeObj *pDnode = mndAcquireDnodeByEp(pMnode, ep); pDnode = mndAcquireDnodeByEp(pMnode, ep);
if (pDnode != NULL) { if (pDnode != NULL) {
mError("dnode:%d, already exist, %s:%u", pDnode->id, pCreate->fqdn, pCreate->port);
mndReleaseDnode(pMnode, pDnode);
terrno = TSDB_CODE_MND_DNODE_ALREADY_EXIST; terrno = TSDB_CODE_MND_DNODE_ALREADY_EXIST;
return -1; goto CREATE_DNODE_OVER;
} }
int32_t code = mndCreateDnode(pMnode, pReq, pCreate); pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto CREATE_DNODE_OVER;
}
if (code != 0) { if (mndCheckDropNodeAuth(pUser)) {
mError("dnode:%s:%d, failed to create since %s", pCreate->fqdn, pCreate->port, terrstr()); goto CREATE_DNODE_OVER;
}
code = mndCreateDnode(pMnode, pReq, &createReq);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
CREATE_DNODE_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("dnode:%s:%d, failed to create since %s", createReq.fqdn, createReq.port, terrstr());
return -1; return -1;
} }
return TSDB_CODE_MND_ACTION_IN_PROGRESS; mndReleaseDnode(pMnode, pDnode);
mndReleaseUser(pMnode, pUser);
return code;
} }
static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode) { static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode) {
...@@ -519,34 +541,53 @@ static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode) ...@@ -519,34 +541,53 @@ static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pReq, SDnodeObj *pDnode)
} }
static int32_t mndProcessDropDnodeReq(SMnodeMsg *pReq) { static int32_t mndProcessDropDnodeReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
SDropDnodeReq *pDrop = pReq->rpcMsg.pCont; int32_t code = -1;
pDrop->dnodeId = htonl(pDrop->dnodeId); SUserObj *pUser = NULL;
SDnodeObj *pDnode = NULL;
SDropDnodeReq dropReq = {0};
if (tDeserializeSDropDnodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto DROP_DNODE_OVER;
}
mDebug("dnode:%d, start to drop", pDrop->dnodeId); mDebug("dnode:%d, start to drop", dropReq.dnodeId);
if (pDrop->dnodeId <= 0) { if (dropReq.dnodeId <= 0) {
terrno = TSDB_CODE_MND_INVALID_DNODE_ID; terrno = TSDB_CODE_MND_INVALID_DNODE_ID;
mError("dnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr()); goto DROP_DNODE_OVER;
return -1;
} }
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pDrop->dnodeId); pDnode = mndAcquireDnode(pMnode, dropReq.dnodeId);
if (pDnode == NULL) { if (pDnode == NULL) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
mError("dnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr()); goto DROP_DNODE_OVER;
return -1; }
pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto DROP_DNODE_OVER;
} }
int32_t code = mndDropDnode(pMnode, pReq, pDnode); if (mndCheckCreateNodeAuth(pUser)) {
if (code != 0) { goto DROP_DNODE_OVER;
mndReleaseDnode(pMnode, pDnode); }
mError("dnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr());
code = mndDropDnode(pMnode, pReq, pDnode);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
DROP_DNODE_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("dnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
return -1; return -1;
} }
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
return TSDB_CODE_MND_ACTION_IN_PROGRESS; mndReleaseUser(pMnode, pUser);
return code;
} }
static int32_t mndProcessConfigDnodeReq(SMnodeMsg *pReq) { static int32_t mndProcessConfigDnodeReq(SMnodeMsg *pReq) {
......
...@@ -102,11 +102,13 @@ TEST_F(MndTestBnode, 02_Create_Bnode) { ...@@ -102,11 +102,13 @@ TEST_F(MndTestBnode, 02_Create_Bnode) {
TEST_F(MndTestBnode, 03_Drop_Bnode) { TEST_F(MndTestBnode, 03_Drop_Bnode) {
{ {
int32_t contLen = sizeof(SCreateDnodeReq); SCreateDnodeReq createReq = {0};
strcpy(createReq.fqdn, "localhost");
createReq.port = 9019;
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSCreateDnodeReq(NULL, 0, &createReq);
strcpy(pReq->fqdn, "localhost"); void* pReq = rpcMallocCont(contLen);
pReq->port = htonl(9019); tSerializeSCreateDnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
......
...@@ -88,11 +88,13 @@ TEST_F(MndTestDnode, 02_ConfigDnode) { ...@@ -88,11 +88,13 @@ TEST_F(MndTestDnode, 02_ConfigDnode) {
TEST_F(MndTestDnode, 03_Create_Dnode) { TEST_F(MndTestDnode, 03_Create_Dnode) {
{ {
int32_t contLen = sizeof(SCreateDnodeReq); SCreateDnodeReq createReq = {0};
strcpy(createReq.fqdn, "");
createReq.port = 9024;
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSCreateDnodeReq(NULL, 0, &createReq);
strcpy(pReq->fqdn, ""); void* pReq = rpcMallocCont(contLen);
pReq->port = htonl(9024); tSerializeSCreateDnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -100,11 +102,13 @@ TEST_F(MndTestDnode, 03_Create_Dnode) { ...@@ -100,11 +102,13 @@ TEST_F(MndTestDnode, 03_Create_Dnode) {
} }
{ {
int32_t contLen = sizeof(SCreateDnodeReq); SCreateDnodeReq createReq = {0};
strcpy(createReq.fqdn, "localhost");
createReq.port = -1;
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSCreateDnodeReq(NULL, 0, &createReq);
strcpy(pReq->fqdn, "localhost"); void* pReq = rpcMallocCont(contLen);
pReq->port = htonl(-1); tSerializeSCreateDnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -112,11 +116,13 @@ TEST_F(MndTestDnode, 03_Create_Dnode) { ...@@ -112,11 +116,13 @@ TEST_F(MndTestDnode, 03_Create_Dnode) {
} }
{ {
int32_t contLen = sizeof(SCreateDnodeReq); SCreateDnodeReq createReq = {0};
strcpy(createReq.fqdn, "localhost");
createReq.port = 123456;
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSCreateDnodeReq(NULL, 0, &createReq);
strcpy(pReq->fqdn, "localhost"); void* pReq = rpcMallocCont(contLen);
pReq->port = htonl(123456); tSerializeSCreateDnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -124,11 +130,13 @@ TEST_F(MndTestDnode, 03_Create_Dnode) { ...@@ -124,11 +130,13 @@ TEST_F(MndTestDnode, 03_Create_Dnode) {
} }
{ {
int32_t contLen = sizeof(SCreateDnodeReq); SCreateDnodeReq createReq = {0};
strcpy(createReq.fqdn, "localhost");
createReq.port = 9024;
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSCreateDnodeReq(NULL, 0, &createReq);
strcpy(pReq->fqdn, "localhost"); void* pReq = rpcMallocCont(contLen);
pReq->port = htonl(9024); tSerializeSCreateDnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -136,11 +144,13 @@ TEST_F(MndTestDnode, 03_Create_Dnode) { ...@@ -136,11 +144,13 @@ TEST_F(MndTestDnode, 03_Create_Dnode) {
} }
{ {
int32_t contLen = sizeof(SCreateDnodeReq); SCreateDnodeReq createReq = {0};
strcpy(createReq.fqdn, "localhost");
createReq.port = 9024;
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSCreateDnodeReq(NULL, 0, &createReq);
strcpy(pReq->fqdn, "localhost"); void* pReq = rpcMallocCont(contLen);
pReq->port = htonl(9024); tSerializeSCreateDnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -172,10 +182,12 @@ TEST_F(MndTestDnode, 03_Create_Dnode) { ...@@ -172,10 +182,12 @@ TEST_F(MndTestDnode, 03_Create_Dnode) {
TEST_F(MndTestDnode, 04_Drop_Dnode) { TEST_F(MndTestDnode, 04_Drop_Dnode) {
{ {
int32_t contLen = sizeof(SDropDnodeReq); SDropDnodeReq dropReq = {0};
dropReq.dnodeId = -3;
SDropDnodeReq* pReq = (SDropDnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSDropDnodeReq(NULL, 0, &dropReq);
pReq->dnodeId = htonl(-3); void* pReq = rpcMallocCont(contLen);
tSerializeSDropDnodeReq(pReq, contLen, &dropReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -183,10 +195,12 @@ TEST_F(MndTestDnode, 04_Drop_Dnode) { ...@@ -183,10 +195,12 @@ TEST_F(MndTestDnode, 04_Drop_Dnode) {
} }
{ {
int32_t contLen = sizeof(SDropDnodeReq); SDropDnodeReq dropReq = {0};
dropReq.dnodeId = 5;
SDropDnodeReq* pReq = (SDropDnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSDropDnodeReq(NULL, 0, &dropReq);
pReq->dnodeId = htonl(5); void* pReq = rpcMallocCont(contLen);
tSerializeSDropDnodeReq(pReq, contLen, &dropReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -194,10 +208,12 @@ TEST_F(MndTestDnode, 04_Drop_Dnode) { ...@@ -194,10 +208,12 @@ TEST_F(MndTestDnode, 04_Drop_Dnode) {
} }
{ {
int32_t contLen = sizeof(SDropDnodeReq); SDropDnodeReq dropReq = {0};
dropReq.dnodeId = 2;
SDropDnodeReq* pReq = (SDropDnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSDropDnodeReq(NULL, 0, &dropReq);
pReq->dnodeId = htonl(2); void* pReq = rpcMallocCont(contLen);
tSerializeSDropDnodeReq(pReq, contLen, &dropReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -205,10 +221,12 @@ TEST_F(MndTestDnode, 04_Drop_Dnode) { ...@@ -205,10 +221,12 @@ TEST_F(MndTestDnode, 04_Drop_Dnode) {
} }
{ {
int32_t contLen = sizeof(SDropDnodeReq); SDropDnodeReq dropReq = {0};
dropReq.dnodeId = 2;
SDropDnodeReq* pReq = (SDropDnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSDropDnodeReq(NULL, 0, &dropReq);
pReq->dnodeId = htonl(2); void* pReq = rpcMallocCont(contLen);
tSerializeSDropDnodeReq(pReq, contLen, &dropReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_DROP_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -235,11 +253,13 @@ TEST_F(MndTestDnode, 04_Drop_Dnode) { ...@@ -235,11 +253,13 @@ TEST_F(MndTestDnode, 04_Drop_Dnode) {
TEST_F(MndTestDnode, 05_Create_Drop_Restart_Dnode) { TEST_F(MndTestDnode, 05_Create_Drop_Restart_Dnode) {
{ {
int32_t contLen = sizeof(SCreateDnodeReq); SCreateDnodeReq createReq = {0};
strcpy(createReq.fqdn, "localhost");
createReq.port = 9025;
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSCreateDnodeReq(NULL, 0, &createReq);
strcpy(pReq->fqdn, "localhost"); void* pReq = rpcMallocCont(contLen);
pReq->port = htonl(9025); tSerializeSCreateDnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -247,11 +267,13 @@ TEST_F(MndTestDnode, 05_Create_Drop_Restart_Dnode) { ...@@ -247,11 +267,13 @@ TEST_F(MndTestDnode, 05_Create_Drop_Restart_Dnode) {
} }
{ {
int32_t contLen = sizeof(SCreateDnodeReq); SCreateDnodeReq createReq = {0};
strcpy(createReq.fqdn, "localhost");
createReq.port = 9026;
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSCreateDnodeReq(NULL, 0, &createReq);
strcpy(pReq->fqdn, "localhost"); void* pReq = rpcMallocCont(contLen);
pReq->port = htonl(9026); tSerializeSCreateDnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -259,11 +281,13 @@ TEST_F(MndTestDnode, 05_Create_Drop_Restart_Dnode) { ...@@ -259,11 +281,13 @@ TEST_F(MndTestDnode, 05_Create_Drop_Restart_Dnode) {
} }
{ {
int32_t contLen = sizeof(SCreateDnodeReq); SCreateDnodeReq createReq = {0};
strcpy(createReq.fqdn, "localhost");
createReq.port = 9027;
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSCreateDnodeReq(NULL, 0, &createReq);
strcpy(pReq->fqdn, "localhost"); void* pReq = rpcMallocCont(contLen);
pReq->port = htonl(9027); tSerializeSCreateDnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
......
...@@ -87,11 +87,13 @@ TEST_F(MndTestMnode, 03_Create_Mnode_Invalid_Id) { ...@@ -87,11 +87,13 @@ TEST_F(MndTestMnode, 03_Create_Mnode_Invalid_Id) {
TEST_F(MndTestMnode, 04_Create_Mnode) { TEST_F(MndTestMnode, 04_Create_Mnode) {
{ {
// create dnode // create dnode
int32_t contLen = sizeof(SCreateDnodeReq); SCreateDnodeReq createReq = {0};
strcpy(createReq.fqdn, "localhost");
createReq.port = 9029;
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSCreateDnodeReq(NULL, 0, &createReq);
strcpy(pReq->fqdn, "localhost"); void* pReq = rpcMallocCont(contLen);
pReq->port = htonl(9029); tSerializeSCreateDnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
......
...@@ -102,11 +102,13 @@ TEST_F(MndTestQnode, 02_Create_Qnode) { ...@@ -102,11 +102,13 @@ TEST_F(MndTestQnode, 02_Create_Qnode) {
TEST_F(MndTestQnode, 03_Drop_Qnode) { TEST_F(MndTestQnode, 03_Drop_Qnode) {
{ {
int32_t contLen = sizeof(SCreateDnodeReq); SCreateDnodeReq createReq = {0};
strcpy(createReq.fqdn, "localhost");
createReq.port = 9015;
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSCreateDnodeReq(NULL, 0, &createReq);
strcpy(pReq->fqdn, "localhost"); void* pReq = rpcMallocCont(contLen);
pReq->port = htonl(9015); tSerializeSCreateDnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
......
...@@ -102,11 +102,13 @@ TEST_F(MndTestSnode, 02_Create_Snode) { ...@@ -102,11 +102,13 @@ TEST_F(MndTestSnode, 02_Create_Snode) {
TEST_F(MndTestSnode, 03_Drop_Snode) { TEST_F(MndTestSnode, 03_Drop_Snode) {
{ {
int32_t contLen = sizeof(SCreateDnodeReq); SCreateDnodeReq createReq = {0};
strcpy(createReq.fqdn, "localhost");
createReq.port = 9017;
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSCreateDnodeReq(NULL, 0, &createReq);
strcpy(pReq->fqdn, "localhost"); void* pReq = rpcMallocCont(contLen);
pReq->port = htonl(9017); tSerializeSCreateDnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
......
...@@ -100,10 +100,12 @@ TEST_F(MndTestTrans, 01_Create_User_Crash) { ...@@ -100,10 +100,12 @@ TEST_F(MndTestTrans, 01_Create_User_Crash) {
TEST_F(MndTestTrans, 02_Create_Qnode1_Crash) { TEST_F(MndTestTrans, 02_Create_Qnode1_Crash) {
{ {
int32_t contLen = sizeof(SMCreateQnodeReq); SMCreateQnodeReq createReq = {0};
createReq.dnodeId = 1;
SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &createReq);
pReq->dnodeId = htonl(1); void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_QNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_QNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -117,10 +119,12 @@ TEST_F(MndTestTrans, 02_Create_Qnode1_Crash) { ...@@ -117,10 +119,12 @@ TEST_F(MndTestTrans, 02_Create_Qnode1_Crash) {
KillThenRestartServer(); KillThenRestartServer();
{ {
int32_t contLen = sizeof(SMCreateQnodeReq); SMCreateQnodeReq createReq = {0};
createReq.dnodeId = 1;
SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &createReq);
pReq->dnodeId = htonl(1); void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_QNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_QNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -135,11 +139,13 @@ TEST_F(MndTestTrans, 02_Create_Qnode1_Crash) { ...@@ -135,11 +139,13 @@ TEST_F(MndTestTrans, 02_Create_Qnode1_Crash) {
TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) { TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) {
{ {
int32_t contLen = sizeof(SCreateDnodeReq); SCreateDnodeReq createReq = {0};
strcpy(createReq.fqdn, "localhost");
createReq.port = 9020;
SCreateDnodeReq* pReq = (SCreateDnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSCreateDnodeReq(NULL, 0, &createReq);
strcpy(pReq->fqdn, "localhost"); void* pReq = rpcMallocCont(contLen);
pReq->port = htonl(9020); tSerializeSCreateDnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_DNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -152,10 +158,12 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) { ...@@ -152,10 +158,12 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) {
} }
{ {
int32_t contLen = sizeof(SMCreateQnodeReq); SMCreateQnodeReq createReq = {0};
createReq.dnodeId = 2;
SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &createReq);
pReq->dnodeId = htonl(2); void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &createReq);
server2.Stop(); server2.Stop();
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_QNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_QNODE, pReq, contLen);
...@@ -172,10 +180,12 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) { ...@@ -172,10 +180,12 @@ TEST_F(MndTestTrans, 03_Create_Qnode2_Crash) {
int32_t retryMax = 20; int32_t retryMax = 20;
for (retry = 0; retry < retryMax; retry++) { for (retry = 0; retry < retryMax; retry++) {
int32_t contLen = sizeof(SMCreateQnodeReq); SMCreateQnodeReq createReq = {0};
createReq.dnodeId = 2;
SMCreateQnodeReq* pReq = (SMCreateQnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &createReq);
pReq->dnodeId = htonl(2); void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_QNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CREATE_QNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
......
...@@ -12,7 +12,7 @@ SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseContext* pParseCtx, SMsgBuf* ...@@ -12,7 +12,7 @@ SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseContext* pParseCtx, SMsgBuf*
SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseContext *pCtx, SMsgBuf* pMsgBuf); SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseContext *pCtx, SMsgBuf* pMsgBuf);
char* buildCreateStbReq(SCreateTableSql* pCreateTableSql, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf); char* buildCreateStbReq(SCreateTableSql* pCreateTableSql, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf);
char* buildDropStableReq(SSqlInfo* pInfo, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf); char* buildDropStableReq(SSqlInfo* pInfo, int32_t* len, SParseContext* pParseCtx, SMsgBuf* pMsgBuf);
SCreateDnodeReq *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf); char* buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf);
SDropDnodeReq *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf); char* buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf);
#endif // TDENGINE_ASTTOMSG_H #endif // TDENGINE_ASTTOMSG_H
#include "parserInt.h"
#include "astGenerator.h" #include "astGenerator.h"
#include "parserInt.h"
#include "parserUtil.h" #include "parserUtil.h"
char* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen) { char* buildUserManipulationMsg(SSqlInfo* pInfo, int32_t* outputLen, int64_t id, char* msgBuf, int32_t msgLen) {
...@@ -96,7 +96,7 @@ char* buildDropUserMsg(SSqlInfo* pInfo, int32_t* msgLen, int64_t id, char* msgBu ...@@ -96,7 +96,7 @@ char* buildDropUserMsg(SSqlInfo* pInfo, int32_t* msgLen, int64_t id, char* msgBu
return pReq; return pReq;
} }
SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseContext *pCtx, SMsgBuf* pMsgBuf) { SShowReq* buildShowMsg(SShowInfo* pShowInfo, SParseContext* pCtx, SMsgBuf* pMsgBuf) {
SShowReq* pShowMsg = calloc(1, sizeof(SShowReq)); SShowReq* pShowMsg = calloc(1, sizeof(SShowReq));
if (pShowMsg == NULL) { if (pShowMsg == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -155,27 +155,27 @@ static int32_t setKeepOption(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDb, ...@@ -155,27 +155,27 @@ static int32_t setKeepOption(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDb,
if (pKeep != NULL) { if (pKeep != NULL) {
size_t s = taosArrayGetSize(pKeep); size_t s = taosArrayGetSize(pKeep);
#ifdef _STORAGE #ifdef _STORAGE
if (s >= 4 ||s <= 0) { if (s >= 4 || s <= 0) {
#else #else
if (s != 1) { if (s != 1) {
#endif #endif
return buildInvalidOperationMsg(pMsgBuf, msg1); return buildInvalidOperationMsg(pMsgBuf, msg1);
} }
// tListI* p0 = taosArrayGet(pKeep, 0); // tListI* p0 = taosArrayGet(pKeep, 0);
// tVariantListItem* p1 = (s > 1) ? taosArrayGet(pKeep, 1) : p0; // tVariantListItem* p1 = (s > 1) ? taosArrayGet(pKeep, 1) : p0;
// tVariantListItem* p2 = (s > 2) ? taosArrayGet(pKeep, 2) : p1; // tVariantListItem* p2 = (s > 2) ? taosArrayGet(pKeep, 2) : p1;
// //
// if ((int32_t)p0->pVar.i64 <= 0 || (int32_t)p1->pVar.i64 <= 0 || (int32_t)p2->pVar.i64 <= 0) { // if ((int32_t)p0->pVar.i64 <= 0 || (int32_t)p1->pVar.i64 <= 0 || (int32_t)p2->pVar.i64 <= 0) {
// return buildInvalidOperationMsg(pMsgBuf, msg2); // return buildInvalidOperationMsg(pMsgBuf, msg2);
// } // }
// if (!(((int32_t)p0->pVar.i64 <= (int32_t)p1->pVar.i64) && ((int32_t)p1->pVar.i64 <= (int32_t)p2->pVar.i64))) { // if (!(((int32_t)p0->pVar.i64 <= (int32_t)p1->pVar.i64) && ((int32_t)p1->pVar.i64 <= (int32_t)p2->pVar.i64))) {
// return buildInvalidOperationMsg(pMsgBuf, msg3); // return buildInvalidOperationMsg(pMsgBuf, msg3);
// } // }
// //
// pMsg->daysToKeep0 = htonl((int32_t)p0->pVar.i64); // pMsg->daysToKeep0 = htonl((int32_t)p0->pVar.i64);
// pMsg->daysToKeep1 = htonl((int32_t)p1->pVar.i64); // pMsg->daysToKeep1 = htonl((int32_t)p1->pVar.i64);
// pMsg->daysToKeep2 = htonl((int32_t)p2->pVar.i64); // pMsg->daysToKeep2 = htonl((int32_t)p2->pVar.i64);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -186,7 +186,7 @@ static int32_t setTimePrecision(SCreateDbReq* pMsg, const SCreateDbInfo* pCreate ...@@ -186,7 +186,7 @@ static int32_t setTimePrecision(SCreateDbReq* pMsg, const SCreateDbInfo* pCreate
pMsg->precision = TSDB_TIME_PRECISION_MILLI; // millisecond by default pMsg->precision = TSDB_TIME_PRECISION_MILLI; // millisecond by default
SToken* pToken = (SToken*) &pCreateDbInfo->precision; SToken* pToken = (SToken*)&pCreateDbInfo->precision;
if (pToken->n > 0) { if (pToken->n > 0) {
pToken->n = strdequote(pToken->z); pToken->n = strdequote(pToken->z);
...@@ -210,20 +210,20 @@ static int32_t setTimePrecision(SCreateDbReq* pMsg, const SCreateDbInfo* pCreate ...@@ -210,20 +210,20 @@ static int32_t setTimePrecision(SCreateDbReq* pMsg, const SCreateDbInfo* pCreate
static void doSetDbOptions(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDb) { static void doSetDbOptions(SCreateDbReq* pMsg, const SCreateDbInfo* pCreateDb) {
pMsg->cacheBlockSize = htonl(pCreateDb->cacheBlockSize); pMsg->cacheBlockSize = htonl(pCreateDb->cacheBlockSize);
pMsg->totalBlocks = htonl(pCreateDb->numOfBlocks); pMsg->totalBlocks = htonl(pCreateDb->numOfBlocks);
pMsg->daysPerFile = htonl(pCreateDb->daysPerFile); pMsg->daysPerFile = htonl(pCreateDb->daysPerFile);
pMsg->commitTime = htonl((int32_t)pCreateDb->commitTime); pMsg->commitTime = htonl((int32_t)pCreateDb->commitTime);
pMsg->minRows = htonl(pCreateDb->minRowsPerBlock); pMsg->minRows = htonl(pCreateDb->minRowsPerBlock);
pMsg->maxRows = htonl(pCreateDb->maxRowsPerBlock); pMsg->maxRows = htonl(pCreateDb->maxRowsPerBlock);
pMsg->fsyncPeriod = htonl(pCreateDb->fsyncPeriod); pMsg->fsyncPeriod = htonl(pCreateDb->fsyncPeriod);
pMsg->compression = (int8_t) pCreateDb->compressionLevel; pMsg->compression = (int8_t)pCreateDb->compressionLevel;
pMsg->walLevel = (char)pCreateDb->walLevel; pMsg->walLevel = (char)pCreateDb->walLevel;
pMsg->replications = pCreateDb->replica; pMsg->replications = pCreateDb->replica;
pMsg->quorum = pCreateDb->quorum; pMsg->quorum = pCreateDb->quorum;
pMsg->ignoreExist = pCreateDb->ignoreExists; pMsg->ignoreExist = pCreateDb->ignoreExists;
pMsg->update = pCreateDb->update; pMsg->update = pCreateDb->update;
pMsg->cacheLastRow = pCreateDb->cachelast; pMsg->cacheLastRow = pCreateDb->cachelast;
pMsg->numOfVgroups = htonl(pCreateDb->numOfVgroups); pMsg->numOfVgroups = htonl(pCreateDb->numOfVgroups);
} }
int32_t setDbOptions(SCreateDbReq* pCreateDbMsg, const SCreateDbInfo* pCreateDbSql, SMsgBuf* pMsgBuf) { int32_t setDbOptions(SCreateDbReq* pCreateDbMsg, const SCreateDbInfo* pCreateDbSql, SMsgBuf* pMsgBuf) {
...@@ -240,7 +240,7 @@ int32_t setDbOptions(SCreateDbReq* pCreateDbMsg, const SCreateDbInfo* pCreateDbS ...@@ -240,7 +240,7 @@ int32_t setDbOptions(SCreateDbReq* pCreateDbMsg, const SCreateDbInfo* pCreateDbS
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseContext *pCtx, SMsgBuf* pMsgBuf) { SCreateDbReq* buildCreateDbMsg(SCreateDbInfo* pCreateDbInfo, SParseContext* pCtx, SMsgBuf* pMsgBuf) {
SCreateDbReq* pCreateMsg = calloc(1, sizeof(SCreateDbReq)); SCreateDbReq* pCreateMsg = calloc(1, sizeof(SCreateDbReq));
if (setDbOptions(pCreateMsg, pCreateDbInfo, pMsgBuf) != TSDB_CODE_SUCCESS) { if (setDbOptions(pCreateMsg, pCreateDbInfo, pMsgBuf) != TSDB_CODE_SUCCESS) {
tfree(pCreateMsg); tfree(pCreateMsg);
...@@ -320,7 +320,7 @@ char* buildDropStableReq(SSqlInfo* pInfo, int32_t* len, SParseContext* pParseCtx ...@@ -320,7 +320,7 @@ char* buildDropStableReq(SSqlInfo* pInfo, int32_t* len, SParseContext* pParseCtx
return pReq; return pReq;
} }
SCreateDnodeReq *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf) { char* buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf) {
const char* msg1 = "invalid host name (name too long, maximum length 128)"; const char* msg1 = "invalid host name (name too long, maximum length 128)";
const char* msg2 = "dnode name can not be string"; const char* msg2 = "dnode name can not be string";
const char* msg3 = "port should be an integer that is less than 65535 and greater than 0"; const char* msg3 = "port should be an integer that is less than 65535 and greater than 0";
...@@ -352,33 +352,44 @@ SCreateDnodeReq *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMs ...@@ -352,33 +352,44 @@ SCreateDnodeReq *buildCreateDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMs
return NULL; return NULL;
} }
SCreateDnodeReq *pCreate = (SCreateDnodeReq *) calloc(1, sizeof(SCreateDnodeReq)); SCreateDnodeReq createReq = {0};
if (pCreate == NULL) {
buildInvalidOperationMsg(pMsgBuf, msg4); strncpy(createReq.fqdn, id->z, id->n);
createReq.port = val;
int32_t tlen = tSerializeSCreateDnodeReq(NULL, 0, &createReq);
void* pReq = malloc(tlen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
strncpy(pCreate->fqdn, id->z, id->n); tSerializeSCreateDnodeReq(pReq, tlen, &createReq);
pCreate->port = htonl(val); *len = tlen;
return pReq;
*len = sizeof(SCreateDnodeReq);
return pCreate;
} }
SDropDnodeReq *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf) { char* buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf) {
SDropDnodeReq dropReq = {0};
SToken* pzName = taosArrayGet(pInfo->pMiscInfo->a, 0); SToken* pzName = taosArrayGet(pInfo->pMiscInfo->a, 0);
char* end = NULL; char* end = NULL;
SDropDnodeReq * pDrop = (SDropDnodeReq *)calloc(1, sizeof(SDropDnodeReq)); dropReq.dnodeId = strtoll(pzName->z, &end, 10);
pDrop->dnodeId = strtoll(pzName->z, &end, 10);
pDrop->dnodeId = htonl(pDrop->dnodeId);
*len = sizeof(SDropDnodeReq);
if (end - pzName->z != pzName->n) { if (end - pzName->z != pzName->n) {
buildInvalidOperationMsg(pMsgBuf, "invalid dnode id"); buildInvalidOperationMsg(pMsgBuf, "invalid dnode id");
tfree(pDrop);
return NULL; return NULL;
} }
return pDrop; int32_t tlen = tSerializeSDropDnodeReq(NULL, 0, &dropReq);
void* pReq = malloc(tlen);
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tSerializeSDropDnodeReq(pReq, tlen, &dropReq);
*len = tlen;
return pReq;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册