提交 8a850dd3 编写于 作者: S Shengliang Guan

TD-10431 process mnode profile

上级 5857ab5b
......@@ -81,6 +81,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_ALTER_STABLE, "alter-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DROP_STABLE, "drop-stable" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_STABLE_VGROUP, "stable-vgroup" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_QUERY, "kill-query" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_STREAM, "kill-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_KILL_CONN, "kill-conn" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_HEARTBEAT, "heartbeat" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SHOW, "show" )
......
......@@ -76,6 +76,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TSDB_MSG_TYPE_DROP_STABLE] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_STABLE_VGROUP] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_QUERY] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_STREAM] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_KILL_CONN] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_HEARTBEAT] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TSDB_MSG_TYPE_SHOW] = dndProcessMnodeReadMsg;
......
......@@ -25,7 +25,7 @@
extern "C" {
#endif
typedef int32_t (*MndMsgFp)(SMnode *pMnode, SMnodeMsg *pMsg);
typedef int32_t (*MndMsgFp)(SMnodeMsg *pMsg);
typedef int32_t (*MndInitFp)(SMnode *pMnode);
typedef void (*MndCleanupFp)(SMnode *pMnode);
typedef int32_t (*ShowMetaFp)(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
......
......@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "mndDb.h"
static int32_t mnodeProcessUseMsg(SMnode *pMnode, SMnodeMsg *pMsg);
static int32_t mnodeProcessUseMsg(SMnodeMsg *pMsg);
int32_t mndInitDb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_USE_DB, mnodeProcessUseMsg);
......@@ -35,10 +35,11 @@ void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) {
sdbRelease(pSdb, pDb);
}
static int32_t mnodeProcessUseMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
SUseDbMsg *pUseDbMsg = pMsg->rpcMsg.pCont;
static int32_t mnodeProcessUseMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SUseDbMsg *pUse = pMsg->rpcMsg.pCont;
strncpy(pMsg->db, pUseDbMsg->db, TSDB_FULL_DB_NAME_LEN);
strncpy(pMsg->db, pUse->db, TSDB_FULL_DB_NAME_LEN);
SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db);
if (pDb != NULL) {
......
......@@ -226,7 +226,8 @@ static void mndParseStatusMsg(SStatusMsg *pStatus) {
pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime);
}
static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SStatusMsg *pStatus = pMsg->rpcMsg.pCont;
mndParseStatusMsg(pStatus);
......@@ -315,11 +316,11 @@ static int32_t mndProcessStatusMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
return 0;
}
static int32_t mndProcessCreateDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessDropDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessDropDnodeMsg(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessConfigDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessConfigDnodeMsg(SMnodeMsg *pMsg) { return 0; }
int32_t mndInitDnode(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_DNODE,
......
......@@ -103,9 +103,9 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
return sdbWrite(pMnode->pSdb, pRaw);
}
static int32_t mndProcessCreateMnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessCreateMnodeMsg(SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessDropMnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; }
static int32_t mndProcessDropMnodeMsg(SMnodeMsg *pMsg) { return 0; }
int32_t mndInitMnode(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_MNODE,
......
......@@ -50,8 +50,11 @@ static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId, char *user, uint
static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn);
static void *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn);
static void mndCancelGetNextConn(SMnode *pMnode, void *pIter);
static int32_t mndProcessHeartBeatMsg(SMnode *pMnode, SMnodeMsg *pMsg);
static int32_t mndProcessConnectMsg(SMnode *pMnode, SMnodeMsg *pMsg);
static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg);
static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg);
static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg);
static int32_t mndProcessKillStreamMsg(SMnodeMsg *pMsg);
static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg);
static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
static int32_t mndRetrieveConns(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj *pShow);
......@@ -74,10 +77,14 @@ int32_t mndInitProfile(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_HEARTBEAT, mndProcessHeartBeatMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CONNECT, mndProcessConnectMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_KILL_QUERY, mndProcessKillQueryMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CONNECT, mndProcessKillStreamMsg);
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_KILL_CONN, mndProcessKillConnectionMsg);
mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndGetConnsMeta);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn);
return 0;
}
......@@ -183,7 +190,8 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
taosHashCancelIterate(pMgmt->cache->pHashTable, pIter);
}
static int32_t mndProcessConnectMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
static int32_t mndProcessConnectMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SConnectMsg *pReq = pMsg->rpcMsg.pCont;
pReq->pid = htonl(pReq->pid);
......@@ -240,7 +248,8 @@ static int32_t mndProcessConnectMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
return 0;
}
static int32_t mndProcessHeartBeatMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SHeartBeatMsg *pReq = pMsg->rpcMsg.pCont;
pReq->connId = htonl(pReq->connId);
pReq->pid = htonl(pReq->pid);
......@@ -280,6 +289,118 @@ static int32_t mndProcessHeartBeatMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
return 0;
}
static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
if (pUser == NULL) return 0;
if (!pUser->superAuth) {
mndReleaseUser(pMnode, pUser);
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
}
mndReleaseUser(pMnode, pUser);
SKillQueryMsg *pKill = pMsg->rpcMsg.pCont;
mInfo("kill query msg is received, queryId:%s", pKill->queryId);
const char delim = ':';
char *connIdStr = strtok(pKill->queryId, &delim);
char *queryIdStr = strtok(NULL, &delim);
if (queryIdStr == NULL || connIdStr == NULL) {
mError("failed to kill query, queryId:%s", pKill->queryId);
terrno = TSDB_CODE_MND_INVALID_QUERY_ID;
return -1;
}
int32_t queryId = (int32_t)strtol(queryIdStr, NULL, 10);
int32_t connId = atoi(connIdStr);
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
if (pConn == NULL) {
mError("connId:%s, failed to kill queryId:%d, conn not exist", connIdStr, queryId);
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
return -1;
} else {
mInfo("connId:%s, queryId:%d is killed by user:%s", connIdStr, queryId, pMsg->user);
pConn->queryId = queryId;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
return 0;
}
}
static int32_t mndProcessKillStreamMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
if (pUser == NULL) return 0;
if (!pUser->superAuth) {
mndReleaseUser(pMnode, pUser);
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
}
mndReleaseUser(pMnode, pUser);
SKillQueryMsg *pKill = pMsg->rpcMsg.pCont;
mInfo("kill stream msg is received, streamId:%s", pKill->queryId);
const char delim = ':';
char *connIdStr = strtok(pKill->queryId, &delim);
char *streamIdStr = strtok(NULL, &delim);
if (streamIdStr == NULL || connIdStr == NULL) {
mError("failed to kill stream, streamId:%s", pKill->queryId);
terrno = TSDB_CODE_MND_INVALID_STREAM_ID;
return -1;
}
int32_t streamId = (int32_t)strtol(streamIdStr, NULL, 10);
int32_t connId = atoi(connIdStr);
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
if (pConn == NULL) {
mError("connId:%s, failed to kill streamId:%d, conn not exist", connIdStr, streamId);
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
return -1;
} else {
mInfo("connId:%s, streamId:%d is killed by user:%s", connIdStr, streamId, pMsg->user);
pConn->streamId = streamId;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
return TSDB_CODE_SUCCESS;
}
}
static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user);
if (pUser == NULL) return 0;
if (!pUser->superAuth) {
mndReleaseUser(pMnode, pUser);
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
}
mndReleaseUser(pMnode, pUser);
SKillConnMsg *pKill = pMsg->rpcMsg.pCont;
int32_t connId = atoi(pKill->queryId);
SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t));
if (pConn == NULL) {
mError("connId:%s, failed to kill, conn not exist", pKill->queryId);
terrno = TSDB_CODE_MND_INVALID_CONN_ID;
return -1;
} else {
mInfo("connId:%s, is killed by user:%s", pKill->queryId, pMsg->user);
pConn->killed = 1;
taosCacheRelease(pMgmt->cache, (void **)&pConn, false);
return TSDB_CODE_SUCCESS;
}
}
static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) {
SMnode *pMnode = pMsg->pMnode;
SProfileMgmt *pMgmt = &pMnode->profileMgmt;
......@@ -288,7 +409,8 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *
if (pUser == NULL) return 0;
if (!pUser->superAuth) {
mndReleaseUser(pMnode, pUser);
return TSDB_CODE_MND_NO_RIGHTS;
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
}
mndReleaseUser(pMnode, pUser);
......@@ -415,7 +537,8 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj *
if (pUser == NULL) return 0;
if (!pUser->superAuth) {
mndReleaseUser(pMnode, pUser);
return TSDB_CODE_MND_NO_RIGHTS;
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
}
mndReleaseUser(pMnode, pUser);
......@@ -519,8 +642,8 @@ static int32_t mndRetrieveQueries(SMnodeMsg *pMsg, SShowObj *pShow, char *data,
int32_t numOfRows = 0;
SConnObj *pConnObj = NULL;
int32_t cols = 0;
char * pWrite;
void * pIter;
char *pWrite;
void *pIter;
char str[TSDB_IPv4ADDR_LEN + 6] = {0};
while (numOfRows < rows) {
......@@ -621,7 +744,8 @@ static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj
if (pUser == NULL) return 0;
if (!pUser->superAuth) {
mndReleaseUser(pMnode, pUser);
return TSDB_CODE_MND_NO_RIGHTS;
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
}
mndReleaseUser(pMnode, pUser);
......
......@@ -16,8 +16,8 @@
#define _DEFAULT_SOURCE
#include "mndShow.h"
static int32_t mndProcessShowMsg(SMnode *pMnode, SMnodeMsg *pMnodeMsg);
static int32_t mndProcessRetrieveMsg(SMnode *pMnode, SMnodeMsg *pMsg);
static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg);
static int32_t mndProcessRetrieveMsg( SMnodeMsg *pMsg);
static bool mndCheckRetrieveFinished(SShowObj *pShow);
static int32_t mndAcquireShowObj(SMnode *pMnode, SShowObj *pShow);
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove);
......@@ -42,7 +42,8 @@ int32_t mndInitShow(SMnode *pMnode) {
void mndCleanupShow(SMnode *pMnode) {}
static int32_t mndProcessShowMsg(SMnode *pMnode, SMnodeMsg *pMnodeMsg) {
static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) {
SMnode *pMnode = pMnodeMsg->pMnode;
SShowMgmt *pMgmt = &pMnode->showMgmt;
SShowMsg *pMsg = pMnodeMsg->rpcMsg.pCont;
int8_t type = pMsg->type;
......@@ -108,7 +109,8 @@ static int32_t mndProcessShowMsg(SMnode *pMnode, SMnodeMsg *pMnodeMsg) {
}
}
static int32_t mndProcessRetrieveMsg(SMnode *pMnode, SMnodeMsg *pMnodeMsg) {
static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) {
SMnode *pMnode = pMnodeMsg->pMnode;
SShowMgmt *pMgmt = &pMnode->showMgmt;
int32_t rowsToRead = 0;
int32_t size = 0;
......
......@@ -180,7 +180,8 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass,
return 0;
}
static int32_t mndProcessCreateUserMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
static int32_t mndProcessCreateUserMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SCreateUserMsg *pCreate = pMsg->rpcMsg.pCont;
if (pCreate->user[0] == 0) {
......
......@@ -386,7 +386,7 @@ static void mndProcessRpcMsg(SMnodeMsg *pMsg) {
goto PROCESS_RPC_END;
}
code = (*fp)(pMnode, pMsg);
code = (*fp)(pMsg);
if (code != 0) {
code = terrno;
mError("msg:%p, app:%p failed to process since %s", pMsg, ahandle, terrstr());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册