提交 ca10ef44 编写于 作者: S Shengliang Guan

fix invalid db err while process connect req

上级 ca8ed353
...@@ -179,7 +179,12 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) { ...@@ -179,7 +179,12 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
} }
static int32_t mndProcessConnectReq(SMnodeMsg *pReq) { static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
SUserObj *pUser = NULL;
SDbObj *pDb = NULL;
SConnObj *pConn = NULL;
int32_t code = -1;
SConnectReq *pConnReq = pReq->rpcMsg.pCont; SConnectReq *pConnReq = pReq->rpcMsg.pCont;
pConnReq->pid = htonl(pConnReq->pid); pConnReq->pid = htonl(pConnReq->pid);
pConnReq->startTime = htobe64(pConnReq->startTime); pConnReq->startTime = htobe64(pConnReq->startTime);
...@@ -187,54 +192,61 @@ static int32_t mndProcessConnectReq(SMnodeMsg *pReq) { ...@@ -187,54 +192,61 @@ static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
SRpcConnInfo info = {0}; SRpcConnInfo info = {0};
if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) { if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
mError("user:%s, failed to login while get connection info since %s", pReq->user, terrstr()); mError("user:%s, failed to login while get connection info since %s", pReq->user, terrstr());
return -1; goto CONN_OVER;
} }
char ip[30]; char ip[30];
taosIp2String(info.clientIp, ip); taosIp2String(info.clientIp, ip);
pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) {
mError("user:%s, failed to login while acquire user since %s", pReq->user, terrstr());
goto CONN_OVER;
}
if (pConnReq->db[0]) { if (pConnReq->db[0]) {
snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pReq->acctId, TS_PATH_DELIMITER, pConnReq->db); snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, pConnReq->db);
SDbObj *pDb = mndAcquireDb(pMnode, pReq->db); pDb = mndAcquireDb(pMnode, pReq->db);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_INVALID_DB; terrno = TSDB_CODE_MND_INVALID_DB;
mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, pConnReq->db, terrstr()); mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, pConnReq->db, terrstr());
return -1; goto CONN_OVER;
} }
mndReleaseDb(pMnode, pDb);
} }
SConnObj *pConn = mndCreateConn(pMnode, &info, pConnReq->pid, pConnReq->app, pConnReq->startTime); pConn = mndCreateConn(pMnode, &info, pConnReq->pid, pConnReq->app, pConnReq->startTime);
if (pConn == NULL) { if (pConn == NULL) {
mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr()); mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
return -1; goto CONN_OVER;
} }
SConnectRsp *pRsp = rpcMallocCont(sizeof(SConnectRsp)); SConnectRsp *pRsp = rpcMallocCont(sizeof(SConnectRsp));
if (pRsp == NULL) { if (pRsp == NULL) {
mndReleaseConn(pMnode, pConn);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("user:%s, failed to login from %s while create rsp since %s", pReq->user, ip, terrstr()); mError("user:%s, failed to login from %s while create rsp since %s", pReq->user, ip, terrstr());
return -1; goto CONN_OVER;
}
SUserObj *pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser != NULL) {
pRsp->acctId = htonl(pUser->acctId);
pRsp->superUser = pUser->superUser;
mndReleaseUser(pMnode, pUser);
} }
pRsp->acctId = htonl(pUser->acctId);
pRsp->superUser = pUser->superUser;
pRsp->clusterId = htobe64(pMnode->clusterId); pRsp->clusterId = htobe64(pMnode->clusterId);
pRsp->connId = htonl(pConn->id); pRsp->connId = htonl(pConn->id);
mndGetMnodeEpSet(pMnode, &pRsp->epSet); mndGetMnodeEpSet(pMnode, &pRsp->epSet);
mndReleaseConn(pMnode, pConn);
pReq->contLen = sizeof(SConnectRsp); pReq->contLen = sizeof(SConnectRsp);
pReq->pCont = pRsp; pReq->pCont = pRsp;
mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, pConnReq->app); mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, pConnReq->app);
return 0;
code = 0;
CONN_OVER:
mndReleaseUser(pMnode, pUser);
mndReleaseDb(pMnode, pDb);
mndReleaseConn(pMnode, pConn);
return code;
} }
static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) { static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
...@@ -258,33 +270,27 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) { ...@@ -258,33 +270,27 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
} }
static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
char *batchReqStr = pReq->rpcMsg.pCont; char *batchReqStr = pReq->rpcMsg.pCont;
SClientHbBatchReq batchReq = {0}; SClientHbBatchReq batchReq = {0};
tDeserializeSClientHbBatchReq(batchReqStr, &batchReq); tDeserializeSClientHbBatchReq(batchReqStr, &batchReq);
SArray *pArray = batchReq.reqs; SArray *pArray = batchReq.reqs;
int sz = taosArrayGetSize(pArray); int sz = taosArrayGetSize(pArray);
SClientHbBatchRsp batchRsp = {0}; SClientHbBatchRsp batchRsp = {0};
batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp)); batchRsp.rsps = taosArrayInit(0, sizeof(SClientHbRsp));
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
SClientHbReq* pHbReq = taosArrayGet(pArray, i); SClientHbReq *pHbReq = taosArrayGet(pArray, i);
if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) { if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
} else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) { } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) {
SClientHbRsp rsp = { SClientHbRsp rsp = {.status = 0, .connKey = pHbReq->connKey, .bodyLen = 0, .body = NULL};
.status = 0,
.connKey = pHbReq->connKey,
.bodyLen = 0,
.body = NULL
};
taosArrayPush(batchRsp.rsps, &rsp); taosArrayPush(batchRsp.rsps, &rsp);
} }
} }
int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp); int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp);
void* buf = rpcMallocCont(tlen); void *buf = rpcMallocCont(tlen);
void* bufCopy = buf; void *bufCopy = buf;
tSerializeSClientHbBatchRsp(&bufCopy, &batchRsp); tSerializeSClientHbBatchRsp(&bufCopy, &batchRsp);
pReq->contLen = tlen; pReq->contLen = tlen;
pReq->pCont = buf; pReq->pCont = buf;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册