提交 19b1d449 编写于 作者: S slguan

rearrange code directory

上级 0f202d9f
......@@ -23,7 +23,6 @@ extern "C" {
int32_t dnodeInitMClient();
void dnodeCleanupMClient();
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg);
uint32_t dnodeGetMnodeMasteIp();
void * dnodeGetMpeerInfos();
int32_t dnodeGetDnodeId();
......
......@@ -21,9 +21,9 @@ extern "C" {
#endif
int32_t dnodeInitModules();
void dnodeCleanUpModules();
void dnodeStartModules();
void dnodeProcessModuleStatus(uint32_t moduleStatus);
void dnodeStartModules();
void dnodeCleanUpModules();
void dnodeProcessModuleStatus(uint32_t moduleStatus);
#ifdef __cplusplus
}
......
......@@ -17,7 +17,6 @@
#include "os.h"
#include "tglobalcfg.h"
#include "tlog.h"
#include "tmodule.h"
#include "trpc.h"
#include "tutil.h"
#include "dnode.h"
......
......@@ -15,55 +15,74 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosdef.h"
#include "tlog.h"
#include "tmodule.h"
#include "tglobalcfg.h"
#include "mnode.h"
#include "http.h"
#include "monitor.h"
#include "dnodeModule.h"
#include "dnode.h"
static void dnodeAllocModules() {
tsModule[TSDB_MOD_MGMT].name = "mgmt";
tsModule[TSDB_MOD_MGMT].initFp = mgmtInitSystem;
tsModule[TSDB_MOD_MGMT].cleanUpFp = mgmtCleanUpSystem;
tsModule[TSDB_MOD_MGMT].startFp = mgmtStartSystem;
tsModule[TSDB_MOD_MGMT].stopFp = mgmtStopSystem;
tsModule[TSDB_MOD_MGMT].num = tsNumOfMPeers;
tsModule[TSDB_MOD_MGMT].curNum = 0;
tsModule[TSDB_MOD_MGMT].equalVnodeNum = tsMgmtEqualVnodeNum;
typedef struct {
bool enable;
char * name;
int32_t (*initFp)();
int32_t (*startFp)();
void (*cleanUpFp)();
void (*stopFp)();
} SModule;
static SModule tsModule[TSDB_MOD_MAX] = {0};
static uint32_t tsModuleStatus = 0;
tsModule[TSDB_MOD_HTTP].name = "http";
tsModule[TSDB_MOD_HTTP].initFp = httpInitSystem;
tsModule[TSDB_MOD_HTTP].cleanUpFp = httpCleanUpSystem;
tsModule[TSDB_MOD_HTTP].startFp = httpStartSystem;
tsModule[TSDB_MOD_HTTP].stopFp = httpStopSystem;
tsModule[TSDB_MOD_HTTP].num = (tsEnableHttpModule == 1) ? -1 : 0;
tsModule[TSDB_MOD_HTTP].curNum = 0;
tsModule[TSDB_MOD_HTTP].equalVnodeNum = 0;
static void dnodeSetModuleStatus(int32_t module) {
tsModuleStatus |= (1 << module);
}
static void dnodeUnSetModuleStatus(int32_t module) {
tsModuleStatus &= ~(1 << module);
}
tsModule[TSDB_MOD_MONITOR].name = "monitor";
tsModule[TSDB_MOD_MONITOR].initFp = monitorInitSystem;
tsModule[TSDB_MOD_MONITOR].cleanUpFp = monitorCleanUpSystem;
tsModule[TSDB_MOD_MONITOR].startFp = monitorStartSystem;
tsModule[TSDB_MOD_MONITOR].stopFp = monitorStopSystem;
tsModule[TSDB_MOD_MONITOR].num = (tsEnableMonitorModule == 1) ? -1 : 0;
tsModule[TSDB_MOD_MONITOR].curNum = 0;
tsModule[TSDB_MOD_MONITOR].equalVnodeNum = 0;
static void dnodeAllocModules() {
tsModule[TSDB_MOD_MGMT].name = false;
tsModule[TSDB_MOD_MGMT].name = "mgmt";
tsModule[TSDB_MOD_MGMT].initFp = mgmtInitSystem;
tsModule[TSDB_MOD_MGMT].cleanUpFp = mgmtCleanUpSystem;
tsModule[TSDB_MOD_MGMT].startFp = mgmtStartSystem;
tsModule[TSDB_MOD_MGMT].stopFp = mgmtStopSystem;
tsModule[TSDB_MOD_HTTP].enable = (tsEnableHttpModule == 1);
tsModule[TSDB_MOD_HTTP].name = "http";
tsModule[TSDB_MOD_HTTP].initFp = httpInitSystem;
tsModule[TSDB_MOD_HTTP].cleanUpFp = httpCleanUpSystem;
tsModule[TSDB_MOD_HTTP].startFp = httpStartSystem;
tsModule[TSDB_MOD_HTTP].stopFp = httpStopSystem;
if (tsEnableHttpModule) {
dnodeSetModuleStatus(TSDB_MOD_HTTP);
}
tsModule[TSDB_MOD_MONITOR].enable = (tsEnableMonitorModule == 1);
tsModule[TSDB_MOD_MONITOR].name = "monitor";
tsModule[TSDB_MOD_MONITOR].initFp = monitorInitSystem;
tsModule[TSDB_MOD_MONITOR].cleanUpFp = monitorCleanUpSystem;
tsModule[TSDB_MOD_MONITOR].startFp = monitorStartSystem;
tsModule[TSDB_MOD_MONITOR].stopFp = monitorStopSystem;
if (tsEnableMonitorModule) {
dnodeSetModuleStatus(TSDB_MOD_MONITOR);
}
}
void dnodeCleanUpModules() {
for (int mod = 1; mod < TSDB_MOD_MAX; ++mod) {
if (tsModule[mod].num != 0 && tsModule[mod].stopFp) {
(*tsModule[mod].stopFp)();
for (int32_t module = 1; module < TSDB_MOD_MAX; ++module) {
if (tsModule[module].enable && tsModule[module].stopFp) {
(*tsModule[module].stopFp)();
}
if (tsModule[mod].num != 0 && tsModule[mod].cleanUpFp) {
(*tsModule[mod].cleanUpFp)();
if (tsModule[module].cleanUpFp) {
(*tsModule[module].cleanUpFp)();
}
}
if (tsModule[TSDB_MOD_MGMT].num != 0 && tsModule[TSDB_MOD_MGMT].cleanUpFp) {
if (tsModule[TSDB_MOD_MGMT].enable && tsModule[TSDB_MOD_MGMT].cleanUpFp) {
(*tsModule[TSDB_MOD_MGMT].cleanUpFp)();
}
}
......@@ -71,57 +90,41 @@ void dnodeCleanUpModules() {
int32_t dnodeInitModules() {
dnodeAllocModules();
for (int mod = 0; mod < TSDB_MOD_MAX; ++mod) {
if (tsModule[mod].num != 0 && tsModule[mod].initFp) {
if ((*tsModule[mod].initFp)() != 0) {
dError("failed to init modules");
for (int32_t module = 0; module < TSDB_MOD_MAX; ++module) {
if (tsModule[module].initFp) {
if ((*tsModule[module].initFp)() != 0) {
dError("failed to init module:%s", tsModule[module].name);
return -1;
}
}
}
return TSDB_CODE_SUCCESS;
return 0;
}
void dnodeStartModules() {
// for (int mod = 1; mod < TSDB_MOD_MAX; ++mod) {
// if (tsModule[mod].num != 0 && tsModule[mod].startFp) {
// if ((*tsModule[mod].startFp)() != 0) {
// dError("failed to start module:%d", mod);
// }
// }
// }
for (int32_t module = 1; module < TSDB_MOD_MAX; ++module) {
if (tsModule[module].enable && tsModule[module].startFp) {
if ((*tsModule[module].startFp)() != 0) {
dError("failed to start module:%s", tsModule[module].name);
}
}
}
}
void dnodeProcessModuleStatus(uint32_t moduleStatus) {
if (moduleStatus == tsModuleStatus) return;
dPrint("module status is received, old:%d, new:%d", tsModuleStatus, moduleStatus);
int news = moduleStatus;
int olds = tsModuleStatus;
for (int moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) {
int newStatus = news & (1 << moduleType);
int oldStatus = olds & (1 << moduleType);
if (oldStatus > 0) {
if (newStatus == 0) {
if (tsModule[moduleType].stopFp) {
dPrint("module:%s is stopped on this node", tsModule[moduleType].name);
(*tsModule[moduleType].stopFp)();
}
}
} else if (oldStatus == 0) {
if (newStatus > 0) {
if (tsModule[moduleType].startFp) {
dPrint("module:%s is started on this node", tsModule[moduleType].name);
(*tsModule[moduleType].startFp)();
}
}
} else {
}
bool enableMgmtModule = moduleStatus & (1 << TSDB_MOD_MGMT);
if (!tsModule[TSDB_MOD_MGMT].enable && enableMgmtModule) {
dPrint("module status is received, start mgmt module", tsModuleStatus, moduleStatus);
tsModule[TSDB_MOD_MGMT].enable = true;
dnodeSetModuleStatus(TSDB_MOD_MGMT);
(*tsModule[TSDB_MOD_MGMT].stopFp)();
}
tsModuleStatus = moduleStatus;
if (tsModule[TSDB_MOD_MGMT].enable && !enableMgmtModule) {
dPrint("module status is received, stop mgmt module", tsModuleStatus, moduleStatus);
tsModule[TSDB_MOD_MGMT].enable = false;
dnodeUnSetModuleStatus(TSDB_MOD_MGMT);
(*tsModule[TSDB_MOD_MGMT].stopFp)();
}
}
......@@ -20,9 +20,6 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
typedef struct {
int32_t queryReqNum;
int32_t submitReqNum;
......@@ -45,6 +42,7 @@ void dnodeFreeRqueue(void *rqueue);
void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code);
bool dnodeIsFirstDeploy();
uint32_t dnodeGetMnodeMasteIp();
#ifdef __cplusplus
}
......
......@@ -20,245 +20,12 @@
extern "C" {
#endif
#include "os.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tglobalcfg.h"
#include "thash.h"
#include "tidpool.h"
#include "tlog.h"
#include "tmempool.h"
#include "trpc.h"
#include "taosdef.h"
#include "tskiplist.h"
#include "tsocket.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
struct _vg_obj;
struct _db_obj;
struct _acct_obj;
struct _user_obj;
struct _mnode_obj;
typedef struct _dnode_obj {
int32_t dnodeId;
uint32_t privateIp;
uint32_t publicIp;
uint16_t mnodeShellPort;
uint16_t mnodeDnodePort;
uint16_t dnodeShellPort;
uint16_t dnodeMnodePort;
uint16_t syncPort;
uint32_t moduleStatus;
int64_t createdTime;
uint32_t lastAccess;
int32_t openVnodes;
int32_t totalVnodes; // from dnode status msg, config information
uint16_t numOfCores; // from dnode status msg
int8_t alternativeRole; // from dnode status msg, 0-any, 1-mgmt, 2-dnode
int8_t status; // set in balance function
int32_t customScore; // config by user
char dnodeName[TSDB_NODE_NAME_LEN + 1];
int8_t reserved[15];
int8_t updateEnd[1];
int32_t refCount;
SVnodeLoad vload[TSDB_MAX_VNODES];
uint32_t lastReboot; // time stamp for last reboot
float score; // calc in balance function
float diskAvailable; // from dnode status msg
int16_t diskAvgUsage; // calc from sys.disk
int16_t cpuAvgUsage; // calc from sys.cpu
int16_t memoryAvgUsage; // calc from sys.mem
int16_t bandwidthUsage; // calc from sys.band
} SDnodeObj;
typedef struct _mnode_obj {
int32_t mnodeId;
int64_t createdTime;
int8_t reserved[14];
int8_t updateEnd[1];
int32_t refCount;
int8_t role;
SDnodeObj *pDnode;
} SMnodeObj;
typedef struct {
int32_t dnodeId;
uint32_t privateIp;
uint32_t publicIp;
} SVnodeGid;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
} STableObj;
typedef struct SSuperTableObj {
STableObj info;
uint64_t uid;
int64_t createdTime;
int32_t sversion;
int32_t numOfColumns;
int32_t numOfTags;
int8_t reserved[15];
int8_t updateEnd[1];
int32_t refCount;
int32_t numOfTables;
int16_t nextColId;
SSchema * schema;
int32_t vgLen;
int32_t * vgList;
} SSuperTableObj;
typedef struct {
STableObj info;
uint64_t uid;
int64_t createdTime;
int32_t sversion; //used by normal table
int32_t numOfColumns; //used by normal table
int32_t sid;
int32_t vgId;
char superTableId[TSDB_TABLE_ID_LEN + 1];
int32_t sqlLen;
int8_t reserved[1];
int8_t updateEnd[1];
int16_t nextColId; //used by normal table
int32_t refCount;
char* sql; //used by normal table
SSchema* schema; //used by normal table
SSuperTableObj *superTable;
} SChildTableObj;
typedef struct _vg_obj {
uint32_t vgId;
char dbName[TSDB_DB_NAME_LEN + 1];
int64_t createdTime;
SVnodeGid vnodeGid[TSDB_VNODES_SUPPORT];
int32_t numOfVnodes;
int32_t lbDnodeId;
int32_t lbTime;
int8_t status;
int8_t inUse;
int8_t reserved[13];
int8_t updateEnd[1];
int32_t refCount;
struct _vg_obj *prev, *next;
struct _db_obj *pDb;
int32_t numOfTables;
void * idPool;
SChildTableObj ** tableList;
} SVgObj;
typedef struct _db_obj {
char name[TSDB_DB_NAME_LEN + 1];
int8_t status;
int64_t createdTime;
SDbCfg cfg;
int8_t reserved[15];
int8_t updateEnd[1];
int32_t refCount;
int32_t numOfVgroups;
int32_t numOfTables;
int32_t numOfSuperTables;
SVgObj *pHead;
SVgObj *pTail;
struct _acct_obj *pAcct;
} SDbObj;
typedef struct _user_obj {
char user[TSDB_USER_LEN + 1];
char pass[TSDB_KEY_LEN + 1];
char acct[TSDB_USER_LEN + 1];
int64_t createdTime;
int8_t superAuth;
int8_t writeAuth;
int8_t reserved[13];
int8_t updateEnd[1];
int32_t refCount;
struct _acct_obj * pAcct;
SQqueryList * pQList; // query list
SStreamList * pSList; // stream list
} SUserObj;
typedef struct {
int32_t numOfUsers;
int32_t numOfDbs;
int32_t numOfTimeSeries;
int32_t numOfPointsPerSecond;
int32_t numOfConns;
int32_t numOfQueries;
int32_t numOfStreams;
int64_t totalStorage; // Total storage wrtten from this account
int64_t compStorage; // Compressed storage on disk
int64_t queryTime;
int64_t totalPoints;
int64_t inblound;
int64_t outbound;
int64_t sKey;
int8_t accessState; // Checked by mgmt heartbeat message
} SAcctInfo;
typedef struct _acct_obj {
char user[TSDB_USER_LEN + 1];
char pass[TSDB_KEY_LEN + 1];
SAcctCfg cfg;
int32_t acctId;
int64_t createdTime;
int8_t status;
int8_t reserved[14];
int8_t updateEnd[1];
int32_t refCount;
SAcctInfo acctInfo;
pthread_mutex_t mutex;
} SAcctObj;
typedef struct {
int8_t type;
char db[TSDB_DB_NAME_LEN + 1];
void * pNode;
int16_t numOfColumns;
int32_t rowSize;
int32_t numOfRows;
int32_t numOfReads;
int16_t offset[TSDB_MAX_COLUMNS];
int16_t bytes[TSDB_MAX_COLUMNS];
void * signature;
uint16_t payloadLen;
char payload[];
} SShowObj;
typedef struct {
uint8_t msgType;
int8_t usePublicIp;
int8_t received;
int8_t successed;
int8_t expected;
int8_t retry;
int8_t maxRetry;
int32_t contLen;
int32_t code;
void *ahandle;
void *thandle;
void *pCont;
SAcctObj *pAcct;
SDnodeObj*pDnode;
SUserObj *pUser;
SDbObj *pDb;
SVgObj *pVgroup;
STableObj *pTable;
} SQueuedMsg;
int32_t mgmtInitSystem();
int32_t mgmtStartSystem();
void mgmtCleanUpSystem();
void mgmtStopSystem();
extern char version[];
extern void *tsMgmtTmr;
extern char tsMnodeDir[];
#ifdef __cplusplus
......
......@@ -319,11 +319,11 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_MAX_NORMAL_TABLES 1000
#define TSDB_MAX_CHILD_TABLES 100000
enum {
typedef enum {
TSDB_PRECISION_MILLI,
TSDB_PRECISION_MICRO,
TSDB_PRECISION_NANO
};
} EPrecisionType;
typedef enum {
TSDB_SUPER_TABLE = 0, // super table
......@@ -331,7 +331,14 @@ typedef enum {
TSDB_NORMAL_TABLE = 2, // ordinary table
TSDB_STREAM_TABLE = 3, // table created from stream computing
TSDB_TABLE_MAX = 4
} TSDB_TABLE_TYPE;
} ETableType;
typedef enum {
TSDB_MOD_MGMT,
TSDB_MOD_HTTP,
TSDB_MOD_MONITOR,
TSDB_MOD_MAX
} EModuleType;
#ifdef __cplusplus
}
......
......@@ -20,16 +20,16 @@
extern "C" {
#endif
struct _vg_obj;
struct _dnode_obj;
struct SVgObj;
struct SDnodeObj;
int32_t replicaInit();
void replicaCleanUp();
void replicaNotify();
void replicaReset();
int32_t replicaAllocVnodes(struct _vg_obj *pVgroup);
int32_t replicaAllocVnodes(struct SVgObj *pVgroup);
int32_t replicaForwardReqToPeer(void *pHead);
int32_t replicaDropDnode(struct _dnode_obj *pDnode);
int32_t replicaDropDnode(struct SDnodeObj *pDnode);
#ifdef __cplusplus
}
......
......@@ -22,20 +22,16 @@ extern "C" {
#include "tacct.h"
struct _acct_obj;
struct _user_obj;
struct _db_obj;
int32_t mgmtInitAccts();
void mgmtCleanUpAccts();
void *mgmtGetAcct(char *acctName);
void mgmtIncAcctRef(struct _acct_obj *pAcct);
void mgmtDecAcctRef(struct _acct_obj *pAcct);
void mgmtIncAcctRef(SAcctObj *pAcct);
void mgmtDecAcctRef(SAcctObj *pAcct);
void mgmtAddDbToAcct(struct _acct_obj *pAcct, struct _db_obj *pDb);
void mgmtDropDbFromAcct(struct _acct_obj *pAcct, struct _db_obj *pDb);
void mgmtAddUserToAcct(struct _acct_obj *pAcct, struct _user_obj *pUser);
void mgmtDropUserFromAcct(struct _acct_obj *pAcct, struct _user_obj *pUser);
void mgmtAddDbToAcct(SAcctObj *pAcct, SDbObj *pDb);
void mgmtDropDbFromAcct(SAcctObj *pAcct, SDbObj *pDb);
void mgmtAddUserToAcct(SAcctObj *pAcct, SUserObj *pUser);
void mgmtDropUserFromAcct(SAcctObj *pAcct, SUserObj *pUser);
#ifdef __cplusplus
}
......
......@@ -20,7 +20,7 @@
extern "C" {
#endif
#include "mnode.h"
#include "mgmtDef.h"
enum _TSDB_DB_STATUS {
TSDB_DB_STATUS_READY,
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_MGMT_DEF_H
#define TDENGINE_MGMT_DEF_H
#ifdef __cplusplus
extern "C" {
#endif
#include "taosdef.h"
#include "taosmsg.h"
struct SVgObj;
struct SDbObj;
struct SAcctObj;
struct SUserObj;
struct SMnodeObj;
typedef struct SDnodeObj {
int32_t dnodeId;
uint32_t privateIp;
uint32_t publicIp;
uint16_t mnodeShellPort;
uint16_t mnodeDnodePort;
uint16_t dnodeShellPort;
uint16_t dnodeMnodePort;
uint16_t syncPort;
int64_t createdTime;
uint32_t lastAccess;
int32_t openVnodes;
int32_t totalVnodes; // from dnode status msg, config information
int32_t customScore; // config by user
uint16_t numOfCores; // from dnode status msg
int8_t alternativeRole; // from dnode status msg, 0-any, 1-mgmt, 2-dnode
int8_t status; // set in balance function
int8_t isMgmt;
char dnodeName[TSDB_NODE_NAME_LEN + 1];
int8_t reserved[15];
int8_t updateEnd[1];
int32_t refCount;
SVnodeLoad vload[TSDB_MAX_VNODES];
uint32_t moduleStatus;
uint32_t lastReboot; // time stamp for last reboot
float score; // calc in balance function
float diskAvailable; // from dnode status msg
int16_t diskAvgUsage; // calc from sys.disk
int16_t cpuAvgUsage; // calc from sys.cpu
int16_t memoryAvgUsage; // calc from sys.mem
int16_t bandwidthUsage; // calc from sys.band
} SDnodeObj;
typedef struct SMnodeObj {
int32_t mnodeId;
int64_t createdTime;
int8_t reserved[14];
int8_t updateEnd[1];
int32_t refCount;
int8_t role;
SDnodeObj *pDnode;
} SMnodeObj;
typedef struct {
int32_t dnodeId;
uint32_t privateIp;
uint32_t publicIp;
} SVnodeGid;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type;
} STableObj;
typedef struct SSuperTableObj {
STableObj info;
uint64_t uid;
int64_t createdTime;
int32_t sversion;
int32_t numOfColumns;
int32_t numOfTags;
int8_t reserved[15];
int8_t updateEnd[1];
int32_t refCount;
int32_t numOfTables;
int16_t nextColId;
SSchema * schema;
int32_t vgLen;
int32_t * vgList;
} SSuperTableObj;
typedef struct {
STableObj info;
uint64_t uid;
int64_t createdTime;
int32_t sversion; //used by normal table
int32_t numOfColumns; //used by normal table
int32_t sid;
int32_t vgId;
char superTableId[TSDB_TABLE_ID_LEN + 1];
int32_t sqlLen;
int8_t reserved[1];
int8_t updateEnd[1];
int16_t nextColId; //used by normal table
int32_t refCount;
char* sql; //used by normal table
SSchema* schema; //used by normal table
SSuperTableObj *superTable;
} SChildTableObj;
typedef struct SVgObj {
uint32_t vgId;
char dbName[TSDB_DB_NAME_LEN + 1];
int64_t createdTime;
SVnodeGid vnodeGid[TSDB_VNODES_SUPPORT];
int32_t numOfVnodes;
int32_t lbDnodeId;
int32_t lbTime;
int8_t status;
int8_t inUse;
int8_t reserved[13];
int8_t updateEnd[1];
int32_t refCount;
struct SVgObj *prev, *next;
struct SDbObj *pDb;
int32_t numOfTables;
void * idPool;
SChildTableObj ** tableList;
} SVgObj;
typedef struct SDbObj {
char name[TSDB_DB_NAME_LEN + 1];
int8_t status;
int64_t createdTime;
SDbCfg cfg;
int8_t reserved[15];
int8_t updateEnd[1];
int32_t refCount;
int32_t numOfVgroups;
int32_t numOfTables;
int32_t numOfSuperTables;
SVgObj *pHead;
SVgObj *pTail;
struct SAcctObj *pAcct;
} SDbObj;
typedef struct SUserObj {
char user[TSDB_USER_LEN + 1];
char pass[TSDB_KEY_LEN + 1];
char acct[TSDB_USER_LEN + 1];
int64_t createdTime;
int8_t superAuth;
int8_t writeAuth;
int8_t reserved[13];
int8_t updateEnd[1];
int32_t refCount;
struct SAcctObj * pAcct;
SQqueryList * pQList; // query list
SStreamList * pSList; // stream list
} SUserObj;
typedef struct {
int32_t numOfUsers;
int32_t numOfDbs;
int32_t numOfTimeSeries;
int32_t numOfPointsPerSecond;
int32_t numOfConns;
int32_t numOfQueries;
int32_t numOfStreams;
int64_t totalStorage; // Total storage wrtten from this account
int64_t compStorage; // Compressed storage on disk
int64_t queryTime;
int64_t totalPoints;
int64_t inblound;
int64_t outbound;
int64_t sKey;
int8_t accessState; // Checked by mgmt heartbeat message
} SAcctInfo;
typedef struct SAcctObj {
char user[TSDB_USER_LEN + 1];
char pass[TSDB_KEY_LEN + 1];
SAcctCfg cfg;
int32_t acctId;
int64_t createdTime;
int8_t status;
int8_t reserved[14];
int8_t updateEnd[1];
int32_t refCount;
SAcctInfo acctInfo;
pthread_mutex_t mutex;
} SAcctObj;
typedef struct {
int8_t type;
char db[TSDB_DB_NAME_LEN + 1];
void * pNode;
int16_t numOfColumns;
int32_t rowSize;
int32_t numOfRows;
int32_t numOfReads;
int16_t offset[TSDB_MAX_COLUMNS];
int16_t bytes[TSDB_MAX_COLUMNS];
void * signature;
uint16_t payloadLen;
char payload[];
} SShowObj;
typedef struct {
uint8_t msgType;
int8_t usePublicIp;
int8_t received;
int8_t successed;
int8_t expected;
int8_t retry;
int8_t maxRetry;
int32_t contLen;
int32_t code;
void *ahandle;
void *thandle;
void *pCont;
SAcctObj *pAcct;
SDnodeObj*pDnode;
SUserObj *pUser;
SDbObj *pDb;
SVgObj *pVgroup;
STableObj *pTable;
} SQueuedMsg;
#ifdef __cplusplus
}
#endif
#endif
......@@ -20,33 +20,27 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
#include <pthread.h>
struct _dnode_obj;
enum _TAOS_DN_STATUS {
typedef enum {
TAOS_DN_STATUS_OFFLINE,
TAOS_DN_STATUS_DROPPING,
TAOS_DN_STATUS_BALANCING,
TAOS_DN_STATUS_READY
};
} EDnodeStatus;
int32_t mgmtInitDnodes();
void mgmtCleanupDnodes();
char* mgmtGetDnodeStatusStr(int32_t dnodeStatus);
bool mgmtCheckModuleInDnode(struct _dnode_obj *pDnode, int moduleType);
bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int moduleType);
void mgmtMonitorDnodeModule();
int32_t mgmtGetDnodesNum();
void * mgmtGetNextDnode(void *pNode, struct _dnode_obj **pDnode);
void mgmtReleaseDnode(struct _dnode_obj *pDnode);
void * mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode);
void mgmtReleaseDnode(SDnodeObj *pDnode);
void * mgmtGetDnode(int32_t dnodeId);
void * mgmtGetDnodeByIp(uint32_t ip);
void mgmtUpdateDnode(struct _dnode_obj *pDnode);
int32_t mgmtDropDnode(struct _dnode_obj *pDnode);
void mgmtUpdateDnode(SDnodeObj *pDnode);
int32_t mgmtDropDnode(SDnodeObj *pDnode);
#ifdef __cplusplus
}
......
......@@ -13,42 +13,53 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TMODULE_H
#define TDENGINE_TMODULE_H
#ifndef TDENGINE_MGMT_LOG_H
#define TDENGINE_MGMT_LOG_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
#include <pthread.h>
enum _module {
TSDB_MOD_MGMT,
TSDB_MOD_HTTP,
TSDB_MOD_MONITOR,
TSDB_MOD_MAX
};
#define tsetModuleStatus(mod) \
{ tsModuleStatus |= (1 << mod); }
#define tclearModuleStatus(mod) \
{ tsModuleStatus &= ~(1 << mod); }
typedef struct {
char *name;
int (*initFp)();
void (*cleanUpFp)();
int (*startFp)();
void (*stopFp)();
int num;
int curNum;
int equalVnodeNum;
} SModule;
extern uint32_t tsModuleStatus;
extern SModule tsModule[];
#include "tlog.h"
// mnode log function
#define mError(...) \
if (mdebugFlag & DEBUG_ERROR) { \
tprintf("ERROR MND ", 255, __VA_ARGS__); \
}
#define mWarn(...) \
if (mdebugFlag & DEBUG_WARN) { \
tprintf("WARN MND ", mdebugFlag, __VA_ARGS__); \
}
#define mTrace(...) \
if (mdebugFlag & DEBUG_TRACE) { \
tprintf("MND ", mdebugFlag, __VA_ARGS__); \
}
#define mPrint(...) \
{ tprintf("MND ", 255, __VA_ARGS__); }
#define mLError(...) taosLogError(__VA_ARGS__) mError(__VA_ARGS__)
#define mLWarn(...) taosLogWarn(__VA_ARGS__) mWarn(__VA_ARGS__)
#define mLPrint(...) taosLogPrint(__VA_ARGS__) mPrint(__VA_ARGS__)
#define sdbError(...) \
if (sdbDebugFlag & DEBUG_ERROR) { \
tprintf("ERROR MND-SDB ", 255, __VA_ARGS__); \
}
#define sdbWarn(...) \
if (sdbDebugFlag & DEBUG_WARN) { \
tprintf("WARN MND-SDB ", sdbDebugFlag, __VA_ARGS__); \
}
#define sdbTrace(...) \
if (sdbDebugFlag & DEBUG_TRACE) { \
tprintf("MND-SDB ", sdbDebugFlag, __VA_ARGS__); \
}
#define sdbPrint(...) \
{ tprintf("MND-SDB ", 255, __VA_ARGS__); }
#define sdbLError(...) taosLogError(__VA_ARGS__) sdbError(__VA_ARGS__)
#define sdbLWarn(...) taosLogWarn(__VA_ARGS__) sdbWarn(__VA_ARGS__)
#define sdbLPrint(...) taosLogPrint(__VA_ARGS__) sdbPrint(__VA_ARGS__)
#ifdef __cplusplus
}
......
......@@ -20,13 +20,13 @@
extern "C" {
#endif
struct _mnode_obj;
struct SMnodeObj;
enum _TAOS_MN_STATUS {
typedef enum {
TAOS_MN_STATUS_OFFLINE,
TAOS_MN_STATUS_DROPPING,
TAOS_MN_STATUS_READY
};
} EMnodeStatus;
int32_t mgmtInitMnodes();
void mgmtCleanupMnodes();
......@@ -36,8 +36,8 @@ int32_t mgmtDropMnode(int32_t dnodeId);
void * mgmtGetMnode(int32_t mnodeId);
int32_t mgmtGetMnodesNum();
void * mgmtGetNextMnode(void *pNode, struct _mnode_obj **pMnode);
void mgmtReleaseMnode(struct _mnode_obj *pMnode);
void * mgmtGetNextMnode(void *pNode, struct SMnodeObj **pMnode);
void mgmtReleaseMnode(struct SMnodeObj *pMnode);
bool mgmtIsMaster();
......
......@@ -19,7 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "mnode.h"
#include "mgmtDef.h"
int32_t mgmtInitProfile();
void mgmtCleanUpProfile();
......
......@@ -19,7 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "mnode.h"
#include "mgmtDef.h"
int32_t mgmtInitShell();
void mgmtCleanUpShell();
......
......@@ -20,10 +20,7 @@
extern "C" {
#endif
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include "mnode.h"
#include "mgmtDef.h"
int32_t mgmtInitTables();
void mgmtCleanUpTables();
......
......@@ -19,7 +19,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "mnode.h"
#include "mgmtDef.h"
int32_t mgmtInitUsers();
void mgmtCleanUpUsers();
......
......@@ -20,9 +20,7 @@
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
#include "mnode.h"
#include "mgmtDef.h"
enum _TSDB_VG_STATUS {
TSDB_VG_STATUS_READY,
......
......@@ -16,8 +16,11 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "ttime.h"
#include "tutil.h"
#include "dnode.h"
#include "mnode.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtAcct.h"
#include "mgmtDb.h"
#include "mgmtSdb.h"
......
......@@ -19,12 +19,14 @@
#include "tsched.h"
#include "tsystem.h"
#include "tutil.h"
#include "tglobalcfg.h"
#include "dnode.h"
#include "mnode.h"
#include "tgrant.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtMnode.h"
#include "mgmtDb.h"
#include "mgmtDnode.h"
#include "tgrant.h"
#include "mgmtProfile.h"
#include "mgmtShell.h"
#include "mgmtTable.h"
......
......@@ -20,12 +20,14 @@
#include "tsched.h"
#include "tsystem.h"
#include "tutil.h"
#include "dnode.h"
#include "mnode.h"
#include "tgrant.h"
#include "treplica.h"
#include "tglobalcfg.h"
#include "dnode.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtDb.h"
#include "mgmtDServer.h"
#include "tgrant.h"
#include "mgmtProfile.h"
#include "mgmtShell.h"
#include "mgmtTable.h"
......
......@@ -17,12 +17,15 @@
#include "os.h"
#include "taoserror.h"
#include "tutil.h"
#include "tgrant.h"
#include "tglobalcfg.h"
#include "ttime.h"
#include "name.h"
#include "mnode.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtAcct.h"
#include "mgmtDb.h"
#include "mgmtDnode.h"
#include "tgrant.h"
#include "mgmtMnode.h"
#include "mgmtShell.h"
#include "mgmtProfile.h"
......
......@@ -15,10 +15,15 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tmodule.h"
#include "tgrant.h"
#include "treplica.h"
#include "mnode.h"
#include "tglobalcfg.h"
#include "ttime.h"
#include "tutil.h"
#include "tsocket.h"
#include "dnode.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtDClient.h"
#include "mgmtDServer.h"
#include "mgmtDnode.h"
......@@ -27,7 +32,6 @@
#include "mgmtShell.h"
#include "mgmtUser.h"
#include "mgmtVgroup.h"
#include "dnodeMClient.h"
void *tsDnodeSdb = NULL;
int32_t tsDnodeUpdateSize = 0;
......@@ -329,7 +333,6 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
mTrace("dnode:%d, from offline to online", pDnode->dnodeId);
pDnode->status = TAOS_DN_STATUS_READY;
replicaNotify();
mgmtMonitorDnodeModule();
}
mgmtReleaseDnode(pDnode);
......@@ -626,21 +629,27 @@ static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
SSchema *pSchema = pMeta->schema;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "id");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "IP");
strcpy(pSchema[cols].name, "ip");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 10;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "module type");
strcpy(pSchema[cols].name, "module");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 10;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "module status");
strcpy(pSchema[cols].name, "status");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
......@@ -652,18 +661,7 @@ static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
}
pShow->numOfRows = 0;
SDnodeObj *pDnode = NULL;
while (1) {
pShow->pNode = mgmtGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode);
if (pDnode == NULL) break;
for (int32_t moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) {
if (mgmtCheckModuleInDnode(pDnode, moduleType)) {
pShow->numOfRows++;
}
}
}
pShow->numOfRows = mgmtGetDnodesNum() * TSDB_MOD_MAX;
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
pShow->pNode = NULL;
mgmtReleaseUser(pUser);
......@@ -672,39 +670,52 @@ static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
}
int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
SDnodeObj *pDnode = NULL;
char * pWrite;
int32_t cols = 0;
char ipstr[20];
int32_t numOfRows = 0;
char * pWrite;
while (numOfRows < rows) {
mgmtReleaseDnode(pDnode);
SDnodeObj *pDnode = NULL;
pShow->pNode = mgmtGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode);
if (pDnode == NULL) break;
for (int32_t moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) {
if (!mgmtCheckModuleInDnode(pDnode, moduleType)) {
continue;
}
int32_t cols = 0;
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *)pWrite = pDnode->dnodeId;
cols++;
char ipstr[20];
tinet_ntoa(ipstr, pDnode->privateIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, tsModule[moduleType].name);
switch (moduleType) {
case TSDB_MOD_MGMT:
strcpy(pWrite, "mgmt");
break;
case TSDB_MOD_HTTP:
strcpy(pWrite, "http");
break;
case TSDB_MOD_MONITOR:
strcpy(pWrite, "monitor");
break;
default:
strcpy(pWrite, "unknown");
}
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, mgmtGetDnodeStatusStr(pDnode->status));
bool enable = mgmtCheckModuleInDnode(pDnode, moduleType);
strcpy(pWrite, enable ? "enable" : "disable");
cols++;
numOfRows++;
}
mgmtReleaseDnode(pDnode);
}
pShow->numOfReads += numOfRows;
......@@ -919,155 +930,3 @@ char* mgmtGetDnodeStatusStr(int32_t dnodeStatus) {
default: return "undefined";
}
}
static void clusterSetModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) {
pDnode->moduleStatus |= (1 << moduleType);
mgmtUpdateDnode(pDnode);
if (moduleType == TSDB_MOD_MGMT) {
mgmtAddMnode(pDnode->dnodeId);
mPrint("dnode:%d, add it into mnode list", pDnode->dnodeId);
}
}
static void clusterUnSetModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) {
pDnode->moduleStatus &= ~(1 << moduleType);
mgmtUpdateDnode(pDnode);
if (moduleType == TSDB_MOD_MGMT) {
mgmtDropMnode(pDnode->dnodeId);
mPrint("dnode:%d, remove it from mnode list", pDnode->dnodeId);
}
}
static void clusterStopAllModuleInDnode(SDnodeObj *pDnode) {
for (int32_t moduleType = 0; moduleType < TSDB_MOD_MAX; ++moduleType) {
if (!mgmtCheckModuleInDnode(pDnode, moduleType)) {
continue;
}
mPrint("dnode:%d, stop %s module for its offline or remove", pDnode->dnodeId, tsModule[moduleType].name);
clusterUnSetModuleInDnode(pDnode, moduleType);
}
}
static void clusterStartModuleInAllDnodes(int32_t moduleType) {
void * pNode = NULL;
SDnodeObj *pDnode = NULL;
while (1) {
pNode = mgmtGetNextDnode(pNode, &pDnode);
if (pDnode == NULL) break;
if (!mgmtCheckModuleInDnode(pDnode, moduleType)
&& pDnode->status != TAOS_DN_STATUS_OFFLINE
&& pDnode->status != TAOS_DN_STATUS_DROPPING) {
mPrint("dnode:%d, add %s module for schedule", pDnode->dnodeId, tsModule[moduleType].name);
clusterSetModuleInDnode(pDnode, moduleType);
}
mgmtReleaseDnode(pNode);
}
}
static void clusterStartModuleInOneDnode(int32_t moduleType) {
void * pNode = NULL;
SDnodeObj *pDnode = NULL;
while (1) {
pNode = mgmtGetNextDnode(pNode, &pDnode);
if (pDnode == NULL) break;
if (!mgmtCheckModuleInDnode(pDnode, moduleType)
&& pDnode->status != TAOS_DN_STATUS_OFFLINE
&& pDnode->status != TAOS_DN_STATUS_DROPPING
&& !(moduleType == TSDB_MOD_MGMT && pDnode->alternativeRole == TSDB_DNODE_ROLE_VNODE)) {
mPrint("dnode:%d, add %s module for schedule", pDnode->dnodeId, tsModule[moduleType].name);
clusterSetModuleInDnode(pDnode, moduleType);
mgmtReleaseDnode(pNode);
break;
}
mgmtReleaseDnode(pNode);
}
}
static void clusterStopModuleInOneDnode(int32_t moduleType) {
void * pNode = NULL;
SDnodeObj *pDnode = NULL;
while (1) {
pNode = mgmtGetNextDnode(pNode, &pDnode);
if (pDnode == NULL) break;
if (mgmtCheckModuleInDnode(pDnode, moduleType)) {
mPrint("dnode:%d, stop %s module for schedule", pDnode->dnodeId, tsModule[moduleType].name);
clusterUnSetModuleInDnode(pDnode, moduleType);
mgmtReleaseDnode(pNode);
break;
}
mgmtReleaseDnode(pNode);
}
}
void mgmtMonitorDnodeModule() {
void * pNode = NULL;
SDnodeObj *pDnode = NULL;
int32_t onlineDnodes = 0;
for (int32_t moduleType = 0; moduleType < TSDB_MOD_MGMT+1; ++moduleType) {
tsModule[moduleType].curNum = 0;
}
// dnode loop
while (1) {
pNode = mgmtGetNextDnode(pNode, &pDnode);
if (pDnode == NULL) break;
if (pDnode->status == TAOS_DN_STATUS_DROPPING) {
mPrint("dnode:%d, status:%d, remove all modules for removing", pDnode->dnodeId, pDnode->status);
clusterStopAllModuleInDnode(pDnode);
mgmtReleaseDnode(pDnode);
continue;
}
for (int32_t moduleType = 0; moduleType < TSDB_MOD_MGMT+1; ++moduleType) {
if (mgmtCheckModuleInDnode(pDnode, moduleType)) {
tsModule[moduleType].curNum ++;
}
}
if (pDnode->status != TAOS_DN_STATUS_OFFLINE) {
onlineDnodes++;
}
mgmtReleaseDnode(pDnode);
}
for (int32_t moduleType = 0; moduleType < TSDB_MOD_MGMT+1; ++moduleType) {
if (tsModule[moduleType].num == -1) {
clusterStartModuleInAllDnodes(moduleType);
continue;
}
if (tsModule[moduleType].curNum < tsModule[moduleType].num) {
if (onlineDnodes <= tsModule[moduleType].curNum) {
continue;
}
mTrace("need add %s module, curNum:%d, expectNum:%d", tsModule[moduleType].name, tsModule[moduleType].curNum,
tsModule[moduleType].num);
for (int32_t i = tsModule[moduleType].curNum; i < tsModule[moduleType].num; ++i) {
clusterStartModuleInOneDnode(moduleType);
}
} else if (tsModule[moduleType].curNum > tsModule[moduleType].num) {
mTrace("need drop %s module, curNum:%d, expectNum:%d", tsModule[moduleType].name, tsModule[moduleType].curNum,
tsModule[moduleType].num);
for (int32_t i = tsModule[moduleType].num; i < tsModule[moduleType].curNum; ++i) {
clusterStopModuleInOneDnode(moduleType);
}
} else {
}
}
}
......@@ -17,8 +17,8 @@
#ifndef _GRANT
#include "os.h"
#include "taoserror.h"
#include "tlog.h"
#include "tgrant.h"
#include "mgmtLog.h"
int32_t grantInit() { return TSDB_CODE_SUCCESS; }
void grantCleanUp() {}
......
......@@ -16,13 +16,14 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosdef.h"
#include "tmodule.h"
#include "tsched.h"
#include "mnode.h"
#include "mgmtAcct.h"
#include "treplica.h"
#include "mgmtDnode.h"
#include "tgrant.h"
#include "ttimer.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtAcct.h"
#include "mgmtDnode.h"
#include "mgmtMnode.h"
#include "mgmtDb.h"
#include "mgmtDClient.h"
......@@ -33,41 +34,21 @@
#include "mgmtTable.h"
#include "mgmtShell.h"
static int32_t mgmtCheckMgmtRunning();
void *tsMgmtTmr = NULL;
static bool tsMgmtIsRunning = false;
int32_t mgmtInitSystem() {
if (mgmtInitShell() != 0) {
mError("failed to init shell");
return -1;
}
struct stat dirstat;
bool fileExist = (stat(tsMnodeDir, &dirstat) == 0);
bool asMaster = (strcmp(tsMasterIp, tsPrivateIp) == 0);
if (asMaster || fileExist) {
if (mgmtStartSystem() != 0) {
return -1;
}
int32_t mgmtStartSystem() {
if (tsMgmtIsRunning) {
mPrint("TDengine mgmt module already started...");
return 0;
}
return 0;
}
int32_t mgmtStartSystem() {
mPrint("starting to initialize TDengine mgmt ...");
struct stat dirstat;
if (stat(tsMnodeDir, &dirstat) < 0) {
mkdir(tsMnodeDir, 0755);
}
if (mgmtCheckMgmtRunning() != 0) {
mPrint("TDengine mgmt module already started...");
return 0;
}
tsMgmtTmr = taosTmrInit((tsMaxShellConns) * 3, 200, 3600000, "MND");
if (tsMgmtTmr == NULL) {
mError("failed to init timer");
......@@ -132,21 +113,30 @@ int32_t mgmtStartSystem() {
}
grantReset(TSDB_GRANT_ALL, 0);
tsMgmtIsRunning = true;
mPrint("TDengine mgmt is initialized successfully");
return 0;
}
int32_t mgmtInitSystem() {
if (mgmtInitShell() != 0) {
mError("failed to init shell");
return -1;
}
void mgmtStopSystem() {
if (mgmtIsMaster()) {
mTrace("it is a master mgmt node, it could not be stopped");
return;
struct stat dirstat;
bool fileExist = (stat(tsMnodeDir, &dirstat) == 0);
bool asMaster = (strcmp(tsMasterIp, tsPrivateIp) == 0);
if (asMaster || fileExist) {
if (mgmtStartSystem() != 0) {
return -1;
}
}
mgmtCleanUpSystem();
remove(tsMnodeDir);
return 0;
}
void mgmtCleanUpSystem() {
......@@ -165,14 +155,18 @@ void mgmtCleanUpSystem() {
mgmtCleanUpAccts();
sdbCleanUp();
taosTmrCleanUp(tsMgmtTmr);
tsMgmtIsRunning = false;
mPrint("mgmt is cleaned up");
}
static int32_t mgmtCheckMgmtRunning() {
if (tsModuleStatus & (1 << TSDB_MOD_MGMT)) {
return -1;
void mgmtStopSystem() {
if (mgmtIsMaster()) {
mTrace("it is a master mgmt node, it could not be stopped");
return;
}
tsetModuleStatus(TSDB_MOD_MGMT);
return 0;
}
\ No newline at end of file
mgmtCleanUpSystem();
mPrint("mgmt file is removed");
remove(tsMnodeDir);
}
......@@ -16,11 +16,14 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tmodule.h"
#include "trpc.h"
#include "tsync.h"
#include "treplica.h"
#include "mnode.h"
#include "tutil.h"
#include "ttime.h"
#include "tsocket.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtMnode.h"
#include "mgmtDnode.h"
#include "mgmtSdb.h"
......@@ -42,7 +45,9 @@ static int32_t mgmtMnodeActionInsert(SSdbOperDesc *pOper) {
SMnodeObj *pMnode = pOper->pObj;
SDnodeObj *pDnode = mgmtGetDnode(pMnode->mnodeId);
if (pDnode == NULL) return TSDB_CODE_DNODE_NOT_EXIST;
pMnode->pDnode = pDnode;
pDnode->isMgmt = true;
mgmtReleaseDnode(pDnode);
return TSDB_CODE_SUCCESS;
......@@ -130,7 +135,7 @@ void *mgmtGetMnode(int32_t mnodeId) {
return sdbGetRow(tsMnodeSdb, &mnodeId);
}
void mgmtReleaseMnode(struct _mnode_obj *pMnode) {
void mgmtReleaseMnode(SMnodeObj *pMnode) {
sdbDecRef(tsMnodeSdb, pMnode);
}
......
......@@ -16,6 +16,10 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tutil.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtAcct.h"
#include "mgmtDnode.h"
#include "mgmtDb.h"
......
......@@ -17,7 +17,8 @@
#include "os.h"
#include "trpc.h"
#include "treplica.h"
#include "mnode.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtMnode.h"
#include "mgmtDnode.h"
#include "mgmtVgroup.h"
......
......@@ -23,6 +23,7 @@
#include "twal.h"
#include "hashint.h"
#include "hashstr.h"
#include "mgmtLog.h"
#include "mgmtMnode.h"
#include "mgmtSdb.h"
......
......@@ -20,8 +20,11 @@
#include "tlog.h"
#include "trpc.h"
#include "tsched.h"
#include "tutil.h"
#include "ttimer.h"
#include "dnode.h"
#include "mnode.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtAcct.h"
#include "mgmtDb.h"
#include "mgmtDnode.h"
......@@ -47,6 +50,7 @@ static void mgmtProcessHeartBeatMsg(SQueuedMsg *queuedMsg);
static void mgmtProcessConnectMsg(SQueuedMsg *queuedMsg);
static void mgmtProcessUseMsg(SQueuedMsg *queuedMsg);
extern void *tsMgmtTmr;
static void *tsMgmtShellRpc = NULL;
static void *tsMgmtTranQhandle = NULL;
static void (*tsMgmtProcessShellMsgFp[TSDB_MSG_TYPE_MAX])(SQueuedMsg *) = {0};
......
......@@ -15,7 +15,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "ttime.h"
#include "tutil.h"
......@@ -23,6 +22,9 @@
#include "taosmsg.h"
#include "tscompression.h"
#include "name.h"
#include "tidpool.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtAcct.h"
#include "mgmtDClient.h"
#include "mgmtDb.h"
......
......@@ -18,6 +18,8 @@
#include "trpc.h"
#include "ttime.h"
#include "tutil.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtAcct.h"
#include "tgrant.h"
#include "mgmtMnode.h"
......
......@@ -17,13 +17,18 @@
#include "os.h"
#include "taoserror.h"
#include "tlog.h"
#include "tutil.h"
#include "tsocket.h"
#include "tidpool.h"
#include "tsync.h"
#include "ttime.h"
#include "treplica.h"
#include "mgmtDnode.h"
#include "mnode.h"
#include "mgmtDef.h"
#include "mgmtLog.h"
#include "mgmtDb.h"
#include "mgmtDClient.h"
#include "mgmtDServer.h"
#include "mgmtDnode.h"
#include "mgmtMnode.h"
#include "mgmtProfile.h"
#include "mgmtSdb.h"
......
......@@ -24,7 +24,7 @@ ELSEIF (TD_WINDOWS_64)
ENDIF ()
ADD_LIBRARY(trpc ${SRC})
TARGET_LINK_LIBRARIES(trpc tutil lz4)
TARGET_LINK_LIBRARIES(trpc tutil lz4 common)
ADD_SUBDIRECTORY(test)
......@@ -79,7 +79,7 @@ typedef struct {
// --------- TSDB TABLE configuration
typedef struct {
TSDB_TABLE_TYPE type;
ETableType type;
STableId tableId;
int32_t sversion;
int64_t superUid;
......@@ -88,7 +88,7 @@ typedef struct {
SDataRow tagValues;
} STableCfg;
int tsdbInitTableCfg(STableCfg *config, TSDB_TABLE_TYPE type, int64_t uid, int32_t tid);
int tsdbInitTableCfg(STableCfg *config, ETableType type, int64_t uid, int32_t tid);
int tsdbTableSetSuperUid(STableCfg *config, int64_t uid);
int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup);
int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup);
......
......@@ -387,7 +387,7 @@ int32_t tsdbInsertData(tsdb_repo_t *repo, SSubmitMsg *pMsg) {
/**
* Initialize a table configuration
*/
int tsdbInitTableCfg(STableCfg *config, TSDB_TABLE_TYPE type, int64_t uid, int32_t tid) {
int tsdbInitTableCfg(STableCfg *config, ETableType type, int64_t uid, int32_t tid) {
if (config == NULL) return -1;
if (type != TSDB_NORMAL_TABLE && type != TSDB_CHILD_TABLE) return -1;
......
......@@ -177,26 +177,6 @@ extern uint32_t cdebugFlag;
tprintf("DND QRY ", qdebugFlag, __VA_ARGS__); \
}
// mnode log function
#define mError(...) \
if (mdebugFlag & DEBUG_ERROR) { \
tprintf("ERROR MND ", 255, __VA_ARGS__); \
}
#define mWarn(...) \
if (mdebugFlag & DEBUG_WARN) { \
tprintf("WARN MND ", mdebugFlag, __VA_ARGS__); \
}
#define mTrace(...) \
if (mdebugFlag & DEBUG_TRACE) { \
tprintf("MND ", mdebugFlag, __VA_ARGS__); \
}
#define mPrint(...) \
{ tprintf("MND ", 255, __VA_ARGS__); }
#define mLError(...) taosLogError(__VA_ARGS__) mError(__VA_ARGS__)
#define mLWarn(...) taosLogWarn(__VA_ARGS__) mWarn(__VA_ARGS__)
#define mLPrint(...) taosLogPrint(__VA_ARGS__) mPrint(__VA_ARGS__)
#define httpError(...) \
if (httpDebugFlag & DEBUG_ERROR) { \
tprintf("ERROR HTP ", 255, __VA_ARGS__); \
......@@ -239,25 +219,6 @@ extern uint32_t cdebugFlag;
#define monitorLWarn(...) taosLogWarn(__VA_ARGS__) monitorWarn(__VA_ARGS__)
#define monitorLPrint(...) taosLogPrint(__VA_ARGS__) monitorPrint(__VA_ARGS__)
#define sdbError(...) \
if (sdbDebugFlag & DEBUG_ERROR) { \
tprintf("ERROR MND-SDB ", 255, __VA_ARGS__); \
}
#define sdbWarn(...) \
if (sdbDebugFlag & DEBUG_WARN) { \
tprintf("WARN MND-SDB ", sdbDebugFlag, __VA_ARGS__); \
}
#define sdbTrace(...) \
if (sdbDebugFlag & DEBUG_TRACE) { \
tprintf("MND-SDB ", sdbDebugFlag, __VA_ARGS__); \
}
#define sdbPrint(...) \
{ tprintf("MND-SDB ", 255, __VA_ARGS__); }
#define sdbLError(...) taosLogError(__VA_ARGS__) sdbError(__VA_ARGS__)
#define sdbLWarn(...) taosLogWarn(__VA_ARGS__) sdbWarn(__VA_ARGS__)
#define sdbLPrint(...) taosLogPrint(__VA_ARGS__) sdbPrint(__VA_ARGS__)
#ifdef __cplusplus
}
#endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "tmodule.h"
SModule tsModule[TSDB_MOD_MAX] = {0};
uint32_t tsModuleStatus = 0;
\ No newline at end of file
......@@ -23,7 +23,6 @@
#include "taos.h"
#include "tidpool.h"
#include "tlog.h"
#include "tmodule.h"
#include "tutil.h"
#define MAX_MAIN_SCRIPT_NUM 10
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册