提交 5857ab5b 编写于 作者: S Shengliang Guan

TD-10431 process mnode profile

上级 6d7388de
......@@ -353,11 +353,9 @@ typedef struct {
} SUpdateTableTagValMsg;
typedef struct {
char clientVersion[TSDB_VERSION_LEN];
char msgVersion[TSDB_VERSION_LEN];
char db[TSDB_TABLE_FNAME_LEN];
char appName[TSDB_APPNAME_LEN];
int32_t pid;
char app[TSDB_APP_NAME_LEN];
char db[TSDB_DB_NAME_LEN];
} SConnectMsg;
typedef struct SEpSet {
......@@ -368,15 +366,14 @@ typedef struct SEpSet {
} SEpSet;
typedef struct {
char acctId[TSDB_ACCT_ID_LEN];
char serverVersion[TSDB_VERSION_LEN];
char clusterId[TSDB_CLUSTER_ID_LEN];
int8_t writeAuth;
int8_t superAuth;
int8_t reserved1;
int8_t reserved2;
int32_t connId;
SEpSet epSet;
int32_t acctId;
int32_t clusterId;
int32_t connId;
int8_t superAuth;
int8_t readAuth;
int8_t writeAuth;
int8_t reserved[5];
SEpSet epSet;
} SConnectRsp;
typedef struct {
......@@ -875,23 +872,23 @@ typedef struct {
} SStreamDesc;
typedef struct {
char clientVer[TSDB_VERSION_LEN];
uint32_t connId;
int32_t pid;
int32_t numOfQueries;
int32_t numOfStreams;
char appName[TSDB_APPNAME_LEN];
char pData[];
int32_t connId;
int32_t pid;
int32_t numOfQueries;
int32_t numOfStreams;
char app[TSDB_APP_NAME_LEN];
char pData[];
} SHeartBeatMsg;
typedef struct {
uint32_t queryId;
uint32_t streamId;
uint32_t totalDnodes;
uint32_t onlineDnodes;
uint32_t connId;
int8_t killConnection;
SEpSet epSet;
int32_t connId;
int32_t queryId;
int32_t streamId;
int32_t totalDnodes;
int32_t onlineDnodes;
int8_t killConnection;
int8_t reserved[3];
SEpSet epSet;
} SHeartBeatRsp;
typedef struct {
......
......@@ -174,7 +174,7 @@ do { \
#define TSDB_MAX_SQL_SHOW_LEN 512
#define TSDB_MAX_ALLOWED_SQL_LEN (1*1024*1024u) // sql length should be less than 1mb
#define TSDB_APPNAME_LEN TSDB_UNI_LEN
#define TSDB_APP_NAME_LEN TSDB_UNI_LEN
/**
* In some scenarios uint16_t (0~65535) is used to store the row len.
......
......@@ -41,8 +41,10 @@ char * paGetToken(char *src, char **token, int32_t *tokenLen);
int32_t taosByteArrayToHexStr(char bytes[], int32_t len, char hexstr[]);
int32_t taosHexStrToByteArray(char hexstr[], char bytes[]);
char * taosIpStr(uint32_t ipInt);
char *taosIpStr(uint32_t ipInt);
uint32_t ip2uint(const char *const ip_addr);
void taosIp2String(uint32_t ip, char *str);
void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
MD5_CTX context;
......
......@@ -24,6 +24,8 @@ extern "C" {
int32_t mndInitDb(SMnode *pMnode);
void mndCleanupDb(SMnode *pMnode);
SDbObj *mndAcquireDb(SMnode *pMnode, char *db);
void mndReleaseDb(SMnode *pMnode, SDbObj *pDb);
#ifdef __cplusplus
}
......
......@@ -185,9 +185,11 @@ typedef struct SUserObj {
char acct[TSDB_USER_LEN];
int64_t createdTime;
int64_t updateTime;
int8_t rootAuth;
int8_t superAuth;
int8_t readAuth;
int8_t writeAuth;
int32_t acctId;
SHashObj *prohibitDbHash;
SAcctObj *pAcct;
} SUserObj;
typedef struct {
......@@ -303,6 +305,8 @@ typedef struct SShowObj {
typedef struct SMnodeMsg {
char user[TSDB_USER_LEN];
char db[TSDB_FULL_DB_NAME_LEN];
int32_t acctId;
SMnode *pMnode;
int16_t received;
int16_t successed;
......
......@@ -28,8 +28,8 @@ extern "C" {
typedef int32_t (*MndMsgFp)(SMnode *pMnode, SMnodeMsg *pMsg);
typedef int32_t (*MndInitFp)(SMnode *pMnode);
typedef void (*MndCleanupFp)(SMnode *pMnode);
typedef int32_t (*ShowMetaFp)(SMnode *pMnode, STableMetaMsg *pMeta, SShowObj *pShow, void *pConn);
typedef int32_t (*ShowRetrieveFp)(SMnode *pMnode, SShowObj *pShow, char *data, int32_t rows, void *pConn);
typedef int32_t (*ShowMetaFp)(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta);
typedef int32_t (*ShowRetrieveFp)(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
typedef void (*ShowFreeIterFp)(SMnode *pMnode, void *pIter);
typedef struct {
......@@ -46,6 +46,11 @@ typedef struct {
SCacheObj *cache;
} SShowMgmt;
typedef struct {
int32_t connId;
SCacheObj *cache;
} SProfileMgmt;
typedef struct SMnode {
int32_t dnodeId;
int32_t clusterId;
......@@ -58,6 +63,7 @@ typedef struct SMnode {
SDnode *pDnode;
SArray *pSteps;
SShowMgmt showMgmt;
SProfileMgmt profileMgmt;
MndMsgFp msgFp[TSDB_MSG_TYPE_MAX];
SendMsgToDnodeFp sendMsgToDnodeFp;
SendMsgToMnodeFp sendMsgToMnodeFp;
......
......@@ -25,6 +25,7 @@ extern "C" {
int32_t mndInitMnode(SMnode *pMnode);
void mndCleanupMnode(SMnode *pMnode);
bool mndIsMnode(SMnode *pMnode, int32_t dnodeId);
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet);
#ifdef __cplusplus
}
......
......@@ -24,9 +24,9 @@ extern "C" {
int32_t mndInitShow(SMnode *pMnode);
void mndCleanupShow(SMnode *pMnode);
void mnodeAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp);
void mnodeAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp);
void mnodeAddShowFreeIterHandle(SMnode *pMnode, EShowType msgType, ShowFreeIterFp fp);
void mndAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp);
void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp);
void mndAddShowFreeIterHandle(SMnode *pMnode, EShowType msgType, ShowFreeIterFp fp);
void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capacity, SShowObj *pShow);
#ifdef __cplusplus
......
......@@ -170,9 +170,9 @@ int32_t mndInitCluster(SMnode *pMnode) {
.updateFp = (SdbUpdateFp)mndClusterActionUpdate,
.deleteFp = (SdbDeleteFp)mndClusterActionDelete};
// mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeGetClusterMeta);
// mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeRetrieveClusters);
// mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeCancelGetNextCluster);
// mndAddShowMetaHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeGetClusterMeta);
// mndAddShowRetrieveHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeRetrieveClusters);
// mndAddShowFreeIterHandle(TSDB_MGMT_TABLE_CLUSTER, mnodeCancelGetNextCluster);
return sdbSetTable(pMnode->pSdb, table);
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndShow.h"
// static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
// SHeartBeatRsp *pRsp = (SHeartBeatRsp *)rpcMallocCont(sizeof(SHeartBeatRsp));
// if (pRsp == NULL) {
// return TSDB_CODE_MND_OUT_OF_MEMORY;
// }
// SHeartBeatMsg *pHBMsg = pMsg->rpcMsg.pCont;
// if (taosCheckVersion(pHBMsg->clientVer, version, 3) != TSDB_CODE_SUCCESS) {
// rpcFreeCont(pRsp);
// return TSDB_CODE_TSC_INVALID_VERSION; // todo change the error code
// }
// SRpcConnInfo connInfo = {0};
// rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo);
// int32_t connId = htonl(pHBMsg->connId);
// SConnObj *pConn = mnodeAccquireConn(connId, connInfo.user, connInfo.clientIp, connInfo.clientPort);
// if (pConn == NULL) {
// pHBMsg->pid = htonl(pHBMsg->pid);
// pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pHBMsg->pid, pHBMsg->appName);
// }
// if (pConn == NULL) {
// // do not close existing links, otherwise
// // mError("failed to create connId, close connect");
// // pRsp->killConnection = 1;
// } else {
// pRsp->connId = htonl(pConn->connId);
// mnodeSaveQueryStreamList(pConn, pHBMsg);
// if (pConn->killed != 0) {
// pRsp->killConnection = 1;
// }
// if (pConn->streamId != 0) {
// pRsp->streamId = htonl(pConn->streamId);
// pConn->streamId = 0;
// }
// if (pConn->queryId != 0) {
// pRsp->queryId = htonl(pConn->queryId);
// pConn->queryId = 0;
// }
// }
// int32_t onlineDnodes = 0, totalDnodes = 0;
// mnodeGetOnlineAndTotalDnodesNum(&onlineDnodes, &totalDnodes);
// pRsp->onlineDnodes = htonl(onlineDnodes);
// pRsp->totalDnodes = htonl(totalDnodes);
// mnodeGetMnodeEpSetForShell(&pRsp->epSet, false);
// pMsg->rpcRsp.rsp = pRsp;
// pMsg->rpcRsp.len = sizeof(SHeartBeatRsp);
// mnodeReleaseConn(pConn);
// return TSDB_CODE_SUCCESS;
// }
// static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
// SConnectMsg *pConnectMsg = pMsg->rpcMsg.pCont;
// SConnectRsp *pConnectRsp = NULL;
// int32_t code = TSDB_CODE_SUCCESS;
// SRpcConnInfo connInfo = {0};
// if (rpcGetConnInfo(pMsg->rpcMsg.handle, &connInfo) != 0) {
// mError("thandle:%p is already released while process connect msg", pMsg->rpcMsg.handle);
// code = TSDB_CODE_MND_INVALID_CONNECTION;
// goto connect_over;
// }
// code = taosCheckVersion(pConnectMsg->clientVersion, version, 3);
// if (code != TSDB_CODE_SUCCESS) {
// goto connect_over;
// }
// SUserObj *pUser = pMsg->pUser;
// SAcctObj *pAcct = pUser->pAcct;
// if (pConnectMsg->db[0]) {
// char dbName[TSDB_TABLE_FNAME_LEN * 3] = {0};
// sprintf(dbName, "%x%s%s", pAcct->acctId, TS_PATH_DELIMITER, pConnectMsg->db);
// SDbObj *pDb = mnodeGetDb(dbName);
// if (pDb == NULL) {
// code = TSDB_CODE_MND_INVALID_DB;
// goto connect_over;
// }
// if (pDb->status != TSDB_DB_STATUS_READY) {
// mError("db:%s, status:%d, in dropping", pDb->name, pDb->status);
// code = TSDB_CODE_MND_DB_IN_DROPPING;
// mnodeDecDbRef(pDb);
// goto connect_over;
// }
// mnodeDecDbRef(pDb);
// }
// pConnectRsp = rpcMallocCont(sizeof(SConnectRsp));
// if (pConnectRsp == NULL) {
// code = TSDB_CODE_MND_OUT_OF_MEMORY;
// goto connect_over;
// }
// pConnectMsg->pid = htonl(pConnectMsg->pid);
// SConnObj *pConn = mnodeCreateConn(connInfo.user, connInfo.clientIp, connInfo.clientPort, pConnectMsg->pid, pConnectMsg->appName);
// if (pConn == NULL) {
// code = terrno;
// } else {
// pConnectRsp->connId = htonl(pConn->connId);
// mnodeReleaseConn(pConn);
// }
// sprintf(pConnectRsp->acctId, "%x", pAcct->acctId);
// memcpy(pConnectRsp->serverVersion, version, TSDB_VERSION_LEN);
// pConnectRsp->writeAuth = pUser->writeAuth;
// pConnectRsp->superAuth = pUser->superAuth;
// mnodeGetMnodeEpSetForShell(&pConnectRsp->epSet, false);
// dnodeGetClusterId(pConnectRsp->clusterId);
// connect_over:
// if (code != TSDB_CODE_SUCCESS) {
// if (pConnectRsp) rpcFreeCont(pConnectRsp);
// mLError("user:%s login from %s, result:%s", connInfo.user, taosIpStr(connInfo.clientIp), tstrerror(code));
// } else {
// mLInfo("user:%s login from %s, result:%s", connInfo.user, taosIpStr(connInfo.clientIp), tstrerror(code));
// pMsg->rpcRsp.rsp = pConnectRsp;
// pMsg->rpcRsp.len = sizeof(SConnectRsp);
// }
// return code;
// }
......@@ -14,26 +14,38 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "mndInt.h"
#include "mndDb.h"
int32_t mndInitDb(SMnode *pMnode) { return 0; }
void mndCleanupDb(SMnode *pMnode) {}
static int32_t mnodeProcessUseMsg(SMnode *pMnode, SMnodeMsg *pMsg);
int32_t mndInitDb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_USE_DB, mnodeProcessUseMsg);
return 0;
}
// static int32_t mnodeProcessUseMsg(SMnodeMsg *pMsg) {
// SUseDbMsg *pUseDbMsg = pMsg->rpcMsg.pCont;
void mndCleanupDb(SMnode *pMnode) {}
// int32_t code = TSDB_CODE_SUCCESS;
// if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pUseDbMsg->db);
// if (pMsg->pDb == NULL) {
// return TSDB_CODE_MND_INVALID_DB;
// }
// if (pMsg->pDb->status != TSDB_DB_STATUS_READY) {
// mError("db:%s, status:%d, in dropping", pMsg->pDb->name, pMsg->pDb->status);
// return TSDB_CODE_MND_DB_IN_DROPPING;
// }
SDbObj *mndAcquireDb(SMnode *pMnode, char *db) {
SSdb *pSdb = pMnode->pSdb;
return sdbAcquire(pSdb, SDB_DB, db);
}
// return code;
// }
void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pDb);
}
static int32_t mnodeProcessUseMsg(SMnode *pMnode, SMnodeMsg *pMsg) {
SUseDbMsg *pUseDbMsg = pMsg->rpcMsg.pCont;
strncpy(pMsg->db, pUseDbMsg->db, TSDB_FULL_DB_NAME_LEN);
SDbObj *pDb = mndAcquireDb(pMnode, pMsg->db);
if (pDb != NULL) {
mndReleaseDb(pMnode, pDb);
return 0;
} else {
mError("db:%s, failed to process use db msg since %s", pMsg->db, terrstr());
return -1;
}
}
......@@ -135,4 +135,6 @@ bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) {
sdbRelease(pSdb, pMnodeObj);
return true;
}
\ No newline at end of file
}
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {}
\ No newline at end of file
......@@ -92,7 +92,7 @@ static int32_t mndProcessShowMsg(SMnode *pMnode, SMnodeMsg *pMnodeMsg) {
pRsp->qhandle = htobe64((uint64_t)pShow);
int32_t code = (*metaFp)(pMnode, &pRsp->tableMeta, pShow, pMnodeMsg->rpcMsg.handle);
int32_t code = (*metaFp)(pMnodeMsg,pShow, &pRsp->tableMeta);
mDebug("show:%d, type:%s, get meta finished, numOfRows:%d cols:%d result:%s", pShow->id, mndShowStr(type),
pShow->numOfRows, pShow->numOfColumns, tstrerror(code));
......@@ -169,7 +169,7 @@ static int32_t mndProcessRetrieveMsg(SMnode *pMnode, SMnodeMsg *pMnodeMsg) {
// if free flag is set, client wants to clean the resources
if ((pRetrieve->free & TSDB_QUERY_TYPE_FREE_RESOURCE) != TSDB_QUERY_TYPE_FREE_RESOURCE) {
rowsRead = (*retrieveFp)(pMnode, pShow, pRsp->data, rowsToRead, pMnodeMsg->rpcMsg.handle);
rowsRead = (*retrieveFp)(pMnodeMsg, pShow, pRsp->data, rowsToRead);
}
mDebug("show:%d, stop retrieve data, rowsRead:%d rowsToRead:%d", pShow->id, rowsRead, rowsToRead);
......@@ -311,17 +311,17 @@ void mnodeVacuumResult(char *data, int32_t numOfCols, int32_t rows, int32_t capa
}
}
void mnodeAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp) {
void mndAddShowMetaHandle(SMnode *pMnode, EShowType showType, ShowMetaFp fp) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
pMgmt->metaFps[showType] = fp;
}
void mnodeAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp) {
void mndAddShowRetrieveHandle(SMnode *pMnode, EShowType showType, ShowRetrieveFp fp) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
pMgmt->retrieveFps[showType] = fp;
}
void mnodeAddShowFreeIterHandle(SMnode *pMnode, EShowType showType, ShowFreeIterFp fp) {
void mndAddShowFreeIterHandle(SMnode *pMnode, EShowType showType, ShowFreeIterFp fp) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
pMgmt->freeIterFps[showType] = fp;
}
......@@ -30,7 +30,7 @@ static SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
SDB_SET_BINARY(pRaw, dataPos, pUser->acct, TSDB_USER_LEN)
SDB_SET_INT64(pRaw, dataPos, pUser->createdTime)
SDB_SET_INT64(pRaw, dataPos, pUser->updateTime)
SDB_SET_INT8(pRaw, dataPos, pUser->rootAuth)
SDB_SET_INT8(pRaw, dataPos, pUser->superAuth)
SDB_SET_DATALEN(pRaw, dataPos);
return pRaw;
......@@ -56,7 +56,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY(pRaw, pRow, dataPos, pUser->acct, TSDB_USER_LEN)
SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->createdTime)
SDB_GET_INT64(pRaw, pRow, dataPos, &pUser->updateTime)
SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->rootAuth)
SDB_GET_INT8(pRaw, pRow, dataPos, &pUser->superAuth)
return pRow;
}
......@@ -70,12 +70,14 @@ static int32_t mndUserActionInsert(SSdb *pSdb, SUserObj *pUser) {
return -1;
}
pUser->pAcct = sdbAcquire(pSdb, SDB_ACCT, pUser->acct);
if (pUser->pAcct == NULL) {
SAcctObj *pAcct = sdbAcquire(pSdb, SDB_ACCT, pUser->acct);
if (pAcct == NULL) {
terrno = TSDB_CODE_MND_ACCT_NOT_EXIST;
mError("user:%s, failed to perform insert action since %s", pUser->user, terrstr());
return -1;
}
pUser->acctId = pAcct->acctId;
sdbRelease(pSdb, pAcct);
return 0;
}
......@@ -87,11 +89,6 @@ static int32_t mndUserActionDelete(SSdb *pSdb, SUserObj *pUser) {
pUser->prohibitDbHash = NULL;
}
if (pUser->pAcct != NULL) {
sdbRelease(pSdb, pUser->pAcct);
pUser->pAcct = NULL;
}
return 0;
}
......@@ -102,7 +99,7 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pSrcUser, SUserObj *pDs
memcpy(pSrcUser->acct, pDstUser->acct, TSDB_USER_LEN);
pSrcUser->createdTime = pDstUser->createdTime;
pSrcUser->updateTime = pDstUser->updateTime;
pSrcUser->rootAuth = pDstUser->rootAuth;
pSrcUser->superAuth = pDstUser->superAuth;
return 0;
}
......@@ -115,7 +112,7 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char
userObj.updateTime = userObj.createdTime;
if (strcmp(user, TSDB_DEFAULT_USER) == 0) {
userObj.rootAuth = 1;
userObj.superAuth = 1;
}
SSdbRaw *pRaw = mndUserActionEncode(&userObj);
......@@ -145,7 +142,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass,
taosEncryptPass((uint8_t *)pass, strlen(pass), userObj.pass);
userObj.createdTime = taosGetTimestampMs();
userObj.updateTime = userObj.createdTime;
userObj.rootAuth = 0;
userObj.superAuth = 0;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
if (pTrans == NULL) return -1;
......
......@@ -410,3 +410,10 @@ char *taosIpStr(uint32_t ipInt) {
return ipStr;
}
void taosIp2String(uint32_t ip, char *str) {
sprintf(str, "%u.%u.%u.%u", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, (uint8_t)(ip >> 24));
}
void taosIpPort2String(uint32_t ip, uint16_t port, char *str) {
sprintf(str, "%u.%u.%u.%u:%u", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, (uint8_t)(ip >> 24), port);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册