提交 57a722a6 编写于 作者: S Shengliang Guan

TD-10431 rename test file

上级 ff86bc29
...@@ -893,8 +893,18 @@ typedef struct { ...@@ -893,8 +893,18 @@ typedef struct {
} SHeartBeatRsp; } SHeartBeatRsp;
typedef struct { typedef struct {
char queryId[TSDB_KILL_MSG_LEN + 1]; int32_t connId;
} SKillQueryMsg, SKillConnMsg; int32_t streamId;
} SKillStreamMsg;
typedef struct {
int32_t connId;
int32_t queryId;
} SKillQueryMsg;
typedef struct {
int32_t connId;
} SKillConnMsg;
typedef struct { typedef struct {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
......
...@@ -21,7 +21,8 @@ class DndTestProfile : public ::testing::Test { ...@@ -21,7 +21,8 @@ class DndTestProfile : public ::testing::Test {
void TearDown() override {} void TearDown() override {}
static void SetUpTestSuite() { static void SetUpTestSuite() {
pServer = createServer("/tmp/dndTestProfile", "localhost", 9527); pServer = createServer("/tmp/dndTestProfile", "localhost", 7100);
ASSERT(pServer);
pClient = createClient("root", "taosdata"); pClient = createClient("root", "taosdata");
} }
...@@ -38,7 +39,6 @@ SServer* DndTestProfile::pServer; ...@@ -38,7 +39,6 @@ SServer* DndTestProfile::pServer;
SClient* DndTestProfile::pClient; SClient* DndTestProfile::pClient;
TEST_F(DndTestProfile, connectMsg_01) { TEST_F(DndTestProfile, connectMsg_01) {
ASSERT_NE(pServer, nullptr);
ASSERT_NE(pClient, nullptr); ASSERT_NE(pClient, nullptr);
SConnectMsg* pReq = (SConnectMsg*)rpcMallocCont(sizeof(SConnectMsg)); SConnectMsg* pReq = (SConnectMsg*)rpcMallocCont(sizeof(SConnectMsg));
...@@ -74,7 +74,6 @@ TEST_F(DndTestProfile, connectMsg_01) { ...@@ -74,7 +74,6 @@ TEST_F(DndTestProfile, connectMsg_01) {
} }
TEST_F(DndTestProfile, heartbeatMsg_01) { TEST_F(DndTestProfile, heartbeatMsg_01) {
ASSERT_NE(pServer, nullptr);
ASSERT_NE(pClient, nullptr); ASSERT_NE(pClient, nullptr);
SHeartBeatMsg* pReq = (SHeartBeatMsg*)rpcMallocCont(sizeof(SHeartBeatMsg)); SHeartBeatMsg* pReq = (SHeartBeatMsg*)rpcMallocCont(sizeof(SHeartBeatMsg));
......
...@@ -84,6 +84,12 @@ int32_t mndInitProfile(SMnode *pMnode) { ...@@ -84,6 +84,12 @@ int32_t mndInitProfile(SMnode *pMnode) {
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndGetConnsMeta); mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndGetConnsMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndGetQueryMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndRetrieveQueries);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_QUERIES, mndCancelGetNextQuery);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndGetStreamMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStreams);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
return 0; return 0;
} }
...@@ -122,15 +128,19 @@ static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t ...@@ -122,15 +128,19 @@ static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t
int32_t keepTime = pMnode->shellActivityTimer * 3; int32_t keepTime = pMnode->shellActivityTimer * 3;
SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000); SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000);
if (pConn == NULL) {
mDebug("conn:%d, is created, user:%s", connId, user); terrno = TSDB_CODE_OUT_OF_MEMORY;
return pConn; mError("conn:%d, failed to put into cache since %s, user:%s", connId, user, terrstr());
return NULL;
} else {
mDebug("conn:%d, is created, user:%s", connId, user);
return pConn;
}
} }
static void mndFreeConn(SConnObj *pConn) { static void mndFreeConn(SConnObj *pConn) {
tfree(pConn->pQueries); tfree(pConn->pQueries);
tfree(pConn->pStreams); tfree(pConn->pStreams);
tfree(pConn);
mDebug("conn:%d, is destroyed", pConn->connId); mDebug("conn:%d, is destroyed", pConn->connId);
} }
...@@ -224,6 +234,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) { ...@@ -224,6 +234,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
SConnectRsp *pRsp = rpcMallocCont(sizeof(SConnectRsp)); SConnectRsp *pRsp = rpcMallocCont(sizeof(SConnectRsp));
if (pRsp == NULL) { if (pRsp == NULL) {
mndReleaseConn(pMnode, pConn);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("user:%s, failed to login from %s while create rsp since %s", pMsg->user, ip, terrstr()); mError("user:%s, failed to login from %s while create rsp since %s", pMsg->user, ip, terrstr());
return -1; return -1;
...@@ -241,6 +252,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) { ...@@ -241,6 +252,7 @@ static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
pRsp->clusterId = htonl(pMnode->clusterId); pRsp->clusterId = htonl(pMnode->clusterId);
pRsp->connId = htonl(pConn->connId); pRsp->connId = htonl(pConn->connId);
mndGetMnodeEpSet(pMnode, &pRsp->epSet); mndGetMnodeEpSet(pMnode, &pRsp->epSet);
mndReleaseConn(pMnode, pConn);
pMsg->contLen = sizeof(SConnectRsp); pMsg->contLen = sizeof(SConnectRsp);
pMsg->pCont = pRsp; pMsg->pCont = pRsp;
...@@ -354,28 +366,17 @@ static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg) { ...@@ -354,28 +366,17 @@ static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg) {
mndReleaseUser(pMnode, pUser); mndReleaseUser(pMnode, pUser);
SKillQueryMsg *pKill = pMsg->rpcMsg.pCont; SKillQueryMsg *pKill = pMsg->rpcMsg.pCont;
int32_t connId = htonl(pKill->connId);
int32_t queryId = htonl(pKill->queryId);
mInfo("kill query msg is received, queryId:%s", pKill->queryId); mInfo("kill query msg is received, queryId:%s", pKill->queryId);
const char delim = ':';
char *connIdStr = strtok(pKill->queryId, &delim);
char *queryIdStr = strtok(NULL, &delim);
if (queryIdStr == NULL || connIdStr == NULL) {
mError("failed to kill query, queryId:%s", pKill->queryId);
terrno = TSDB_CODE_MND_INVALID_QUERY_ID;
return -1;
}
int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10);
int32_t connId = atoi(connIdStr);
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t)); SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
if (pConn == NULL) { if (pConn == NULL) {
mError("connId:%s, failed to kill queryId:%d, conn not exist", connIdStr, queryId); mError("connId:%d, failed to kill queryId:%d, conn not exist", connId, queryId);
terrno = TSDB_CODE_MND_INVALID_CONN_ID; terrno = TSDB_CODE_MND_INVALID_CONN_ID;
return -1; return -1;
} else { } else {
mInfo("connId:%s, queryId:%d is killed by user:%s", connIdStr, queryId, pMsg->user); mInfo("connId:%d, queryId:%d is killed by user:%s", connId, queryId, pMsg->user);
pConn->queryId = queryId; pConn->queryId = queryId;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false); taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
return 0; return 0;
...@@ -395,29 +396,18 @@ static int32_t mndProcessKillStreamMsg(SMnodeMsg *pMsg) { ...@@ -395,29 +396,18 @@ static int32_t mndProcessKillStreamMsg(SMnodeMsg *pMsg) {
} }
mndReleaseUser(pMnode, pUser); mndReleaseUser(pMnode, pUser);
SKillQueryMsg *pKill = pMsg->rpcMsg.pCont; SKillStreamMsg *pKill = pMsg->rpcMsg.pCont;
mInfo("kill stream msg is received, streamId:%s", pKill->queryId); int32_t connId = htonl(pKill->connId);
int32_t streamId = htonl(pKill->streamId);
const char delim = ':'; mDebug("kill stream msg is received, streamId:%s", streamId);
char *connIdStr = strtok(pKill->queryId, &delim);
char *streamIdStr = strtok(NULL, &delim);
if (streamIdStr == NULL || connIdStr == NULL) {
mError("failed to kill stream, streamId:%s", pKill->queryId);
terrno = TSDB_CODE_MND_INVALID_STREAM_ID;
return -1;
}
int32_t streamId = (int32_t)strtol(streamIdStr, NULL, 10);
int32_t connId = atoi(connIdStr);
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t)); SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
if (pConn == NULL) { if (pConn == NULL) {
mError("connId:%s, failed to kill streamId:%d, conn not exist", connIdStr, streamId); mError("connId:%d, failed to kill streamId:%d, conn not exist", connId, streamId);
terrno = TSDB_CODE_MND_INVALID_CONN_ID; terrno = TSDB_CODE_MND_INVALID_CONN_ID;
return -1; return -1;
} else { } else {
mInfo("connId:%s, streamId:%d is killed by user:%s", connIdStr, streamId, pMsg->user); mInfo("connId:%d, streamId:%d is killed by user:%s", connId, streamId, pMsg->user);
pConn->streamId = streamId; pConn->streamId = streamId;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false); taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -438,14 +428,15 @@ static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg) { ...@@ -438,14 +428,15 @@ static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg) {
mndReleaseUser(pMnode, pUser); mndReleaseUser(pMnode, pUser);
SKillConnMsg *pKill = pMsg->rpcMsg.pCont; SKillConnMsg *pKill = pMsg->rpcMsg.pCont;
int32_t connId = atoi(pKill->queryId); int32_t connId = htonl(pKill->connId);
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
if (pConn == NULL) { if (pConn == NULL) {
mError("connId:%s, failed to kill, conn not exist", pKill->queryId); mError("connId:%s, failed to kill connection, conn not exist", connId);
terrno = TSDB_CODE_MND_INVALID_CONN_ID; terrno = TSDB_CODE_MND_INVALID_CONN_ID;
return -1; return -1;
} else { } else {
mInfo("connId:%s, is killed by user:%s", pKill->queryId, pMsg->user); mInfo("connId:%s, is killed by user:%s", connId, pMsg->user);
pConn->killed = 1; pConn->killed = 1;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false); taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册