提交 bec7a0d5 编写于 作者: S Shengliang Guan

serialize mnode msg

上级 2011914d
...@@ -917,6 +917,9 @@ typedef struct { ...@@ -917,6 +917,9 @@ typedef struct {
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
} SDCreateMnodeReq, SDAlterMnodeReq; } SDCreateMnodeReq, SDAlterMnodeReq;
int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
typedef struct { typedef struct {
int32_t dnodeId; int32_t dnodeId;
} SMCreateQnodeReq, SMDropQnodeReq, SDCreateQnodeReq, SDDropQnodeReq, SMCreateSnodeReq, SMDropSnodeReq, } SMCreateQnodeReq, SMDropQnodeReq, SDCreateQnodeReq, SDDropQnodeReq, SMCreateSnodeReq, SMDropSnodeReq,
...@@ -982,9 +985,11 @@ typedef struct { ...@@ -982,9 +985,11 @@ typedef struct {
char ckey[TSDB_PASSWORD_LEN]; char ckey[TSDB_PASSWORD_LEN];
} SAuthReq, SAuthRsp; } SAuthReq, SAuthRsp;
int32_t tSerializeSAuthReq(void* buf, int32_t bufLen, SAuthReq* pReq);
int32_t tDeserializeSAuthReq(void* buf, int32_t bufLen, SAuthReq* pReq);
typedef struct { typedef struct {
int8_t finished; int8_t finished;
int8_t align[7];
char name[TSDB_STEP_NAME_LEN]; char name[TSDB_STEP_NAME_LEN];
char desc[TSDB_STEP_DESC_LEN]; char desc[TSDB_STEP_DESC_LEN];
} SStartupReq; } SStartupReq;
......
...@@ -2020,6 +2020,20 @@ int32_t tDeserializeSMTimerMsg(void *buf, int32_t bufLen, SMTimerReq *pReq) { ...@@ -2020,6 +2020,20 @@ int32_t tDeserializeSMTimerMsg(void *buf, int32_t bufLen, SMTimerReq *pReq) {
return 0; return 0;
} }
int32_t tEncodeSReplica(SCoder *pEncoder, SReplica *pReplica) {
if (tEncodeI32(pEncoder, pReplica->id) < 0) return -1;
if (tEncodeU16(pEncoder, pReplica->port) < 0) return -1;
if (tEncodeCStr(pEncoder, pReplica->fqdn) < 0) return -1;
return 0;
}
int32_t tDecodeSReplica(SCoder *pDecoder, SReplica *pReplica) {
if (tDecodeI32(pDecoder, &pReplica->id) < 0) return -1;
if (tDecodeU16(pDecoder, &pReplica->port) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pReplica->fqdn) < 0) return -1;
return 0;
}
int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pReq) { int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pReq) {
SCoder encoder = {0}; SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
...@@ -2050,9 +2064,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR ...@@ -2050,9 +2064,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1; if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
SReplica *pReplica = &pReq->replicas[i]; SReplica *pReplica = &pReq->replicas[i];
if (tEncodeI32(&encoder, pReplica->id) < 0) return -1; if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
if (tEncodeU16(&encoder, pReplica->port) < 0) return -1;
if (tEncodeCStr(&encoder, pReplica->fqdn) < 0) return -1;
} }
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -2091,9 +2103,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq * ...@@ -2091,9 +2103,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1; if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
SReplica *pReplica = &pReq->replicas[i]; SReplica *pReplica = &pReq->replicas[i];
if (tDecodeI32(&decoder, &pReplica->id) < 0) return -1; if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
if (tDecodeU16(&decoder, &pReplica->port) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReplica->fqdn) < 0) return -1;
} }
tEndDecode(&decoder); tEndDecode(&decoder);
...@@ -2182,4 +2192,72 @@ int32_t tDeserializeSKillConnReq(void *buf, int32_t bufLen, SKillConnReq *pReq) ...@@ -2182,4 +2192,72 @@ int32_t tDeserializeSKillConnReq(void *buf, int32_t bufLen, SKillConnReq *pReq)
tCoderClear(&decoder); tCoderClear(&decoder);
return 0; return 0;
} }
\ No newline at end of file
int32_t tSerializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
SReplica *pReplica = &pReq->replicas[i];
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
SReplica *pReplica = &pReq->replicas[i];
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
}
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->user) < 0) return -1;
if (tEncodeI8(&encoder, pReq->spi) < 0) return -1;
if (tEncodeI8(&encoder, pReq->encrypt) < 0) return -1;
if (tEncodeBinary(&encoder, pReq->secret, TSDB_PASSWORD_LEN) < 0) return -1;
if (tEncodeBinary(&encoder, pReq->ckey, TSDB_PASSWORD_LEN) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->user) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->spi) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->encrypt) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->secret) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->ckey) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
...@@ -424,28 +424,22 @@ static int32_t dndDropMnode(SDnode *pDnode) { ...@@ -424,28 +424,22 @@ static int32_t dndDropMnode(SDnode *pDnode) {
return 0; return 0;
} }
static SDCreateMnodeReq *dndParseCreateMnodeReq(SRpcMsg *pReq) {
SDCreateMnodeReq *pCreate = pReq->pCont;
pCreate->dnodeId = htonl(pCreate->dnodeId);
for (int32_t i = 0; i < pCreate->replica; ++i) {
pCreate->replicas[i].id = htonl(pCreate->replicas[i].id);
pCreate->replicas[i].port = htons(pCreate->replicas[i].port);
}
return pCreate;
}
int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDCreateMnodeReq *pCreate = dndParseCreateMnodeReq(pReq); SDCreateMnodeReq createReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (pCreate->replica <= 1 || pCreate->dnodeId != dndGetDnodeId(pDnode)) { if (createReq.replica <= 1 || createReq.dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to create mnode since %s", terrstr()); dError("failed to create mnode since %s", terrstr());
return -1; return -1;
} }
SMnodeOpt option = {0}; SMnodeOpt option = {0};
if (dndBuildMnodeOptionFromReq(pDnode, &option, pCreate) != 0) { if (dndBuildMnodeOptionFromReq(pDnode, &option, &createReq) != 0) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to create mnode since %s", terrstr()); dError("failed to create mnode since %s", terrstr());
return -1; return -1;
...@@ -464,16 +458,20 @@ int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { ...@@ -464,16 +458,20 @@ int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
} }
int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDAlterMnodeReq *pAlter = dndParseCreateMnodeReq(pReq); SDAlterMnodeReq alterReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (pAlter->dnodeId != dndGetDnodeId(pDnode)) { if (alterReq.dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to alter mnode since %s", terrstr()); dError("failed to alter mnode since %s", terrstr());
return -1; return -1;
} }
SMnodeOpt option = {0}; SMnodeOpt option = {0};
if (dndBuildMnodeOptionFromReq(pDnode, &option, pAlter) != 0) { if (dndBuildMnodeOptionFromReq(pDnode, &option, &alterReq) != 0) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to alter mnode since %s", terrstr()); dError("failed to alter mnode since %s", terrstr());
return -1; return -1;
...@@ -494,10 +492,13 @@ int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { ...@@ -494,10 +492,13 @@ int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
} }
int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) { int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDDropMnodeReq *pDrop = pReq->pCont; SDDropMnodeReq dropReq = {0};
pDrop->dnodeId = htonl(pDrop->dnodeId); if (tDeserializeSMCreateDropMnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (pDrop->dnodeId != dndGetDnodeId(pDnode)) { if (dropReq.dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION; terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to drop mnode since %s", terrstr()); dError("failed to drop mnode since %s", terrstr());
return -1; return -1;
......
...@@ -308,12 +308,15 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char ...@@ -308,12 +308,15 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
return -1; return -1;
} }
SAuthReq *pReq = rpcMallocCont(sizeof(SAuthReq)); SAuthReq authReq = {0};
tstrncpy(pReq->user, user, TSDB_USER_LEN); tstrncpy(authReq.user, user, TSDB_USER_LEN);
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
void *pReq = rpcMallocCont(contLen);
tSerializeSAuthReq(pReq, contLen, &authReq);
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = sizeof(SAuthReq), .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528}; SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
SRpcMsg rpcRsp = {0}; SRpcMsg rpcRsp = {0};
dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, pReq->spi, pReq->encrypt); dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt);
dndSendMsgToMnodeRecv(pDnode, &rpcMsg, &rpcRsp); dndSendMsgToMnodeRecv(pDnode, &rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) { if (rpcRsp.code != 0) {
......
...@@ -16,25 +16,27 @@ class DndTestMnode : public ::testing::Test { ...@@ -16,25 +16,27 @@ class DndTestMnode : public ::testing::Test {
static void SetUpTestSuite() { test.Init("/tmp/dnode_test_mnode", 9114); } static void SetUpTestSuite() { test.Init("/tmp/dnode_test_mnode", 9114); }
static void TearDownTestSuite() { test.Cleanup(); } static void TearDownTestSuite() { test.Cleanup(); }
static Testbase test; static Testbase test;
public: public:
void SetUp() override {} void SetUp() override {}
void TearDown() override {} void TearDown() override {}
}; };
Testbase DndTestMnode::test; Testbase DndTestMnode::test;
TEST_F(DndTestMnode, 01_Create_Mnode) { TEST_F(DndTestMnode, 01_Create_Mnode) {
{ {
int32_t contLen = sizeof(SDCreateMnodeReq); SDCreateMnodeReq createReq = {0};
createReq.dnodeId = 2;
createReq.replica = 1;
createReq.replicas[0].id = 1;
createReq.replicas[0].port = 9113;
strcpy(createReq.replicas[0].fqdn, "localhost");
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq);
pReq->dnodeId = htonl(2); void* pReq = rpcMallocCont(contLen);
pReq->replica = 1; tSerializeSDCreateMnodeReq(pReq, contLen, &createReq);
pReq->replicas[0].id = htonl(1);
pReq->replicas[0].port = htonl(9113);
strcpy(pReq->replicas[0].fqdn, "localhost");
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -42,14 +44,16 @@ TEST_F(DndTestMnode, 01_Create_Mnode) { ...@@ -42,14 +44,16 @@ TEST_F(DndTestMnode, 01_Create_Mnode) {
} }
{ {
int32_t contLen = sizeof(SDCreateMnodeReq); SDCreateMnodeReq createReq = {0};
createReq.dnodeId = 1;
createReq.replica = 1;
createReq.replicas[0].id = 2;
createReq.replicas[0].port = 9113;
strcpy(createReq.replicas[0].fqdn, "localhost");
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq);
pReq->dnodeId = htonl(1); void* pReq = rpcMallocCont(contLen);
pReq->replica = 1; tSerializeSDCreateMnodeReq(pReq, contLen, &createReq);
pReq->replicas[0].id = htonl(2);
pReq->replicas[0].port = htonl(9113);
strcpy(pReq->replicas[0].fqdn, "localhost");
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -57,17 +61,19 @@ TEST_F(DndTestMnode, 01_Create_Mnode) { ...@@ -57,17 +61,19 @@ TEST_F(DndTestMnode, 01_Create_Mnode) {
} }
{ {
int32_t contLen = sizeof(SDCreateMnodeReq); SDCreateMnodeReq createReq = {0};
createReq.dnodeId = 1;
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); createReq.replica = 2;
pReq->dnodeId = htonl(1); createReq.replicas[0].id = 1;
pReq->replica = 2; createReq.replicas[0].port = 9113;
pReq->replicas[0].id = htonl(1); strcpy(createReq.replicas[0].fqdn, "localhost");
pReq->replicas[0].port = htonl(9113); createReq.replicas[1].id = 1;
strcpy(pReq->replicas[0].fqdn, "localhost"); createReq.replicas[1].port = 9114;
pReq->replicas[1].id = htonl(1); strcpy(createReq.replicas[1].fqdn, "localhost");
pReq->replicas[1].port = htonl(9114);
strcpy(pReq->replicas[1].fqdn, "localhost"); int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSDCreateMnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -76,15 +82,17 @@ TEST_F(DndTestMnode, 01_Create_Mnode) { ...@@ -76,15 +82,17 @@ TEST_F(DndTestMnode, 01_Create_Mnode) {
} }
TEST_F(DndTestMnode, 02_Alter_Mnode) { TEST_F(DndTestMnode, 02_Alter_Mnode) {
{ {
int32_t contLen = sizeof(SDAlterMnodeReq); SDAlterMnodeReq alterReq = {0};
alterReq.dnodeId = 2;
alterReq.replica = 1;
alterReq.replicas[0].id = 1;
alterReq.replicas[0].port = 9113;
strcpy(alterReq.replicas[0].fqdn, "localhost");
SDAlterMnodeReq* pReq = (SDAlterMnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
pReq->dnodeId = htonl(2); void* pReq = rpcMallocCont(contLen);
pReq->replica = 1; tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq);
pReq->replicas[0].id = htonl(1);
pReq->replicas[0].port = htonl(9113);
strcpy(pReq->replicas[0].fqdn, "localhost");
SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -92,14 +100,16 @@ TEST_F(DndTestMnode, 02_Alter_Mnode) { ...@@ -92,14 +100,16 @@ TEST_F(DndTestMnode, 02_Alter_Mnode) {
} }
{ {
int32_t contLen = sizeof(SDAlterMnodeReq); SDAlterMnodeReq alterReq = {0};
alterReq.dnodeId = 1;
alterReq.replica = 1;
alterReq.replicas[0].id = 2;
alterReq.replicas[0].port = 9113;
strcpy(alterReq.replicas[0].fqdn, "localhost");
SDAlterMnodeReq* pReq = (SDAlterMnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
pReq->dnodeId = htonl(1); void* pReq = rpcMallocCont(contLen);
pReq->replica = 1; tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq);
pReq->replicas[0].id = htonl(2);
pReq->replicas[0].port = htonl(9113);
strcpy(pReq->replicas[0].fqdn, "localhost");
SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -107,14 +117,16 @@ TEST_F(DndTestMnode, 02_Alter_Mnode) { ...@@ -107,14 +117,16 @@ TEST_F(DndTestMnode, 02_Alter_Mnode) {
} }
{ {
int32_t contLen = sizeof(SDAlterMnodeReq); SDAlterMnodeReq alterReq = {0};
alterReq.dnodeId = 1;
alterReq.replica = 1;
alterReq.replicas[0].id = 1;
alterReq.replicas[0].port = 9113;
strcpy(alterReq.replicas[0].fqdn, "localhost");
SDAlterMnodeReq* pReq = (SDAlterMnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
pReq->dnodeId = htonl(1); void* pReq = rpcMallocCont(contLen);
pReq->replica = 1; tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq);
pReq->replicas[0].id = htonl(1);
pReq->replicas[0].port = htonl(9113);
strcpy(pReq->replicas[0].fqdn, "localhost");
SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -124,10 +136,12 @@ TEST_F(DndTestMnode, 02_Alter_Mnode) { ...@@ -124,10 +136,12 @@ TEST_F(DndTestMnode, 02_Alter_Mnode) {
TEST_F(DndTestMnode, 03_Drop_Mnode) { TEST_F(DndTestMnode, 03_Drop_Mnode) {
{ {
int32_t contLen = sizeof(SDDropMnodeReq); SDDropMnodeReq dropReq = {0};
dropReq.dnodeId = 2;
SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &dropReq);
pReq->dnodeId = htonl(2); void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropMnodeReq(pReq, contLen, &dropReq);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -135,10 +149,12 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) { ...@@ -135,10 +149,12 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) {
} }
{ {
int32_t contLen = sizeof(SDDropMnodeReq); SDDropMnodeReq dropReq = {0};
dropReq.dnodeId = 1;
SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &dropReq);
pReq->dnodeId = htonl(1); void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropMnodeReq(pReq, contLen, &dropReq);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -146,10 +162,12 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) { ...@@ -146,10 +162,12 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) {
} }
{ {
int32_t contLen = sizeof(SDDropMnodeReq); SDDropMnodeReq dropReq = {0};
dropReq.dnodeId = 1;
SDDropMnodeReq* pReq = (SDDropMnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &dropReq);
pReq->dnodeId = htonl(1); void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropMnodeReq(pReq, contLen, &dropReq);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -157,30 +175,33 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) { ...@@ -157,30 +175,33 @@ TEST_F(DndTestMnode, 03_Drop_Mnode) {
} }
{ {
int32_t contLen = sizeof(SDAlterMnodeReq); SDAlterMnodeReq alterReq = {0};
alterReq.dnodeId = 1;
alterReq.replica = 1;
alterReq.replicas[0].id = 1;
alterReq.replicas[0].port = 9113;
strcpy(alterReq.replicas[0].fqdn, "localhost");
SDAlterMnodeReq* pReq = (SDAlterMnodeReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
pReq->dnodeId = htonl(1); void* pReq = rpcMallocCont(contLen);
pReq->replica = 1; tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq);
pReq->replicas[0].id = htonl(1);
pReq->replicas[0].port = htonl(9113);
strcpy(pReq->replicas[0].fqdn, "localhost");
SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_ALTER_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED); ASSERT_EQ(pRsp->code, TSDB_CODE_DND_MNODE_NOT_DEPLOYED);
} }
{ {
int32_t contLen = sizeof(SDCreateMnodeReq); SDCreateMnodeReq createReq = {0};
createReq.dnodeId = 1;
SDCreateMnodeReq* pReq = (SDCreateMnodeReq*)rpcMallocCont(contLen); createReq.replica = 2;
pReq->dnodeId = htonl(1); createReq.replicas[0].id = 1;
pReq->replica = 2; createReq.replicas[0].port = 9113;
pReq->replicas[0].id = htonl(1); strcpy(createReq.replicas[0].fqdn, "localhost");
pReq->replicas[0].port = htonl(9113);
strcpy(pReq->replicas[0].fqdn, "localhost"); int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSDCreateMnodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_MNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
......
...@@ -46,15 +46,25 @@ int32_t mndRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, cha ...@@ -46,15 +46,25 @@ int32_t mndRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, cha
} }
static int32_t mndProcessAuthReq(SMnodeMsg *pReq) { static int32_t mndProcessAuthReq(SMnodeMsg *pReq) {
SAuthReq *pAuth = pReq->rpcMsg.pCont; SAuthReq authReq = {0};
if (tDeserializeSAuthReq(pReq->pCont, pReq->contLen, &authReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
SAuthReq authRsp = {0};
memcpy(authRsp.user, authReq.user, TSDB_USER_LEN);
int32_t contLen = sizeof(SAuthRsp); int32_t code =
SAuthRsp *pRsp = rpcMallocCont(contLen); mndRetriveAuth(pReq->pMnode, authRsp.user, &authRsp.spi, &authRsp.encrypt, authRsp.secret, authRsp.ckey);
mTrace("user:%s, auth req received, spi:%d encrypt:%d ruser:%s", pReq->user, authRsp.spi, authRsp.encrypt,
authRsp.user);
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authRsp);
void *pRsp = rpcMallocCont(contLen);
tSerializeSAuthReq(pRsp, contLen, &authRsp);
pReq->pCont = pRsp; pReq->pCont = pRsp;
pReq->contLen = contLen; pReq->contLen = contLen;
int32_t code = mndRetriveAuth(pReq->pMnode, pAuth->user, &pRsp->spi, &pRsp->encrypt, pRsp->secret, pRsp->ckey);
mTrace("user:%s, auth req received, spi:%d encrypt:%d ruser:%s", pReq->user, pAuth->spi, pAuth->encrypt, pAuth->user);
return code; return code;
} }
......
...@@ -284,8 +284,8 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno ...@@ -284,8 +284,8 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
if (pIter == NULL) break; if (pIter == NULL) break;
SReplica *pReplica = &createReq.replicas[numOfReplicas]; SReplica *pReplica = &createReq.replicas[numOfReplicas];
pReplica->id = htonl(pMObj->id); pReplica->id = pMObj->id;
pReplica->port = htons(pMObj->pDnode->port); pReplica->port = pMObj->pDnode->port;
memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
numOfReplicas++; numOfReplicas++;
...@@ -293,8 +293,8 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno ...@@ -293,8 +293,8 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
} }
SReplica *pReplica = &createReq.replicas[numOfReplicas]; SReplica *pReplica = &createReq.replicas[numOfReplicas];
pReplica->id = htonl(pDnode->id); pReplica->id = pDnode->id;
pReplica->port = htons(pDnode->port); pReplica->port = pDnode->port;
memcpy(pReplica->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); memcpy(pReplica->fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
numOfReplicas++; numOfReplicas++;
...@@ -307,18 +307,14 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno ...@@ -307,18 +307,14 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
STransAction action = {0}; STransAction action = {0};
SDAlterMnodeReq *pReq = malloc(sizeof(SDAlterMnodeReq)); createReq.dnodeId = pMObj->id;
if (pReq == NULL) { int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq);
sdbCancelFetch(pSdb, pIter); void *pReq = malloc(contLen);
sdbRelease(pSdb, pMObj); tSerializeSDCreateMnodeReq(pReq, contLen, &createReq);
return -1;
}
memcpy(pReq, &createReq, sizeof(SDAlterMnodeReq));
pReq->dnodeId = htonl(pMObj->id);
action.epSet = mndGetDnodeEpset(pMObj->pDnode); action.epSet = mndGetDnodeEpset(pMObj->pDnode);
action.pCont = pReq; action.pCont = pReq;
action.contLen = sizeof(SDAlterMnodeReq); action.contLen = contLen;
action.msgType = TDMT_DND_ALTER_MNODE; action.msgType = TDMT_DND_ALTER_MNODE;
action.acceptableCode = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED; action.acceptableCode = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED;
...@@ -336,14 +332,14 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno ...@@ -336,14 +332,14 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
SDCreateMnodeReq *pReq = malloc(sizeof(SDCreateMnodeReq)); createReq.dnodeId = pObj->id;
if (pReq == NULL) return -1; int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &createReq);
memcpy(pReq, &createReq, sizeof(SDAlterMnodeReq)); void *pReq = malloc(contLen);
pReq->dnodeId = htonl(pObj->id); tSerializeSDCreateMnodeReq(pReq, contLen, &createReq);
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
action.pCont = pReq; action.pCont = pReq;
action.contLen = sizeof(SDCreateMnodeReq); action.contLen = contLen;
action.msgType = TDMT_DND_CREATE_MNODE; action.msgType = TDMT_DND_CREATE_MNODE;
action.acceptableCode = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED; action.acceptableCode = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
...@@ -463,8 +459,8 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode ...@@ -463,8 +459,8 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
if (pMObj->id != pObj->id) { if (pMObj->id != pObj->id) {
SReplica *pReplica = &alterReq.replicas[numOfReplicas]; SReplica *pReplica = &alterReq.replicas[numOfReplicas];
pReplica->id = htonl(pMObj->id); pReplica->id = pMObj->id;
pReplica->port = htons(pMObj->pDnode->port); pReplica->port = pMObj->pDnode->port;
memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); memcpy(pReplica->fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
numOfReplicas++; numOfReplicas++;
} }
...@@ -481,18 +477,14 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode ...@@ -481,18 +477,14 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
if (pMObj->id != pObj->id) { if (pMObj->id != pObj->id) {
STransAction action = {0}; STransAction action = {0};
SDAlterMnodeReq *pReq = malloc(sizeof(SDAlterMnodeReq)); alterReq.dnodeId = pMObj->id;
if (pReq == NULL) { int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
sdbCancelFetch(pSdb, pIter); void *pReq = malloc(contLen);
sdbRelease(pSdb, pMObj); tSerializeSDCreateMnodeReq(pReq, contLen, &alterReq);
return -1;
}
memcpy(pReq, &alterReq, sizeof(SDAlterMnodeReq));
pReq->dnodeId = htonl(pMObj->id);
action.epSet = mndGetDnodeEpset(pMObj->pDnode); action.epSet = mndGetDnodeEpset(pMObj->pDnode);
action.pCont = pReq; action.pCont = pReq;
action.contLen = sizeof(SDAlterMnodeReq); action.contLen = contLen;
action.msgType = TDMT_DND_ALTER_MNODE; action.msgType = TDMT_DND_ALTER_MNODE;
action.acceptableCode = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED; action.acceptableCode = TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED;
...@@ -511,16 +503,15 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode ...@@ -511,16 +503,15 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
STransAction action = {0}; STransAction action = {0};
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
SDDropMnodeReq *pReq = malloc(sizeof(SDDropMnodeReq)); SDDropMnodeReq dropReq = {0};
if (pReq == NULL) { dropReq.dnodeId = pObj->id;
terrno = TSDB_CODE_OUT_OF_MEMORY; int32_t contLen = tSerializeSMCreateDropMnodeReq(NULL, 0, &dropReq);
return -1; void *pReq = malloc(contLen);
} tSerializeSMCreateDropMnodeReq(pReq, contLen, &dropReq);
pReq->dnodeId = htonl(pObj->id);
action.epSet = mndGetDnodeEpset(pDnode); action.epSet = mndGetDnodeEpset(pDnode);
action.pCont = pReq; action.pCont = pReq;
action.contLen = sizeof(SDDropMnodeReq); action.contLen = contLen;
action.msgType = TDMT_DND_DROP_MNODE; action.msgType = TDMT_DND_DROP_MNODE;
action.acceptableCode = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; action.acceptableCode = TSDB_CODE_DND_MNODE_NOT_DEPLOYED;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册