From 5857ab5b28a6fa6a21749b3bc2d2b54d81688962 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 3 Dec 2021 14:36:41 +0800 Subject: [PATCH] TD-10431 process mnode profile --- include/common/taosmsg.h | 51 +- include/util/tdef.h | 2 +- include/util/tutil.h | 4 +- source/dnode/mnode/impl/inc/mndDb.h | 2 + source/dnode/mnode/impl/inc/mndDef.h | 8 +- source/dnode/mnode/impl/inc/mndInt.h | 10 +- source/dnode/mnode/impl/inc/mndMnode.h | 1 + source/dnode/mnode/impl/inc/mndShow.h | 6 +- source/dnode/mnode/impl/src/mndCluster.c | 6 +- source/dnode/mnode/impl/src/mndConnect.c | 155 ----- source/dnode/mnode/impl/src/mndDb.c | 48 +- source/dnode/mnode/impl/src/mndMnode.c | 4 +- source/dnode/mnode/impl/src/mndProfile.c | 761 ++++++++++++++++++++++- source/dnode/mnode/impl/src/mndShow.c | 10 +- source/dnode/mnode/impl/src/mndUser.c | 21 +- source/util/src/tutil.c | 7 + 16 files changed, 862 insertions(+), 234 deletions(-) delete mode 100644 source/dnode/mnode/impl/src/mndConnect.c diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 826934c4a8..e29accd249 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -353,11 +353,9 @@ typedef struct { } SUpdateTableTagValMsg; typedef struct { - char clientVersion[TSDB_VERSION_LEN]; - char msgVersion[TSDB_VERSION_LEN]; - char db[TSDB_TABLE_FNAME_LEN]; - char appName[TSDB_APPNAME_LEN]; int32_t pid; + char app[TSDB_APP_NAME_LEN]; + char db[TSDB_DB_NAME_LEN]; } SConnectMsg; typedef struct SEpSet { @@ -368,15 +366,14 @@ typedef struct SEpSet { } SEpSet; typedef struct { - char acctId[TSDB_ACCT_ID_LEN]; - char serverVersion[TSDB_VERSION_LEN]; - char clusterId[TSDB_CLUSTER_ID_LEN]; - int8_t writeAuth; - int8_t superAuth; - int8_t reserved1; - int8_t reserved2; - int32_t connId; - SEpSet epSet; + int32_t acctId; + int32_t clusterId; + int32_t connId; + int8_t superAuth; + int8_t readAuth; + int8_t writeAuth; + int8_t reserved[5]; + SEpSet epSet; } SConnectRsp; typedef struct { @@ -875,23 +872,23 @@ typedef struct { } SStreamDesc; typedef struct { - char clientVer[TSDB_VERSION_LEN]; - uint32_t connId; - int32_t pid; - int32_t numOfQueries; - int32_t numOfStreams; - char appName[TSDB_APPNAME_LEN]; - char pData[]; + int32_t connId; + int32_t pid; + int32_t numOfQueries; + int32_t numOfStreams; + char app[TSDB_APP_NAME_LEN]; + char pData[]; } SHeartBeatMsg; typedef struct { - uint32_t queryId; - uint32_t streamId; - uint32_t totalDnodes; - uint32_t onlineDnodes; - uint32_t connId; - int8_t killConnection; - SEpSet epSet; + int32_t connId; + int32_t queryId; + int32_t streamId; + int32_t totalDnodes; + int32_t onlineDnodes; + int8_t killConnection; + int8_t reserved[3]; + SEpSet epSet; } SHeartBeatRsp; typedef struct { diff --git a/include/util/tdef.h b/include/util/tdef.h index 76df8887a0..897f51f5c1 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -174,7 +174,7 @@ do { \ #define TSDB_MAX_SQL_SHOW_LEN 512 #define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024u) // sql length should be less than 1mb -#define TSDB_APPNAME_LEN TSDB_UNI_LEN +#define TSDB_APP_NAME_LEN TSDB_UNI_LEN /** * In some scenarios uint16_t (0~65535) is used to store the row len. diff --git a/include/util/tutil.h b/include/util/tutil.h index 8dbcb7e8d5..573dee9339 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -41,8 +41,10 @@ char * paGetToken(char *src, char **token, int32_t *tokenLen); int32_t taosByteArrayToHexStr(char bytes[], int32_t len, char hexstr[]); int32_t taosHexStrToByteArray(char hexstr[], char bytes[]); -char * taosIpStr(uint32_t ipInt); +char *taosIpStr(uint32_t ipInt); uint32_t ip2uint(const char *const ip_addr); +void taosIp2String(uint32_t ip, char *str); +void taosIpPort2String(uint32_t ip, uint16_t port, char *str); static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) { MD5_CTX context; diff --git a/source/dnode/mnode/impl/inc/mndDb.h b/source/dnode/mnode/impl/inc/mndDb.h index acccb62603..91f502be7d 100644 --- a/source/dnode/mnode/impl/inc/mndDb.h +++ b/source/dnode/mnode/impl/inc/mndDb.h @@ -24,6 +24,8 @@ extern "C" { int32_t mndInitDb(SMnode *pMnode); void mndCleanupDb(SMnode *pMnode); +SDbObj *mndAcquireDb(SMnode *pMnode, char *db); +void mndReleaseDb(SMnode *pMnode, SDbObj *pDb); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index dc27656dbf..aaf86c15b6 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -185,9 +185,11 @@ typedef struct SUserObj { char acct[TSDB_USER_LEN]; int64_t createdTime; int64_t updateTime; - int8_t rootAuth; + int8_t superAuth; + int8_t readAuth; + int8_t writeAuth; + int32_t acctId; SHashObj *prohibitDbHash; - SAcctObj *pAcct; } SUserObj; typedef struct { @@ -303,6 +305,8 @@ typedef struct SShowObj { typedef struct SMnodeMsg { char user[TSDB_USER_LEN]; + char db[TSDB_FULL_DB_NAME_LEN]; + int32_t acctId; SMnode *pMnode; int16_t received; int16_t successed; diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index e5643847a5..0f4dd0633b 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -28,8 +28,8 @@ extern "C" { typedef int32_t (*MndMsgFp)(SMnode *pMnode, SMnodeMsg *pMsg); typedef int32_t (*MndInitFp)(SMnode *pMnode); typedef void (*MndCleanupFp)(SMnode *pMnode); -typedef int32_t (*ShowMetaFp)(SMnode *pMnode, STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); -typedef int32_t (*ShowRetrieveFp)(SMnode *pMnode, SShowObj *pShow, char *data, int32_t rows, void *pConn); +typedef int32_t (*ShowMetaFp)(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); +typedef int32_t (*ShowRetrieveFp)(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter); typedef struct { @@ -46,6 +46,11 @@ typedef struct { SCacheObj *cache; } SShowMgmt; +typedef struct { + int32_t connId; + SCacheObj *cache; +} SProfileMgmt; + typedef struct SMnode { int32_t dnodeId; int32_t clusterId; @@ -58,6 +63,7 @@ typedef struct SMnode { SDnode *pDnode; SArray *pSteps; SShowMgmt showMgmt; + SProfileMgmt profileMgmt; MndMsgFp msgFp[TSDB_MSG_TYPE_MAX]; SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp; diff --git a/source/dnode/mnode/impl/inc/mndMnode.h b/source/dnode/mnode/impl/inc/mndMnode.h index 53f7b733f2..64bbb13b60 100644 --- a/source/dnode/mnode/impl/inc/mndMnode.h +++ b/source/dnode/mnode/impl/inc/mndMnode.h @@ -25,6 +25,7 @@ extern "C" { int32_t mndInitMnode(SMnode *pMnode); void mndCleanupMnode(SMnode *pMnode); bool mndIsMnode(SMnode *pMnode, int32_t dnodeId); +void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndShow.h b/source/dnode/mnode/impl/inc/mndShow.h index 31c0b69c85..e7a3fcd45f 100644 --- a/source/dnode/mnode/impl/inc/mndShow.h +++ b/source/dnode/mnode/impl/inc/mndShow.h @@ -24,9 +24,9 @@ extern "C" { int32_t mndInitShow(SMnode *pMnode); void mndCleanupShow(SMnode *pMnode); -void mnodeAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp); -void mnodeAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp); -void mnodeAddShowFreeIterHandle(SMnode *pMnode, EShowType msgType, ShowFreeIterFp fp); +void mndAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp); +void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp); +void mndAddShowFreeIterHandle(SMnode *pMnode, EShowType msgType, ShowFreeIterFp fp); void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow); #ifdef __cplusplus diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index dbe5dd4f89..6013ac0c31 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -170,9 +170,9 @@ int32_t mndInitCluster(SMnode *pMnode) { .updateFp = (SdbUpdateFp)mndClusterActionUpdate, .deleteFp = (SdbDeleteFp)mndClusterActionDelete}; - // mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeGetClusterMeta); - // mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeRetrieveClusters); - // mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeCancelGetNextCluster); + // mndAddShowMetaHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeGetClusterMeta); + // mndAddShowRetrieveHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeRetrieveClusters); + // mndAddShowFreeIterHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeCancelGetNextCluster); return sdbSetTable(pMnode->pSdb, table); } diff --git a/source/dnode/mnode/impl/src/mndConnect.c b/source/dnode/mnode/impl/src/mndConnect.c deleted file mode 100644 index 8e62eacfab..0000000000 --- a/source/dnode/mnode/impl/src/mndConnect.c +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "mndShow.h" - - - -// static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) { -// SHeartBeatRsp *pRsp = (SHeartBeatRsp *)rpcMallocCont(sizeof(SHeartBeatRsp)); -// if (pRsp == NULL) { -// return TSDB_CODE_MND_OUT_OF_MEMORY; -// } - -// SHeartBeatMsg *pHBMsg = pMsg->rpcMsg.pCont; -// if (taosCheckVersion(pHBMsg->clientVer, version, 3) != TSDB_CODE_SUCCESS) { -// rpcFreeCont(pRsp); -// return TSDB_CODE_TSC_INVALID_VERSION; // todo change the error code -// } - -// SRpcConnInfo connInfo = {0}; -// rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo); - -// int32_t connId = htonl(pHBMsg->connId); -// SConnObj *pConn = mnodeAccquireConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort); -// if (pConn == NULL) { -// pHBMsg->pid = htonl(pHBMsg->pid); -// pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pHBMsg->pid, pHBMsg->appName); -// } - -// if (pConn == NULL) { -// // do not close existing links, otherwise -// // mError("failed to create connId, close connect"); -// // pRsp->killConnection = 1; -// } else { -// pRsp->connId = htonl(pConn->connId); -// mnodeSaveQueryStreamList(pConn, pHBMsg); - -// if (pConn->killed != 0) { -// pRsp->killConnection = 1; -// } - -// if (pConn->streamId != 0) { -// pRsp->streamId = htonl(pConn->streamId); -// pConn->streamId = 0; -// } - -// if (pConn->queryId != 0) { -// pRsp->queryId = htonl(pConn->queryId); -// pConn->queryId = 0; -// } -// } - -// int32_t onlineDnodes = 0, totalDnodes = 0; -// mnodeGetOnlineAndTotalDnodesNum(&onlineDnodes, &totalDnodes); - -// pRsp->onlineDnodes = htonl(onlineDnodes); -// pRsp->totalDnodes = htonl(totalDnodes); -// mnodeGetMnodeEpSetForShell(&pRsp->epSet, false); - -// pMsg->rpcRsp.rsp = pRsp; -// pMsg->rpcRsp.len = sizeof(SHeartBeatRsp); - -// mnodeReleaseConn(pConn); -// return TSDB_CODE_SUCCESS; -// } - -// static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) { -// SConnectMsg *pConnectMsg = pMsg->rpcMsg.pCont; -// SConnectRsp *pConnectRsp = NULL; -// int32_t code = TSDB_CODE_SUCCESS; - -// SRpcConnInfo connInfo = {0}; -// if (rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo) != 0) { -// mError("thandle:%p is already released while process connect msg", pMsg->rpcMsg.handle); -// code = TSDB_CODE_MND_INVALID_CONNECTION; -// goto connect_over; -// } - -// code = taosCheckVersion(pConnectMsg->clientVersion, version, 3); -// if (code != TSDB_CODE_SUCCESS) { -// goto connect_over; -// } - -// SUserObj *pUser = pMsg->pUser; -// SAcctObj *pAcct = pUser->pAcct; - -// if (pConnectMsg->db[0]) { -// char dbName[TSDB_TABLE_FNAME_LEN * 3] = {0}; -// sprintf(dbName, "%x%s%s", pAcct->acctId, TS_PATH_DELIMITER, pConnectMsg->db); -// SDbObj *pDb = mnodeGetDb(dbName); -// if (pDb == NULL) { -// code = TSDB_CODE_MND_INVALID_DB; -// goto connect_over; -// } - -// if (pDb->status != TSDB_DB_STATUS_READY) { -// mError("db:%s, status:%d, in dropping", pDb->name, pDb->status); -// code = TSDB_CODE_MND_DB_IN_DROPPING; -// mnodeDecDbRef(pDb); -// goto connect_over; -// } -// mnodeDecDbRef(pDb); -// } - -// pConnectRsp = rpcMallocCont(sizeof(SConnectRsp)); -// if (pConnectRsp == NULL) { -// code = TSDB_CODE_MND_OUT_OF_MEMORY; -// goto connect_over; -// } - -// pConnectMsg->pid = htonl(pConnectMsg->pid); -// SConnObj *pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pConnectMsg->pid, pConnectMsg->appName); -// if (pConn == NULL) { -// code = terrno; -// } else { -// pConnectRsp->connId = htonl(pConn->connId); -// mnodeReleaseConn(pConn); -// } - -// sprintf(pConnectRsp->acctId, "%x", pAcct->acctId); -// memcpy(pConnectRsp->serverVersion, version, TSDB_VERSION_LEN); -// pConnectRsp->writeAuth = pUser->writeAuth; -// pConnectRsp->superAuth = pUser->superAuth; - -// mnodeGetMnodeEpSetForShell(&pConnectRsp->epSet, false); - -// dnodeGetClusterId(pConnectRsp->clusterId); - -// connect_over: -// if (code != TSDB_CODE_SUCCESS) { -// if (pConnectRsp) rpcFreeCont(pConnectRsp); -// mLError("user:%s login from %s, result:%s", connInfo.user, taosIpStr(connInfo.clientIp), tstrerror(code)); -// } else { -// mLInfo("user:%s login from %s, result:%s", connInfo.user, taosIpStr(connInfo.clientIp), tstrerror(code)); -// pMsg->rpcRsp.rsp = pConnectRsp; -// pMsg->rpcRsp.len = sizeof(SConnectRsp); -// } - -// return code; -// } - - diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 0ad144d2df..d990e926bb 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -14,26 +14,38 @@ */ #define _DEFAULT_SOURCE -#include "os.h" -#include "mndInt.h" +#include "mndDb.h" -int32_t mndInitDb(SMnode *pMnode) { return 0; } -void mndCleanupDb(SMnode *pMnode) {} +static int32_t mnodeProcessUseMsg(SMnode *pMnode, SMnodeMsg *pMsg); +int32_t mndInitDb(SMnode *pMnode) { + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_USE_DB, mnodeProcessUseMsg); + return 0; +} -// static int32_t mnodeProcessUseMsg(SMnodeMsg *pMsg) { -// SUseDbMsg *pUseDbMsg = pMsg->rpcMsg.pCont; +void mndCleanupDb(SMnode *pMnode) {} -// int32_t code = TSDB_CODE_SUCCESS; -// if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pUseDbMsg->db); -// if (pMsg->pDb == NULL) { -// return TSDB_CODE_MND_INVALID_DB; -// } - -// if (pMsg->pDb->status != TSDB_DB_STATUS_READY) { -// mError("db:%s, status:%d, in dropping", pMsg->pDb->name, pMsg->pDb->status); -// return TSDB_CODE_MND_DB_IN_DROPPING; -// } +SDbObj *mndAcquireDb(SMnode *pMnode, char *db) { + SSdb *pSdb = pMnode->pSdb; + return sdbAcquire(pSdb, SDB_DB, db); +} -// return code; -// } +void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pDb); +} + +static int32_t mnodeProcessUseMsg(SMnode *pMnode, SMnodeMsg *pMsg) { + SUseDbMsg *pUseDbMsg = pMsg->rpcMsg.pCont; + + strncpy(pMsg->db, pUseDbMsg->db, TSDB_FULL_DB_NAME_LEN); + + SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); + if (pDb != NULL) { + mndReleaseDb(pMnode, pDb); + return 0; + } else { + mError("db:%s, failed to process use db msg since %s", pMsg->db, terrstr()); + return -1; + } +} diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index d2ace31a36..c1c17e4983 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -135,4 +135,6 @@ bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) { sdbRelease(pSdb, pMnodeObj); return true; -} \ No newline at end of file +} + +void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 845e50210a..ef596f8ee1 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -14,8 +14,761 @@ */ #define _DEFAULT_SOURCE -#include "os.h" -#include "mndInt.h" +#include "mndProfile.h" +#include "mndDb.h" +#include "mndMnode.h" +#include "mndShow.h" +#include "mndUser.h" -int32_t mndInitProfile(SMnode *pMnode) { return 0; } -void mndCleanupProfile(SMnode *pMnode) {} \ No newline at end of file +#define QUERY_ID_SIZE 20 +#define QUERY_OBJ_ID_SIZE 18 +#define SUBQUERY_INFO_SIZE 6 +#define QUERY_STREAM_SAVE_SIZE 20 + +typedef struct { + char user[TSDB_USER_LEN]; + char app[TSDB_APP_NAME_LEN]; // app name that invokes taosc + int32_t pid; // pid of app that invokes taosc + int32_t connId; + int8_t killed; + int8_t align; + uint16_t port; + uint32_t ip; + int64_t stime; + int64_t lastAccess; + int32_t queryId; + int32_t streamId; + int32_t numOfQueries; + int32_t numOfStreams; + SStreamDesc *pStreams; + SQueryDesc *pQueries; +} SConnObj; + +static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t port, int32_t pid, const char *app); +static void mndFreeConn(SConnObj *pConn); +static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId, char *user, uint32_t ip, uint16_t port); +static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn); +static void *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn); +static void mndCancelGetNextConn(SMnode *pMnode, void *pIter); +static int32_t mndProcessHeartBeatMsg(SMnode *pMnode, SMnodeMsg *pMsg); +static int32_t mndProcessConnectMsg(SMnode *pMnode, SMnodeMsg *pMsg); +static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndRetrieveConns(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj *pShow); +static int32_t mndRetrieveQueries(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter); +static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj *pShow); +static int32_t mndRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, SMnodeMsg *pMsg); +static void mndCancelGetNextStream(SMnode *pMnode, void *pIter); + +int32_t mndInitProfile(SMnode *pMnode) { + SProfileMgmt *pMgmt = &pMnode->profileMgmt; + + int32_t connCheckTime = pMnode->shellActivityTimer * 2; + pMgmt->cache = taosCacheInit(TSDB_DATA_TYPE_INT, connCheckTime, true, (__cache_free_fn_t)mndFreeConn, "conn"); + if (pMgmt->cache == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("failed to alloc profile cache since %s", terrstr()); + return -1; + } + + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_HEARTBEAT, mndProcessHeartBeatMsg); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CONNECT, mndProcessConnectMsg); + + mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndGetConnsMeta); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndRetrieveConns); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_CONNS, mndCancelGetNextConn); + return 0; +} + +void mndCleanupProfile(SMnode *pMnode) { + SProfileMgmt *pMgmt = &pMnode->profileMgmt; + if (pMgmt->cache != NULL) { + taosCacheCleanup(pMgmt->cache); + pMgmt->cache = NULL; + } +} + +static SConnObj *mndCreateConn(SMnode *pMnode, char *user, uint32_t ip, uint16_t port, int32_t pid, const char *app) { + SProfileMgmt *pMgmt = &pMnode->profileMgmt; + + int32_t connId = atomic_add_fetch_32(&pMgmt->connId, 1); + if (connId == 0) atomic_add_fetch_32(&pMgmt->connId, 1); + + SConnObj connObj = {.pid = pid, + .connId = connId, + .killed = 0, + .port = port, + .ip = ip, + .stime = taosGetTimestampMs(), + .lastAccess = 0, + .queryId = 0, + .streamId = 0, + .numOfQueries = 0, + .numOfStreams = 0, + .pStreams = NULL, + .pQueries = NULL}; + + connObj.lastAccess = connObj.stime; + tstrncpy(connObj.user, user, TSDB_USER_LEN); + tstrncpy(connObj.app, app, TSDB_APP_NAME_LEN); + + int32_t keepTime = pMnode->shellActivityTimer * 3; + SConnObj *pConn = taosCachePut(pMgmt->cache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), keepTime * 1000); + + mDebug("conn:%d, is created, user:%s", connId, user); + return pConn; +} + +static void mndFreeConn(SConnObj *pConn) { + tfree(pConn->pQueries); + tfree(pConn->pStreams); + mDebug("conn:%d, is destroyed", pConn->connId); +} + +static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId, char *newUser, uint32_t newIp, uint16_t newPort) { + SProfileMgmt *pMgmt = &pMnode->profileMgmt; + + SConnObj *pConn = taosCacheAcquireByKey(pMgmt->cache, &connId, sizeof(int32_t)); + if (pConn == NULL) { + mDebug("conn:%d, already destroyed, user:%s", connId, newUser); + return NULL; + } + + if (pConn->ip != newIp || pConn->port != newPort /* || strcmp(pConn->user, newUser) != 0 */) { + char oldIpStr[30]; + char newIpStr[30]; + taosIp2String(pConn->ip, oldIpStr); + taosIp2String(newIp, newIpStr); + mDebug("conn:%d, incoming conn user:%s ip:%s:%u, not match exist user:%s ip:%s:%u", connId, newUser, newIpStr, + newPort, pConn->user, oldIpStr, pConn->port); + + if (pMgmt->connId < connId) pMgmt->connId = connId + 1; + taosCacheRelease(pMgmt->cache, (void **)&pConn, false); + return NULL; + } + + int32_t keepTime = pMnode->shellActivityTimer * 3; + pConn->lastAccess = keepTime * 1000 + (uint64_t)taosGetTimestampMs(); + return pConn; +} + +static void mndReleaseConn(SMnode *pMnode, SConnObj *pConn) { + SProfileMgmt *pMgmt = &pMnode->profileMgmt; + + if (pConn == NULL) return; + taosCacheRelease(pMgmt->cache, (void **)&pConn, false); +} + +static void *mndGetNextConn(SMnode *pMnode, void *pIter, SConnObj **pConn) { + SProfileMgmt *pMgmt = &pMnode->profileMgmt; + + *pConn = NULL; + + pIter = taosHashIterate(pMgmt->cache->pHashTable, pIter); + if (pIter == NULL) return NULL; + + SCacheDataNode **pNode = pIter; + if (pNode == NULL || *pNode == NULL) { + taosHashCancelIterate(pMgmt->cache->pHashTable, pIter); + return NULL; + } + + *pConn = (SConnObj *)((*pNode)->data); + return pIter; +} + +static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) { + SProfileMgmt *pMgmt = &pMnode->profileMgmt; + taosHashCancelIterate(pMgmt->cache->pHashTable, pIter); +} + +static int32_t mndProcessConnectMsg(SMnode *pMnode, SMnodeMsg *pMsg) { + SConnectMsg *pReq = pMsg->rpcMsg.pCont; + pReq->pid = htonl(pReq->pid); + + SRpcConnInfo info = {0}; + if (rpcGetConnInfo(pMsg->rpcMsg.handle, &info) != 0) { + mError("user:%s, failed to login while get connection info since %s", pMsg->user, terrstr()); + return -1; + } + + char ip[30]; + taosIp2String(info.clientIp, ip); + + if (pReq->db[0]) { + snprintf(pMsg->db, TSDB_FULL_DB_NAME_LEN, "%d%s%s", pMsg->acctId, TS_PATH_DELIMITER, pReq->db); + SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_INVALID_DB; + mError("user:%s, failed to login from %s while use db:%s since %s", pMsg->user, ip, pReq->db, terrstr()); + return -1; + } + mndReleaseDb(pMnode, pDb); + } + + SConnObj *pConn = mndCreateConn(pMnode, info.user, info.clientIp, info.clientPort, pReq->pid, pReq->app); + if (pConn == NULL) { + mError("user:%s, failed to login from %s while create connection since %s", pMsg->user, ip, terrstr()); + return -1; + } + + SConnectRsp *pRsp = rpcMallocCont(sizeof(SConnectRsp)); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("user:%s, failed to login from %s while create rsp since %s", pMsg->user, ip, terrstr()); + return -1; + } + + SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user); + if (pUser != NULL) { + pRsp->acctId = htonl(pUser->acctId); + pRsp->superAuth = pUser->superAuth; + pRsp->readAuth = pUser->readAuth; + pRsp->writeAuth = pUser->writeAuth; + mndReleaseUser(pMnode, pUser); + } + + pRsp->acctId = htonl(pUser->acctId); + pRsp->clusterId = htonl(pMnode->clusterId); + pRsp->connId = htonl(pConn->connId); + mndGetMnodeEpSet(pMnode, &pRsp->epSet); + + pMsg->contLen = sizeof(SConnectRsp); + pMsg->pCont = pRsp; + mDebug("user:%s, login from %s, conn:%d", info.user, ip, pConn->connId); + return 0; +} + +static int32_t mndProcessHeartBeatMsg(SMnode *pMnode, SMnodeMsg *pMsg) { + SHeartBeatMsg *pReq = pMsg->rpcMsg.pCont; + pReq->connId = htonl(pReq->connId); + pReq->pid = htonl(pReq->pid); + + SRpcConnInfo info = {0}; + if (rpcGetConnInfo(pMsg->rpcMsg.handle, &info) != 0) { + mError("user:%s, connId:%d failed to process hb since %s", pMsg->user, pReq->connId, terrstr()); + return -1; + } + + SConnObj *pConn = mndAcquireConn(pMnode, pReq->connId, info.user, info.clientIp, info.clientPort); + if (pConn == NULL) { + pConn = mndCreateConn(pMnode, info.user, info.clientIp, info.clientPort, pReq->pid, pReq->app); + if (pConn == NULL) { + mError("user:%s, conn:%d is freed and failed to create new conn since %s", pMsg->user, pReq->connId, terrstr()); + return -1; + } else { + mDebug("user:%s, conn:%d is freed and create a new conn:%d", pMsg->user, pReq->connId, pConn->connId); + } + } + + SHeartBeatRsp *pRsp = rpcMallocCont(sizeof(SHeartBeatRsp)); + if (pRsp == NULL) { + mndReleaseConn(pMnode, pConn); + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("user:%s, conn:%d failed to process hb while create rsp since %s", pMsg->user, pReq->connId, terrstr()); + return -1; + } + + pRsp->connId = htonl(pConn->connId); + pRsp->killConnection = pConn->killed; + mndGetMnodeEpSet(pMnode, &pRsp->epSet); + mndReleaseConn(pMnode, pConn); + + pMsg->contLen = sizeof(SConnectRsp); + pMsg->pCont = pRsp; + return 0; +} + +static int32_t mndGetConnsMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { + SMnode *pMnode = pMsg->pMnode; + SProfileMgmt *pMgmt = &pMnode->profileMgmt; + + SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user); + if (pUser == NULL) return 0; + if (!pUser->superAuth) { + mndReleaseUser(pMnode, pUser); + return TSDB_CODE_MND_NO_RIGHTS; + } + mndReleaseUser(pMnode, pUser); + + int32_t cols = 0; + SSchema *pSchema = pMeta->schema; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "connId"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "user"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + // app name + pShow->bytes[cols] = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "program"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + // app pid + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "pid"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "ip:port"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "login_time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "last_access"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = taosHashGetSize(pMgmt->cache->pHashTable); + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + + return 0; +} + +static int32_t mndRetrieveConns(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pMsg->pMnode; + int32_t numOfRows = 0; + SConnObj *pConnObj = NULL; + int32_t cols = 0; + char *pWrite; + char ipStr[TSDB_IPv4ADDR_LEN + 6]; + + while (numOfRows < rows) { + pShow->pIter = mndGetNextConn(pMnode, pShow->pIter, &pConnObj); + if (pConnObj == NULL) break; + + cols = 0; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t *)pWrite = pConnObj->connId; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]); + cols++; + + // app name + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->app, pShow->bytes[cols]); + cols++; + + // app pid + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t *)pWrite = pConnObj->pid; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + taosIpPort2String(pConnObj->ip, pConnObj->port, ipStr); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pConnObj->stime; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + if (pConnObj->lastAccess < pConnObj->stime) pConnObj->lastAccess = pConnObj->stime; + *(int64_t *)pWrite = pConnObj->lastAccess; + cols++; + + numOfRows++; + } + + pShow->numOfReads += numOfRows; + + return numOfRows; +} + +static int32_t mndGetQueryMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj *pShow) { + SMnode *pMnode = pMsg->pMnode; + SProfileMgmt *pMgmt = &pMnode->profileMgmt; + + SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user); + if (pUser == NULL) return 0; + if (!pUser->superAuth) { + mndReleaseUser(pMnode, pUser); + return TSDB_CODE_MND_NO_RIGHTS; + } + mndReleaseUser(pMnode, pUser); + + int32_t cols = 0; + SSchema *pSchema = pMeta->schema; + + pShow->bytes[cols] = QUERY_ID_SIZE + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "query_id"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "user"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "ip:port"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 24; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "qid"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "created_time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_BIGINT; + strcpy(pSchema[cols].name, "time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = QUERY_OBJ_ID_SIZE + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "sql_obj_id"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "pid"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "ep"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 1; + pSchema[cols].type = TSDB_DATA_TYPE_BOOL; + strcpy(pSchema[cols].name, "stable_query"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "sub_queries"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "sub_query_info"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "sql"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = 1000000; + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + + return 0; +} + +static int32_t mndRetrieveQueries(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pMsg->pMnode; + int32_t numOfRows = 0; + SConnObj *pConnObj = NULL; + int32_t cols = 0; + char * pWrite; + void * pIter; + char str[TSDB_IPv4ADDR_LEN + 6] = {0}; + + while (numOfRows < rows) { + pIter = mndGetNextConn(pMnode, pShow->pIter, &pConnObj); + if (pConnObj == NULL) { + pShow->pIter = pIter; + break; + } + + if (numOfRows + pConnObj->numOfQueries >= rows) { + mndCancelGetNextConn(pMnode, pIter); + break; + } + + pShow->pIter = pIter; + for (int32_t i = 0; i < pConnObj->numOfQueries; ++i) { + SQueryDesc *pDesc = pConnObj->pQueries + i; + cols = 0; + + snprintf(str, QUERY_ID_SIZE + 1, "%u:%u", pConnObj->connId, htonl(pDesc->queryId)); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + snprintf(str, tListLen(str), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]); + cols++; + + char handleBuf[24] = {0}; + snprintf(handleBuf, tListLen(handleBuf), "%" PRIu64, htobe64(pDesc->qId)); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, handleBuf, pShow->bytes[cols]); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = htobe64(pDesc->stime); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = htobe64(pDesc->useconds); + cols++; + + snprintf(str, tListLen(str), "0x%" PRIx64, htobe64(pDesc->sqlObjId)); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, str, pShow->bytes[cols]); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t *)pWrite = htonl(pDesc->pid); + cols++; + + char epBuf[TSDB_EP_LEN + 1] = {0}; + snprintf(epBuf, tListLen(epBuf), "%s:%u", pDesc->fqdn, pConnObj->port); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, epBuf, pShow->bytes[cols]); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(bool *)pWrite = pDesc->stableQuery; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t *)pWrite = htonl(pDesc->numOfSub); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->subSqlInfo, pShow->bytes[cols]); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->sql, pShow->bytes[cols]); + cols++; + + numOfRows++; + } + } + + pShow->numOfReads += numOfRows; + return numOfRows; +} + +static void mndCancelGetNextQuery(SMnode *pMnode, void *pIter) { + SProfileMgmt *pMgmt = &pMnode->profileMgmt; + taosHashCancelIterate(pMgmt->cache->pHashTable, pIter); +} + +static int32_t mndGetStreamMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta, SShowObj *pShow) { + SMnode *pMnode = pMsg->pMnode; + SProfileMgmt *pMgmt = &pMnode->profileMgmt; + + SUserObj *pUser = mndAcquireUser(pMnode, pMsg->user); + if (pUser == NULL) return 0; + if (!pUser->superAuth) { + mndReleaseUser(pMnode, pUser); + return TSDB_CODE_MND_NO_RIGHTS; + } + mndReleaseUser(pMnode, pUser); + + int32_t cols = 0; + SSchema *pSchema = pMeta->schema; + + pShow->bytes[cols] = QUERY_ID_SIZE + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "streamId"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "user"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "dest table"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "ip:port"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "created time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "exec time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_BIGINT; + strcpy(pSchema[cols].name, "time(us)"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "sql"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "cycles"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = 1000000; + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + + return 0; +} + +static int32_t mndRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + int32_t numOfRows = 0; + SConnObj *pConnObj = NULL; + int32_t cols = 0; + char *pWrite; + void *pIter; + char ipStr[TSDB_IPv4ADDR_LEN + 6]; + + while (numOfRows < rows) { + pIter = mndGetNextConn(pMnode, pShow->pIter, &pConnObj); + if (pConnObj == NULL) { + pShow->pIter = pIter; + break; + } + + if (numOfRows + pConnObj->numOfStreams >= rows) { + mndCancelGetNextConn(pMnode, pIter); + break; + } + + pShow->pIter = pIter; + for (int32_t i = 0; i < pConnObj->numOfStreams; ++i) { + SStreamDesc *pDesc = pConnObj->pStreams + i; + cols = 0; + + snprintf(ipStr, QUERY_ID_SIZE + 1, "%u:%u", pConnObj->connId, htonl(pDesc->streamId)); + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pConnObj->user, pShow->bytes[cols]); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->dstTable, pShow->bytes[cols]); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + snprintf(ipStr, sizeof(ipStr), "%s:%u", taosIpStr(pConnObj->ip), pConnObj->port); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, ipStr, pShow->bytes[cols]); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = htobe64(pDesc->ctime); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = htobe64(pDesc->stime); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = htobe64(pDesc->useconds); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDesc->sql, pShow->bytes[cols]); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int32_t *)pWrite = (int32_t)htobe64(pDesc->num); + cols++; + + numOfRows++; + } + } + + pShow->numOfReads += numOfRows; + return numOfRows; +} + +static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) { + SProfileMgmt *pMgmt = &pMnode->profileMgmt; + taosHashCancelIterate(pMgmt->cache->pHashTable, pIter); +} diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index af6a203006..790f5c1737 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -92,7 +92,7 @@ static int32_t mndProcessShowMsg(SMnode *pMnode, SMnodeMsg *pMnodeMsg) { pRsp->qhandle = htobe64((uint64_t)pShow); - int32_t code = (*metaFp)(pMnode, &pRsp->tableMeta, pShow, pMnodeMsg->rpcMsg.handle); + int32_t code = (*metaFp)(pMnodeMsg,pShow, &pRsp->tableMeta); mDebug("show:%d, type:%s, get meta finished, numOfRows:%d cols:%d result:%s", pShow->id, mndShowStr(type), pShow->numOfRows, pShow->numOfColumns, tstrerror(code)); @@ -169,7 +169,7 @@ static int32_t mndProcessRetrieveMsg(SMnode *pMnode, SMnodeMsg *pMnodeMsg) { // if free flag is set, client wants to clean the resources if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) { - rowsRead = (*retrieveFp)(pMnode, pShow, pRsp->data, rowsToRead, pMnodeMsg->rpcMsg.handle); + rowsRead = (*retrieveFp)(pMnodeMsg, pShow, pRsp->data, rowsToRead); } mDebug("show:%d, stop retrieve data, rowsRead:%d rowsToRead:%d", pShow->id, rowsRead, rowsToRead); @@ -311,17 +311,17 @@ void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capa } } -void mnodeAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp) { +void mndAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp) { SShowMgmt *pMgmt = &pMnode->showMgmt; pMgmt->metaFps[showType] = fp; } -void mnodeAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp) { +void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp) { SShowMgmt *pMgmt = &pMnode->showMgmt; pMgmt->retrieveFps[showType] = fp; } -void mnodeAddShowFreeIterHandle(SMnode *pMnode, EShowType showType, ShowFreeIterFp fp) { +void mndAddShowFreeIterHandle(SMnode *pMnode, EShowType showType, ShowFreeIterFp fp) { SShowMgmt *pMgmt = &pMnode->showMgmt; pMgmt->freeIterFps[showType] = fp; } diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index abbe41a60d..78869bd416 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -30,7 +30,7 @@ static SSdbRaw *mndUserActionEncode(SUserObj *pUser) { SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN) SDB_SET_INT64(pRaw, dataPos, pUser->createdTime) SDB_SET_INT64(pRaw, dataPos, pUser->updateTime) - SDB_SET_INT8(pRaw, dataPos, pUser->rootAuth) + SDB_SET_INT8(pRaw, dataPos, pUser->superAuth) SDB_SET_DATALEN(pRaw, dataPos); return pRaw; @@ -56,7 +56,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->acct, TSDB_USER_LEN) SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->createdTime) SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->updateTime) - SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->rootAuth) + SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->superAuth) return pRow; } @@ -70,12 +70,14 @@ static int32_t mndUserActionInsert(SSdb *pSdb, SUserObj *pUser) { return -1; } - pUser->pAcct = sdbAcquire(pSdb, SDB_ACCT, pUser->acct); - if (pUser->pAcct == NULL) { + SAcctObj *pAcct = sdbAcquire(pSdb, SDB_ACCT, pUser->acct); + if (pAcct == NULL) { terrno = TSDB_CODE_MND_ACCT_NOT_EXIST; mError("user:%s, failed to perform insert action since %s", pUser->user, terrstr()); return -1; } + pUser->acctId = pAcct->acctId; + sdbRelease(pSdb, pAcct); return 0; } @@ -87,11 +89,6 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) { pUser->prohibitDbHash = NULL; } - if (pUser->pAcct != NULL) { - sdbRelease(pSdb, pUser->pAcct); - pUser->pAcct = NULL; - } - return 0; } @@ -102,7 +99,7 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pSrcUser, SUserObj *pDs memcpy(pSrcUser->acct, pDstUser->acct, TSDB_USER_LEN); pSrcUser->createdTime = pDstUser->createdTime; pSrcUser->updateTime = pDstUser->updateTime; - pSrcUser->rootAuth = pDstUser->rootAuth; + pSrcUser->superAuth = pDstUser->superAuth; return 0; } @@ -115,7 +112,7 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char userObj.updateTime = userObj.createdTime; if (strcmp(user, TSDB_DEFAULT_USER) == 0) { - userObj.rootAuth = 1; + userObj.superAuth = 1; } SSdbRaw *pRaw = mndUserActionEncode(&userObj); @@ -145,7 +142,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass, taosEncryptPass((uint8_t *)pass, strlen(pass), userObj.pass); userObj.createdTime = taosGetTimestampMs(); userObj.updateTime = userObj.createdTime; - userObj.rootAuth = 0; + userObj.superAuth = 0; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); if (pTrans == NULL) return -1; diff --git a/source/util/src/tutil.c b/source/util/src/tutil.c index ee524e4448..b9d2da6939 100644 --- a/source/util/src/tutil.c +++ b/source/util/src/tutil.c @@ -410,3 +410,10 @@ char *taosIpStr(uint32_t ipInt) { return ipStr; } +void taosIp2String(uint32_t ip, char *str) { + sprintf(str, "%u.%u.%u.%u", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, (uint8_t)(ip >> 24)); +} + +void taosIpPort2String(uint32_t ip, uint16_t port, char *str) { + sprintf(str, "%u.%u.%u.%u:%u", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, (uint8_t)(ip >> 24), port); +} \ No newline at end of file -- GitLab