From c8cc7ed3a69f4752b8c2d7db421400bf5382c31d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 16 Feb 2022 11:45:44 +0800 Subject: [PATCH] serialize connect msg --- include/common/tmsg.h | 66 +++-------- source/client/src/clientImpl.c | 38 +++---- source/client/src/clientMsgHandler.c | 34 +++--- source/common/src/tmsg.c | 107 ++++++++++++++++++ source/dnode/mnode/impl/src/mndMnode.c | 2 +- source/dnode/mnode/impl/src/mndProfile.c | 57 +++++----- .../dnode/mnode/impl/test/profile/profile.cpp | 88 +++++++------- source/dnode/mnode/impl/test/show/show.cpp | 12 +- 8 files changed, 238 insertions(+), 166 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 8e0586cd34..59f8d38345 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -234,9 +234,9 @@ typedef struct { void* pMsg; } SSubmitMsgIter; -int tInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter); -int tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); -int tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); +int32_t tInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter); +int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); +int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); typedef struct { @@ -295,69 +295,39 @@ int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq); int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq); void tFreeSMAltertbReq(SMAltertbReq* pReq); -typedef struct { - int32_t pid; - char app[TSDB_APP_NAME_LEN]; - char db[TSDB_DB_NAME_LEN]; - int64_t startTime; -} SConnectReq; - typedef struct SEpSet { int8_t inUse; int8_t numOfEps; SEp eps[TSDB_MAX_REPLICA]; } SEpSet; -static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) { - int tlen = 0; - tlen += taosEncodeFixedI8(buf, pEp->inUse); - tlen += taosEncodeFixedI8(buf, pEp->numOfEps); - for (int i = 0; i < TSDB_MAX_REPLICA; i++) { - tlen += taosEncodeFixedU16(buf, pEp->eps[i].port); - tlen += taosEncodeString(buf, pEp->eps[i].fqdn); - } - return tlen; -} +int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp); +int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp); +int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp); +void* taosDecodeSEpSet(void* buf, SEpSet* pEp); -static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) { - buf = taosDecodeFixedI8(buf, &pEp->inUse); - buf = taosDecodeFixedI8(buf, &pEp->numOfEps); - for (int i = 0; i < TSDB_MAX_REPLICA; i++) { - buf = taosDecodeFixedU16(buf, &pEp->eps[i].port); - buf = taosDecodeStringTo(buf, pEp->eps[i].fqdn); - } - return buf; -} -static FORCE_INLINE int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp) { - if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1; - if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1; - for (int i = 0; i < TSDB_MAX_REPLICA; i++) { - if (tEncodeU16(pEncoder, pEp->eps[i].port) < 0) return -1; - if (tEncodeCStr(pEncoder, pEp->eps[i].fqdn) < 0) return -1; - } - return 0; -} +typedef struct { + int32_t pid; + char app[TSDB_APP_NAME_LEN]; + char db[TSDB_DB_NAME_LEN]; + int64_t startTime; +} SConnectReq; -static FORCE_INLINE int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp) { - if (tDecodeI8(pDecoder, &pEp->inUse) < 0) return -1; - if (tDecodeI8(pDecoder, &pEp->numOfEps) < 0) return -1; - for (int i = 0; i < TSDB_MAX_REPLICA; i++) { - if (tDecodeU16(pDecoder, &pEp->eps[i].port) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pEp->eps[i].fqdn) < 0) return -1; - } - return 0; -} +int32_t tSerializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq); +int32_t tDeserializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq); typedef struct { int32_t acctId; int64_t clusterId; int32_t connId; int8_t superUser; - int8_t align[3]; SEpSet epSet; char sVersion[128]; } SConnectRsp; +int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp); +int32_t tDeserializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp); + typedef struct { char user[TSDB_USER_LEN]; char pass[TSDB_PASSWORD_LEN]; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index dfe7b12ce4..b8efa8213d 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -357,40 +357,38 @@ STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __t return pTscObj; } -static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) { - SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); +static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) { + SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); if (pMsgSendInfo == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; return NULL; } - pMsgSendInfo->msgType = TDMT_MND_CONNECT; - pMsgSendInfo->msgInfo.len = sizeof(SConnectReq); + pMsgSendInfo->msgType = TDMT_MND_CONNECT; pMsgSendInfo->requestObjRefId = pRequest->self; - pMsgSendInfo->requestId = pRequest->requestId; - pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)]; - pMsgSendInfo->param = pRequest; + pMsgSendInfo->requestId = pRequest->requestId; + pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)]; + pMsgSendInfo->param = pRequest; - SConnectReq *pConnect = calloc(1, sizeof(SConnectReq)); - if (pConnect == NULL) { - tfree(pMsgSendInfo); - terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - return NULL; - } - - STscObj *pObj = pRequest->pTscObj; + SConnectReq connectReq = {0}; + STscObj* pObj = pRequest->pTscObj; char* db = getDbOfConnection(pObj); if (db != NULL) { - tstrncpy(pConnect->db, db, sizeof(pConnect->db)); + tstrncpy(connectReq.db, db, sizeof(connectReq.db)); } tfree(db); - pConnect->pid = htonl(appInfo.pid); - pConnect->startTime = htobe64(appInfo.startTime); - tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app)); + connectReq.pid = htonl(appInfo.pid); + connectReq.startTime = htobe64(appInfo.startTime); + tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app)); + + int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq); + void* pReq = malloc(contLen); + tSerializeSConnectReq(pReq, contLen, &connectReq); - pMsgSendInfo->msgInfo.pData = pConnect; + pMsgSendInfo->msgInfo.len = contLen; + pMsgSendInfo->msgInfo.pData = pReq; return pMsgSendInfo; } diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 79ec08e14d..48c9920c0c 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -45,41 +45,35 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { return code; } - STscObj *pTscObj = pRequest->pTscObj; + STscObj* pTscObj = pRequest->pTscObj; - SConnectRsp *pConnect = (SConnectRsp *)pMsg->pData; - pConnect->acctId = htonl(pConnect->acctId); - pConnect->connId = htonl(pConnect->connId); - pConnect->clusterId = htobe64(pConnect->clusterId); + SConnectRsp connectRsp = {0}; + tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp); + assert(connectRsp.epSet.numOfEps > 0); - assert(pConnect->epSet.numOfEps > 0); - for(int32_t i = 0; i < pConnect->epSet.numOfEps; ++i) { - pConnect->epSet.eps[i].port = htons(pConnect->epSet.eps[i].port); + if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) { + updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet); } - if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pConnect->epSet)) { - updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pConnect->epSet); - } - - for (int32_t i = 0; i < pConnect->epSet.numOfEps; ++i) { + for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) { tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%" PRIx64, pRequest->requestId, i, - pConnect->epSet.eps[i].fqdn, pConnect->epSet.eps[i].port, pTscObj->id); + connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id); } - pTscObj->connId = pConnect->connId; - pTscObj->acctId = pConnect->acctId; - tstrncpy(pTscObj->ver, pConnect->sVersion, tListLen(pTscObj->ver)); + pTscObj->connId = connectRsp.connId; + pTscObj->acctId = connectRsp.acctId; + tstrncpy(pTscObj->ver, connectRsp.sVersion, tListLen(pTscObj->ver)); // update the appInstInfo - pTscObj->pAppInfo->clusterId = pConnect->clusterId; + pTscObj->pAppInfo->clusterId = connectRsp.clusterId; atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); pTscObj->connType = HEARTBEAT_TYPE_QUERY; - hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY); + hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connectRsp.connId, connectRsp.clusterId, HEARTBEAT_TYPE_QUERY); // pRequest->body.resInfo.pRspMsg = pMsg->pData; - tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId, + tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId, pTscObj->pAppInfo->numOfConns); free(pMsg->pData); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index a331dd0929..e763c0c85f 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -85,6 +85,47 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) { } } +int32_t tEncodeSEpSet(SCoder *pEncoder, const SEpSet *pEp) { + if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1; + if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1; + for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) { + if (tEncodeU16(pEncoder, pEp->eps[i].port) < 0) return -1; + if (tEncodeCStr(pEncoder, pEp->eps[i].fqdn) < 0) return -1; + } + return 0; +} + +int32_t tDecodeSEpSet(SCoder *pDecoder, SEpSet *pEp) { + if (tDecodeI8(pDecoder, &pEp->inUse) < 0) return -1; + if (tDecodeI8(pDecoder, &pEp->numOfEps) < 0) return -1; + for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) { + if (tDecodeU16(pDecoder, &pEp->eps[i].port) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pEp->eps[i].fqdn) < 0) return -1; + } + return 0; +} + +int32_t taosEncodeSEpSet(void **buf, const SEpSet *pEp) { + int32_t tlen = 0; + tlen += taosEncodeFixedI8(buf, pEp->inUse); + tlen += taosEncodeFixedI8(buf, pEp->numOfEps); + for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) { + tlen += taosEncodeFixedU16(buf, pEp->eps[i].port); + tlen += taosEncodeString(buf, pEp->eps[i].fqdn); + } + return tlen; +} + +void *taosDecodeSEpSet(void *buf, SEpSet *pEp) { + buf = taosDecodeFixedI8(buf, &pEp->inUse); + buf = taosDecodeFixedI8(buf, &pEp->numOfEps); + for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) { + buf = taosDecodeFixedU16(buf, &pEp->eps[i].port); + buf = taosDecodeStringTo(buf, pEp->eps[i].fqdn); + } + return buf; +} + static int32_t tSerializeSClientHbReq(SCoder *pEncoder, const SClientHbReq *pReq) { if (tEncodeSClientHbKey(pEncoder, &pReq->connKey) < 0) return -1; @@ -1859,3 +1900,69 @@ int32_t tDeserializeSMCreateTopicRsp(void *buf, int32_t bufLen, SMCreateTopicRsp tCoderClear(&decoder); return 0; } + +int32_t tSerializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pReq->pid) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->app) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; + if (tEncodeI64(&encoder, pReq->startTime) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->pid) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->app) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; + if (tDecodeI64(&decoder, &pReq->startTime) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} + +int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { + SCoder encoder = {0}; + tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->acctId) < 0) return -1; + if (tEncodeI64(&encoder, pRsp->clusterId) < 0) return -1; + if (tEncodeI32(&encoder, pRsp->connId) < 0) return -1; + if (tEncodeI8(&encoder, pRsp->superUser) < 0) return -1; + if (tEncodeSEpSet(&encoder, &pRsp->epSet) < 0) return -1; + if (tEncodeCStr(&encoder, pRsp->sVersion) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tCoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { + SCoder decoder = {0}; + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->acctId) < 0) return -1; + if (tDecodeI64(&decoder, &pRsp->clusterId) < 0) return -1; + if (tDecodeI32(&decoder, &pRsp->connId) < 0) return -1; + if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1; + if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1; + if (tDecodeCStrTo(&decoder, pRsp->sVersion) < 0) return -1; + tEndDecode(&decoder); + + tCoderClear(&decoder); + return 0; +} diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 2e8664b039..1ba9f77b45 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -243,7 +243,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { pEpSet->inUse = pEpSet->numOfEps; } - addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, htons(pObj->pDnode->port)); + addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port); sdbRelease(pSdb, pObj); } } diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 5e6909d01a..566dcff380 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -184,15 +184,17 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) { } static int32_t mndProcessConnectReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - SUserObj *pUser = NULL; - SDbObj *pDb = NULL; - SConnObj *pConn = NULL; - int32_t code = -1; - - SConnectReq *pConnReq = pReq->rpcMsg.pCont; - pConnReq->pid = htonl(pConnReq->pid); - pConnReq->startTime = htobe64(pConnReq->startTime); + SMnode *pMnode = pReq->pMnode; + SUserObj *pUser = NULL; + SDbObj *pDb = NULL; + SConnObj *pConn = NULL; + int32_t code = -1; + SConnectReq connReq = {0}; + + if (tDeserializeSConnectReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &connReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto CONN_OVER; + } SRpcConnInfo info = {0}; if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) { @@ -209,41 +211,42 @@ static int32_t mndProcessConnectReq(SMnodeMsg *pReq) { goto CONN_OVER; } - if (pConnReq->db[0]) { - snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, pConnReq->db); + if (connReq.db[0]) { + snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db); pDb = mndAcquireDb(pMnode, pReq->db); if (pDb == NULL) { terrno = TSDB_CODE_MND_INVALID_DB; - mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, pConnReq->db, terrstr()); + mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, connReq.db, terrstr()); goto CONN_OVER; } } - pConn = mndCreateConn(pMnode, &info, pConnReq->pid, pConnReq->app, pConnReq->startTime); + pConn = mndCreateConn(pMnode, &info, connReq.pid, connReq.app, connReq.startTime); if (pConn == NULL) { mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr()); goto CONN_OVER; } - SConnectRsp *pRsp = rpcMallocCont(sizeof(SConnectRsp)); - if (pRsp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("user:%s, failed to login from %s while create rsp since %s", pReq->user, ip, terrstr()); - goto CONN_OVER; - } + SConnectRsp connectRsp = {0}; + connectRsp.acctId = pUser->acctId; + connectRsp.superUser = pUser->superUser; + connectRsp.clusterId = pMnode->clusterId; + connectRsp.connId = pConn->id; - pRsp->acctId = htonl(pUser->acctId); - pRsp->superUser = pUser->superUser; - pRsp->clusterId = htobe64(pMnode->clusterId); - pRsp->connId = htonl(pConn->id); + snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, + gitinfo); + mndGetMnodeEpSet(pMnode, &connectRsp.epSet); - snprintf(pRsp->sVersion, tListLen(pRsp->sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, gitinfo); - mndGetMnodeEpSet(pMnode, &pRsp->epSet); + int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp); + if (contLen < 0) goto CONN_OVER; + void *pRsp = rpcMallocCont(contLen); + if (pRsp == NULL) goto CONN_OVER; + tSerializeSConnectRsp(pRsp, contLen, &connectRsp); - pReq->contLen = sizeof(SConnectRsp); + pReq->contLen = contLen; pReq->pCont = pRsp; - mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, pConnReq->app); + mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, connReq.app); code = 0; diff --git a/source/dnode/mnode/impl/test/profile/profile.cpp b/source/dnode/mnode/impl/test/profile/profile.cpp index 5b7bcd47b8..4f4197cd64 100644 --- a/source/dnode/mnode/impl/test/profile/profile.cpp +++ b/source/dnode/mnode/impl/test/profile/profile.cpp @@ -28,44 +28,44 @@ Testbase MndTestProfile::test; int32_t MndTestProfile::connId; TEST_F(MndTestProfile, 01_ConnectMsg) { - int32_t contLen = sizeof(SConnectReq); + SConnectReq connectReq = {0}; + connectReq.pid = 1234; + strcpy(connectReq.app, "mnode_test_profile"); + strcpy(connectReq.db, ""); - SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen); - pReq->pid = htonl(1234); - strcpy(pReq->app, "mnode_test_profile"); - strcpy(pReq->db, ""); + int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSConnectReq(pReq, contLen, &connectReq); SRpcMsg* pMsg = test.SendReq(TDMT_MND_CONNECT, pReq, contLen); ASSERT_NE(pMsg, nullptr); ASSERT_EQ(pMsg->code, 0); - SConnectRsp* pRsp = (SConnectRsp*)pMsg->pCont; - ASSERT_NE(pRsp, nullptr); - pRsp->acctId = htonl(pRsp->acctId); - pRsp->clusterId = htobe64(pRsp->clusterId); - pRsp->connId = htonl(pRsp->connId); - pRsp->epSet.eps[0].port = htons(pRsp->epSet.eps[0].port); + SConnectRsp connectRsp = {0}; + tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp); - EXPECT_EQ(pRsp->acctId, 1); - EXPECT_GT(pRsp->clusterId, 0); - EXPECT_EQ(pRsp->connId, 1); - EXPECT_EQ(pRsp->superUser, 1); + EXPECT_EQ(connectRsp.acctId, 1); + EXPECT_GT(connectRsp.clusterId, 0); + EXPECT_EQ(connectRsp.connId, 1); + EXPECT_EQ(connectRsp.superUser, 1); - EXPECT_EQ(pRsp->epSet.inUse, 0); - EXPECT_EQ(pRsp->epSet.numOfEps, 1); - EXPECT_EQ(pRsp->epSet.eps[0].port, 9031); - EXPECT_STREQ(pRsp->epSet.eps[0].fqdn, "localhost"); + EXPECT_EQ(connectRsp.epSet.inUse, 0); + EXPECT_EQ(connectRsp.epSet.numOfEps, 1); + EXPECT_EQ(connectRsp.epSet.eps[0].port, 9031); + EXPECT_STREQ(connectRsp.epSet.eps[0].fqdn, "localhost"); - connId = pRsp->connId; + connId = connectRsp.connId; } TEST_F(MndTestProfile, 02_ConnectMsg_InvalidDB) { - int32_t contLen = sizeof(SConnectReq); + SConnectReq connectReq = {0}; + connectReq.pid = 1234; + strcpy(connectReq.app, "mnode_test_profile"); + strcpy(connectReq.db, "invalid_db"); - SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen); - pReq->pid = htonl(1234); - strcpy(pReq->app, "mnode_test_profile"); - strcpy(pReq->db, "invalid_db"); + int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSConnectReq(pReq, contLen, &connectReq); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONNECT, pReq, contLen); ASSERT_NE(pRsp, nullptr); @@ -194,35 +194,33 @@ TEST_F(MndTestProfile, 05_KillConnMsg) { } { - int32_t contLen = sizeof(SConnectReq); + SConnectReq connectReq = {0}; + connectReq.pid = 1234; + strcpy(connectReq.app, "mnode_test_profile"); + strcpy(connectReq.db, "invalid_db"); - SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen); - pReq->pid = htonl(1234); - strcpy(pReq->app, "mnode_test_profile"); - strcpy(pReq->db, ""); + int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSConnectReq(pReq, contLen, &connectReq); SRpcMsg* pMsg = test.SendReq(TDMT_MND_CONNECT, pReq, contLen); ASSERT_NE(pMsg, nullptr); ASSERT_EQ(pMsg->code, 0); - SConnectRsp* pRsp = (SConnectRsp*)pMsg->pCont; - ASSERT_NE(pRsp, nullptr); - pRsp->acctId = htonl(pRsp->acctId); - pRsp->clusterId = htobe64(pRsp->clusterId); - pRsp->connId = htonl(pRsp->connId); - pRsp->epSet.port[0] = htons(pRsp->epSet.port[0]); + SConnectRsp connectRsp = {0}; + tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp); - EXPECT_EQ(pRsp->acctId, 1); - EXPECT_GT(pRsp->clusterId, 0); - EXPECT_GT(pRsp->connId, connId); - EXPECT_EQ(pRsp->superUser, 1); + EXPECT_EQ(connectRsp.acctId, 1); + EXPECT_GT(connectRsp.clusterId, 0); + EXPECT_GT(connectRsp.connId, connId); + EXPECT_EQ(connectRsp.superUser, 1); - EXPECT_EQ(pRsp->epSet.inUse, 0); - EXPECT_EQ(pRsp->epSet.numOfEps, 1); - EXPECT_EQ(pRsp->epSet.port[0], 9031); - EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); + EXPECT_EQ(connectRsp.epSet.inUse, 0); + EXPECT_EQ(connectRsp.epSet.numOfEps, 1); + EXPECT_EQ(connectRsp.epSet.port[0], 9031); + EXPECT_STREQ(connectRsp.epSet.fqdn[0], "localhost"); - connId = pRsp->connId; + connId = connectRsp.connId; } #endif } diff --git a/source/dnode/mnode/impl/test/show/show.cpp b/source/dnode/mnode/impl/test/show/show.cpp index e7e17d65c6..a57d99a257 100644 --- a/source/dnode/mnode/impl/test/show/show.cpp +++ b/source/dnode/mnode/impl/test/show/show.cpp @@ -54,12 +54,14 @@ TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) { } TEST_F(MndTestShow, 03_ShowMsg_Conn) { - int32_t contLen = sizeof(SConnectReq); + SConnectReq connectReq = {0}; + connectReq.pid = 1234; + strcpy(connectReq.app, "mnode_test_show"); + strcpy(connectReq.db, ""); - SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen); - pReq->pid = htonl(1234); - strcpy(pReq->app, "mnode_test_show"); - strcpy(pReq->db, ""); + int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq); + void* pReq = rpcMallocCont(contLen); + tSerializeSConnectReq(pReq, contLen, &connectReq); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONNECT, pReq, contLen); ASSERT_NE(pRsp, nullptr); -- GitLab