From 3a056eddea7072aae20d666e62be6db4bbfe777c Mon Sep 17 00:00:00 2001 From: slguan Date: Wed, 4 Mar 2020 20:49:23 +0800 Subject: [PATCH] for cluster module --- src/dnode/src/dnodeMgmt.c | 4 +- src/inc/taosmsg.h | 1 - src/mnode/inc/mgmtConn.h | 36 --------- src/mnode/inc/mgmtDnode.h | 3 + src/mnode/inc/mgmtGrant.h | 1 + src/mnode/inc/mgmtMnode.h | 5 +- src/mnode/inc/mgmtProfile.h | 14 ++-- src/mnode/inc/mgmtShell.h | 2 - src/mnode/src/mgmtConn.c | 153 ------------------------------------ src/mnode/src/mgmtDnode.c | 3 + src/mnode/src/mgmtGrant.c | 19 +++-- src/mnode/src/mgmtMnode.c | 55 ++++++------- src/mnode/src/mgmtProfile.c | 139 +++++++++++++++++++++++++++++++- src/mnode/src/mgmtShell.c | 73 ++++++++++++++++- 14 files changed, 264 insertions(+), 244 deletions(-) delete mode 100644 src/mnode/inc/mgmtConn.h delete mode 100644 src/mnode/src/mgmtConn.c diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index d8273738c3..d0cb0a39af 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -30,12 +30,10 @@ void (*dnodeInitMgmtIpFp)() = NULL; int32_t (*dnodeInitMgmtFp)() = NULL; void (*dnodeCleanUpMgmtFp)() = NULL; - -void (*dnodeProcessStatusRspFp)(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL; +void (*dnodeProcessStatusRspFp)(void *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL; void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL; void (*dnodeSendRspToMnodeFp)(void *handle, int32_t code, void *pCont, int contLen) = NULL; - static void *tsStatusTimer = NULL; static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, int8_t msgType, void *pConn); static void dnodeInitProcessShellMsg(); diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index a05bd20f84..7914c6cc86 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -595,7 +595,6 @@ typedef struct { typedef struct { int32_t code; - int32_t numOfVnodes; SDnodeState dnodeState; SRpcIpSet ipList; SVnodeAccess vnodeAccess[]; diff --git a/src/mnode/inc/mgmtConn.h b/src/mnode/inc/mgmtConn.h deleted file mode 100644 index 62dd5ebb42..0000000000 --- a/src/mnode/inc/mgmtConn.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_MGMT_CONN_H -#define TDENGINE_MGMT_CONN_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include "mnode.h" - -int mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); -int mgmtRetrieveConns(SShowObj *pShow, char *data, int rows, void *pConn); - -bool mgmtCheckQhandle(uint64_t qhandle); -void mgmtSaveQhandle(void *qhandle); -void mgmtFreeQhandle(void *qhandle); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/mnode/inc/mgmtDnode.h b/src/mnode/inc/mgmtDnode.h index 4cdac1e7af..aafe45be5c 100644 --- a/src/mnode/inc/mgmtDnode.h +++ b/src/mnode/inc/mgmtDnode.h @@ -53,6 +53,9 @@ bool mgmtCheckConfigShow(SGlobalConfig *cfg); void mgmtSetDnodeUnRemove(SDnodeObj *pDnode); SDnodeObj* mgmtGetDnode(uint32_t ip); +extern int32_t (*mgmtCreateDnodeFp)(uint32_t ip); +extern int32_t (*mgmtDropDnodeByIpFp)(uint32_t ip); + #ifdef __cplusplus } #endif diff --git a/src/mnode/inc/mgmtGrant.h b/src/mnode/inc/mgmtGrant.h index 1cfc88f94a..01a5068bad 100644 --- a/src/mnode/inc/mgmtGrant.h +++ b/src/mnode/inc/mgmtGrant.h @@ -30,6 +30,7 @@ void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeseries); int32_t mgmtCheckTimeSeries(uint32_t timeseries); int32_t mgmtCheckUserGrant(); int32_t mgmtCheckDbGrant(); +int32_t mgmtCheckDnodeGrant(); int32_t mgmtGetGrantsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtRetrieveGrants(SShowObj *pShow, char *data, int32_t rows, void *pConn); diff --git a/src/mnode/inc/mgmtMnode.h b/src/mnode/inc/mgmtMnode.h index d012997d13..9132993994 100644 --- a/src/mnode/inc/mgmtMnode.h +++ b/src/mnode/inc/mgmtMnode.h @@ -24,7 +24,10 @@ extern "C" { #include #include "mnode.h" - int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); +int32_t mgmtInitMnodes(); +void mgmtCleanUpMnodes(); + +int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); #ifdef __cplusplus diff --git a/src/mnode/inc/mgmtProfile.h b/src/mnode/inc/mgmtProfile.h index 959f9e65ab..5af38a73b8 100644 --- a/src/mnode/inc/mgmtProfile.h +++ b/src/mnode/inc/mgmtProfile.h @@ -22,20 +22,22 @@ extern "C" { #include "mnode.h" -int32_t mgmtGetQueryMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); - -int32_t mgmtGetStreamMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); +bool mgmtCheckQhandle(uint64_t qhandle); +void mgmtSaveQhandle(void *qhandle); +void mgmtFreeQhandle(void *qhandle); +int32_t mgmtSaveQueryStreamList(SHeartBeatMsg *pHBMsg); +int32_t mgmtGetQueryMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn); +int32_t mgmtGetStreamMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn); -int32_t mgmtSaveQueryStreamList(SHeartBeatMsg *pHBMsg); +int32_t mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); +int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn); int32_t mgmtKillQuery(char *qidstr, void *pConn); - int32_t mgmtKillStream(char *qidstr, void *pConn); - int32_t mgmtKillConnection(char *qidstr, void *pConn); enum { diff --git a/src/mnode/inc/mgmtShell.h b/src/mnode/inc/mgmtShell.h index f14871b5b2..06b0068652 100644 --- a/src/mnode/inc/mgmtShell.h +++ b/src/mnode/inc/mgmtShell.h @@ -28,10 +28,8 @@ int32_t mgmtInitShell(); void mgmtCleanUpShell(); extern int32_t (*mgmtCheckRedirectMsg)(void *pConn); -extern void (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle); extern void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle); extern void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle); -extern void (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle); /* * If table not exist, will create it diff --git a/src/mnode/src/mgmtConn.c b/src/mnode/src/mgmtConn.c deleted file mode 100644 index 5d7b8ab27f..0000000000 --- a/src/mnode/src/mgmtConn.c +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "mgmtConn.h" -#include "taosmsg.h" -#include "tschemautil.h" - -typedef struct { - char user[TSDB_TABLE_ID_LEN]; - uint64_t stime; - uint32_t ip; - uint16_t port; -} SConnInfo; - -typedef struct { - int numOfConns; - int index; - SConnInfo connInfo[]; -} SConnShow; - -int mgmtGetConns(SShowObj *pShow, void *pConn) { -// SAcctObj * pAcct = pConn->pAcct; -// SConnShow *pConnShow; -// -// pthread_mutex_lock(&pAcct->mutex); -// -// pConnShow = malloc(sizeof(SConnInfo) * pAcct->acctInfo.numOfConns + sizeof(SConnShow)); -// pConnShow->index = 0; -// pConnShow->numOfConns = 0; -// -// if (pAcct->acctInfo.numOfConns > 0) { -// pConn = pAcct->pConn; -// SConnInfo *pConnInfo = pConnShow->connInfo; -// -// while (pConn && pConn->pUser) { -// strcpy(pConnInfo->user, pConn->pUser->user); -// pConnInfo->ip = pConn->ip; -// pConnInfo->port = pConn->port; -// pConnInfo->stime = pConn->stime; -// -// pConnShow->numOfConns++; -// pConnInfo++; -// pConn = pConn->next; -// } -// } -// -// pthread_mutex_unlock(&pAcct->mutex); -// -// // sorting based on useconds -// -// pShow->pNode = pConnShow; - - return 0; -} - -int mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { - int cols = 0; - - pShow->bytes[cols] = TSDB_TABLE_NAME_LEN; - SSchema *pSchema = tsGetSchema(pMeta); - - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "user"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6; - pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "ip:port"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pShow->bytes[cols] = 8; - pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; - strcpy(pSchema[cols].name, "login time"); - pSchema[cols].bytes = htons(pShow->bytes[cols]); - cols++; - - pMeta->numOfColumns = htons(cols); - pShow->numOfColumns = cols; - - pShow->offset[0] = 0; - for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; - - pShow->numOfRows = 1000000; - pShow->pNode = NULL; - pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; - - mgmtGetConns(pShow, pConn); - return 0; -} - -int mgmtRetrieveConns(SShowObj *pShow, char *data, int rows, void *pConn) { - int numOfRows = 0; - char *pWrite; - int cols = 0; - - SConnShow *pConnShow = (SConnShow *)pShow->pNode; - - if (rows > pConnShow->numOfConns - pConnShow->index) rows = pConnShow->numOfConns - pConnShow->index; - - while (numOfRows < rows) { - SConnInfo *pNode = pConnShow->connInfo + pConnShow->index; - cols = 0; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - strcpy(pWrite, pNode->user); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - uint32_t ip = pNode->ip; - sprintf(pWrite, "%d.%d.%d.%d:%hu", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24, htons(pNode->port)); - cols++; - - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pNode->stime; - cols++; - - numOfRows++; - pConnShow->index++; - } - - if (numOfRows == 0) { - tfree(pConnShow); - } - - pShow->numOfReads += numOfRows; - return numOfRows; -} - -bool mgmtCheckQhandle(uint64_t qhandle) { - return true; -} - -void mgmtSaveQhandle(void *qhandle) { -} - -void mgmtFreeQhandle(void *qhandle) { -} \ No newline at end of file diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index 20043c3800..a28f337f69 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -33,6 +33,9 @@ void * (*mgmtGetNextDnodeFp)(SShowObj *pShow, SDnodeObj **pDnode) = NULL; int32_t (*mgmtGetScoresMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = NULL; int32_t (*mgmtRetrieveScoresFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = NULL; void (*mgmtSetDnodeUnRemoveFp)(SDnodeObj *pDnode) = NULL; +int32_t (*mgmtCreateDnodeFp)(uint32_t ip) = NULL; +int32_t (*mgmtDropDnodeByIpFp)(uint32_t ip) = NULL; + static SDnodeObj tsDnodeObj = {0}; diff --git a/src/mnode/src/mgmtGrant.c b/src/mnode/src/mgmtGrant.c index c24fb82aa6..0ea212b86f 100644 --- a/src/mnode/src/mgmtGrant.c +++ b/src/mnode/src/mgmtGrant.c @@ -22,6 +22,7 @@ int32_t (*mgmtCheckUserGrantFp)() = NULL; int32_t (*mgmtCheckDbGrantFp)() = NULL; +int32_t (*mgmtCheckDnodeGrantFp)() = NULL; void (*mgmtAddTimeSeriesFp)(uint32_t timeSeriesNum) = NULL; void (*mgmtRestoreTimeSeriesFp)(uint32_t timeSeriesNum) = NULL; int32_t (*mgmtCheckTimeSeriesFp)(uint32_t timeseries) = NULL; @@ -32,7 +33,7 @@ void (*mgmtUpdateGrantInfoFp)(void *pCont) = NULL; int32_t mgmtCheckUserGrant() { if (mgmtCheckUserGrantFp) { - return mgmtCheckUserGrantFp(); + return (*mgmtCheckUserGrantFp)(); } else { return 0; } @@ -40,7 +41,15 @@ int32_t mgmtCheckUserGrant() { int32_t mgmtCheckDbGrant() { if (mgmtCheckDbGrantFp) { - return mgmtCheckDbGrantFp(); + return (*mgmtCheckDbGrantFp)(); + } else { + return 0; + } +} + +int32_t mgmtCheckDnodeGrant() { + if (mgmtCheckDnodeGrantFp) { + return (*mgmtCheckDnodeGrantFp)(); } else { return 0; } @@ -49,20 +58,20 @@ int32_t mgmtCheckDbGrant() { void mgmtAddTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum) { pAcct->acctInfo.numOfTimeSeries += timeSeriesNum; if (mgmtAddTimeSeriesFp) { - mgmtAddTimeSeriesFp(timeSeriesNum); + (*mgmtAddTimeSeriesFp)(timeSeriesNum); } } void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum) { pAcct->acctInfo.numOfTimeSeries -= timeSeriesNum; if (mgmtRestoreTimeSeriesFp) { - mgmtRestoreTimeSeriesFp(timeSeriesNum); + (*mgmtRestoreTimeSeriesFp)(timeSeriesNum); } } int32_t mgmtCheckTimeSeries(uint32_t timeseries) { if (mgmtCheckTimeSeriesFp) { - return mgmtCheckTimeSeriesFp(timeseries); + return (*mgmtCheckTimeSeriesFp)(timeseries); } else { return 0; } diff --git a/src/mnode/src/mgmtMnode.c b/src/mnode/src/mgmtMnode.c index 1c60312f3e..9cc796ec0f 100644 --- a/src/mnode/src/mgmtMnode.c +++ b/src/mnode/src/mgmtMnode.c @@ -18,12 +18,32 @@ #include "mgmtMnode.h" #include "mgmtUser.h" -void *(*mgmtGetNextMnodeFp)(SShowObj *pShow, SSdbPeer **pMnode) = NULL; int32_t (*mgmtInitMnodesFp)() = NULL; +void (*mgmtCleanUpMnodesFp)() = NULL; int32_t (*mgmtGetMnodesNumFp)() = NULL; +void * (*mgmtGetNextMnodeFp)(SShowObj *pShow, SSdbPeer **pMnode) = NULL; -static int32_t mgmtGetMnodesNum(); -static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode); +static int32_t mgmtGetMnodesNum() { + if (mgmtGetMnodesNumFp) { + return (*mgmtGetMnodesNumFp)(); + } else { + return 1; + } +} + +static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode) { + if (mgmtGetNextMnodeFp) { + return (*mgmtGetNextMnodeFp)(pShow, pMnode); + } else { + if (*pMnode == NULL) { + *pMnode = NULL; + } else { + *pMnode = NULL; + } + } + + return *pMnode; +} int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { int32_t cols = 0; @@ -88,11 +108,8 @@ int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon char ipstr[20]; while (numOfRows < rows) { - pShow->pNode = mgmtGetNextMnode(pShow, (SDnodeObj **)&pMnode); - - -// pShow->pNode = sdbFetchRow(mnodeSdb, pShow->pNode, (void **)&pMnode); -// if (pMnode == NULL) break; + pShow->pNode = mgmtGetNextMnode(pShow, (SSdbPeer **)&pMnode); + if (pMnode == NULL) break; cols = 0; @@ -123,25 +140,3 @@ int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon pShow->numOfReads += numOfRows; return numOfRows; } - -static int32_t mgmtGetMnodesNum() { - if (mgmtGetMnodesNumFp) { - return mgmtGetMnodesNumFp(); - } else { - return 1; - } -} - -static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode) { - if (mgmtGetNextMnodeFp) { - return mgmtGetNextMnodeFp(pShow, pMnode); - } else { - if (*pMnode == NULL) { - *pMnode = NULL; - } else { - *pMnode = NULL; - } - } - - return *pMnode; -} \ No newline at end of file diff --git a/src/mnode/src/mgmtProfile.c b/src/mnode/src/mgmtProfile.c index 6e9a6e9c07..d8cc5af06b 100644 --- a/src/mnode/src/mgmtProfile.c +++ b/src/mnode/src/mgmtProfile.c @@ -15,16 +15,27 @@ #define _DEFAULT_SOURCE #include "os.h" - -#include "mnode.h" -#include "mgmtProfile.h" #include "taosmsg.h" #include "tschemautil.h" +#include "mgmtProfile.h" + +typedef struct { + char user[TSDB_TABLE_ID_LEN + 1]; + uint64_t stime; + uint32_t ip; + uint16_t port; +} SConnInfo; + +typedef struct { + int numOfConns; + int index; + SConnInfo connInfo[]; +} SConnShow; typedef struct { uint32_t ip; uint16_t port; - char user[TSDB_TABLE_ID_LEN]; + char user[TSDB_TABLE_ID_LEN+ 1]; } SCDesc; typedef struct { @@ -532,3 +543,123 @@ int32_t mgmtKillConnection(char *qidstr, void *pConn) { return TSDB_CODE_INVALID_CONNECTION; } + +bool mgmtCheckQhandle(uint64_t qhandle) { + return true; +} + +void mgmtSaveQhandle(void *qhandle) { +} + +void mgmtFreeQhandle(void *qhandle) { +} + +int mgmtGetConns(SShowObj *pShow, void *pConn) { + // SAcctObj * pAcct = pConn->pAcct; + // SConnShow *pConnShow; + // + // pthread_mutex_lock(&pAcct->mutex); + // + // pConnShow = malloc(sizeof(SConnInfo) * pAcct->acctInfo.numOfConns + sizeof(SConnShow)); + // pConnShow->index = 0; + // pConnShow->numOfConns = 0; + // + // if (pAcct->acctInfo.numOfConns > 0) { + // pConn = pAcct->pConn; + // SConnInfo *pConnInfo = pConnShow->connInfo; + // + // while (pConn && pConn->pUser) { + // strcpy(pConnInfo->user, pConn->pUser->user); + // pConnInfo->ip = pConn->ip; + // pConnInfo->port = pConn->port; + // pConnInfo->stime = pConn->stime; + // + // pConnShow->numOfConns++; + // pConnInfo++; + // pConn = pConn->next; + // } + // } + // + // pthread_mutex_unlock(&pAcct->mutex); + // + // // sorting based on useconds + // + // pShow->pNode = pConnShow; + + return 0; +} + +int32_t mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { + int32_t cols = 0; + + pShow->bytes[cols] = TSDB_TABLE_NAME_LEN; + SSchema *pSchema = tsGetSchema(pMeta); + + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "user"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "ip:port"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "login time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + + pShow->numOfRows = 1000000; + pShow->pNode = NULL; + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + + mgmtGetConns(pShow, pConn); + return 0; +} + +int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn) { + int32_t numOfRows = 0; + char *pWrite; + int32_t cols = 0; + + SConnShow *pConnShow = (SConnShow *)pShow->pNode; + + if (rows > pConnShow->numOfConns - pConnShow->index) rows = pConnShow->numOfConns - pConnShow->index; + + while (numOfRows < rows) { + SConnInfo *pNode = pConnShow->connInfo + pConnShow->index; + cols = 0; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + strcpy(pWrite, pNode->user); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + uint32_t ip = pNode->ip; + sprintf(pWrite, "%d.%d.%d.%d:%hu", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24, htons(pNode->port)); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pNode->stime; + cols++; + + numOfRows++; + pConnShow->index++; + } + + if (numOfRows == 0) { + tfree(pConnShow); + } + + pShow->numOfReads += numOfRows; + return numOfRows; +} diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 2aadd8963c..5c3b540916 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -26,7 +26,6 @@ #include "mgmtAcct.h" #include "mgmtBalance.h" #include "mgmtChildTable.h" -#include "mgmtConn.h" #include "mgmtDb.h" #include "mgmtDnode.h" #include "mgmtDnodeInt.h" @@ -1164,10 +1163,8 @@ static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle) rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); } -void (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; -void (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; static void mgmtProcessAlterAcctMsg(void *pCont, int32_t contLen, void *ahandle) { if (!mgmtAlterAcctFp) { @@ -1297,6 +1294,76 @@ static void mgmtProcessCreateAcctMsg(void *pCont, int32_t contLen, void *ahandle rpcSendResponse(ahandle, code, NULL, 0); } +static void mgmtProcessCreateDnodeMsg(void *pCont, int32_t contLen, void *ahandle) { + if (!mgmtCreateDnodeFp) { + rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); + return; + } + + SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *)pCont; + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + mError("failed to create dnode:%s, redirect this message", pCreate->ip); + return; + } + + SUserObj *pUser = mgmtGetUserFromConn(ahandle); + if (pUser == NULL) { + mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(TSDB_CODE_INVALID_USER)); + rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + return; + } + + if (strcmp(pUser->user, "root") != 0) { + mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(TSDB_CODE_NO_RIGHTS)); + rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); + return; + } + + int32_t code = (*mgmtCreateDnodeFp)(inet_addr(pCreate->ip)); + if (code == TSDB_CODE_SUCCESS) { + mLPrint("dnode:%s is created by %s", pCreate->ip, pUser->user); + } else { + mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(code)); + } + + rpcSendResponse(ahandle, code, NULL, 0); +} + +static void mgmtProcessDropDnodeMsg(void *pCont, int32_t contLen, void *ahandle) { + if (!mgmtDropDnodeByIpFp) { + rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); + return; + } + + SDropDnodeMsg *pDrop = (SDropDnodeMsg *)pCont; + if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) { + mError("failed to drop dnode:%s, redirect this message", pDrop->ip); + return; + } + + SUserObj *pUser = mgmtGetUserFromConn(ahandle); + if (pUser == NULL) { + mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(TSDB_CODE_INVALID_USER)); + rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0); + return; + } + + if (strcmp(pUser->user, "root") != 0) { + mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(TSDB_CODE_NO_RIGHTS)); + rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0); + return; + } + + int32_t code = (*mgmtDropDnodeByIpFp)(inet_addr(pDrop->ip)); + if (code == TSDB_CODE_SUCCESS) { + mLPrint("dnode:%s set to removing state by %s", pDrop->ip, pUser->user); + } else { + mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(code)); + } + + rpcSendResponse(ahandle, code, NULL, 0); +} + void mgmtInitProcessShellMsg() { mgmtProcessShellMsg[TSDB_MSG_TYPE_CONNECT] = mgmtProcessConnectMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_HEARTBEAT] = mgmtProcessHeartBeatMsg; -- GitLab