提交 c8cc7ed3 编写于 作者: S Shengliang Guan

serialize connect msg

上级 353feec1
...@@ -234,9 +234,9 @@ typedef struct { ...@@ -234,9 +234,9 @@ typedef struct {
void* pMsg; void* pMsg;
} SSubmitMsgIter; } SSubmitMsgIter;
int tInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter); int32_t tInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter);
int tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
int tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
typedef struct { typedef struct {
...@@ -295,69 +295,39 @@ int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq); ...@@ -295,69 +295,39 @@ int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq); int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
void tFreeSMAltertbReq(SMAltertbReq* pReq); void tFreeSMAltertbReq(SMAltertbReq* pReq);
typedef struct {
int32_t pid;
char app[TSDB_APP_NAME_LEN];
char db[TSDB_DB_NAME_LEN];
int64_t startTime;
} SConnectReq;
typedef struct SEpSet { typedef struct SEpSet {
int8_t inUse; int8_t inUse;
int8_t numOfEps; int8_t numOfEps;
SEp eps[TSDB_MAX_REPLICA]; SEp eps[TSDB_MAX_REPLICA];
} SEpSet; } SEpSet;
static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) { int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp);
int tlen = 0; int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp);
tlen += taosEncodeFixedI8(buf, pEp->inUse); int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
tlen += taosEncodeFixedI8(buf, pEp->numOfEps); void* taosDecodeSEpSet(void* buf, SEpSet* pEp);
for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
tlen += taosEncodeFixedU16(buf, pEp->eps[i].port);
tlen += taosEncodeString(buf, pEp->eps[i].fqdn);
}
return tlen;
}
static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) { typedef struct {
buf = taosDecodeFixedI8(buf, &pEp->inUse); int32_t pid;
buf = taosDecodeFixedI8(buf, &pEp->numOfEps); char app[TSDB_APP_NAME_LEN];
for (int i = 0; i < TSDB_MAX_REPLICA; i++) { char db[TSDB_DB_NAME_LEN];
buf = taosDecodeFixedU16(buf, &pEp->eps[i].port); int64_t startTime;
buf = taosDecodeStringTo(buf, pEp->eps[i].fqdn); } SConnectReq;
}
return buf;
}
static FORCE_INLINE int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp) {
if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1;
if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1;
for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
if (tEncodeU16(pEncoder, pEp->eps[i].port) < 0) return -1;
if (tEncodeCStr(pEncoder, pEp->eps[i].fqdn) < 0) return -1;
}
return 0;
}
static FORCE_INLINE int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp) { int32_t tSerializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq);
if (tDecodeI8(pDecoder, &pEp->inUse) < 0) return -1; int32_t tDeserializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq);
if (tDecodeI8(pDecoder, &pEp->numOfEps) < 0) return -1;
for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
if (tDecodeU16(pDecoder, &pEp->eps[i].port) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pEp->eps[i].fqdn) < 0) return -1;
}
return 0;
}
typedef struct { typedef struct {
int32_t acctId; int32_t acctId;
int64_t clusterId; int64_t clusterId;
int32_t connId; int32_t connId;
int8_t superUser; int8_t superUser;
int8_t align[3];
SEpSet epSet; SEpSet epSet;
char sVersion[128]; char sVersion[128];
} SConnectRsp; } SConnectRsp;
int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp);
int32_t tDeserializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp);
typedef struct { typedef struct {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN]; char pass[TSDB_PASSWORD_LEN];
......
...@@ -357,40 +357,38 @@ STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __t ...@@ -357,40 +357,38 @@ STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __t
return pTscObj; return pTscObj;
} }
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) { static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (pMsgSendInfo == NULL) { if (pMsgSendInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pMsgSendInfo->msgType = TDMT_MND_CONNECT; pMsgSendInfo->msgType = TDMT_MND_CONNECT;
pMsgSendInfo->msgInfo.len = sizeof(SConnectReq);
pMsgSendInfo->requestObjRefId = pRequest->self; pMsgSendInfo->requestObjRefId = pRequest->self;
pMsgSendInfo->requestId = pRequest->requestId; pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)]; pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
pMsgSendInfo->param = pRequest; pMsgSendInfo->param = pRequest;
SConnectReq *pConnect = calloc(1, sizeof(SConnectReq)); SConnectReq connectReq = {0};
if (pConnect == NULL) { STscObj* pObj = pRequest->pTscObj;
tfree(pMsgSendInfo);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
STscObj *pObj = pRequest->pTscObj;
char* db = getDbOfConnection(pObj); char* db = getDbOfConnection(pObj);
if (db != NULL) { if (db != NULL) {
tstrncpy(pConnect->db, db, sizeof(pConnect->db)); tstrncpy(connectReq.db, db, sizeof(connectReq.db));
} }
tfree(db); tfree(db);
pConnect->pid = htonl(appInfo.pid); connectReq.pid = htonl(appInfo.pid);
pConnect->startTime = htobe64(appInfo.startTime); connectReq.startTime = htobe64(appInfo.startTime);
tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app)); tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
void* pReq = malloc(contLen);
tSerializeSConnectReq(pReq, contLen, &connectReq);
pMsgSendInfo->msgInfo.pData = pConnect; pMsgSendInfo->msgInfo.len = contLen;
pMsgSendInfo->msgInfo.pData = pReq;
return pMsgSendInfo; return pMsgSendInfo;
} }
......
...@@ -45,41 +45,35 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -45,41 +45,35 @@ int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return code; return code;
} }
STscObj *pTscObj = pRequest->pTscObj; STscObj* pTscObj = pRequest->pTscObj;
SConnectRsp *pConnect = (SConnectRsp *)pMsg->pData; SConnectRsp connectRsp = {0};
pConnect->acctId = htonl(pConnect->acctId); tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp);
pConnect->connId = htonl(pConnect->connId); assert(connectRsp.epSet.numOfEps > 0);
pConnect->clusterId = htobe64(pConnect->clusterId);
assert(pConnect->epSet.numOfEps > 0); if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
for(int32_t i = 0; i < pConnect->epSet.numOfEps; ++i) { updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
pConnect->epSet.eps[i].port = htons(pConnect->epSet.eps[i].port);
} }
if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pConnect->epSet)) { for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) {
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pConnect->epSet);
}
for (int32_t i = 0; i < pConnect->epSet.numOfEps; ++i) {
tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%" PRIx64, pRequest->requestId, i, tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%" PRIx64, pRequest->requestId, i,
pConnect->epSet.eps[i].fqdn, pConnect->epSet.eps[i].port, pTscObj->id); connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id);
} }
pTscObj->connId = pConnect->connId; pTscObj->connId = connectRsp.connId;
pTscObj->acctId = pConnect->acctId; pTscObj->acctId = connectRsp.acctId;
tstrncpy(pTscObj->ver, pConnect->sVersion, tListLen(pTscObj->ver)); tstrncpy(pTscObj->ver, connectRsp.sVersion, tListLen(pTscObj->ver));
// update the appInstInfo // update the appInstInfo
pTscObj->pAppInfo->clusterId = pConnect->clusterId; pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
pTscObj->connType = HEARTBEAT_TYPE_QUERY; pTscObj->connType = HEARTBEAT_TYPE_QUERY;
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY); hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connectRsp.connId, connectRsp.clusterId, HEARTBEAT_TYPE_QUERY);
// pRequest->body.resInfo.pRspMsg = pMsg->pData; // pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId, tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
pTscObj->pAppInfo->numOfConns); pTscObj->pAppInfo->numOfConns);
free(pMsg->pData); free(pMsg->pData);
......
...@@ -85,6 +85,47 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) { ...@@ -85,6 +85,47 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) {
} }
} }
int32_t tEncodeSEpSet(SCoder *pEncoder, const SEpSet *pEp) {
if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1;
if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) {
if (tEncodeU16(pEncoder, pEp->eps[i].port) < 0) return -1;
if (tEncodeCStr(pEncoder, pEp->eps[i].fqdn) < 0) return -1;
}
return 0;
}
int32_t tDecodeSEpSet(SCoder *pDecoder, SEpSet *pEp) {
if (tDecodeI8(pDecoder, &pEp->inUse) < 0) return -1;
if (tDecodeI8(pDecoder, &pEp->numOfEps) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) {
if (tDecodeU16(pDecoder, &pEp->eps[i].port) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pEp->eps[i].fqdn) < 0) return -1;
}
return 0;
}
int32_t taosEncodeSEpSet(void **buf, const SEpSet *pEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pEp->inUse);
tlen += taosEncodeFixedI8(buf, pEp->numOfEps);
for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) {
tlen += taosEncodeFixedU16(buf, pEp->eps[i].port);
tlen += taosEncodeString(buf, pEp->eps[i].fqdn);
}
return tlen;
}
void *taosDecodeSEpSet(void *buf, SEpSet *pEp) {
buf = taosDecodeFixedI8(buf, &pEp->inUse);
buf = taosDecodeFixedI8(buf, &pEp->numOfEps);
for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) {
buf = taosDecodeFixedU16(buf, &pEp->eps[i].port);
buf = taosDecodeStringTo(buf, pEp->eps[i].fqdn);
}
return buf;
}
static int32_t tSerializeSClientHbReq(SCoder *pEncoder, const SClientHbReq *pReq) { static int32_t tSerializeSClientHbReq(SCoder *pEncoder, const SClientHbReq *pReq) {
if (tEncodeSClientHbKey(pEncoder, &pReq->connKey) < 0) return -1; if (tEncodeSClientHbKey(pEncoder, &pReq->connKey) < 0) return -1;
...@@ -1859,3 +1900,69 @@ int32_t tDeserializeSMCreateTopicRsp(void *buf, int32_t bufLen, SMCreateTopicRsp ...@@ -1859,3 +1900,69 @@ int32_t tDeserializeSMCreateTopicRsp(void *buf, int32_t bufLen, SMCreateTopicRsp
tCoderClear(&decoder); tCoderClear(&decoder);
return 0; return 0;
} }
int32_t tSerializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->pid) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->app) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
if (tEncodeI64(&encoder, pReq->startTime) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSConnectReq(void *buf, int32_t bufLen, SConnectReq *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->pid) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->app) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->startTime) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->acctId) < 0) return -1;
if (tEncodeI64(&encoder, pRsp->clusterId) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->connId) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->superUser) < 0) return -1;
if (tEncodeSEpSet(&encoder, &pRsp->epSet) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->sVersion) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->acctId) < 0) return -1;
if (tDecodeI64(&decoder, &pRsp->clusterId) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->connId) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->superUser) < 0) return -1;
if (tDecodeSEpSet(&decoder, &pRsp->epSet) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->sVersion) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
...@@ -243,7 +243,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { ...@@ -243,7 +243,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
pEpSet->inUse = pEpSet->numOfEps; pEpSet->inUse = pEpSet->numOfEps;
} }
addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, htons(pObj->pDnode->port)); addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
sdbRelease(pSdb, pObj); sdbRelease(pSdb, pObj);
} }
} }
......
...@@ -184,15 +184,17 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) { ...@@ -184,15 +184,17 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
} }
static int32_t mndProcessConnectReq(SMnodeMsg *pReq) { static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
SUserObj *pUser = NULL; SUserObj *pUser = NULL;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
SConnObj *pConn = NULL; SConnObj *pConn = NULL;
int32_t code = -1; int32_t code = -1;
SConnectReq connReq = {0};
SConnectReq *pConnReq = pReq->rpcMsg.pCont;
pConnReq->pid = htonl(pConnReq->pid); if (tDeserializeSConnectReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &connReq) != 0) {
pConnReq->startTime = htobe64(pConnReq->startTime); terrno = TSDB_CODE_INVALID_MSG;
goto CONN_OVER;
}
SRpcConnInfo info = {0}; SRpcConnInfo info = {0};
if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) { if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
...@@ -209,41 +211,42 @@ static int32_t mndProcessConnectReq(SMnodeMsg *pReq) { ...@@ -209,41 +211,42 @@ static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
goto CONN_OVER; goto CONN_OVER;
} }
if (pConnReq->db[0]) { if (connReq.db[0]) {
snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, pConnReq->db); snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
pDb = mndAcquireDb(pMnode, pReq->db); pDb = mndAcquireDb(pMnode, pReq->db);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_INVALID_DB; terrno = TSDB_CODE_MND_INVALID_DB;
mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, pConnReq->db, terrstr()); mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, connReq.db, terrstr());
goto CONN_OVER; goto CONN_OVER;
} }
} }
pConn = mndCreateConn(pMnode, &info, pConnReq->pid, pConnReq->app, pConnReq->startTime); pConn = mndCreateConn(pMnode, &info, connReq.pid, connReq.app, connReq.startTime);
if (pConn == NULL) { if (pConn == NULL) {
mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr()); mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
goto CONN_OVER; goto CONN_OVER;
} }
SConnectRsp *pRsp = rpcMallocCont(sizeof(SConnectRsp)); SConnectRsp connectRsp = {0};
if (pRsp == NULL) { connectRsp.acctId = pUser->acctId;
terrno = TSDB_CODE_OUT_OF_MEMORY; connectRsp.superUser = pUser->superUser;
mError("user:%s, failed to login from %s while create rsp since %s", pReq->user, ip, terrstr()); connectRsp.clusterId = pMnode->clusterId;
goto CONN_OVER; connectRsp.connId = pConn->id;
}
pRsp->acctId = htonl(pUser->acctId); snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
pRsp->superUser = pUser->superUser; gitinfo);
pRsp->clusterId = htobe64(pMnode->clusterId); mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
pRsp->connId = htonl(pConn->id);
snprintf(pRsp->sVersion, tListLen(pRsp->sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, gitinfo); int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp);
mndGetMnodeEpSet(pMnode, &pRsp->epSet); if (contLen < 0) goto CONN_OVER;
void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) goto CONN_OVER;
tSerializeSConnectRsp(pRsp, contLen, &connectRsp);
pReq->contLen = sizeof(SConnectRsp); pReq->contLen = contLen;
pReq->pCont = pRsp; pReq->pCont = pRsp;
mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, pConnReq->app); mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, connReq.app);
code = 0; code = 0;
......
...@@ -28,44 +28,44 @@ Testbase MndTestProfile::test; ...@@ -28,44 +28,44 @@ Testbase MndTestProfile::test;
int32_t MndTestProfile::connId; int32_t MndTestProfile::connId;
TEST_F(MndTestProfile, 01_ConnectMsg) { TEST_F(MndTestProfile, 01_ConnectMsg) {
int32_t contLen = sizeof(SConnectReq); SConnectReq connectReq = {0};
connectReq.pid = 1234;
strcpy(connectReq.app, "mnode_test_profile");
strcpy(connectReq.db, "");
SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
pReq->pid = htonl(1234); void* pReq = rpcMallocCont(contLen);
strcpy(pReq->app, "mnode_test_profile"); tSerializeSConnectReq(pReq, contLen, &connectReq);
strcpy(pReq->db, "");
SRpcMsg* pMsg = test.SendReq(TDMT_MND_CONNECT, pReq, contLen); SRpcMsg* pMsg = test.SendReq(TDMT_MND_CONNECT, pReq, contLen);
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, 0);
SConnectRsp* pRsp = (SConnectRsp*)pMsg->pCont; SConnectRsp connectRsp = {0};
ASSERT_NE(pRsp, nullptr); tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp);
pRsp->acctId = htonl(pRsp->acctId);
pRsp->clusterId = htobe64(pRsp->clusterId);
pRsp->connId = htonl(pRsp->connId);
pRsp->epSet.eps[0].port = htons(pRsp->epSet.eps[0].port);
EXPECT_EQ(pRsp->acctId, 1); EXPECT_EQ(connectRsp.acctId, 1);
EXPECT_GT(pRsp->clusterId, 0); EXPECT_GT(connectRsp.clusterId, 0);
EXPECT_EQ(pRsp->connId, 1); EXPECT_EQ(connectRsp.connId, 1);
EXPECT_EQ(pRsp->superUser, 1); EXPECT_EQ(connectRsp.superUser, 1);
EXPECT_EQ(pRsp->epSet.inUse, 0); EXPECT_EQ(connectRsp.epSet.inUse, 0);
EXPECT_EQ(pRsp->epSet.numOfEps, 1); EXPECT_EQ(connectRsp.epSet.numOfEps, 1);
EXPECT_EQ(pRsp->epSet.eps[0].port, 9031); EXPECT_EQ(connectRsp.epSet.eps[0].port, 9031);
EXPECT_STREQ(pRsp->epSet.eps[0].fqdn, "localhost"); EXPECT_STREQ(connectRsp.epSet.eps[0].fqdn, "localhost");
connId = pRsp->connId; connId = connectRsp.connId;
} }
TEST_F(MndTestProfile, 02_ConnectMsg_InvalidDB) { TEST_F(MndTestProfile, 02_ConnectMsg_InvalidDB) {
int32_t contLen = sizeof(SConnectReq); SConnectReq connectReq = {0};
connectReq.pid = 1234;
strcpy(connectReq.app, "mnode_test_profile");
strcpy(connectReq.db, "invalid_db");
SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
pReq->pid = htonl(1234); void* pReq = rpcMallocCont(contLen);
strcpy(pReq->app, "mnode_test_profile"); tSerializeSConnectReq(pReq, contLen, &connectReq);
strcpy(pReq->db, "invalid_db");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONNECT, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONNECT, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
...@@ -194,35 +194,33 @@ TEST_F(MndTestProfile, 05_KillConnMsg) { ...@@ -194,35 +194,33 @@ TEST_F(MndTestProfile, 05_KillConnMsg) {
} }
{ {
int32_t contLen = sizeof(SConnectReq); SConnectReq connectReq = {0};
connectReq.pid = 1234;
strcpy(connectReq.app, "mnode_test_profile");
strcpy(connectReq.db, "invalid_db");
SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
pReq->pid = htonl(1234); void* pReq = rpcMallocCont(contLen);
strcpy(pReq->app, "mnode_test_profile"); tSerializeSConnectReq(pReq, contLen, &connectReq);
strcpy(pReq->db, "");
SRpcMsg* pMsg = test.SendReq(TDMT_MND_CONNECT, pReq, contLen); SRpcMsg* pMsg = test.SendReq(TDMT_MND_CONNECT, pReq, contLen);
ASSERT_NE(pMsg, nullptr); ASSERT_NE(pMsg, nullptr);
ASSERT_EQ(pMsg->code, 0); ASSERT_EQ(pMsg->code, 0);
SConnectRsp* pRsp = (SConnectRsp*)pMsg->pCont; SConnectRsp connectRsp = {0};
ASSERT_NE(pRsp, nullptr); tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp);
pRsp->acctId = htonl(pRsp->acctId);
pRsp->clusterId = htobe64(pRsp->clusterId);
pRsp->connId = htonl(pRsp->connId);
pRsp->epSet.port[0] = htons(pRsp->epSet.port[0]);
EXPECT_EQ(pRsp->acctId, 1); EXPECT_EQ(connectRsp.acctId, 1);
EXPECT_GT(pRsp->clusterId, 0); EXPECT_GT(connectRsp.clusterId, 0);
EXPECT_GT(pRsp->connId, connId); EXPECT_GT(connectRsp.connId, connId);
EXPECT_EQ(pRsp->superUser, 1); EXPECT_EQ(connectRsp.superUser, 1);
EXPECT_EQ(pRsp->epSet.inUse, 0); EXPECT_EQ(connectRsp.epSet.inUse, 0);
EXPECT_EQ(pRsp->epSet.numOfEps, 1); EXPECT_EQ(connectRsp.epSet.numOfEps, 1);
EXPECT_EQ(pRsp->epSet.port[0], 9031); EXPECT_EQ(connectRsp.epSet.port[0], 9031);
EXPECT_STREQ(pRsp->epSet.fqdn[0], "localhost"); EXPECT_STREQ(connectRsp.epSet.fqdn[0], "localhost");
connId = pRsp->connId; connId = connectRsp.connId;
} }
#endif #endif
} }
......
...@@ -54,12 +54,14 @@ TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) { ...@@ -54,12 +54,14 @@ TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) {
} }
TEST_F(MndTestShow, 03_ShowMsg_Conn) { TEST_F(MndTestShow, 03_ShowMsg_Conn) {
int32_t contLen = sizeof(SConnectReq); SConnectReq connectReq = {0};
connectReq.pid = 1234;
strcpy(connectReq.app, "mnode_test_show");
strcpy(connectReq.db, "");
SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen); int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
pReq->pid = htonl(1234); void* pReq = rpcMallocCont(contLen);
strcpy(pReq->app, "mnode_test_show"); tSerializeSConnectReq(pReq, contLen, &connectReq);
strcpy(pReq->db, "");
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONNECT, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONNECT, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册