未验证 提交 0cb93b06 编写于 作者: H hzcheng 提交者: GitHub

Merge pull request #1319 from taosdata/refact/slguan

Refact/slguan
...@@ -4,11 +4,11 @@ PROJECT(TDengine) ...@@ -4,11 +4,11 @@ PROJECT(TDengine)
ADD_SUBDIRECTORY(os) ADD_SUBDIRECTORY(os)
ADD_SUBDIRECTORY(util) ADD_SUBDIRECTORY(util)
ADD_SUBDIRECTORY(rpc) ADD_SUBDIRECTORY(rpc)
# ADD_SUBDIRECTORY(client) ADD_SUBDIRECTORY(client)
# ADD_SUBDIRECTORY(kit) ADD_SUBDIRECTORY(kit)
# ADD_SUBDIRECTORY(plugins) ADD_SUBDIRECTORY(plugins)
# ADD_SUBDIRECTORY(sdb) ADD_SUBDIRECTORY(sdb)
# ADD_SUBDIRECTORY(mnode) ADD_SUBDIRECTORY(mnode)
ADD_SUBDIRECTORY(vnode) ADD_SUBDIRECTORY(vnode)
# ADD_SUBDIRECTORY(dnode) ADD_SUBDIRECTORY(dnode)
#ADD_SUBDIRECTORY(connector/jdbc) #ADD_SUBDIRECTORY(connector/jdbc)
...@@ -26,15 +26,13 @@ extern "C" { ...@@ -26,15 +26,13 @@ extern "C" {
int32_t dnodeInitMgmt(); int32_t dnodeInitMgmt();
void dnodeInitMgmtIp(); void dnodeInitMgmtIp();
void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code);
void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen); void dnodeSendMsgToMnode(int8_t msgType, void *pCont, int32_t contLen);
void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen); void dnodeSendRspToMnode(void *pConn, int8_t msgType, int32_t code, void *pCont, int32_t contLen);
void dnodeSendVnodeCfgMsg(int32_t vnode); void dnodeSendVnodeCfgMsg(int32_t vnode);
void dnodeSendTableCfgMsg(int32_t vnode, int32_t sid); void dnodeSendTableCfgMsg(int32_t vnode, int32_t sid);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -30,12 +30,10 @@ ...@@ -30,12 +30,10 @@
void (*dnodeInitMgmtIpFp)() = NULL; void (*dnodeInitMgmtIpFp)() = NULL;
int32_t (*dnodeInitMgmtFp)() = NULL; int32_t (*dnodeInitMgmtFp)() = NULL;
void (*dnodeCleanUpMgmtFp)() = NULL; void (*dnodeCleanUpMgmtFp)() = NULL;
void (*dnodeProcessStatusRspFp)(void *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL;
void (*dnodeProcessStatusRspFp)(int8_t *pCont, int32_t contLen, int8_t msgType, void *pConn) = NULL;
void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL; void (*dnodeSendMsgToMnodeFp)(int8_t msgType, void *pCont, int32_t contLen) = NULL;
void (*dnodeSendRspToMnodeFp)(void *handle, int32_t code, void *pCont, int contLen) = NULL; void (*dnodeSendRspToMnodeFp)(void *handle, int32_t code, void *pCont, int contLen) = NULL;
static void *tsStatusTimer = NULL; static void *tsStatusTimer = NULL;
static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, int8_t msgType, void *pConn); static void (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contLen, int8_t msgType, void *pConn);
static void dnodeInitProcessShellMsg(); static void dnodeInitProcessShellMsg();
...@@ -173,13 +171,13 @@ void dnodeCleanUpMgmt() { ...@@ -173,13 +171,13 @@ void dnodeCleanUpMgmt() {
} }
} }
void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) { void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) { if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) {
dError("invalid msg type:%d", msgType); dError("invalid msg type:%d", msgType);
return; return;
} }
dTrace("msg:%d:%s is received from mgmt, pConn:%p", msgType, taosMsg[msgType], pConn); dTrace("msg:%d:%s is received from mgmt, pConn:%p", msgType, taosMsg[(int8_t)msgType], pConn);
if (msgType == TSDB_MSG_TYPE_STATUS_RSP && dnodeProcessStatusRspFp != NULL) { if (msgType == TSDB_MSG_TYPE_STATUS_RSP && dnodeProcessStatusRspFp != NULL) {
dnodeProcessStatusRspFp(pCont, contLen, msgType, pConn); dnodeProcessStatusRspFp(pCont, contLen, msgType, pConn);
......
...@@ -68,38 +68,6 @@ void dnodeCleanUpModules() { ...@@ -68,38 +68,6 @@ void dnodeCleanUpModules() {
} }
} }
void dnodeProcessModuleStatus(uint32_t status) {
if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_RUNING) {
return;
}
int news = status;
int olds = tsModuleStatus;
for (int moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) {
int newStatus = news & (1 << moduleType);
int oldStatus = olds & (1 << moduleType);
if (oldStatus > 0) {
if (newStatus == 0) {
if (tsModule[moduleType].stopFp) {
dPrint("module:%s is stopped on this node", tsModule[moduleType].name);
(*tsModule[moduleType].stopFp)();
}
}
} else if (oldStatus == 0) {
if (newStatus > 0) {
if (tsModule[moduleType].startFp) {
dPrint("module:%s is started on this node", tsModule[moduleType].name);
(*tsModule[moduleType].startFp)();
}
}
} else {
}
}
tsModuleStatus = status;
}
int32_t dnodeInitModules() { int32_t dnodeInitModules() {
for (int mod = 0; mod < TSDB_MOD_MAX; ++mod) { for (int mod = 0; mod < TSDB_MOD_MAX; ++mod) {
if (tsModule[mod].num != 0 && tsModule[mod].initFp) { if (tsModule[mod].num != 0 && tsModule[mod].initFp) {
......
...@@ -33,12 +33,14 @@ ...@@ -33,12 +33,14 @@
#include "dnodeVnodeMgmt.h" #include "dnodeVnodeMgmt.h"
#ifdef CLUSTER #ifdef CLUSTER
//#include "acct.h" #include "account.h"
//#include "admin.h" #include "admin.h"
//#include "cluster.h" #include "balance.h"
//#include "grant.h" #include "cluster.h"
//#include "replica.h" #include "grant.h"
//#include "storage.h" #include "mpeer.h"
#include "storage.h"
#include "vpeer.h"
#endif #endif
static pthread_mutex_t tsDnodeMutex; static pthread_mutex_t tsDnodeMutex;
...@@ -89,8 +91,6 @@ void dnodeCleanUpSystem() { ...@@ -89,8 +91,6 @@ void dnodeCleanUpSystem() {
dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED);
} }
dnodeCleanupShell(); dnodeCleanupShell();
dnodeCleanUpModules(); dnodeCleanUpModules();
dnodeCleanupVnodes(); dnodeCleanupVnodes();
...@@ -112,7 +112,13 @@ void dnodeCheckDataDirOpenned(const char *dir) { ...@@ -112,7 +112,13 @@ void dnodeCheckDataDirOpenned(const char *dir) {
void dnodeInitPlugins() { void dnodeInitPlugins() {
#ifdef CLUSTER #ifdef CLUSTER
acctInit(); // acctInit();
// adminInit();
// balanceInit();
// clusterInit();
// grantInit();
// mpeerInit();
// storageInit();
#endif #endif
} }
......
...@@ -49,8 +49,6 @@ extern int32_t (*dnodeCheckSystem)(); ...@@ -49,8 +49,6 @@ extern int32_t (*dnodeCheckSystem)();
extern void *tsDnodeMgmtQhandle; extern void *tsDnodeMgmtQhandle;
void dnodeCheckDataDirOpenned(const char* dir); void dnodeCheckDataDirOpenned(const char* dir);
void dnodeProcessMsgFromMgmt(int8_t msgType, void *pCont, int32_t contLen, void *pConn, int32_t code);
// dnodeModule // dnodeModule
extern void (*dnodeStartModules)(); extern void (*dnodeStartModules)();
......
...@@ -264,6 +264,7 @@ void mgmtCleanUpSystem(); ...@@ -264,6 +264,7 @@ void mgmtCleanUpSystem();
void mgmtStopSystem(); void mgmtStopSystem();
void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code); void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code);
void dnodeProcessMsgFromMgmt(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -25,18 +25,10 @@ extern "C" { ...@@ -25,18 +25,10 @@ extern "C" {
extern uint16_t tsMgmtMgmtPort; extern uint16_t tsMgmtMgmtPort;
extern uint16_t tsMgmtSyncPort; extern uint16_t tsMgmtSyncPort;
extern int sdbMaxNodes;
extern int tsMgmtPeerHBTimer; // seconds extern int tsMgmtPeerHBTimer; // seconds
extern char sdbZone[];
extern char sdbMasterIp[];
extern char sdbPrivateIp[];
extern char * sdbStatusStr[]; extern char * sdbStatusStr[];
extern char * sdbRoleStr[]; extern char * sdbRoleStr[];
extern void * mnodeSdb;
extern int sdbExtConns;
extern int sdbMaster; extern int sdbMaster;
extern uint32_t sdbPublicIp;
extern uint32_t sdbMasterStartTime;
extern SRpcIpSet *pSdbIpList; extern SRpcIpSet *pSdbIpList;
extern SRpcIpSet *pSdbPublicIpList; extern SRpcIpSet *pSdbPublicIpList;
...@@ -89,14 +81,9 @@ typedef struct { ...@@ -89,14 +81,9 @@ typedef struct {
// internal // internal
int syncFd; int syncFd;
void *hbTimer; void *hbTimer;
void *thandle;
void *pSync; void *pSync;
} SSdbPeer; } SSdbPeer;
SSdbPeer *sdbAddPeer(uint32_t ip, uint32_t publicIp, char role);
void sdbUpdateIpList();
extern SSdbPeer *sdbPeer[]; extern SSdbPeer *sdbPeer[];
#define sdbInited (sdbPeer[0]) #define sdbInited (sdbPeer[0])
#define sdbStatus (sdbPeer[0]->status) #define sdbStatus (sdbPeer[0]->status)
...@@ -130,8 +117,6 @@ int sdbInitPeers(char *directory); ...@@ -130,8 +117,6 @@ int sdbInitPeers(char *directory);
void sdbCleanUpPeers(); void sdbCleanUpPeers();
int sdbCfgNode(char *cont);
int64_t sdbGetVersion(); int64_t sdbGetVersion();
int32_t sdbGetRunStatus(); int32_t sdbGetRunStatus();
......
...@@ -595,23 +595,11 @@ typedef struct { ...@@ -595,23 +595,11 @@ typedef struct {
typedef struct { typedef struct {
int32_t code; int32_t code;
int32_t numOfVnodes;
SDnodeState dnodeState; SDnodeState dnodeState;
SRpcIpSet ipList; SRpcIpSet ipList;
SVnodeAccess vnodeAccess[]; SVnodeAccess vnodeAccess[];
} SStatusRsp; } SStatusRsp;
// internal message
typedef struct {
uint32_t destId;
uint32_t destIp;
char tableId[TSDB_UNI_LEN + 1];
char empty[3];
uint8_t msgType;
int32_t msgLen;
uint8_t content[0];
} SIntMsg;
typedef struct { typedef struct {
char spi; char spi;
char encrypt; char encrypt;
......
...@@ -14,7 +14,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ...@@ -14,7 +14,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
TARGET_LINK_LIBRARIES(mnode trpc tutil sdb pthread) TARGET_LINK_LIBRARIES(mnode trpc tutil sdb pthread)
IF (TD_CLUSTER) IF (TD_CLUSTER)
TARGET_LINK_LIBRARIES(mnode acct) TARGET_LINK_LIBRARIES(mnode)
ENDIF () ENDIF ()
ENDIF () ENDIF ()
......
...@@ -22,7 +22,6 @@ extern "C" { ...@@ -22,7 +22,6 @@ extern "C" {
#include "mnode.h" #include "mnode.h"
void mgmtStartBalanceTimer(int64_t mseconds);
int32_t mgmtInitBalance(); int32_t mgmtInitBalance();
void mgmtCleanupBalance(); void mgmtCleanupBalance();
int32_t mgmtAllocVnodes(SVgObj *pVgroup); int32_t mgmtAllocVnodes(SVgObj *pVgroup);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_MGMT_CONN_H
#define TDENGINE_MGMT_CONN_H
#ifdef __cplusplus
extern "C" {
#endif
#include "mnode.h"
int mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int mgmtRetrieveConns(SShowObj *pShow, char *data, int rows, void *pConn);
bool mgmtCheckQhandle(uint64_t qhandle);
void mgmtSaveQhandle(void *qhandle);
void mgmtFreeQhandle(void *qhandle);
#ifdef __cplusplus
}
#endif
#endif
...@@ -50,9 +50,16 @@ int32_t mgmtGetDnodesNum(); ...@@ -50,9 +50,16 @@ int32_t mgmtGetDnodesNum();
int32_t mgmtUpdateDnode(SDnodeObj *pDnode); int32_t mgmtUpdateDnode(SDnodeObj *pDnode);
void* mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode); void* mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode);
bool mgmtCheckConfigShow(SGlobalConfig *cfg); bool mgmtCheckConfigShow(SGlobalConfig *cfg);
bool mgmtCheckDnodeInRemoveState(SDnodeObj *pDnode);
bool mgmtCheckDnodeInOfflineState(SDnodeObj *pDnode);
void mgmtSetDnodeUnRemove(SDnodeObj *pDnode); void mgmtSetDnodeUnRemove(SDnodeObj *pDnode);
SDnodeObj* mgmtGetDnode(uint32_t ip); SDnodeObj* mgmtGetDnode(uint32_t ip);
extern int32_t (*mgmtCreateDnodeFp)(uint32_t ip);
extern int32_t (*mgmtDropDnodeByIpFp)(uint32_t ip);
void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -30,6 +30,7 @@ void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeseries); ...@@ -30,6 +30,7 @@ void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeseries);
int32_t mgmtCheckTimeSeries(uint32_t timeseries); int32_t mgmtCheckTimeSeries(uint32_t timeseries);
int32_t mgmtCheckUserGrant(); int32_t mgmtCheckUserGrant();
int32_t mgmtCheckDbGrant(); int32_t mgmtCheckDbGrant();
int32_t mgmtCheckDnodeGrant();
int32_t mgmtGetGrantsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtGetGrantsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveGrants(SShowObj *pShow, char *data, int32_t rows, void *pConn); int32_t mgmtRetrieveGrants(SShowObj *pShow, char *data, int32_t rows, void *pConn);
......
...@@ -24,7 +24,10 @@ extern "C" { ...@@ -24,7 +24,10 @@ extern "C" {
#include <stdbool.h> #include <stdbool.h>
#include "mnode.h" #include "mnode.h"
int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); int32_t mgmtAddMnode(uint32_t privateIp, uint32_t publicIp);
int32_t mgmtRemoveMnode(uint32_t privateIp);
int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn); int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pConn);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -22,20 +22,22 @@ extern "C" { ...@@ -22,20 +22,22 @@ extern "C" {
#include "mnode.h" #include "mnode.h"
int32_t mgmtGetQueryMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); bool mgmtCheckQhandle(uint64_t qhandle);
void mgmtSaveQhandle(void *qhandle);
int32_t mgmtGetStreamMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn); void mgmtFreeQhandle(void *qhandle);
int32_t mgmtSaveQueryStreamList(SHeartBeatMsg *pHBMsg);
int32_t mgmtGetQueryMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn); int32_t mgmtRetrieveQueries(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int32_t mgmtGetStreamMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn); int32_t mgmtRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int32_t mgmtSaveQueryStreamList(SHeartBeatMsg *pHBMsg); int32_t mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn);
int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn);
int32_t mgmtKillQuery(char *qidstr, void *pConn); int32_t mgmtKillQuery(char *qidstr, void *pConn);
int32_t mgmtKillStream(char *qidstr, void *pConn); int32_t mgmtKillStream(char *qidstr, void *pConn);
int32_t mgmtKillConnection(char *qidstr, void *pConn); int32_t mgmtKillConnection(char *qidstr, void *pConn);
enum { enum {
......
...@@ -28,10 +28,8 @@ int32_t mgmtInitShell(); ...@@ -28,10 +28,8 @@ int32_t mgmtInitShell();
void mgmtCleanUpShell(); void mgmtCleanUpShell();
extern int32_t (*mgmtCheckRedirectMsg)(void *pConn); extern int32_t (*mgmtCheckRedirectMsg)(void *pConn);
extern void (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle); extern void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle); extern void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
extern void (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle);
/* /*
* If table not exist, will create it * If table not exist, will create it
......
...@@ -77,11 +77,3 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) { ...@@ -77,11 +77,3 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) {
return 0; return 0;
} }
} }
char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) {
if (mgmtGetVnodeStatusFp) {
return (*mgmtGetVnodeStatusFp)(pVgroup, pVnode);
} else {
return "master";
}
}
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "mgmtConn.h"
#include "taosmsg.h"
#include "tschemautil.h"
typedef struct {
char user[TSDB_TABLE_ID_LEN];
uint64_t stime;
uint32_t ip;
uint16_t port;
} SConnInfo;
typedef struct {
int numOfConns;
int index;
SConnInfo connInfo[];
} SConnShow;
int mgmtGetConns(SShowObj *pShow, void *pConn) {
// SAcctObj * pAcct = pConn->pAcct;
// SConnShow *pConnShow;
//
// pthread_mutex_lock(&pAcct->mutex);
//
// pConnShow = malloc(sizeof(SConnInfo) * pAcct->acctInfo.numOfConns + sizeof(SConnShow));
// pConnShow->index = 0;
// pConnShow->numOfConns = 0;
//
// if (pAcct->acctInfo.numOfConns > 0) {
// pConn = pAcct->pConn;
// SConnInfo *pConnInfo = pConnShow->connInfo;
//
// while (pConn && pConn->pUser) {
// strcpy(pConnInfo->user, pConn->pUser->user);
// pConnInfo->ip = pConn->ip;
// pConnInfo->port = pConn->port;
// pConnInfo->stime = pConn->stime;
//
// pConnShow->numOfConns++;
// pConnInfo++;
// pConn = pConn->next;
// }
// }
//
// pthread_mutex_unlock(&pAcct->mutex);
//
// // sorting based on useconds
//
// pShow->pNode = pConnShow;
return 0;
}
int mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
int cols = 0;
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN;
SSchema *pSchema = tsGetSchema(pMeta);
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "user");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "ip:port");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "login time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
pShow->numOfRows = 1000000;
pShow->pNode = NULL;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
mgmtGetConns(pShow, pConn);
return 0;
}
int mgmtRetrieveConns(SShowObj *pShow, char *data, int rows, void *pConn) {
int numOfRows = 0;
char *pWrite;
int cols = 0;
SConnShow *pConnShow = (SConnShow *)pShow->pNode;
if (rows > pConnShow->numOfConns - pConnShow->index) rows = pConnShow->numOfConns - pConnShow->index;
while (numOfRows < rows) {
SConnInfo *pNode = pConnShow->connInfo + pConnShow->index;
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, pNode->user);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
uint32_t ip = pNode->ip;
sprintf(pWrite, "%d.%d.%d.%d:%hu", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24, htons(pNode->port));
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pNode->stime;
cols++;
numOfRows++;
pConnShow->index++;
}
if (numOfRows == 0) {
tfree(pConnShow);
}
pShow->numOfReads += numOfRows;
return numOfRows;
}
bool mgmtCheckQhandle(uint64_t qhandle) {
return true;
}
void mgmtSaveQhandle(void *qhandle) {
}
void mgmtFreeQhandle(void *qhandle) {
}
\ No newline at end of file
...@@ -455,9 +455,9 @@ int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) { ...@@ -455,9 +455,9 @@ int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) {
// //
// SVgObj *pVgroup = pDb->pHead; // SVgObj *pVgroup = pDb->pHead;
// while (pVgroup != NULL) { // while (pVgroup != NULL) {
// mgmtUpdateVgroupState(pVgroup, TSDB_VG_LB_STATUS_UPDATE, 0); // balanceUpdateVgroupState(pVgroup, TSDB_VG_LB_STATUS_UPDATE, 0);
// if (oldReplicaNum < pDb->cfg.replications) { // if (oldReplicaNum < pDb->cfg.replications) {
// if (!mgmtAddVnode(pVgroup, NULL, NULL)) { // if (!balanceAddVnode(pVgroup, NULL, NULL)) {
// mWarn("db:%s vgroup:%d not enough dnode to add vnode", pAlter->db, pVgroup->vgId); // mWarn("db:%s vgroup:%d not enough dnode to add vnode", pAlter->db, pVgroup->vgId);
// code = TSDB_CODE_NO_ENOUGH_DNODES; // code = TSDB_CODE_NO_ENOUGH_DNODES;
// } // }
......
...@@ -33,6 +33,9 @@ void * (*mgmtGetNextDnodeFp)(SShowObj *pShow, SDnodeObj **pDnode) = NULL; ...@@ -33,6 +33,9 @@ void * (*mgmtGetNextDnodeFp)(SShowObj *pShow, SDnodeObj **pDnode) = NULL;
int32_t (*mgmtGetScoresMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = NULL; int32_t (*mgmtGetScoresMetaFp)(STableMeta *pMeta, SShowObj *pShow, void *pConn) = NULL;
int32_t (*mgmtRetrieveScoresFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = NULL; int32_t (*mgmtRetrieveScoresFp)(SShowObj *pShow, char *data, int32_t rows, void *pConn) = NULL;
void (*mgmtSetDnodeUnRemoveFp)(SDnodeObj *pDnode) = NULL; void (*mgmtSetDnodeUnRemoveFp)(SDnodeObj *pDnode) = NULL;
int32_t (*mgmtCreateDnodeFp)(uint32_t ip) = NULL;
int32_t (*mgmtDropDnodeByIpFp)(uint32_t ip) = NULL;
static SDnodeObj tsDnodeObj = {0}; static SDnodeObj tsDnodeObj = {0};
...@@ -606,6 +609,9 @@ void *mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode) { ...@@ -606,6 +609,9 @@ void *mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode) {
int32_t mgmtGetScoresMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { int32_t mgmtGetScoresMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
if (mgmtGetScoresMetaFp) { if (mgmtGetScoresMetaFp) {
SUserObj *pUser = mgmtGetUserFromConn(pConn);
if (pUser == NULL) return 0;
if (strcmp(pUser->user, "root") != 0) return TSDB_CODE_NO_RIGHTS;
return mgmtGetScoresMetaFp(pMeta, pShow, pConn); return mgmtGetScoresMetaFp(pMeta, pShow, pConn);
} else { } else {
return TSDB_CODE_OPS_NOT_SUPPORT; return TSDB_CODE_OPS_NOT_SUPPORT;
...@@ -633,3 +639,17 @@ bool mgmtCheckConfigShow(SGlobalConfig *cfg) { ...@@ -633,3 +639,17 @@ bool mgmtCheckConfigShow(SGlobalConfig *cfg) {
return false; return false;
return true; return true;
} }
/**
* check if a dnode in remove state
**/
bool mgmtCheckDnodeInRemoveState(SDnodeObj *pDnode) {
return pDnode->lbStatus == TSDB_DN_LB_STATUS_OFFLINE_REMOVING || pDnode->lbStatus == TSDB_DN_LB_STATE_SHELL_REMOVING;
}
/**
* check if a dnode in offline state
**/
bool mgmtCheckDnodeInOfflineState(SDnodeObj *pDnode) {
return pDnode->status == TSDB_DN_STATUS_OFFLINE;
}
...@@ -312,7 +312,7 @@ int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) { ...@@ -312,7 +312,7 @@ int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) {
mTrace("dnode:%s, custom score set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->customScore, score); mTrace("dnode:%s, custom score set from:%d to:%d", taosIpStr(pDnode->privateIp), pDnode->customScore, score);
pDnode->customScore = score; pDnode->customScore = score;
mgmtUpdateDnode(pDnode); mgmtUpdateDnode(pDnode);
mgmtStartBalanceTimer(15); //mgmtStartBalanceTimer(15);
} }
return TSDB_CODE_INVALID_SQL; return TSDB_CODE_INVALID_SQL;
} else if (strncasecmp(option, "bandwidth", 9) == 0) { } else if (strncasecmp(option, "bandwidth", 9) == 0) {
......
...@@ -22,6 +22,7 @@ ...@@ -22,6 +22,7 @@
int32_t (*mgmtCheckUserGrantFp)() = NULL; int32_t (*mgmtCheckUserGrantFp)() = NULL;
int32_t (*mgmtCheckDbGrantFp)() = NULL; int32_t (*mgmtCheckDbGrantFp)() = NULL;
int32_t (*mgmtCheckDnodeGrantFp)() = NULL;
void (*mgmtAddTimeSeriesFp)(uint32_t timeSeriesNum) = NULL; void (*mgmtAddTimeSeriesFp)(uint32_t timeSeriesNum) = NULL;
void (*mgmtRestoreTimeSeriesFp)(uint32_t timeSeriesNum) = NULL; void (*mgmtRestoreTimeSeriesFp)(uint32_t timeSeriesNum) = NULL;
int32_t (*mgmtCheckTimeSeriesFp)(uint32_t timeseries) = NULL; int32_t (*mgmtCheckTimeSeriesFp)(uint32_t timeseries) = NULL;
...@@ -32,7 +33,7 @@ void (*mgmtUpdateGrantInfoFp)(void *pCont) = NULL; ...@@ -32,7 +33,7 @@ void (*mgmtUpdateGrantInfoFp)(void *pCont) = NULL;
int32_t mgmtCheckUserGrant() { int32_t mgmtCheckUserGrant() {
if (mgmtCheckUserGrantFp) { if (mgmtCheckUserGrantFp) {
return mgmtCheckUserGrantFp(); return (*mgmtCheckUserGrantFp)();
} else { } else {
return 0; return 0;
} }
...@@ -40,7 +41,15 @@ int32_t mgmtCheckUserGrant() { ...@@ -40,7 +41,15 @@ int32_t mgmtCheckUserGrant() {
int32_t mgmtCheckDbGrant() { int32_t mgmtCheckDbGrant() {
if (mgmtCheckDbGrantFp) { if (mgmtCheckDbGrantFp) {
return mgmtCheckDbGrantFp(); return (*mgmtCheckDbGrantFp)();
} else {
return 0;
}
}
int32_t mgmtCheckDnodeGrant() {
if (mgmtCheckDnodeGrantFp) {
return (*mgmtCheckDnodeGrantFp)();
} else { } else {
return 0; return 0;
} }
...@@ -49,20 +58,20 @@ int32_t mgmtCheckDbGrant() { ...@@ -49,20 +58,20 @@ int32_t mgmtCheckDbGrant() {
void mgmtAddTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum) { void mgmtAddTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum) {
pAcct->acctInfo.numOfTimeSeries += timeSeriesNum; pAcct->acctInfo.numOfTimeSeries += timeSeriesNum;
if (mgmtAddTimeSeriesFp) { if (mgmtAddTimeSeriesFp) {
mgmtAddTimeSeriesFp(timeSeriesNum); (*mgmtAddTimeSeriesFp)(timeSeriesNum);
} }
} }
void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum) { void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeSeriesNum) {
pAcct->acctInfo.numOfTimeSeries -= timeSeriesNum; pAcct->acctInfo.numOfTimeSeries -= timeSeriesNum;
if (mgmtRestoreTimeSeriesFp) { if (mgmtRestoreTimeSeriesFp) {
mgmtRestoreTimeSeriesFp(timeSeriesNum); (*mgmtRestoreTimeSeriesFp)(timeSeriesNum);
} }
} }
int32_t mgmtCheckTimeSeries(uint32_t timeseries) { int32_t mgmtCheckTimeSeries(uint32_t timeseries) {
if (mgmtCheckTimeSeriesFp) { if (mgmtCheckTimeSeriesFp) {
return mgmtCheckTimeSeriesFp(timeseries); return (*mgmtCheckTimeSeriesFp)(timeseries);
} else { } else {
return 0; return 0;
} }
......
...@@ -18,12 +18,48 @@ ...@@ -18,12 +18,48 @@
#include "mgmtMnode.h" #include "mgmtMnode.h"
#include "mgmtUser.h" #include "mgmtUser.h"
void *(*mgmtGetNextMnodeFp)(SShowObj *pShow, SSdbPeer **pMnode) = NULL; int32_t (*mgmtAddMnodeFp)(uint32_t privateIp, uint32_t publicIp) = NULL;
int32_t (*mgmtInitMnodesFp)() = NULL; int32_t (*mgmtRemoveMnodeFp)(uint32_t privateIp) = NULL;
int32_t (*mgmtGetMnodesNumFp)() = NULL; int32_t (*mgmtGetMnodesNumFp)() = NULL;
void * (*mgmtGetNextMnodeFp)(SShowObj *pShow, SSdbPeer **pMnode) = NULL;
static int32_t mgmtGetMnodesNum(); int32_t mgmtAddMnode(uint32_t privateIp, uint32_t publicIp) {
static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode); if (mgmtAddMnodeFp) {
return (*mgmtAddMnodeFp)(privateIp, publicIp);
} else {
return 0;
}
}
int32_t mgmtRemoveMnode(uint32_t privateIp) {
if (mgmtRemoveMnodeFp) {
return (*mgmtRemoveMnodeFp)(privateIp);
} else {
return 0;
}
}
static int32_t mgmtGetMnodesNum() {
if (mgmtGetMnodesNumFp) {
return (*mgmtGetMnodesNumFp)();
} else {
return 1;
}
}
static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode) {
if (mgmtGetNextMnodeFp) {
return (*mgmtGetNextMnodeFp)(pShow, pMnode);
} else {
if (*pMnode == NULL) {
*pMnode = NULL;
} else {
*pMnode = NULL;
}
}
return *pMnode;
}
int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { int32_t mgmtGetMnodeMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0; int32_t cols = 0;
...@@ -88,11 +124,8 @@ int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon ...@@ -88,11 +124,8 @@ int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon
char ipstr[20]; char ipstr[20];
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pNode = mgmtGetNextMnode(pShow, (SDnodeObj **)&pMnode); pShow->pNode = mgmtGetNextMnode(pShow, (SSdbPeer **)&pMnode);
if (pMnode == NULL) break;
// pShow->pNode = sdbFetchRow(mnodeSdb, pShow->pNode, (void **)&pMnode);
// if (pMnode == NULL) break;
cols = 0; cols = 0;
...@@ -123,25 +156,3 @@ int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon ...@@ -123,25 +156,3 @@ int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon
pShow->numOfReads += numOfRows; pShow->numOfReads += numOfRows;
return numOfRows; return numOfRows;
} }
static int32_t mgmtGetMnodesNum() {
if (mgmtGetMnodesNumFp) {
return mgmtGetMnodesNumFp();
} else {
return 1;
}
}
static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode) {
if (mgmtGetNextMnodeFp) {
return mgmtGetNextMnodeFp(pShow, pMnode);
} else {
if (*pMnode == NULL) {
*pMnode = NULL;
} else {
*pMnode = NULL;
}
}
return *pMnode;
}
\ No newline at end of file
...@@ -15,16 +15,27 @@ ...@@ -15,16 +15,27 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "mnode.h"
#include "mgmtProfile.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tschemautil.h" #include "tschemautil.h"
#include "mgmtProfile.h"
typedef struct {
char user[TSDB_TABLE_ID_LEN + 1];
uint64_t stime;
uint32_t ip;
uint16_t port;
} SConnInfo;
typedef struct {
int numOfConns;
int index;
SConnInfo connInfo[];
} SConnShow;
typedef struct { typedef struct {
uint32_t ip; uint32_t ip;
uint16_t port; uint16_t port;
char user[TSDB_TABLE_ID_LEN]; char user[TSDB_TABLE_ID_LEN+ 1];
} SCDesc; } SCDesc;
typedef struct { typedef struct {
...@@ -532,3 +543,123 @@ int32_t mgmtKillConnection(char *qidstr, void *pConn) { ...@@ -532,3 +543,123 @@ int32_t mgmtKillConnection(char *qidstr, void *pConn) {
return TSDB_CODE_INVALID_CONNECTION; return TSDB_CODE_INVALID_CONNECTION;
} }
bool mgmtCheckQhandle(uint64_t qhandle) {
return true;
}
void mgmtSaveQhandle(void *qhandle) {
}
void mgmtFreeQhandle(void *qhandle) {
}
int mgmtGetConns(SShowObj *pShow, void *pConn) {
// SAcctObj * pAcct = pConn->pAcct;
// SConnShow *pConnShow;
//
// pthread_mutex_lock(&pAcct->mutex);
//
// pConnShow = malloc(sizeof(SConnInfo) * pAcct->acctInfo.numOfConns + sizeof(SConnShow));
// pConnShow->index = 0;
// pConnShow->numOfConns = 0;
//
// if (pAcct->acctInfo.numOfConns > 0) {
// pConn = pAcct->pConn;
// SConnInfo *pConnInfo = pConnShow->connInfo;
//
// while (pConn && pConn->pUser) {
// strcpy(pConnInfo->user, pConn->pUser->user);
// pConnInfo->ip = pConn->ip;
// pConnInfo->port = pConn->port;
// pConnInfo->stime = pConn->stime;
//
// pConnShow->numOfConns++;
// pConnInfo++;
// pConn = pConn->next;
// }
// }
//
// pthread_mutex_unlock(&pAcct->mutex);
//
// // sorting based on useconds
//
// pShow->pNode = pConnShow;
return 0;
}
int32_t mgmtGetConnsMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0;
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN;
SSchema *pSchema = tsGetSchema(pMeta);
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "user");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "ip:port");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "login time");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
pShow->numOfRows = 1000000;
pShow->pNode = NULL;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
mgmtGetConns(pShow, pConn);
return 0;
}
int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
char *pWrite;
int32_t cols = 0;
SConnShow *pConnShow = (SConnShow *)pShow->pNode;
if (rows > pConnShow->numOfConns - pConnShow->index) rows = pConnShow->numOfConns - pConnShow->index;
while (numOfRows < rows) {
SConnInfo *pNode = pConnShow->connInfo + pConnShow->index;
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, pNode->user);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
uint32_t ip = pNode->ip;
sprintf(pWrite, "%d.%d.%d.%d:%hu", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24, htons(pNode->port));
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int64_t *)pWrite = pNode->stime;
cols++;
numOfRows++;
pConnShow->index++;
}
if (numOfRows == 0) {
tfree(pConnShow);
}
pShow->numOfReads += numOfRows;
return numOfRows;
}
...@@ -26,7 +26,6 @@ ...@@ -26,7 +26,6 @@
#include "mgmtAcct.h" #include "mgmtAcct.h"
#include "mgmtBalance.h" #include "mgmtBalance.h"
#include "mgmtChildTable.h" #include "mgmtChildTable.h"
#include "mgmtConn.h"
#include "mgmtDb.h" #include "mgmtDb.h"
#include "mgmtDnode.h" #include "mgmtDnode.h"
#include "mgmtDnodeInt.h" #include "mgmtDnodeInt.h"
...@@ -52,7 +51,6 @@ static void (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contL ...@@ -52,7 +51,6 @@ static void (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contL
static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle); static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle);
static int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey); static int mgmtRetriveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey);
uint32_t mgmtAccessSquence = 0;
void *tsShellConnServer = NULL; void *tsShellConnServer = NULL;
void mgmtProcessTranRequest(SSchedMsg *sched) { void mgmtProcessTranRequest(SSchedMsg *sched) {
...@@ -1164,10 +1162,8 @@ static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle) ...@@ -1164,10 +1162,8 @@ static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle)
rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0); rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0);
} }
void (*mgmtProcessCreateDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg;
void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; void (*mgmtProcessCfgMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg;
void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg; void (*mgmtProcessDropMnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg;
void (*mgmtProcessDropDnodeMsg)(void *pCont, int32_t contLen, void *ahandle) = mgmtProcessUnSupportMsg;
static void mgmtProcessAlterAcctMsg(void *pCont, int32_t contLen, void *ahandle) { static void mgmtProcessAlterAcctMsg(void *pCont, int32_t contLen, void *ahandle) {
if (!mgmtAlterAcctFp) { if (!mgmtAlterAcctFp) {
...@@ -1297,6 +1293,76 @@ static void mgmtProcessCreateAcctMsg(void *pCont, int32_t contLen, void *ahandle ...@@ -1297,6 +1293,76 @@ static void mgmtProcessCreateAcctMsg(void *pCont, int32_t contLen, void *ahandle
rpcSendResponse(ahandle, code, NULL, 0); rpcSendResponse(ahandle, code, NULL, 0);
} }
static void mgmtProcessCreateDnodeMsg(void *pCont, int32_t contLen, void *ahandle) {
if (!mgmtCreateDnodeFp) {
rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0);
return;
}
SCreateDnodeMsg *pCreate = (SCreateDnodeMsg *)pCont;
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
mError("failed to create dnode:%s, redirect this message", pCreate->ip);
return;
}
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
if (pUser == NULL) {
mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(TSDB_CODE_INVALID_USER));
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
return;
}
if (strcmp(pUser->user, "root") != 0) {
mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(TSDB_CODE_NO_RIGHTS));
rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0);
return;
}
int32_t code = (*mgmtCreateDnodeFp)(inet_addr(pCreate->ip));
if (code == TSDB_CODE_SUCCESS) {
mLPrint("dnode:%s is created by %s", pCreate->ip, pUser->user);
} else {
mError("failed to create dnode:%s, reason:%s", pCreate->ip, tstrerror(code));
}
rpcSendResponse(ahandle, code, NULL, 0);
}
static void mgmtProcessDropDnodeMsg(void *pCont, int32_t contLen, void *ahandle) {
if (!mgmtDropDnodeByIpFp) {
rpcSendResponse(ahandle, TSDB_CODE_OPS_NOT_SUPPORT, NULL, 0);
return;
}
SDropDnodeMsg *pDrop = (SDropDnodeMsg *)pCont;
if (mgmtCheckRedirectMsg(ahandle) != TSDB_CODE_SUCCESS) {
mError("failed to drop dnode:%s, redirect this message", pDrop->ip);
return;
}
SUserObj *pUser = mgmtGetUserFromConn(ahandle);
if (pUser == NULL) {
mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(TSDB_CODE_INVALID_USER));
rpcSendResponse(ahandle, TSDB_CODE_INVALID_USER, NULL, 0);
return;
}
if (strcmp(pUser->user, "root") != 0) {
mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(TSDB_CODE_NO_RIGHTS));
rpcSendResponse(ahandle, TSDB_CODE_NO_RIGHTS, NULL, 0);
return;
}
int32_t code = (*mgmtDropDnodeByIpFp)(inet_addr(pDrop->ip));
if (code == TSDB_CODE_SUCCESS) {
mLPrint("dnode:%s set to removing state by %s", pDrop->ip, pUser->user);
} else {
mError("failed to drop dnode:%s, reason:%s", pDrop->ip, tstrerror(code));
}
rpcSendResponse(ahandle, code, NULL, 0);
}
void mgmtInitProcessShellMsg() { void mgmtInitProcessShellMsg() {
mgmtProcessShellMsg[TSDB_MSG_TYPE_CONNECT] = mgmtProcessConnectMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_CONNECT] = mgmtProcessConnectMsg;
mgmtProcessShellMsg[TSDB_MSG_TYPE_HEARTBEAT] = mgmtProcessHeartBeatMsg; mgmtProcessShellMsg[TSDB_MSG_TYPE_HEARTBEAT] = mgmtProcessHeartBeatMsg;
......
...@@ -61,10 +61,6 @@ int32_t mgmtCheckMgmtRunning() { ...@@ -61,10 +61,6 @@ int32_t mgmtCheckMgmtRunning() {
tsetModuleStatus(TSDB_MOD_MGMT); tsetModuleStatus(TSDB_MOD_MGMT);
// strcpy(sdbMasterIp, mgmtIpStr[0]);
// strcpy(sdbPrivateIp, tsPrivateIp);
// sdbPublicIp = inet_addr(tsPublicIp);
return 0; return 0;
} }
......
...@@ -318,6 +318,27 @@ int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) { ...@@ -318,6 +318,27 @@ int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
return 0; return 0;
} }
char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) {
SDnodeObj *pDnode = mgmtGetDnode(pVnode->ip);
if (pDnode == NULL) {
mError("dnode:%s, vgroup:%d, vnode:%d dnode not exist", taosIpStr(pVnode->ip), pVgroup->vgId, pVnode->vnode);
return "null";
}
if (pDnode->status == TSDB_DN_STATUS_OFFLINE) {
return "offline";
}
SVnodeLoad *vload = pDnode->vload + pVnode->vnode;
if (vload->vgId != pVgroup->vgId || vload->vnode != pVnode->vnode) {
mError("dnode:%s, vgroup:%d, vnode:%d not same with dnode vgroup:%d vnode:%d",
taosIpStr(pVnode->ip), pVgroup->vgId, pVnode->vnode, vload->vgId, vload->vnode);
return "null";
}
return (char*)taosGetVnodeStatusStr(vload->status);
}
int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn) { int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0; int32_t numOfRows = 0;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
......
...@@ -13,6 +13,6 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ...@@ -13,6 +13,6 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
TARGET_LINK_LIBRARIES(http taos_static z) TARGET_LINK_LIBRARIES(http taos_static z)
IF (TD_CLUSTER) IF (TD_CLUSTER)
TARGET_LINK_LIBRARIES(http http_admin) TARGET_LINK_LIBRARIES(http)
ENDIF () ENDIF ()
ENDIF () ENDIF ()
...@@ -34,13 +34,21 @@ ...@@ -34,13 +34,21 @@
#include "tgHandle.h" #include "tgHandle.h"
#include "tlog.h" #include "tlog.h"
#ifdef CLUSTER
void adminInitHandle(HttpServer* pServer); void (*adminInitHandleFp)(HttpServer* pServer) = NULL;
void opInitHandle(HttpServer* pServer); void (*opInitHandleFp)(HttpServer* pServer) = NULL;
#else
void adminInitHandle(HttpServer* pServer) {} void adminInitHandle(HttpServer* pServer) {
void opInitHandle(HttpServer* pServer) {} if (adminInitHandleFp) {
#endif (*adminInitHandleFp)(pServer);
}
}
void opInitHandle(HttpServer* pServer) {
if (opInitHandleFp) {
(*opInitHandleFp)(pServer);
}
}
static HttpServer *httpServer = NULL; static HttpServer *httpServer = NULL;
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable); void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
......
...@@ -11,6 +11,6 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM)) ...@@ -11,6 +11,6 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
ADD_LIBRARY(sdb ${SRC}) ADD_LIBRARY(sdb ${SRC})
TARGET_LINK_LIBRARIES(sdb trpc) TARGET_LINK_LIBRARIES(sdb trpc)
IF (TD_CLUSTER) IF (TD_CLUSTER)
TARGET_LINK_LIBRARIES(sdb mreplica) TARGET_LINK_LIBRARIES(sdb)
ENDIF() ENDIF()
ENDIF () ENDIF ()
...@@ -48,6 +48,21 @@ ...@@ -48,6 +48,21 @@
#define sdbPrint(...) \ #define sdbPrint(...) \
{ tprintf("MND-SDB ", 255, __VA_ARGS__); } { tprintf("MND-SDB ", 255, __VA_ARGS__); }
#define mpeerError(...) \
if (sdbDebugFlag & DEBUG_ERROR) { \
tprintf("ERROR MND-MPEER ", 255, __VA_ARGS__); \
}
#define mpeerWarn(...) \
if (sdbDebugFlag & DEBUG_WARN) { \
tprintf("WARN MND-MPEER ", sdbDebugFlag, __VA_ARGS__); \
}
#define mpeerTrace(...) \
if (sdbDebugFlag & DEBUG_TRACE) { \
tprintf("MND-MPEER ", sdbDebugFlag, __VA_ARGS__); \
}
#define mpeerPrint(...) \
{ tprintf("MND-MPEER ", 255, __VA_ARGS__); }
#define sdbLError(...) taosLogError(__VA_ARGS__) sdbError(__VA_ARGS__) #define sdbLError(...) taosLogError(__VA_ARGS__) sdbError(__VA_ARGS__)
#define sdbLWarn(...) taosLogWarn(__VA_ARGS__) sdbWarn(__VA_ARGS__) #define sdbLWarn(...) taosLogWarn(__VA_ARGS__) sdbWarn(__VA_ARGS__)
#define sdbLPrint(...) taosLogPrint(__VA_ARGS__) sdbPrint(__VA_ARGS__) #define sdbLPrint(...) taosLogPrint(__VA_ARGS__) sdbPrint(__VA_ARGS__)
...@@ -69,12 +84,7 @@ typedef struct { ...@@ -69,12 +84,7 @@ typedef struct {
char *row; char *row;
} SSdbUpdate; } SSdbUpdate;
typedef struct { typedef struct _SSdbTable {
char numOfTables;
uint64_t version[];
} SSdbSync;
typedef struct {
SSdbHeader header; SSdbHeader header;
int maxRows; int maxRows;
int dbId; int dbId;
...@@ -109,23 +119,6 @@ typedef struct { ...@@ -109,23 +119,6 @@ typedef struct {
char data[]; char data[];
} SRowHead; } SRowHead;
typedef struct {
char * buffer;
char * offset;
int trans;
int bufferSize;
pthread_mutex_t qmutex;
} STranQueue;
typedef struct {
char status;
char role;
char numOfMnodes;
uint64_t dbVersion;
uint32_t numOfDnodes;
uint32_t publicIp;
} SMnodeStatus;
typedef struct { typedef struct {
uint8_t dbId; uint8_t dbId;
char type; char type;
...@@ -140,7 +133,7 @@ extern int sdbNumOfTables; ...@@ -140,7 +133,7 @@ extern int sdbNumOfTables;
extern int64_t sdbVersion; extern int64_t sdbVersion;
int sdbForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int dataLen); int sdbForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int dataLen);
int sdbRetrieveRows(int fd, SSdbTable *pTable, uint64_t version); int mpeerRetrieveRows(int fd, SSdbTable *pTable, uint64_t version);
void sdbResetTable(SSdbTable *pTable); void sdbResetTable(SSdbTable *pTable);
extern const int16_t sdbFileVersion; extern const int16_t sdbFileVersion;
......
...@@ -23,7 +23,6 @@ ...@@ -23,7 +23,6 @@
extern char version[]; extern char version[];
const int16_t sdbFileVersion = 0; const int16_t sdbFileVersion = 0;
int sdbExtConns = 0;
SRpcIpSet *pSdbIpList = NULL; SRpcIpSet *pSdbIpList = NULL;
SRpcIpSet *pSdbPublicIpList = NULL; SRpcIpSet *pSdbPublicIpList = NULL;
SSdbPeer * sdbPeer[SDB_MAX_PEERS]; // first slot for self SSdbPeer * sdbPeer[SDB_MAX_PEERS]; // first slot for self
......
...@@ -12,32 +12,47 @@ ...@@ -12,32 +12,47 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE
#include "sdbint.h" #include "sdbint.h"
char* sdbStatusStr[] = {"offline", "unsynced", "syncing", "serving", "null"}; int32_t (*mpeerInitMnodesFp)(char *directory) = NULL;
void (*mpeerCleanUpMnodesFp)() = NULL;
char* sdbRoleStr[] = {"unauthed", "undecided", "master", "slave", "null"}; int32_t (*mpeerForwardRequestFp)(SSdbTable *pTable, char type, void *cont, int32_t contLen) = NULL;
#ifndef CLUSTER char *sdbStatusStr[] = {
"offline",
/* "unsynced",
* Lite Version sync request is always successful "syncing",
*/ "serving",
int sdbForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int dataLen) { "null"
return 0; };
char *sdbRoleStr[] = {
"unauthed",
"undecided",
"master",
"slave",
"null"
};
int32_t sdbForwardDbReqToPeer(SSdbTable *pTable, char type, char *data, int32_t dataLen) {
if (mpeerForwardRequestFp) {
return mpeerForwardRequestFp(pTable, type, data, dataLen);
} else {
return 0;
}
} }
/* int32_t sdbInitPeers(char *directory) {
* Lite Version does not need to initialize peers if (mpeerInitMnodesFp) {
*/ return (*mpeerInitMnodesFp)(directory);
int sdbInitPeers(char *directory) { } else {
return 0; return 0;
}
} }
/* void sdbCleanUpPeers() {
* Lite Version does not need to cleanup peers if (mpeerCleanUpMnodesFp) {
*/ (*mpeerCleanUpMnodesFp)();
void sdbCleanUpPeers(){} }
}
#endif
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册