Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8fc096e3
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
8fc096e3
编写于
3月 03, 2020
作者:
S
slguan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
sort out mgmt account
上级
14ecbb0f
变更
10
显示空白变更内容
内联
并排
Showing
10 changed file
with
220 addition
and
911 deletion
+220
-911
src/inc/.fuse_hidden0000e2ae00000244
src/inc/.fuse_hidden0000e2ae00000244
+0
-816
src/inc/taosmsg.h
src/inc/taosmsg.h
+2
-2
src/mnode/inc/mgmtAcct.h
src/mnode/inc/mgmtAcct.h
+13
-11
src/mnode/inc/mgmtBalance.h
src/mnode/inc/mgmtBalance.h
+1
-1
src/mnode/inc/mgmtShell.h
src/mnode/inc/mgmtShell.h
+0
-3
src/mnode/src/mgmtAcct.c
src/mnode/src/mgmtAcct.c
+11
-12
src/mnode/src/mgmtBalance.c
src/mnode/src/mgmtBalance.c
+30
-25
src/mnode/src/mgmtMnode.c
src/mnode/src/mgmtMnode.c
+1
-1
src/mnode/src/mgmtShell.c
src/mnode/src/mgmtShell.c
+161
-35
src/mnode/src/mgmtSystem.c
src/mnode/src/mgmtSystem.c
+1
-5
未找到文件。
src/inc/.fuse_hidden0000e2ae00000244
已删除
100644 → 0
浏览文件 @
14ecbb0f
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TAOSMSG_H
#define TDENGINE_TAOSMSG_H
#ifdef __cplusplus
extern "C" {
#endif
#include <stdint.h>
#include <stdbool.h>
#include "taosdef.h"
#include "taoserror.h"
#include "taosdef.h"
#include "trpc.h"
// message type
#define TSDB_MSG_TYPE_REG 1
#define TSDB_MSG_TYPE_REG_RSP 2
#define TSDB_MSG_TYPE_DNODE_SUBMIT 3
#define TSDB_MSG_TYPE_DNODE_SUBMIT_RSP 4
#define TSDB_MSG_TYPE_DNODE_QUERY 5
#define TSDB_MSG_TYPE_DNODE_QUERY_RSP 6
#define TSDB_MSG_TYPE_RETRIEVE 7
#define TSDB_MSG_TYPE_RETRIEVE_RSP 8
#define TSDB_MSG_TYPE_DNODE_CREATE_TABLE 9
#define TSDB_MSG_TYPE_DNODE_CREATE_TABLE_RSP 10
#define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE 11
#define TSDB_MSG_TYPE_DNODE_REMOVE_TABLE_RSP 12
#define TSDB_MSG_TYPE_DNODE_CREATE_VNODE 13
#define TSDB_MSG_TYPE_DNODE_VPEERS_RSP 14
#define TSDB_MSG_TYPE_DNODE_FREE_VNODE 15
#define TSDB_MSG_TYPE_DNODE_FREE_VNODE_RSP 16
#define TSDB_MSG_TYPE_DNODE_CFG 17
#define TSDB_MSG_TYPE_DNODE_CFG_RSP 18
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM 19
#define TSDB_MSG_TYPE_DNODE_ALTER_STREAM_RSP 20
#define TSDB_MSG_TYPE_SDB_SYNC 21
#define TSDB_MSG_TYPE_SDB_SYNC_RSP 22
#define TSDB_MSG_TYPE_SDB_FORWARD 23
#define TSDB_MSG_TYPE_SDB_FORWARD_RSP 24
#define TSDB_MSG_TYPE_CONNECT 31
#define TSDB_MSG_TYPE_CONNECT_RSP 32
#define TSDB_MSG_TYPE_CREATE_ACCT 33
#define TSDB_MSG_TYPE_CREATE_ACCT_RSP 34
#define TSDB_MSG_TYPE_ALTER_ACCT 35
#define TSDB_MSG_TYPE_ALTER_ACCT_RSP 36
#define TSDB_MSG_TYPE_DROP_ACCT 37
#define TSDB_MSG_TYPE_DROP_ACCT_RSP 38
#define TSDB_MSG_TYPE_CREATE_USER 39
#define TSDB_MSG_TYPE_CREATE_USER_RSP 40
#define TSDB_MSG_TYPE_ALTER_USER 41
#define TSDB_MSG_TYPE_ALTER_USER_RSP 42
#define TSDB_MSG_TYPE_DROP_USER 43
#define TSDB_MSG_TYPE_DROP_USER_RSP 44
#define TSDB_MSG_TYPE_CREATE_MNODE 45
#define TSDB_MSG_TYPE_CREATE_MNODE_RSP 46
#define TSDB_MSG_TYPE_DROP_MNODE 47
#define TSDB_MSG_TYPE_DROP_MNODE_RSP 48
#define TSDB_MSG_TYPE_CREATE_DNODE 49
#define TSDB_MSG_TYPE_CREATE_DNODE_RSP 50
#define TSDB_MSG_TYPE_DROP_DNODE 51
#define TSDB_MSG_TYPE_DROP_DNODE_RSP 52
#define TSDB_MSG_TYPE_ALTER_DNODE 53
#define TSDB_MSG_TYPE_ALTER_DNODE_RSP 54
#define TSDB_MSG_TYPE_CREATE_DB 55
#define TSDB_MSG_TYPE_CREATE_DB_RSP 56
#define TSDB_MSG_TYPE_DROP_DB 57
#define TSDB_MSG_TYPE_DROP_DB_RSP 58
#define TSDB_MSG_TYPE_USE_DB 59
#define TSDB_MSG_TYPE_USE_DB_RSP 60
#define TSDB_MSG_TYPE_ALTER_DB 61
#define TSDB_MSG_TYPE_ALTER_DB_RSP 62
#define TSDB_MSG_TYPE_CREATE_TABLE 63
#define TSDB_MSG_TYPE_CREATE_TABLE_RSP 64
#define TSDB_MSG_TYPE_DROP_TABLE 65
#define TSDB_MSG_TYPE_DROP_TABLE_RSP 66
#define TSDB_MSG_TYPE_ALTER_TABLE 67
#define TSDB_MSG_TYPE_ALTER_TABLE_RSP 68
#define TSDB_MSG_TYPE_VNODE_CFG 69
#define TSDB_MSG_TYPE_VNODE_CFG_RSP 70
#define TSDB_MSG_TYPE_TABLE_CFG 71
#define TSDB_MSG_TYPE_TABLE_CFG_RSP 72
#define TSDB_MSG_TYPE_TABLE_META 73
#define TSDB_MSG_TYPE_TABLE_META_RSP 74
#define TSDB_MSG_TYPE_STABLE_META 75
#define TSDB_MSG_TYPE_STABLE_META_RSP 76
#define TSDB_MSG_TYPE_MULTI_TABLE_META 77
#define TSDB_MSG_TYPE_MULTI_TABLE_META_RSP 78
#define TSDB_MSG_TYPE_ALTER_STREAM 79
#define TSDB_MSG_TYPE_ALTER_STREAM_RSP 80
#define TSDB_MSG_TYPE_SHOW 81
#define TSDB_MSG_TYPE_SHOW_RSP 82
#define TSDB_MSG_TYPE_CFG_MNODE 83
#define TSDB_MSG_TYPE_CFG_MNODE_RSP 84
#define TSDB_MSG_TYPE_KILL_QUERY 85
#define TSDB_MSG_TYPE_KILL_QUERY_RSP 86
#define TSDB_MSG_TYPE_KILL_STREAM 87
#define TSDB_MSG_TYPE_KILL_STREAM_RSP 88
#define TSDB_MSG_TYPE_KILL_CONNECTION 89
#define TSDB_MSG_TYPE_KILL_CONNECTION_RSP 90
#define TSDB_MSG_TYPE_HEARTBEAT 91
#define TSDB_MSG_TYPE_HEARTBEAT_RSP 92
#define TSDB_MSG_TYPE_STATUS 93
#define TSDB_MSG_TYPE_STATUS_RSP 94
#define TSDB_MSG_TYPE_GRANT 95
#define TSDB_MSG_TYPE_GRANT_RSP 96
#define TSDB_MSG_TYPE_MAX 97
// IE type
#define TSDB_IE_TYPE_SEC 1
#define TSDB_IE_TYPE_META 2
#define TSDB_IE_TYPE_MGMT_IP 3
#define TSDB_IE_TYPE_DNODE_CFG 4
#define TSDB_IE_TYPE_NEW_VERSION 5
#define TSDB_IE_TYPE_DNODE_EXT 6
#define TSDB_IE_TYPE_DNODE_STATE 7
enum _mgmt_table {
TSDB_MGMT_TABLE_ACCT,
TSDB_MGMT_TABLE_USER,
TSDB_MGMT_TABLE_DB,
TSDB_MGMT_TABLE_TABLE,
TSDB_MGMT_TABLE_DNODE,
TSDB_MGMT_TABLE_MNODE,
TSDB_MGMT_TABLE_VGROUP,
TSDB_MGMT_TABLE_METRIC,
TSDB_MGMT_TABLE_MODULE,
TSDB_MGMT_TABLE_QUERIES,
TSDB_MGMT_TABLE_STREAMS,
TSDB_MGMT_TABLE_CONFIGS,
TSDB_MGMT_TABLE_CONNS,
TSDB_MGMT_TABLE_SCORES,
TSDB_MGMT_TABLE_GRANTS,
TSDB_MGMT_TABLE_VNODES,
TSDB_MGMT_TABLE_MAX,
};
#define TSDB_ALTER_TABLE_ADD_TAG_COLUMN 1
#define TSDB_ALTER_TABLE_DROP_TAG_COLUMN 2
#define TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN 3
#define TSDB_ALTER_TABLE_UPDATE_TAG_VAL 4
#define TSDB_ALTER_TABLE_ADD_COLUMN 5
#define TSDB_ALTER_TABLE_DROP_COLUMN 6
#define TSDB_INTERPO_NONE 0
#define TSDB_INTERPO_NULL 1
#define TSDB_INTERPO_SET_VALUE 2
#define TSDB_INTERPO_LINEAR 3
#define TSDB_INTERPO_PREV 4
#define TSDB_ALTER_USER_PASSWD 0x1
#define TSDB_ALTER_USER_PRIVILEGES 0x2
#define TSDB_KILL_MSG_LEN 30
typedef enum {
TSDB_TABLE_TYPE_SUPER_TABLE = 0, // super table
TSDB_TABLE_TYPE_CHILD_TABLE = 1, // table created from super table
TSDB_TABLE_TYPE_NORMAL_TABLE = 2, // ordinary table
TSDB_TABLE_TYPE_STREAM_TABLE = 3, // table created from stream computing
TSDB_TABLE_TYPE_MAX = 4
} ETableType;
#define TSDB_VN_READ_ACCCESS ((char)0x1)
#define TSDB_VN_WRITE_ACCCESS ((char)0x2)
#define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS)
#define TSDB_COL_NORMAL 0x0U
#define TSDB_COL_TAG 0x1U
#define TSDB_COL_JOIN 0x2U
extern char *taosMsg[];
#pragma pack(push, 1)
typedef struct {
int32_t vnode;
int32_t sid;
int32_t sversion;
uint64_t uid;
int16_t numOfRows;
char payLoad[];
} SShellSubmitBlock;
typedef struct {
int8_t import;
int8_t reserved[3];
int32_t numOfSid; /* total number of sid */
char blks[]; /* numOfSid blocks, each blocks for one table */
} SShellSubmitMsg;
typedef struct {
int32_t index; // index of failed block in submit blocks
int32_t vnode; // vnode index of failed block
int32_t sid; // table index of failed block
int32_t code; // errorcode while write data to vnode, such as not created, dropped, no space, invalid table
} SShellSubmitRspBlock;
typedef struct {
int32_t code; // 0-success, > 0 error code
int32_t numOfRows; // number of records the client is trying to write
int32_t affectedRows; // number of records actually written
int32_t failedRows; // number of failed records (exclude duplicate records)
int32_t numOfFailedBlocks;
SShellSubmitRspBlock failedBlocks[];
} SShellSubmitRspMsg;
typedef struct SSchema {
uint8_t type;
char name[TSDB_COL_NAME_LEN + 1];
int16_t colId;
int16_t bytes;
} SSchema;
typedef struct {
int32_t vnode; //the index of vnode
uint32_t ip;
} SVPeerDesc;
typedef struct {
int8_t tableType;
int16_t numOfColumns;
int16_t numOfTags;
int32_t sid;
int32_t sversion;
int32_t tagDataLen;
int32_t sqlDataLen;
int32_t contLen;
int32_t numOfVPeers;
uint64_t uid;
uint64_t superTableUid;
uint64_t createdTime;
SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS];
char tableId[TSDB_TABLE_ID_LEN + 1];
char superTableId[TSDB_TABLE_ID_LEN + 1];
char data[];
} SDCreateTableMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
char db[TSDB_DB_NAME_LEN + 1];
int8_t igExists;
int16_t numOfTags;
int16_t numOfColumns;
int16_t sqlLen; // the length of SQL, it starts after schema , sql is a null-terminated string
int16_t reserved[16];
SSchema schema[];
} SCreateTableMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t igNotExists;
} SDropTableMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
char db[TSDB_DB_NAME_LEN + 1];
int16_t type; /* operation type */
char tagVal[TSDB_MAX_BYTES_PER_ROW];
int8_t numOfCols; /* number of schema */
SSchema schema[];
} SAlterTableMsg;
typedef struct {
char clientVersion[TSDB_VERSION_LEN];
char msgVersion[TSDB_VERSION_LEN];
char db[TSDB_TABLE_ID_LEN + 1];
} SConnectMsg;
typedef struct {
char acctId[TSDB_ACCT_LEN + 1];
char serverVersion[TSDB_VERSION_LEN];
int8_t writeAuth;
int8_t superAuth;
SRpcIpSet ipList;
} SConnectRsp;
typedef struct {
int32_t maxUsers;
int32_t maxDbs;
int32_t maxTimeSeries;
int32_t maxConnections;
int32_t maxStreams;
int32_t maxPointsPerSecond;
int64_t maxStorage; // In unit of GB
int64_t maxQueryTime; // In unit of hour
int64_t maxInbound;
int64_t maxOutbound;
int8_t accessState; // Configured only by command
} SAcctCfg;
typedef struct {
char user[TSDB_USER_LEN + 1];
char pass[TSDB_KEY_LEN + 1];
SAcctCfg cfg;
} SCreateAcctMsg, SAlterAcctMsg;
typedef struct {
char user[TSDB_USER_LEN + 1];
} SDropUserMsg, SDropAcctMsg;
typedef struct {
char user[TSDB_USER_LEN + 1];
char pass[TSDB_KEY_LEN + 1];
int8_t privilege;
int8_t flag;
} SCreateUserMsg, SAlterUserMsg;
typedef struct {
char db[TSDB_TABLE_ID_LEN + 1];
} SMgmtHead;
typedef struct {
int32_t sid;
int32_t numOfVPeers;
uint64_t uid;
SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS];
char tableId[TSDB_TABLE_ID_LEN + 1];
} SDRemoveTableMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
int64_t uid;
} SDRemoveSuperTableMsg;
typedef struct {
int32_t vnode;
} SFreeVnodeMsg;
typedef struct SColIndexEx {
int16_t colId;
/*
* colIdx is the index of column in latest schema of table
* it is available in the client side. Also used to determine
* whether current table schema is up-to-date.
*
* colIdxInBuf is used to denote the index of column in pQuery->colList,
* this value is invalid in client side, as well as in cache block of vnode either.
*/
int16_t colIdx;
int16_t colIdxInBuf;
uint16_t flag; // denote if it is a tag or not
} SColIndexEx;
/* sql function msg, to describe the message to vnode about sql function
* operations in select clause */
typedef struct SSqlFuncExprMsg {
int16_t functionId;
int16_t numOfParams;
SColIndexEx colInfo;
struct ArgElem {
int16_t argType;
int16_t argBytes;
union {
double d;
int64_t i64;
char * pz;
} argValue;
} arg[3];
} SSqlFuncExprMsg;
typedef struct SSqlBinaryExprInfo {
struct tSQLBinaryExpr *pBinExpr; /* for binary expression */
int32_t numOfCols; /* binary expression involves the readed number of columns*/
SColIndexEx * pReqColumns; /* source column list */
} SSqlBinaryExprInfo;
typedef struct SSqlFunctionExpr {
SSqlFuncExprMsg pBase;
SSqlBinaryExprInfo pBinExprInfo;
int16_t resBytes;
int16_t resType;
int16_t interResBytes;
} SSqlFunctionExpr;
typedef struct SColumnFilterInfo {
int16_t lowerRelOptr;
int16_t upperRelOptr;
int16_t filterOnBinary; /* denote if current column is binary */
union {
struct {
int64_t lowerBndi;
int64_t upperBndi;
};
struct {
double lowerBndd;
double upperBndd;
};
struct {
int64_t pz;
int64_t len;
};
};
} SColumnFilterInfo;
/*
* for client side struct, we only need the column id, type, bytes are not necessary
* But for data in vnode side, we need all the following information.
*/
typedef struct SColumnInfo {
int16_t colId;
int16_t type;
int16_t bytes;
int16_t numOfFilters;
SColumnFilterInfo *filters;
} SColumnInfo;
/*
* enable vnode to understand how to group several tables with different tag;
*/
typedef struct STableSidExtInfo {
int32_t sid;
int64_t uid;
TSKEY key; // key for subscription
char tags[];
} STableSidExtInfo;
/*
* the outputCols is equalled to or larger than numOfCols
* e.g., select min(colName), max(colName), avg(colName) from table
* the outputCols will be 3 while the numOfCols is 1.
*/
typedef struct {
int16_t vnode;
int32_t numOfSids;
uint64_t pSidExtInfo; // table id & tag info ptr, in windows pointer may
uint64_t uid;
TSKEY skey;
TSKEY ekey;
int16_t order;
int16_t orderColId;
int16_t numOfCols; // the number of columns will be load from vnode
char intervalTimeUnit; // time interval type, for revisement of interval(1d)
int64_t nAggTimeInterval; // time interval for aggregation, in million second
int64_t slidingTime; // value for sliding window
// tag schema, used to parse tag information in pSidExtInfo
uint64_t pTagSchema;
int16_t numOfTagsCols; // required number of tags
int16_t tagLength; // tag length in current query
int16_t numOfGroupCols; // num of group by columns
int16_t orderByIdx;
int16_t orderType; // used in group by xx order by xxx
uint64_t groupbyTagIds;
int64_t limit;
int64_t offset;
int16_t queryType; // denote another query process
int16_t numOfOutputCols; // final output columns numbers
int16_t interpoType; // interpolate type
uint64_t defaultVal; // default value array list
int32_t colNameLen;
int64_t colNameList;
int64_t pSqlFuncExprs;
int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed
int32_t tsLen; // total length of ts comp block
int32_t tsNumOfBlocks; // ts comp block numbers
int32_t tsOrder; // ts comp block order
SColumnInfo colList[];
} SQueryTableMsg;
typedef struct {
char code;
uint64_t qhandle;
} SQueryTableRsp;
typedef struct {
uint64_t qhandle;
uint16_t free;
} SRetrieveTableMsg;
typedef struct {
int32_t numOfRows;
int16_t precision;
int64_t offset; // updated offset value for multi-vnode projection query
int64_t useconds;
char data[];
} SRetrieveTableRsp;
typedef struct {
uint32_t vnode;
uint32_t vgId;
uint8_t status;
uint8_t dropStatus;
uint8_t accessState;
int64_t totalStorage;
int64_t compStorage;
int64_t pointsWritten;
uint8_t syncStatus;
uint8_t reserved[15];
} SVnodeLoad;
typedef struct {
uint32_t vnode;
char accessState;
} SVnodeAccess;
/*
* NOTE: sizeof(SVnodeCfg) < TSDB_FILE_HEADER_LEN / 4
*/
typedef struct {
char acct[TSDB_USER_LEN + 1];
char db[TSDB_DB_NAME_LEN + 1];
uint32_t vgId;
int32_t maxSessions;
int32_t cacheBlockSize;
union {
int32_t totalBlocks;
float fraction;
} cacheNumOfBlocks;
int32_t daysPerFile;
int32_t daysToKeep1;
int32_t daysToKeep2;
int32_t daysToKeep;
int32_t commitTime;
int32_t rowsInFileBlock;
int16_t blocksPerTable;
int8_t compression;
int8_t commitLog;
int8_t replications;
int8_t repStrategy;
int8_t loadLatest; // load into mem or not
uint8_t precision; // time resolution
int8_t reserved[16];
} SVnodeCfg, SCreateDbMsg, SDbCfg, SAlterDbMsg;
typedef struct {
char db[TSDB_TABLE_ID_LEN + 1];
uint8_t ignoreNotExists;
} SDropDbMsg, SUseDbMsg;
// IMPORTANT: sizeof(SVnodeStatisticInfo) should not exceed
// TSDB_FILE_HEADER_LEN/4 - TSDB_FILE_HEADER_VERSION_SIZE
typedef struct {
int64_t pointsWritten; // In unit of points
int64_t totalStorage; // In unit of bytes
int64_t compStorage; // In unit of bytes
int64_t queryTime; // In unit of second ??
char reserved[64];
} SVnodeStatisticInfo;
typedef struct {
uint32_t version;
uint32_t publicIp;
uint32_t lastReboot; // time stamp for last reboot
uint16_t numOfCores;
uint8_t alternativeRole;
uint8_t reserve;
uint16_t numOfTotalVnodes; // from config file
uint16_t unused;
float diskAvailable; // GB
uint32_t openVnodes;
char reserved[16];
SVnodeLoad load[];
} SStatusMsg;
typedef struct {
int32_t code;
SRpcIpSet ipList;
} SStatusRsp;
typedef struct {
uint32_t moduleStatus;
uint32_t createdTime;
uint32_t numOfVnodes;
uint32_t reserved;
} SDnodeState;
// internal message
typedef struct {
uint32_t destId;
uint32_t destIp;
char tableId[TSDB_UNI_LEN + 1];
char empty[3];
uint8_t msgType;
int32_t msgLen;
uint8_t content[0];
} SIntMsg;
typedef struct {
char spi;
char encrypt;
char secret[TSDB_KEY_LEN]; // key is changed if updated
char cipheringKey[TSDB_KEY_LEN];
} SSecIe;
typedef struct {
int32_t numOfVPeers;
SVPeerDesc vpeerDesc[];
} SVpeerDescArray;
typedef struct {
int32_t vnode;
SVnodeCfg cfg;
SVPeerDesc vpeerDesc[TSDB_MAX_MPEERS];
} SCreateVnodeMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
int16_t createFlag;
char tags[];
} STableInfoMsg;
typedef struct {
int32_t numOfTables;
char tableIds[];
} SMultiTableInfoMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1];
} SSuperTableInfoMsg;
typedef struct {
int32_t numOfDnodes;
uint32_t dnodeIps[];
} SSuperTableInfoRsp;
typedef struct {
int16_t elemLen;
char tableId[TSDB_TABLE_ID_LEN + 1];
int16_t orderIndex;
int16_t orderType; // used in group by xx order by xxx
int16_t rel; // denotes the relation between condition and table list
int32_t tableCond; // offset value of table name condition
int32_t tableCondLen;
int32_t cond; // offset of column query condition
int32_t condLen;
int16_t tagCols[TSDB_MAX_TAGS + 1]; // required tag columns, plus one is for table name
int16_t numOfTags; // required number of tags
int16_t numOfGroupCols; // num of group by columns
int32_t groupbyTagColumnList;
} SSuperTableMetaElemMsg;
typedef struct {
int32_t numOfTables;
int32_t join;
int32_t joinCondLen; // for join condition
int32_t metaElem[TSDB_MAX_JOIN_TABLE_NUM];
} SSuperTableMetaMsg;
typedef struct {
SVPeerDesc vpeerDesc[TSDB_VNODES_SUPPORT];
int16_t index; // used locally
int32_t numOfSids;
int32_t pSidExtInfoList[]; // offset value of STableSidExtInfo
} SVnodeSidList;
typedef struct {
int32_t numOfTables;
int32_t numOfVnodes;
uint16_t tagLen; /* tag value length */
int32_t list[]; /* offset of SVnodeSidList, compared to the SSuperTableMeta struct */
} SSuperTableMeta;
typedef struct STableMeta {
char tableId[TSDB_TABLE_ID_LEN + 1]; // note: This field must be at the front
int32_t contLen;
uint8_t numOfTags : 6;
uint8_t precision : 2;
uint8_t tableType : 4;
uint8_t index : 4; // used locally
int16_t numOfColumns;
int16_t rowSize; // used locally, calculated in client
int16_t sversion;
int8_t numOfVpeers;
SVPeerDesc vpeerDesc[TSDB_VNODES_SUPPORT];
int32_t sid;
int32_t vgid;
uint64_t uid;
SSchema schema[];
} STableMeta;
typedef struct SMultiTableMeta {
int32_t numOfTables;
int32_t contLen;
STableMeta metas[];
} SMultiTableMeta;
typedef struct {
char name[TSDB_TABLE_ID_LEN + 1];
char data[TSDB_MAX_TAGS_LEN];
} STagData;
/*
* sql: show tables like '%a_%'
* payload is the query condition, e.g., '%a_%'
* payloadLen is the length of payload
*/
typedef struct {
int8_t type;
char db[TSDB_DB_NAME_LEN + 1];
uint16_t payloadLen;
char payload[];
} SShowMsg;
typedef struct {
uint64_t qhandle;
STableMeta tableMeta;
} SShowRsp;
typedef struct {
char ip[32];
} SCreateMnodeMsg, SDropMnodeMsg, SCreateDnodeMsg, SDropDnodeMsg;
typedef struct {
uint32_t dnode;
int32_t vnode;
int32_t sid;
} STableCfgMsg;
typedef struct {
uint32_t dnode;
int32_t vnode;
} SVpeerCfgMsg;
typedef struct {
char ip[32];
char config[64];
} SCfgDnodeMsg;
typedef struct {
char sql[TSDB_SHOW_SQL_LEN + 1];
uint32_t queryId;
int64_t useconds;
int64_t stime;
} SQueryDesc;
typedef struct {
char sql[TSDB_SHOW_SQL_LEN + 1];
uint32_t streamId;
int64_t num; // number of computing/cycles
int64_t useconds;
int64_t ctime;
int64_t stime;
int64_t slidingTime;
int64_t interval;
} SStreamDesc;
typedef struct {
int32_t numOfQueries;
SQueryDesc qdesc[];
} SQqueryList;
typedef struct {
int32_t numOfStreams;
SStreamDesc sdesc[];
} SStreamList;
typedef struct {
SQqueryList qlist;
SStreamList slist;
} SHeartBeatMsg;
typedef struct {
uint32_t queryId;
uint32_t streamId;
int8_t killConnection;
SRpcIpSet ipList;
} SHeartBeatRsp;
typedef struct {
char queryId[TSDB_KILL_MSG_LEN + 1];
} SKillQueryMsg, SKillStreamMsg, SKillConnectionMsg;
typedef struct {
int32_t vnode;
int32_t sid;
uint64_t uid;
uint64_t stime; // stream starting time
int32_t status;
char tableId[TSDB_TABLE_ID_LEN + 1];
} SDAlterStreamMsg;
#pragma pack(pop)
#ifdef __cplusplus
}
#endif
#endif
src/inc/taosmsg.h
浏览文件 @
8fc096e3
src/mnode/inc/mgmtAcct.h
浏览文件 @
8fc096e3
...
...
@@ -22,22 +22,24 @@ extern "C" {
#include "mnode.h"
int32_t
mgmtAddDbIntoAcct
(
SAcctObj
*
pAcct
,
SDbObj
*
pDb
);
int32_t
mgmtRemoveDbFromAcct
(
SAcctObj
*
pAcct
,
SDbObj
*
pDb
);
int32_t
mgmtAddUserIntoAcct
(
SAcctObj
*
pAcct
,
SUserObj
*
pUser
);
int32_t
mgmtRemoveUserFromAcct
(
SAcctObj
*
pAcct
,
SUserObj
*
pUser
);
int32_t
mgmtInitAccts
();
void
mgmtCleanUpAccts
();
SAcctObj
*
mgmtGetAcct
(
char
*
acctName
);
SAcctObj
*
mgmtGetAcct
(
char
*
acctName
);
int32_t
mgmtCheckUserLimit
(
SAcctObj
*
pAcct
);
int32_t
mgmtCheckDbLimit
(
SAcctObj
*
pAcct
);
int32_t
mgmtCheckTableLimit
(
SAcctObj
*
pAcct
,
SCreateTableMsg
*
pCreate
);
int32_t
mgmtCheckTableLimit
(
SAcctObj
*
pAcct
,
int32_t
numOfTimeSeries
);
int32_t
mgmtGetAcctMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
int32_t
mgmtRetrieveAccts
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
void
mgmtDoStatistic
(
void
*
handle
,
void
*
tmrId
);
int32_t
mgmtAddDbIntoAcct
(
SAcctObj
*
pAcct
,
SDbObj
*
pDb
);
int32_t
mgmtRemoveDbFromAcct
(
SAcctObj
*
pAcct
,
SDbObj
*
pDb
);
int32_t
mgmtAddUserIntoAcct
(
SAcctObj
*
pAcct
,
SUserObj
*
pUser
);
int32_t
mgmtRemoveUserFromAcct
(
SAcctObj
*
pAcct
,
SUserObj
*
pUser
);
extern
int32_t
(
*
mgmtCreateAcctFp
)(
char
*
name
,
char
*
pass
,
SAcctCfg
*
pCfg
);
extern
int32_t
(
*
mgmtDropAcctFp
)(
char
*
name
);
extern
int32_t
(
*
mgmtAlterAcctFp
)(
char
*
name
,
char
*
pass
,
SAcctCfg
*
pCfg
);
#ifdef __cplusplus
}
...
...
src/mnode/inc/mgmtBalance.h
浏览文件 @
8fc096e3
...
...
@@ -32,7 +32,7 @@ extern char* (*mgmtGetVnodeStatus)(SVgObj *pVgroup, SVnodeGid *pVnode);
extern
bool
(
*
mgmtCheckVnodeReady
)(
SDnodeObj
*
pDnode
,
SVgObj
*
pVgroup
,
SVnodeGid
*
pVnode
);
extern
void
(
*
mgmtUpdateDnodeState
)(
SDnodeObj
*
pDnode
,
int
lbStatus
);
extern
void
(
*
mgmtUpdateVgroupState
)(
SVgObj
*
pVgroup
,
int
lbStatus
,
int
srcIp
);
extern
bool
(
*
mgmtAddVnode
)
(
SVgObj
*
pVgroup
,
SDnodeObj
*
pSrcDnode
,
SDnodeObj
*
pDestDnode
);
bool
mgmtAddVnode
(
SVgObj
*
pVgroup
,
SDnodeObj
*
pSrcDnode
,
SDnodeObj
*
pDestDnode
);
#ifdef __cplusplus
}
...
...
src/mnode/inc/mgmtShell.h
浏览文件 @
8fc096e3
...
...
@@ -28,13 +28,10 @@ int32_t mgmtInitShell();
void
mgmtCleanUpShell
();
extern
int32_t
(
*
mgmtCheckRedirectMsg
)(
void
*
pConn
);
extern
void
(
*
mgmtProcessAlterAcctMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
);
extern
void
(
*
mgmtProcessCreateDnodeMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
);
extern
void
(
*
mgmtProcessCfgMnodeMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
);
extern
void
(
*
mgmtProcessDropMnodeMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
);
extern
void
(
*
mgmtProcessDropDnodeMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
);
extern
void
(
*
mgmtProcessDropAcctMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
);
extern
void
(
*
mgmtProcessCreateAcctMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
);
/*
* If table not exist, will create it
...
...
src/mnode/src/mgmtAcct.c
浏览文件 @
8fc096e3
...
...
@@ -28,13 +28,17 @@ static SAcctObj tsAcctObj;
int32_t
(
*
mgmtInitAcctsFp
)()
=
NULL
;
void
(
*
mgmtCleanUpAcctsFp
)()
=
NULL
;
int32_t
(
*
mgmtCreateAcctFp
)(
char
*
name
,
char
*
pass
,
SAcctCfg
*
pCfg
)
=
NULL
;
int32_t
(
*
mgmtDropAcctFp
)(
char
*
name
)
=
NULL
;
int32_t
(
*
mgmtAlterAcctFp
)(
char
*
name
,
char
*
pass
,
SAcctCfg
*
pCfg
)
=
NULL
;
int32_t
(
*
mgmtGetAcctMetaFp
)(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
=
NULL
;
int32_t
(
*
mgmtRetrieveAcctsFp
)(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
=
NULL
;
SAcctObj
*
(
*
mgmtGetAcctFp
)(
char
*
acctName
)
=
NULL
;
int32_t
(
*
mgmtCheckUserLimitFp
)(
SAcctObj
*
pAcct
)
=
NULL
;
int32_t
(
*
mgmtCheckDbLimitFp
)(
SAcctObj
*
pAcct
)
=
NULL
;
int32_t
(
*
mgmtCheckTableLimitFp
)(
SAcctObj
*
pAcct
,
SCreateTableMsg
*
pCreate
)
=
NULL
;
int32_t
(
*
mgmtGetAcctMetaFp
)(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
=
NULL
;
int32_t
(
*
mgmtRetrieveAcctsFp
)(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
=
NULL
;
void
(
*
mgmtDoStatisticFp
)(
void
*
handle
,
void
*
tmrId
)
=
NULL
;
int32_t
(
*
mgmtCheckTimeSeriesLimitFp
)(
SAcctObj
*
pAcct
,
int32_t
numOfTimeSeries
)
=
NULL
;
int32_t
mgmtAddDbIntoAcct
(
SAcctObj
*
pAcct
,
SDbObj
*
pDb
)
{
pthread_mutex_lock
(
&
pAcct
->
mutex
);
...
...
@@ -152,9 +156,9 @@ int32_t mgmtCheckDbLimit(SAcctObj *pAcct) {
}
}
int32_t
mgmtCheckTableLimit
(
SAcctObj
*
pAcct
,
SCreateTableMsg
*
pCreate
)
{
if
(
mgmtCheckT
able
LimitFp
)
{
return
mgmtCheckT
ableLimitFp
(
pAcct
,
pCreate
);
int32_t
mgmtCheckTableLimit
(
SAcctObj
*
pAcct
,
int32_t
numOfTimeSeries
)
{
if
(
mgmtCheckT
imeSeries
LimitFp
)
{
return
mgmtCheckT
imeSeriesLimitFp
(
pAcct
,
numOfTimeSeries
);
}
else
{
return
0
;
}
...
...
@@ -194,8 +198,3 @@ SAcctObj *mgmtGetAcctFromConn(void *pConn) {
return
NULL
;
}
void
mgmtDoStatistic
(
void
*
handle
,
void
*
tmrId
)
{
if
(
mgmtDoStatisticFp
)
{
mgmtDoStatisticFp
(
handle
,
tmrId
);
}
}
\ No newline at end of file
src/mnode/src/mgmtBalance.c
浏览文件 @
8fc096e3
...
...
@@ -31,27 +31,28 @@ void mgmtCleanupBalanceImp() {}
void
(
*
mgmtCleanupBalance
)()
=
mgmtCleanupBalanceImp
;
int32_t
mgmtAllocVnodesImp
(
SVgObj
*
pVgroup
)
{
int
selectedVnode
=
-
1
;
int
lastAllocVode
=
pDnode
->
lastAllocVnode
;
for
(
int
i
=
0
;
i
<
pDnode
->
numOfVnodes
;
i
++
)
{
int
vnode
=
(
i
+
lastAllocVode
)
%
pDnode
->
numOfVnodes
;
if
(
pDnode
->
vload
[
vnode
].
vgId
==
0
&&
pDnode
->
vload
[
vnode
].
status
==
TSDB_VN_STATUS_OFFLINE
)
{
selectedVnode
=
vnode
;
break
;
}
}
if
(
selectedVnode
==
-
1
)
{
mError
(
"vgroup:%d alloc vnode failed, free vnodes:%d"
,
pVgroup
->
vgId
,
pDnode
->
numOfFreeVnodes
);
return
-
1
;
}
else
{
mTrace
(
"vgroup:%d allocate vnode:%d, last allocated vnode:%d"
,
pVgroup
->
vgId
,
selectedVnode
,
lastAllocVode
);
pVgroup
->
vnodeGid
[
0
].
vnode
=
selectedVnode
;
pDnode
->
lastAllocVnode
=
selectedVnode
+
1
;
if
(
pDnode
->
lastAllocVnode
>=
pDnode
->
numOfVnodes
)
pDnode
->
lastAllocVnode
=
0
;
// int selectedVnode = -1;
// int lastAllocVode = pDnode->lastAllocVnode;
//
// for (int i = 0; i < pDnode->numOfVnodes; i++) {
// int vnode = (i + lastAllocVode) % pDnode->numOfVnodes;
// if (pDnode->vload[vnode].vgId == 0 && pDnode->vload[vnode].status == TSDB_VN_STATUS_OFFLINE) {
// selectedVnode = vnode;
// break;
// }
// }
//
// if (selectedVnode == -1) {
// mError("vgroup:%d alloc vnode failed, free vnodes:%d", pVgroup->vgId, pDnode->numOfFreeVnodes);
// return -1;
// } else {
// mTrace("vgroup:%d allocate vnode:%d, last allocated vnode:%d", pVgroup->vgId, selectedVnode, lastAllocVode);
// pVgroup->vnodeGid[0].vnode = selectedVnode;
// pDnode->lastAllocVnode = selectedVnode + 1;
// if (pDnode->lastAllocVnode >= pDnode->numOfVnodes) pDnode->lastAllocVnode = 0;
// return 0;
// }
return
0
;
}
}
int32_t
(
*
mgmtAllocVnodes
)(
SVgObj
*
pVgroup
)
=
mgmtAllocVnodesImp
;
...
...
@@ -83,9 +84,13 @@ void mgmtUpdateVgroupStateImp(SVgObj *pVgroup, int lbStatus, int srcIp) {
void
(
*
mgmtUpdateVgroupState
)(
SVgObj
*
pVgroup
,
int
lbStatus
,
int
srcIp
)
=
mgmtUpdateVgroupStateImp
;
bool
mgmtAddVnodeImp
(
SVgObj
*
pVgroup
,
SDnodeObj
*
pSrcDnode
,
SDnodeObj
*
pDestDnode
)
{
bool
(
*
mgmtAddVnodeFp
)(
SVgObj
*
pVgroup
,
SDnodeObj
*
pSrcDnode
,
SDnodeObj
*
pDestDnode
)
=
NULL
;
bool
mgmtAddVnode
(
SVgObj
*
pVgroup
,
SDnodeObj
*
pSrcDnode
,
SDnodeObj
*
pDestDnode
)
{
if
(
mgmtAddVnodeFp
)
{
return
mgmtAddVnodeFp
(
pVgroup
,
pSrcDnode
,
pDestDnode
);
}
else
{
return
false
;
}
}
bool
(
*
mgmtAddVnode
)(
SVgObj
*
pVgroup
,
SDnodeObj
*
pSrcDnode
,
SDnodeObj
*
pDestDnode
)
=
mgmtAddVnodeImp
;
src/mnode/src/mgmtMnode.c
浏览文件 @
8fc096e3
...
...
@@ -137,7 +137,7 @@ static void *mgmtGetNextMnode(SShowObj *pShow, SSdbPeer **pMnode) {
return
mgmtGetNextMnodeFp
(
pShow
,
pMnode
);
}
else
{
if
(
*
pMnode
==
NULL
)
{
*
pMnode
=
&
tsMnodeObj
;
*
pMnode
=
NULL
;
}
else
{
*
pMnode
=
NULL
;
}
...
...
src/mnode/src/mgmtShell.c
浏览文件 @
8fc096e3
...
...
@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "tlog.h"
#include "trpc.h"
#include "tstatus.h"
...
...
@@ -1048,38 +1049,6 @@ static void mgmtProcessMsgFromShell(char type, void *pCont, int contLen, void *a
// rpcFreeCont(pCont);
}
void
mgmtInitProcessShellMsg
()
{
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_CONNECT
]
=
mgmtProcessConnectMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_HEARTBEAT
]
=
mgmtProcessHeartBeatMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_CREATE_DB
]
=
mgmtProcessCreateDbMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_ALTER_DB
]
=
mgmtProcessAlterDbMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_DROP_DB
]
=
mgmtProcessDropDbMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_USE_DB
]
=
mgmtProcessUnSupportMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_CREATE_USER
]
=
mgmtProcessCreateUserMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_ALTER_USER
]
=
mgmtProcessAlterUserMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_DROP_USER
]
=
mgmtProcessDropUserMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_CREATE_ACCT
]
=
mgmtProcessCreateAcctMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_DROP_ACCT
]
=
mgmtProcessDropAcctMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_ALTER_ACCT
]
=
mgmtProcessAlterAcctMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_CREATE_TABLE
]
=
mgmtProcessCreateTableMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_DROP_TABLE
]
=
mgmtProcessDropTableMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_ALTER_TABLE
]
=
mgmtProcessAlterTableMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_CREATE_DNODE
]
=
mgmtProcessCreateDnodeMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_DROP_DNODE
]
=
mgmtProcessDropDnodeMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_DNODE_CFG
]
=
mgmtProcessCfgDnodeMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_CREATE_MNODE
]
=
mgmtProcessUnSupportMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_DROP_MNODE
]
=
mgmtProcessDropMnodeMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_CFG_MNODE
]
=
mgmtProcessCfgMnodeMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_KILL_QUERY
]
=
mgmtProcessKillQueryMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_KILL_STREAM
]
=
mgmtProcessKillStreamMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_KILL_CONNECTION
]
=
mgmtProcessKillConnectionMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_SHOW
]
=
mgmtProcessShowMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_RETRIEVE
]
=
mgmtProcessRetrieveMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_TABLE_META
]
=
mgmtProcessTableMetaMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_MULTI_TABLE_META
]
=
mgmtProcessMultiTableMetaMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_STABLE_META
]
=
mgmtProcessSuperTableMetaMsg
;
}
void
mgmtProcessCreateVgroup
(
SCreateTableMsg
*
pCreate
,
int32_t
contLen
,
void
*
thandle
,
bool
isGetMeta
)
{
SDbObj
*
pDb
=
mgmtGetDb
(
pCreate
->
db
);
if
(
pDb
==
NULL
)
{
...
...
@@ -1195,10 +1164,167 @@ static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle)
rpcSendResponse
(
ahandle
,
TSDB_CODE_OPS_NOT_SUPPORT
,
NULL
,
0
);
}
void
(
*
mgmtProcessAlterAcctMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
)
=
mgmtProcessUnSupportMsg
;
void
(
*
mgmtProcessCreateDnodeMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
)
=
mgmtProcessUnSupportMsg
;
void
(
*
mgmtProcessCfgMnodeMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
)
=
mgmtProcessUnSupportMsg
;
void
(
*
mgmtProcessDropMnodeMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
)
=
mgmtProcessUnSupportMsg
;
void
(
*
mgmtProcessDropDnodeMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
)
=
mgmtProcessUnSupportMsg
;
void
(
*
mgmtProcessDropAcctMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
)
=
mgmtProcessUnSupportMsg
;
void
(
*
mgmtProcessCreateAcctMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
)
=
mgmtProcessUnSupportMsg
;
\ No newline at end of file
static
void
mgmtProcessAlterAcctMsg
(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
)
{
if
(
!
mgmtAlterAcctFp
)
{
rpcSendResponse
(
ahandle
,
TSDB_CODE_OPS_NOT_SUPPORT
,
NULL
,
0
);
return
;
}
SAlterAcctMsg
*
pAlter
=
pCont
;
pAlter
->
cfg
.
maxUsers
=
htonl
(
pAlter
->
cfg
.
maxUsers
);
pAlter
->
cfg
.
maxDbs
=
htonl
(
pAlter
->
cfg
.
maxDbs
);
pAlter
->
cfg
.
maxTimeSeries
=
htonl
(
pAlter
->
cfg
.
maxTimeSeries
);
pAlter
->
cfg
.
maxConnections
=
htonl
(
pAlter
->
cfg
.
maxConnections
);
pAlter
->
cfg
.
maxStreams
=
htonl
(
pAlter
->
cfg
.
maxStreams
);
pAlter
->
cfg
.
maxPointsPerSecond
=
htonl
(
pAlter
->
cfg
.
maxPointsPerSecond
);
pAlter
->
cfg
.
maxStorage
=
htobe64
(
pAlter
->
cfg
.
maxStorage
);
pAlter
->
cfg
.
maxQueryTime
=
htobe64
(
pAlter
->
cfg
.
maxQueryTime
);
pAlter
->
cfg
.
maxInbound
=
htobe64
(
pAlter
->
cfg
.
maxInbound
);
pAlter
->
cfg
.
maxOutbound
=
htobe64
(
pAlter
->
cfg
.
maxOutbound
);
if
(
mgmtCheckRedirectMsg
(
ahandle
)
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"account:%s, failed to alter account, need redirect message"
,
pAlter
->
user
);
return
;
}
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
ahandle
);
if
(
pUser
==
NULL
)
{
mError
(
"account:%s, failed to alter account, invalid user"
,
pAlter
->
user
);
rpcSendResponse
(
ahandle
,
TSDB_CODE_INVALID_USER
,
NULL
,
0
);
return
;
}
if
(
strcmp
(
pUser
->
user
,
"root"
)
!=
0
)
{
mError
(
"account:%s, failed to alter account, no rights"
,
pAlter
->
user
);
rpcSendResponse
(
ahandle
,
TSDB_CODE_NO_RIGHTS
,
NULL
,
0
);
return
;
}
int32_t
code
=
mgmtAlterAcctFp
(
pAlter
->
user
,
pAlter
->
pass
,
&
(
pAlter
->
cfg
));;
if
(
code
==
TSDB_CODE_SUCCESS
)
{
mLPrint
(
"account:%s is altered by %s"
,
pAlter
->
user
,
pUser
->
user
);
}
else
{
mError
(
"account:%s, failed to alter account, reason:%s"
,
pAlter
->
user
,
tstrerror
(
code
));
}
rpcSendResponse
(
ahandle
,
code
,
NULL
,
0
);
}
static
void
mgmtProcessDropAcctMsg
(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
)
{
if
(
!
mgmtDropAcctFp
)
{
rpcSendResponse
(
ahandle
,
TSDB_CODE_OPS_NOT_SUPPORT
,
NULL
,
0
);
return
;
}
SDropAcctMsg
*
pDrop
=
(
SDropAcctMsg
*
)
pCont
;
if
(
mgmtCheckRedirectMsg
(
ahandle
)
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"account:%s, failed to drop account, need redirect message"
,
pDrop
->
user
);
return
;
}
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
ahandle
);
if
(
pUser
==
NULL
)
{
mError
(
"account:%s, failed to drop account, invalid user"
,
pDrop
->
user
);
rpcSendResponse
(
ahandle
,
TSDB_CODE_INVALID_USER
,
NULL
,
0
);
return
;
}
if
(
strcmp
(
pUser
->
user
,
"root"
)
!=
0
)
{
mError
(
"account:%s, failed to drop account, no rights"
,
pDrop
->
user
);
rpcSendResponse
(
ahandle
,
TSDB_CODE_NO_RIGHTS
,
NULL
,
0
);
return
;
}
int32_t
code
=
mgmtDropAcctFp
(
pDrop
->
user
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
mLPrint
(
"account:%s is dropped by %s"
,
pDrop
->
user
,
pUser
->
user
);
}
else
{
mError
(
"account:%s, failed to drop account, reason:%s"
,
pDrop
->
user
,
tstrerror
(
code
));
}
rpcSendResponse
(
ahandle
,
code
,
NULL
,
0
);
}
static
void
mgmtProcessCreateAcctMsg
(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
)
{
if
(
!
mgmtCreateAcctFp
)
{
rpcSendResponse
(
ahandle
,
TSDB_CODE_OPS_NOT_SUPPORT
,
NULL
,
0
);
return
;
}
SCreateAcctMsg
*
pCreate
=
(
SCreateAcctMsg
*
)
pCont
;
pCreate
->
cfg
.
maxUsers
=
htonl
(
pCreate
->
cfg
.
maxUsers
);
pCreate
->
cfg
.
maxDbs
=
htonl
(
pCreate
->
cfg
.
maxDbs
);
pCreate
->
cfg
.
maxTimeSeries
=
htonl
(
pCreate
->
cfg
.
maxTimeSeries
);
pCreate
->
cfg
.
maxConnections
=
htonl
(
pCreate
->
cfg
.
maxConnections
);
pCreate
->
cfg
.
maxStreams
=
htonl
(
pCreate
->
cfg
.
maxStreams
);
pCreate
->
cfg
.
maxPointsPerSecond
=
htonl
(
pCreate
->
cfg
.
maxPointsPerSecond
);
pCreate
->
cfg
.
maxStorage
=
htobe64
(
pCreate
->
cfg
.
maxStorage
);
pCreate
->
cfg
.
maxQueryTime
=
htobe64
(
pCreate
->
cfg
.
maxQueryTime
);
pCreate
->
cfg
.
maxInbound
=
htobe64
(
pCreate
->
cfg
.
maxInbound
);
pCreate
->
cfg
.
maxOutbound
=
htobe64
(
pCreate
->
cfg
.
maxOutbound
);
if
(
mgmtCheckRedirectMsg
(
ahandle
)
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"account:%s, failed to create account, need redirect message"
,
pCreate
->
user
);
return
;
}
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
ahandle
);
if
(
pUser
==
NULL
)
{
mError
(
"account:%s, failed to create account, invalid user"
,
pCreate
->
user
);
rpcSendResponse
(
ahandle
,
TSDB_CODE_INVALID_USER
,
NULL
,
0
);
return
;
}
if
(
strcmp
(
pUser
->
user
,
"root"
)
!=
0
)
{
mError
(
"account:%s, failed to create account, no rights"
,
pCreate
->
user
);
rpcSendResponse
(
ahandle
,
TSDB_CODE_NO_RIGHTS
,
NULL
,
0
);
return
;
}
int32_t
code
=
mgmtCreateAcctFp
(
pCreate
->
user
,
pCreate
->
pass
,
&
(
pCreate
->
cfg
));
if
(
code
==
TSDB_CODE_SUCCESS
)
{
mLPrint
(
"account:%s is created by %s"
,
pCreate
->
user
,
pUser
->
user
);
}
else
{
mError
(
"account:%s, failed to create account, reason:%s"
,
pCreate
->
user
,
tstrerror
(
code
));
}
rpcSendResponse
(
ahandle
,
code
,
NULL
,
0
);
}
void
mgmtInitProcessShellMsg
()
{
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_CONNECT
]
=
mgmtProcessConnectMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_HEARTBEAT
]
=
mgmtProcessHeartBeatMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_CREATE_DB
]
=
mgmtProcessCreateDbMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_ALTER_DB
]
=
mgmtProcessAlterDbMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_DROP_DB
]
=
mgmtProcessDropDbMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_USE_DB
]
=
mgmtProcessUnSupportMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_CREATE_USER
]
=
mgmtProcessCreateUserMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_ALTER_USER
]
=
mgmtProcessAlterUserMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_DROP_USER
]
=
mgmtProcessDropUserMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_CREATE_ACCT
]
=
mgmtProcessCreateAcctMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_DROP_ACCT
]
=
mgmtProcessDropAcctMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_ALTER_ACCT
]
=
mgmtProcessAlterAcctMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_CREATE_TABLE
]
=
mgmtProcessCreateTableMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_DROP_TABLE
]
=
mgmtProcessDropTableMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_ALTER_TABLE
]
=
mgmtProcessAlterTableMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_CREATE_DNODE
]
=
mgmtProcessCreateDnodeMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_DROP_DNODE
]
=
mgmtProcessDropDnodeMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_DNODE_CFG
]
=
mgmtProcessCfgDnodeMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_CREATE_MNODE
]
=
mgmtProcessUnSupportMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_DROP_MNODE
]
=
mgmtProcessDropMnodeMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_CFG_MNODE
]
=
mgmtProcessCfgMnodeMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_KILL_QUERY
]
=
mgmtProcessKillQueryMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_KILL_STREAM
]
=
mgmtProcessKillStreamMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_KILL_CONNECTION
]
=
mgmtProcessKillConnectionMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_SHOW
]
=
mgmtProcessShowMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_RETRIEVE
]
=
mgmtProcessRetrieveMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_TABLE_META
]
=
mgmtProcessTableMetaMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_MULTI_TABLE_META
]
=
mgmtProcessMultiTableMetaMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_STABLE_META
]
=
mgmtProcessSuperTableMetaMsg
;
}
src/mnode/src/mgmtSystem.c
浏览文件 @
8fc096e3
...
...
@@ -33,12 +33,11 @@
char
tsMgmtDirectory
[
128
]
=
{
0
};
void
*
tsMgmtTmr
=
NULL
;
void
*
tsMgmtTranQhandle
=
NULL
;
void
*
tsMgmtStatisTimer
=
NULL
;
void
mgmtCleanUpSystem
()
{
mPrint
(
"starting to clean up mgmt"
);
taosTmrStopA
(
&
tsMgmtStatisTimer
);
mgmtCleanUpRedirect
();
sdbCleanUpPeers
();
mgmtCleanupBalance
();
...
...
@@ -140,9 +139,6 @@ int32_t mgmtStartSystem() {
mError
(
"failed to init dnode balance"
)
}
if
(
mgmtDoStatistic
)
{
taosTmrReset
(
mgmtDoStatistic
,
tsStatusInterval
*
30000
,
NULL
,
tsMgmtTmr
,
&
tsMgmtStatisTimer
);
}
mPrint
(
"TDengine mgmt is initialized successfully"
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录