diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index e14788be4738f100e3a1288dfd55582daed475bb..93186252a7a15095d43e558e5d07b1c12dea8d99 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -893,8 +893,18 @@ typedef struct { } SHeartBeatRsp; typedef struct { - char queryId[TSDB_KILL_MSG_LEN + 1]; -} SKillQueryMsg, SKillConnMsg; + int32_t connId; + int32_t streamId; +} SKillStreamMsg; + +typedef struct { + int32_t connId; + int32_t queryId; +} SKillQueryMsg; + +typedef struct { + int32_t connId; +} SKillConnMsg; typedef struct { char user[TSDB_USER_LEN]; diff --git a/source/dnode/mgmt/impl/test/profile/profile.cpp b/source/dnode/mgmt/impl/test/profile/profile.cpp index 9bbb648e66b0b28dedd95eb34bcaf35b11b5fa9c..ebdaa85ac749f16455bab61dd17eee21ed5cb099 100644 --- a/source/dnode/mgmt/impl/test/profile/profile.cpp +++ b/source/dnode/mgmt/impl/test/profile/profile.cpp @@ -21,7 +21,8 @@ class DndTestProfile : public ::testing::Test { void TearDown() override {} static void SetUpTestSuite() { - pServer = createServer("/tmp/dndTestProfile", "localhost", 9527); + pServer = createServer("/tmp/dndTestProfile", "localhost", 7100); + ASSERT(pServer); pClient = createClient("root", "taosdata"); } @@ -38,7 +39,6 @@ SServer* DndTestProfile::pServer; SClient* DndTestProfile::pClient; TEST_F(DndTestProfile, connectMsg_01) { - ASSERT_NE(pServer, nullptr); ASSERT_NE(pClient, nullptr); SConnectMsg* pReq = (SConnectMsg*)rpcMallocCont(sizeof(SConnectMsg)); @@ -74,7 +74,6 @@ TEST_F(DndTestProfile, connectMsg_01) { } TEST_F(DndTestProfile, heartbeatMsg_01) { - ASSERT_NE(pServer, nullptr); ASSERT_NE(pClient, nullptr); SHeartBeatMsg* pReq = (SHeartBeatMsg*)rpcMallocCont(sizeof(SHeartBeatMsg)); diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index bdd0ab500ae141b088a7eaa172f1af8f165ab12e..ac606b2ff67b2f62461764c8b3f4abbd72033547 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -84,6 +84,12 @@ int32_t mndInitProfile(SMnode *pMnode) { mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndGetConnsMeta); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn); + mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndGetQueryMeta); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery); + mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndGetStreamMeta); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStreams); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream); return 0; } @@ -122,15 +128,19 @@ static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t int32_t keepTime = pMnode->shellActivityTimer * 3; SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000); - - mDebug("conn:%d, is created, user:%s", connId, user); - return pConn; + if (pConn == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr()); + return NULL; + } else { + mDebug("conn:%d, is created, user:%s", connId, user); + return pConn; + } } static void mndFreeConn(SConnObj *pConn) { tfree(pConn->pQueries); tfree(pConn->pStreams); - tfree(pConn); mDebug("conn:%d, is destroyed", pConn->connId); } @@ -224,6 +234,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) { SConnectRsp *pRsp = rpcMallocCont(sizeof(SConnectRsp)); if (pRsp == NULL) { + mndReleaseConn(pMnode, pConn); terrno = TSDB_CODE_OUT_OF_MEMORY; mError("user:%s, failed to login from %s while create rsp since %s", pMsg->user, ip, terrstr()); return -1; @@ -241,6 +252,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) { pRsp->clusterId = htonl(pMnode->clusterId); pRsp->connId = htonl(pConn->connId); mndGetMnodeEpSet(pMnode, &pRsp->epSet); + mndReleaseConn(pMnode, pConn); pMsg->contLen = sizeof(SConnectRsp); pMsg->pCont = pRsp; @@ -354,28 +366,17 @@ static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg) { mndReleaseUser(pMnode, pUser); SKillQueryMsg *pKill = pMsg->rpcMsg.pCont; + int32_t connId = htonl(pKill->connId); + int32_t queryId = htonl(pKill->queryId); mInfo("kill query msg is received, queryId:%s", pKill->queryId); - const char delim = ':'; - char *connIdStr = strtok(pKill->queryId, &delim); - char *queryIdStr = strtok(NULL, &delim); - - if (queryIdStr == NULL || connIdStr == NULL) { - mError("failed to kill query, queryId:%s", pKill->queryId); - terrno = TSDB_CODE_MND_INVALID_QUERY_ID; - return -1; - } - - int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10); - - int32_t connId = atoi(connIdStr); SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t)); if (pConn == NULL) { - mError("connId:%s, failed to kill queryId:%d, conn not exist", connIdStr, queryId); + mError("connId:%d, failed to kill queryId:%d, conn not exist", connId, queryId); terrno = TSDB_CODE_MND_INVALID_CONN_ID; return -1; } else { - mInfo("connId:%s, queryId:%d is killed by user:%s", connIdStr, queryId, pMsg->user); + mInfo("connId:%d, queryId:%d is killed by user:%s", connId, queryId, pMsg->user); pConn->queryId = queryId; taosCacheRelease(pMgmt->cache, (void **)&pConn, false); return 0; @@ -395,29 +396,18 @@ static int32_t mndProcessKillStreamMsg(SMnodeMsg *pMsg) { } mndReleaseUser(pMnode, pUser); - SKillQueryMsg *pKill = pMsg->rpcMsg.pCont; - mInfo("kill stream msg is received, streamId:%s", pKill->queryId); - - const char delim = ':'; - char *connIdStr = strtok(pKill->queryId, &delim); - char *streamIdStr = strtok(NULL, &delim); - - if (streamIdStr == NULL || connIdStr == NULL) { - mError("failed to kill stream, streamId:%s", pKill->queryId); - terrno = TSDB_CODE_MND_INVALID_STREAM_ID; - return -1; - } - - int32_t streamId = (int32_t)strtol(streamIdStr, NULL, 10); - int32_t connId = atoi(connIdStr); + SKillStreamMsg *pKill = pMsg->rpcMsg.pCont; + int32_t connId = htonl(pKill->connId); + int32_t streamId = htonl(pKill->streamId); + mDebug("kill stream msg is received, streamId:%s", streamId); SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t)); if (pConn == NULL) { - mError("connId:%s, failed to kill streamId:%d, conn not exist", connIdStr, streamId); + mError("connId:%d, failed to kill streamId:%d, conn not exist", connId, streamId); terrno = TSDB_CODE_MND_INVALID_CONN_ID; return -1; } else { - mInfo("connId:%s, streamId:%d is killed by user:%s", connIdStr, streamId, pMsg->user); + mInfo("connId:%d, streamId:%d is killed by user:%s", connId, streamId, pMsg->user); pConn->streamId = streamId; taosCacheRelease(pMgmt->cache, (void **)&pConn, false); return TSDB_CODE_SUCCESS; @@ -438,14 +428,15 @@ static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg) { mndReleaseUser(pMnode, pUser); SKillConnMsg *pKill = pMsg->rpcMsg.pCont; - int32_t connId = atoi(pKill->queryId); - SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t)); + int32_t connId = htonl(pKill->connId); + + SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t)); if (pConn == NULL) { - mError("connId:%s, failed to kill, conn not exist", pKill->queryId); + mError("connId:%s, failed to kill connection, conn not exist", connId); terrno = TSDB_CODE_MND_INVALID_CONN_ID; return -1; } else { - mInfo("connId:%s, is killed by user:%s", pKill->queryId, pMsg->user); + mInfo("connId:%s, is killed by user:%s", connId, pMsg->user); pConn->killed = 1; taosCacheRelease(pMgmt->cache, (void **)&pConn, false); return TSDB_CODE_SUCCESS;