Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
3562f612
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3562f612
编写于
3月 07, 2020
作者:
H
hzcheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '2.0' into feature/2.0tsdb
上级
0f4c3330
0cb93b06
变更
34
隐藏空白更改
内联
并排
Showing
34 changed file
with
424 addition
and
400 deletion
+424
-400
src/CMakeLists.txt
src/CMakeLists.txt
+6
-6
src/dnode/inc/dnodeMgmt.h
src/dnode/inc/dnodeMgmt.h
+1
-3
src/dnode/src/dnodeMgmt.c
src/dnode/src/dnodeMgmt.c
+3
-5
src/dnode/src/dnodeModule.c
src/dnode/src/dnodeModule.c
+0
-32
src/dnode/src/dnodeSystem.c
src/dnode/src/dnodeSystem.c
+15
-9
src/inc/dnode.h
src/inc/dnode.h
+0
-2
src/inc/mnode.h
src/inc/mnode.h
+1
-0
src/inc/sdb.h
src/inc/sdb.h
+0
-15
src/inc/taosmsg.h
src/inc/taosmsg.h
+0
-12
src/mnode/CMakeLists.txt
src/mnode/CMakeLists.txt
+1
-1
src/mnode/inc/mgmtBalance.h
src/mnode/inc/mgmtBalance.h
+0
-1
src/mnode/inc/mgmtConn.h
src/mnode/inc/mgmtConn.h
+0
-36
src/mnode/inc/mgmtDnode.h
src/mnode/inc/mgmtDnode.h
+7
-0
src/mnode/inc/mgmtGrant.h
src/mnode/inc/mgmtGrant.h
+1
-0
src/mnode/inc/mgmtMnode.h
src/mnode/inc/mgmtMnode.h
+4
-1
src/mnode/inc/mgmtProfile.h
src/mnode/inc/mgmtProfile.h
+8
-6
src/mnode/inc/mgmtShell.h
src/mnode/inc/mgmtShell.h
+0
-2
src/mnode/src/mgmtBalance.c
src/mnode/src/mgmtBalance.c
+0
-8
src/mnode/src/mgmtConn.c
src/mnode/src/mgmtConn.c
+0
-153
src/mnode/src/mgmtDb.c
src/mnode/src/mgmtDb.c
+2
-2
src/mnode/src/mgmtDnode.c
src/mnode/src/mgmtDnode.c
+20
-0
src/mnode/src/mgmtDnodeInt.c
src/mnode/src/mgmtDnodeInt.c
+1
-1
src/mnode/src/mgmtGrant.c
src/mnode/src/mgmtGrant.c
+14
-5
src/mnode/src/mgmtMnode.c
src/mnode/src/mgmtMnode.c
+42
-31
src/mnode/src/mgmtProfile.c
src/mnode/src/mgmtProfile.c
+135
-4
src/mnode/src/mgmtShell.c
src/mnode/src/mgmtShell.c
+70
-4
src/mnode/src/mgmtSystem.c
src/mnode/src/mgmtSystem.c
+0
-4
src/mnode/src/mgmtVgroup.c
src/mnode/src/mgmtVgroup.c
+21
-0
src/plugins/http/CMakeLists.txt
src/plugins/http/CMakeLists.txt
+1
-1
src/plugins/http/src/httpSystem.c
src/plugins/http/src/httpSystem.c
+15
-7
src/sdb/CMakeLists.txt
src/sdb/CMakeLists.txt
+1
-1
src/sdb/inc/sdbint.h
src/sdb/inc/sdbint.h
+17
-24
src/sdb/src/sdbEngine.c
src/sdb/src/sdbEngine.c
+0
-1
src/sdb/src/sdbstr.c
src/sdb/src/sdbstr.c
+38
-23
未找到文件。
src/CMakeLists.txt
浏览文件 @
3562f612
...
...
@@ -4,11 +4,11 @@ PROJECT(TDengine)
ADD_SUBDIRECTORY
(
os
)
ADD_SUBDIRECTORY
(
util
)
ADD_SUBDIRECTORY
(
rpc
)
#
ADD_SUBDIRECTORY(client)
#
ADD_SUBDIRECTORY(kit)
#
ADD_SUBDIRECTORY(plugins)
#
ADD_SUBDIRECTORY(sdb)
#
ADD_SUBDIRECTORY(mnode)
ADD_SUBDIRECTORY
(
client
)
ADD_SUBDIRECTORY
(
kit
)
ADD_SUBDIRECTORY
(
plugins
)
ADD_SUBDIRECTORY
(
sdb
)
ADD_SUBDIRECTORY
(
mnode
)
ADD_SUBDIRECTORY
(
vnode
)
#
ADD_SUBDIRECTORY(dnode)
ADD_SUBDIRECTORY
(
dnode
)
#ADD_SUBDIRECTORY(connector/jdbc)
src/dnode/inc/dnodeMgmt.h
浏览文件 @
3562f612
...
...
@@ -26,15 +26,13 @@ extern "C" {
int32_t
dnodeInitMgmt
();
void
dnodeInitMgmtIp
();
void
dnodeProcessMsgFromMgmt
(
int8_t
msgType
,
void
*
pCont
,
int32_t
contLen
,
void
*
pConn
,
int32_t
code
);
void
dnodeProcessMsgFromMgmt
(
char
msgType
,
void
*
pCont
,
int32_t
contLen
,
void
*
pConn
,
int32_t
code
);
void
dnodeSendMsgToMnode
(
int8_t
msgType
,
void
*
pCont
,
int32_t
contLen
);
void
dnodeSendRspToMnode
(
void
*
pConn
,
int8_t
msgType
,
int32_t
code
,
void
*
pCont
,
int32_t
contLen
);
void
dnodeSendVnodeCfgMsg
(
int32_t
vnode
);
void
dnodeSendTableCfgMsg
(
int32_t
vnode
,
int32_t
sid
);
#ifdef __cplusplus
}
#endif
...
...
src/dnode/src/dnodeMgmt.c
浏览文件 @
3562f612
...
...
@@ -30,12 +30,10 @@
void
(
*
dnodeInitMgmtIpFp
)()
=
NULL
;
int32_t
(
*
dnodeInitMgmtFp
)()
=
NULL
;
void
(
*
dnodeCleanUpMgmtFp
)()
=
NULL
;
void
(
*
dnodeProcessStatusRspFp
)(
int8_t
*
pCont
,
int32_t
contLen
,
int8_t
msgType
,
void
*
pConn
)
=
NULL
;
void
(
*
dnodeProcessStatusRspFp
)(
void
*
pCont
,
int32_t
contLen
,
int8_t
msgType
,
void
*
pConn
)
=
NULL
;
void
(
*
dnodeSendMsgToMnodeFp
)(
int8_t
msgType
,
void
*
pCont
,
int32_t
contLen
)
=
NULL
;
void
(
*
dnodeSendRspToMnodeFp
)(
void
*
handle
,
int32_t
code
,
void
*
pCont
,
int
contLen
)
=
NULL
;
static
void
*
tsStatusTimer
=
NULL
;
static
void
(
*
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MAX
])(
void
*
pCont
,
int32_t
contLen
,
int8_t
msgType
,
void
*
pConn
);
static
void
dnodeInitProcessShellMsg
();
...
...
@@ -173,13 +171,13 @@ void dnodeCleanUpMgmt() {
}
}
void
dnodeProcessMsgFromMgmt
(
int8_t
msgType
,
void
*
pCont
,
int32_t
contLen
,
void
*
pConn
,
int32_t
code
)
{
void
dnodeProcessMsgFromMgmt
(
char
msgType
,
void
*
pCont
,
int32_t
contLen
,
void
*
pConn
,
int32_t
code
)
{
if
(
msgType
<
0
||
msgType
>=
TSDB_MSG_TYPE_MAX
)
{
dError
(
"invalid msg type:%d"
,
msgType
);
return
;
}
dTrace
(
"msg:%d:%s is received from mgmt, pConn:%p"
,
msgType
,
taosMsg
[
msgType
],
pConn
);
dTrace
(
"msg:%d:%s is received from mgmt, pConn:%p"
,
msgType
,
taosMsg
[
(
int8_t
)
msgType
],
pConn
);
if
(
msgType
==
TSDB_MSG_TYPE_STATUS_RSP
&&
dnodeProcessStatusRspFp
!=
NULL
)
{
dnodeProcessStatusRspFp
(
pCont
,
contLen
,
msgType
,
pConn
);
...
...
src/dnode/src/dnodeModule.c
浏览文件 @
3562f612
...
...
@@ -68,38 +68,6 @@ void dnodeCleanUpModules() {
}
}
void
dnodeProcessModuleStatus
(
uint32_t
status
)
{
if
(
dnodeGetRunStatus
()
!=
TSDB_DNODE_RUN_STATUS_RUNING
)
{
return
;
}
int
news
=
status
;
int
olds
=
tsModuleStatus
;
for
(
int
moduleType
=
0
;
moduleType
<
TSDB_MOD_MAX
;
++
moduleType
)
{
int
newStatus
=
news
&
(
1
<<
moduleType
);
int
oldStatus
=
olds
&
(
1
<<
moduleType
);
if
(
oldStatus
>
0
)
{
if
(
newStatus
==
0
)
{
if
(
tsModule
[
moduleType
].
stopFp
)
{
dPrint
(
"module:%s is stopped on this node"
,
tsModule
[
moduleType
].
name
);
(
*
tsModule
[
moduleType
].
stopFp
)();
}
}
}
else
if
(
oldStatus
==
0
)
{
if
(
newStatus
>
0
)
{
if
(
tsModule
[
moduleType
].
startFp
)
{
dPrint
(
"module:%s is started on this node"
,
tsModule
[
moduleType
].
name
);
(
*
tsModule
[
moduleType
].
startFp
)();
}
}
}
else
{
}
}
tsModuleStatus
=
status
;
}
int32_t
dnodeInitModules
()
{
for
(
int
mod
=
0
;
mod
<
TSDB_MOD_MAX
;
++
mod
)
{
if
(
tsModule
[
mod
].
num
!=
0
&&
tsModule
[
mod
].
initFp
)
{
...
...
src/dnode/src/dnodeSystem.c
浏览文件 @
3562f612
...
...
@@ -33,12 +33,14 @@
#include "dnodeVnodeMgmt.h"
#ifdef CLUSTER
//#include "acct.h"
//#include "admin.h"
//#include "cluster.h"
//#include "grant.h"
//#include "replica.h"
//#include "storage.h"
#include "account.h"
#include "admin.h"
#include "balance.h"
#include "cluster.h"
#include "grant.h"
#include "mpeer.h"
#include "storage.h"
#include "vpeer.h"
#endif
static
pthread_mutex_t
tsDnodeMutex
;
...
...
@@ -89,8 +91,6 @@ void dnodeCleanUpSystem() {
dnodeSetRunStatus
(
TSDB_DNODE_RUN_STATUS_STOPPED
);
}
dnodeCleanupShell
();
dnodeCleanUpModules
();
dnodeCleanupVnodes
();
...
...
@@ -112,7 +112,13 @@ void dnodeCheckDataDirOpenned(const char *dir) {
void
dnodeInitPlugins
()
{
#ifdef CLUSTER
acctInit
();
// acctInit();
// adminInit();
// balanceInit();
// clusterInit();
// grantInit();
// mpeerInit();
// storageInit();
#endif
}
...
...
src/inc/dnode.h
浏览文件 @
3562f612
...
...
@@ -49,8 +49,6 @@ extern int32_t (*dnodeCheckSystem)();
extern
void
*
tsDnodeMgmtQhandle
;
void
dnodeCheckDataDirOpenned
(
const
char
*
dir
);
void
dnodeProcessMsgFromMgmt
(
int8_t
msgType
,
void
*
pCont
,
int32_t
contLen
,
void
*
pConn
,
int32_t
code
);
// dnodeModule
extern
void
(
*
dnodeStartModules
)();
...
...
src/inc/mnode.h
浏览文件 @
3562f612
...
...
@@ -264,6 +264,7 @@ void mgmtCleanUpSystem();
void
mgmtStopSystem
();
void
mgmtProcessMsgFromDnode
(
char
msgType
,
void
*
pCont
,
int32_t
contLen
,
void
*
pConn
,
int32_t
code
);
void
dnodeProcessMsgFromMgmt
(
char
msgType
,
void
*
pCont
,
int32_t
contLen
,
void
*
pConn
,
int32_t
code
);
#ifdef __cplusplus
}
...
...
src/inc/sdb.h
浏览文件 @
3562f612
...
...
@@ -25,18 +25,10 @@ extern "C" {
extern
uint16_t
tsMgmtMgmtPort
;
extern
uint16_t
tsMgmtSyncPort
;
extern
int
sdbMaxNodes
;
extern
int
tsMgmtPeerHBTimer
;
// seconds
extern
char
sdbZone
[];
extern
char
sdbMasterIp
[];
extern
char
sdbPrivateIp
[];
extern
char
*
sdbStatusStr
[];
extern
char
*
sdbRoleStr
[];
extern
void
*
mnodeSdb
;
extern
int
sdbExtConns
;
extern
int
sdbMaster
;
extern
uint32_t
sdbPublicIp
;
extern
uint32_t
sdbMasterStartTime
;
extern
SRpcIpSet
*
pSdbIpList
;
extern
SRpcIpSet
*
pSdbPublicIpList
;
...
...
@@ -89,14 +81,9 @@ typedef struct {
// internal
int
syncFd
;
void
*
hbTimer
;
void
*
thandle
;
void
*
pSync
;
}
SSdbPeer
;
SSdbPeer
*
sdbAddPeer
(
uint32_t
ip
,
uint32_t
publicIp
,
char
role
);
void
sdbUpdateIpList
();
extern
SSdbPeer
*
sdbPeer
[];
#define sdbInited (sdbPeer[0])
#define sdbStatus (sdbPeer[0]->status)
...
...
@@ -130,8 +117,6 @@ int sdbInitPeers(char *directory);
void
sdbCleanUpPeers
();
int
sdbCfgNode
(
char
*
cont
);
int64_t
sdbGetVersion
();
int32_t
sdbGetRunStatus
();
...
...
src/inc/taosmsg.h
浏览文件 @
3562f612
...
...
@@ -595,23 +595,11 @@ typedef struct {
typedef
struct
{
int32_t
code
;
int32_t
numOfVnodes
;
SDnodeState
dnodeState
;
SRpcIpSet
ipList
;
SVnodeAccess
vnodeAccess
[];
}
SStatusRsp
;
// internal message
typedef
struct
{
uint32_t
destId
;
uint32_t
destIp
;
char
tableId
[
TSDB_UNI_LEN
+
1
];
char
empty
[
3
];
uint8_t
msgType
;
int32_t
msgLen
;
uint8_t
content
[
0
];
}
SIntMsg
;
typedef
struct
{
char
spi
;
char
encrypt
;
...
...
src/mnode/CMakeLists.txt
浏览文件 @
3562f612
...
...
@@ -14,7 +14,7 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
TARGET_LINK_LIBRARIES
(
mnode trpc tutil sdb pthread
)
IF
(
TD_CLUSTER
)
TARGET_LINK_LIBRARIES
(
mnode
acct
)
TARGET_LINK_LIBRARIES
(
mnode
)
ENDIF
()
ENDIF
()
...
...
src/mnode/inc/mgmtBalance.h
浏览文件 @
3562f612
...
...
@@ -22,7 +22,6 @@ extern "C" {
#include "mnode.h"
void
mgmtStartBalanceTimer
(
int64_t
mseconds
);
int32_t
mgmtInitBalance
();
void
mgmtCleanupBalance
();
int32_t
mgmtAllocVnodes
(
SVgObj
*
pVgroup
);
...
...
src/mnode/inc/mgmtConn.h
已删除
100644 → 0
浏览文件 @
0f4c3330
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_MGMT_CONN_H
#define TDENGINE_MGMT_CONN_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "mnode.h"
int
mgmtGetConnsMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
int
mgmtRetrieveConns
(
SShowObj
*
pShow
,
char
*
data
,
int
rows
,
void
*
pConn
);
bool
mgmtCheckQhandle
(
uint64_t
qhandle
);
void
mgmtSaveQhandle
(
void
*
qhandle
);
void
mgmtFreeQhandle
(
void
*
qhandle
);
#ifdef __cplusplus
}
#endif
#endif
src/mnode/inc/mgmtDnode.h
浏览文件 @
3562f612
...
...
@@ -50,9 +50,16 @@ int32_t mgmtGetDnodesNum();
int32_t
mgmtUpdateDnode
(
SDnodeObj
*
pDnode
);
void
*
mgmtGetNextDnode
(
SShowObj
*
pShow
,
SDnodeObj
**
pDnode
);
bool
mgmtCheckConfigShow
(
SGlobalConfig
*
cfg
);
bool
mgmtCheckDnodeInRemoveState
(
SDnodeObj
*
pDnode
);
bool
mgmtCheckDnodeInOfflineState
(
SDnodeObj
*
pDnode
);
void
mgmtSetDnodeUnRemove
(
SDnodeObj
*
pDnode
);
SDnodeObj
*
mgmtGetDnode
(
uint32_t
ip
);
extern
int32_t
(
*
mgmtCreateDnodeFp
)(
uint32_t
ip
);
extern
int32_t
(
*
mgmtDropDnodeByIpFp
)(
uint32_t
ip
);
void
mgmtCalcNumOfFreeVnodes
(
SDnodeObj
*
pDnode
);
#ifdef __cplusplus
}
#endif
...
...
src/mnode/inc/mgmtGrant.h
浏览文件 @
3562f612
...
...
@@ -30,6 +30,7 @@ void mgmtRestoreTimeSeries(SAcctObj *pAcct, uint32_t timeseries);
int32_t
mgmtCheckTimeSeries
(
uint32_t
timeseries
);
int32_t
mgmtCheckUserGrant
();
int32_t
mgmtCheckDbGrant
();
int32_t
mgmtCheckDnodeGrant
();
int32_t
mgmtGetGrantsMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
int32_t
mgmtRetrieveGrants
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
...
...
src/mnode/inc/mgmtMnode.h
浏览文件 @
3562f612
...
...
@@ -24,7 +24,10 @@ extern "C" {
#include <stdbool.h>
#include "mnode.h"
int32_t
mgmtGetMnodeMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
int32_t
mgmtAddMnode
(
uint32_t
privateIp
,
uint32_t
publicIp
);
int32_t
mgmtRemoveMnode
(
uint32_t
privateIp
);
int32_t
mgmtGetMnodeMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
int32_t
mgmtRetrieveMnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
#ifdef __cplusplus
...
...
src/mnode/inc/mgmtProfile.h
浏览文件 @
3562f612
...
...
@@ -22,20 +22,22 @@ extern "C" {
#include "mnode.h"
int32_t
mgmtGetQueryMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
int32_t
mgmtGetStreamMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
bool
mgmtCheckQhandle
(
uint64_t
qhandle
);
void
mgmtSaveQhandle
(
void
*
qhandle
);
void
mgmtFreeQhandle
(
void
*
qhandle
);
int32_t
mgmtSaveQueryStreamList
(
SHeartBeatMsg
*
pHBMsg
);
int32_t
mgmtGetQueryMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
int32_t
mgmtRetrieveQueries
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
int32_t
mgmtGetStreamMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
int32_t
mgmtRetrieveStreams
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
int32_t
mgmtSaveQueryStreamList
(
SHeartBeatMsg
*
pHBMsg
);
int32_t
mgmtGetConnsMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
int32_t
mgmtRetrieveConns
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
int32_t
mgmtKillQuery
(
char
*
qidstr
,
void
*
pConn
);
int32_t
mgmtKillStream
(
char
*
qidstr
,
void
*
pConn
);
int32_t
mgmtKillConnection
(
char
*
qidstr
,
void
*
pConn
);
enum
{
...
...
src/mnode/inc/mgmtShell.h
浏览文件 @
3562f612
...
...
@@ -28,10 +28,8 @@ int32_t mgmtInitShell();
void
mgmtCleanUpShell
();
extern
int32_t
(
*
mgmtCheckRedirectMsg
)(
void
*
pConn
);
extern
void
(
*
mgmtProcessCreateDnodeMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
);
extern
void
(
*
mgmtProcessCfgMnodeMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
);
extern
void
(
*
mgmtProcessDropMnodeMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
);
extern
void
(
*
mgmtProcessDropDnodeMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
);
/*
* If table not exist, will create it
...
...
src/mnode/src/mgmtBalance.c
浏览文件 @
3562f612
...
...
@@ -77,11 +77,3 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) {
return
0
;
}
}
char
*
mgmtGetVnodeStatus
(
SVgObj
*
pVgroup
,
SVnodeGid
*
pVnode
)
{
if
(
mgmtGetVnodeStatusFp
)
{
return
(
*
mgmtGetVnodeStatusFp
)(
pVgroup
,
pVnode
);
}
else
{
return
"master"
;
}
}
src/mnode/src/mgmtConn.c
已删除
100644 → 0
浏览文件 @
0f4c3330
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "mgmtConn.h"
#include "taosmsg.h"
#include "tschemautil.h"
typedef
struct
{
char
user
[
TSDB_TABLE_ID_LEN
];
uint64_t
stime
;
uint32_t
ip
;
uint16_t
port
;
}
SConnInfo
;
typedef
struct
{
int
numOfConns
;
int
index
;
SConnInfo
connInfo
[];
}
SConnShow
;
int
mgmtGetConns
(
SShowObj
*
pShow
,
void
*
pConn
)
{
// SAcctObj * pAcct = pConn->pAcct;
// SConnShow *pConnShow;
//
// pthread_mutex_lock(&pAcct->mutex);
//
// pConnShow = malloc(sizeof(SConnInfo) * pAcct->acctInfo.numOfConns + sizeof(SConnShow));
// pConnShow->index = 0;
// pConnShow->numOfConns = 0;
//
// if (pAcct->acctInfo.numOfConns > 0) {
// pConn = pAcct->pConn;
// SConnInfo *pConnInfo = pConnShow->connInfo;
//
// while (pConn && pConn->pUser) {
// strcpy(pConnInfo->user, pConn->pUser->user);
// pConnInfo->ip = pConn->ip;
// pConnInfo->port = pConn->port;
// pConnInfo->stime = pConn->stime;
//
// pConnShow->numOfConns++;
// pConnInfo++;
// pConn = pConn->next;
// }
// }
//
// pthread_mutex_unlock(&pAcct->mutex);
//
// // sorting based on useconds
//
// pShow->pNode = pConnShow;
return
0
;
}
int
mgmtGetConnsMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
int
cols
=
0
;
pShow
->
bytes
[
cols
]
=
TSDB_TABLE_NAME_LEN
;
SSchema
*
pSchema
=
tsGetSchema
(
pMeta
);
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"user"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
TSDB_IPv4ADDR_LEN
+
6
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"ip:port"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
8
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
strcpy
(
pSchema
[
cols
].
name
,
"login time"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pMeta
->
numOfColumns
=
htons
(
cols
);
pShow
->
numOfColumns
=
cols
;
pShow
->
offset
[
0
]
=
0
;
for
(
int
i
=
1
;
i
<
cols
;
++
i
)
pShow
->
offset
[
i
]
=
pShow
->
offset
[
i
-
1
]
+
pShow
->
bytes
[
i
-
1
];
pShow
->
numOfRows
=
1000000
;
pShow
->
pNode
=
NULL
;
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
mgmtGetConns
(
pShow
,
pConn
);
return
0
;
}
int
mgmtRetrieveConns
(
SShowObj
*
pShow
,
char
*
data
,
int
rows
,
void
*
pConn
)
{
int
numOfRows
=
0
;
char
*
pWrite
;
int
cols
=
0
;
SConnShow
*
pConnShow
=
(
SConnShow
*
)
pShow
->
pNode
;
if
(
rows
>
pConnShow
->
numOfConns
-
pConnShow
->
index
)
rows
=
pConnShow
->
numOfConns
-
pConnShow
->
index
;
while
(
numOfRows
<
rows
)
{
SConnInfo
*
pNode
=
pConnShow
->
connInfo
+
pConnShow
->
index
;
cols
=
0
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
pNode
->
user
);
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
uint32_t
ip
=
pNode
->
ip
;
sprintf
(
pWrite
,
"%d.%d.%d.%d:%hu"
,
ip
&
0xFF
,
(
ip
>>
8
)
&
0xFF
,
(
ip
>>
16
)
&
0xFF
,
ip
>>
24
,
htons
(
pNode
->
port
));
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int64_t
*
)
pWrite
=
pNode
->
stime
;
cols
++
;
numOfRows
++
;
pConnShow
->
index
++
;
}
if
(
numOfRows
==
0
)
{
tfree
(
pConnShow
);
}
pShow
->
numOfReads
+=
numOfRows
;
return
numOfRows
;
}
bool
mgmtCheckQhandle
(
uint64_t
qhandle
)
{
return
true
;
}
void
mgmtSaveQhandle
(
void
*
qhandle
)
{
}
void
mgmtFreeQhandle
(
void
*
qhandle
)
{
}
\ No newline at end of file
src/mnode/src/mgmtDb.c
浏览文件 @
3562f612
...
...
@@ -455,9 +455,9 @@ int32_t mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) {
//
// SVgObj *pVgroup = pDb->pHead;
// while (pVgroup != NULL) {
//
mgmt
UpdateVgroupState(pVgroup, TSDB_VG_LB_STATUS_UPDATE, 0);
//
balance
UpdateVgroupState(pVgroup, TSDB_VG_LB_STATUS_UPDATE, 0);
// if (oldReplicaNum < pDb->cfg.replications) {
// if (!
mgmt
AddVnode(pVgroup, NULL, NULL)) {
// if (!
balance
AddVnode(pVgroup, NULL, NULL)) {
// mWarn("db:%s vgroup:%d not enough dnode to add vnode", pAlter->db, pVgroup->vgId);
// code = TSDB_CODE_NO_ENOUGH_DNODES;
// }
...
...
src/mnode/src/mgmtDnode.c
浏览文件 @
3562f612
...
...
@@ -33,6 +33,9 @@ void * (*mgmtGetNextDnodeFp)(SShowObj *pShow, SDnodeObj **pDnode) = NULL;
int32_t
(
*
mgmtGetScoresMetaFp
)(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
=
NULL
;
int32_t
(
*
mgmtRetrieveScoresFp
)(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
=
NULL
;
void
(
*
mgmtSetDnodeUnRemoveFp
)(
SDnodeObj
*
pDnode
)
=
NULL
;
int32_t
(
*
mgmtCreateDnodeFp
)(
uint32_t
ip
)
=
NULL
;
int32_t
(
*
mgmtDropDnodeByIpFp
)(
uint32_t
ip
)
=
NULL
;
static
SDnodeObj
tsDnodeObj
=
{
0
};
...
...
@@ -606,6 +609,9 @@ void *mgmtGetNextDnode(SShowObj *pShow, SDnodeObj **pDnode) {
int32_t
mgmtGetScoresMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
if
(
mgmtGetScoresMetaFp
)
{
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
);
if
(
pUser
==
NULL
)
return
0
;
if
(
strcmp
(
pUser
->
user
,
"root"
)
!=
0
)
return
TSDB_CODE_NO_RIGHTS
;
return
mgmtGetScoresMetaFp
(
pMeta
,
pShow
,
pConn
);
}
else
{
return
TSDB_CODE_OPS_NOT_SUPPORT
;
...
...
@@ -633,3 +639,17 @@ bool mgmtCheckConfigShow(SGlobalConfig *cfg) {
return
false
;
return
true
;
}
/**
* check if a dnode in remove state
**/
bool
mgmtCheckDnodeInRemoveState
(
SDnodeObj
*
pDnode
)
{
return
pDnode
->
lbStatus
==
TSDB_DN_LB_STATUS_OFFLINE_REMOVING
||
pDnode
->
lbStatus
==
TSDB_DN_LB_STATE_SHELL_REMOVING
;
}
/**
* check if a dnode in offline state
**/
bool
mgmtCheckDnodeInOfflineState
(
SDnodeObj
*
pDnode
)
{
return
pDnode
->
status
==
TSDB_DN_STATUS_OFFLINE
;
}
src/mnode/src/mgmtDnodeInt.c
浏览文件 @
3562f612
...
...
@@ -312,7 +312,7 @@ int32_t mgmtCfgDynamicOptions(SDnodeObj *pDnode, char *msg) {
mTrace
(
"dnode:%s, custom score set from:%d to:%d"
,
taosIpStr
(
pDnode
->
privateIp
),
pDnode
->
customScore
,
score
);
pDnode
->
customScore
=
score
;
mgmtUpdateDnode
(
pDnode
);
mgmtStartBalanceTimer
(
15
);
//
mgmtStartBalanceTimer(15);
}
return
TSDB_CODE_INVALID_SQL
;
}
else
if
(
strncasecmp
(
option
,
"bandwidth"
,
9
)
==
0
)
{
...
...
src/mnode/src/mgmtGrant.c
浏览文件 @
3562f612
...
...
@@ -22,6 +22,7 @@
int32_t
(
*
mgmtCheckUserGrantFp
)()
=
NULL
;
int32_t
(
*
mgmtCheckDbGrantFp
)()
=
NULL
;
int32_t
(
*
mgmtCheckDnodeGrantFp
)()
=
NULL
;
void
(
*
mgmtAddTimeSeriesFp
)(
uint32_t
timeSeriesNum
)
=
NULL
;
void
(
*
mgmtRestoreTimeSeriesFp
)(
uint32_t
timeSeriesNum
)
=
NULL
;
int32_t
(
*
mgmtCheckTimeSeriesFp
)(
uint32_t
timeseries
)
=
NULL
;
...
...
@@ -32,7 +33,7 @@ void (*mgmtUpdateGrantInfoFp)(void *pCont) = NULL;
int32_t
mgmtCheckUserGrant
()
{
if
(
mgmtCheckUserGrantFp
)
{
return
mgmtCheckUserGrantFp
();
return
(
*
mgmtCheckUserGrantFp
)
();
}
else
{
return
0
;
}
...
...
@@ -40,7 +41,15 @@ int32_t mgmtCheckUserGrant() {
int32_t
mgmtCheckDbGrant
()
{
if
(
mgmtCheckDbGrantFp
)
{
return
mgmtCheckDbGrantFp
();
return
(
*
mgmtCheckDbGrantFp
)();
}
else
{
return
0
;
}
}
int32_t
mgmtCheckDnodeGrant
()
{
if
(
mgmtCheckDnodeGrantFp
)
{
return
(
*
mgmtCheckDnodeGrantFp
)();
}
else
{
return
0
;
}
...
...
@@ -49,20 +58,20 @@ int32_t mgmtCheckDbGrant() {
void
mgmtAddTimeSeries
(
SAcctObj
*
pAcct
,
uint32_t
timeSeriesNum
)
{
pAcct
->
acctInfo
.
numOfTimeSeries
+=
timeSeriesNum
;
if
(
mgmtAddTimeSeriesFp
)
{
mgmtAddTimeSeriesFp
(
timeSeriesNum
);
(
*
mgmtAddTimeSeriesFp
)
(
timeSeriesNum
);
}
}
void
mgmtRestoreTimeSeries
(
SAcctObj
*
pAcct
,
uint32_t
timeSeriesNum
)
{
pAcct
->
acctInfo
.
numOfTimeSeries
-=
timeSeriesNum
;
if
(
mgmtRestoreTimeSeriesFp
)
{
mgmtRestoreTimeSeriesFp
(
timeSeriesNum
);
(
*
mgmtRestoreTimeSeriesFp
)
(
timeSeriesNum
);
}
}
int32_t
mgmtCheckTimeSeries
(
uint32_t
timeseries
)
{
if
(
mgmtCheckTimeSeriesFp
)
{
return
mgmtCheckTimeSeriesFp
(
timeseries
);
return
(
*
mgmtCheckTimeSeriesFp
)
(
timeseries
);
}
else
{
return
0
;
}
...
...
src/mnode/src/mgmtMnode.c
浏览文件 @
3562f612
...
...
@@ -18,12 +18,48 @@
#include "mgmtMnode.h"
#include "mgmtUser.h"
void
*
(
*
mgmtGetNextMnodeFp
)(
SShowObj
*
pShow
,
SSdbPeer
**
pMnode
)
=
NULL
;
int32_t
(
*
mgmt
InitMnodesFp
)(
)
=
NULL
;
int32_t
(
*
mgmtAddMnodeFp
)(
uint32_t
privateIp
,
uint32_t
publicIp
)
=
NULL
;
int32_t
(
*
mgmt
RemoveMnodeFp
)(
uint32_t
privateIp
)
=
NULL
;
int32_t
(
*
mgmtGetMnodesNumFp
)()
=
NULL
;
void
*
(
*
mgmtGetNextMnodeFp
)(
SShowObj
*
pShow
,
SSdbPeer
**
pMnode
)
=
NULL
;
static
int32_t
mgmtGetMnodesNum
();
static
void
*
mgmtGetNextMnode
(
SShowObj
*
pShow
,
SSdbPeer
**
pMnode
);
int32_t
mgmtAddMnode
(
uint32_t
privateIp
,
uint32_t
publicIp
)
{
if
(
mgmtAddMnodeFp
)
{
return
(
*
mgmtAddMnodeFp
)(
privateIp
,
publicIp
);
}
else
{
return
0
;
}
}
int32_t
mgmtRemoveMnode
(
uint32_t
privateIp
)
{
if
(
mgmtRemoveMnodeFp
)
{
return
(
*
mgmtRemoveMnodeFp
)(
privateIp
);
}
else
{
return
0
;
}
}
static
int32_t
mgmtGetMnodesNum
()
{
if
(
mgmtGetMnodesNumFp
)
{
return
(
*
mgmtGetMnodesNumFp
)();
}
else
{
return
1
;
}
}
static
void
*
mgmtGetNextMnode
(
SShowObj
*
pShow
,
SSdbPeer
**
pMnode
)
{
if
(
mgmtGetNextMnodeFp
)
{
return
(
*
mgmtGetNextMnodeFp
)(
pShow
,
pMnode
);
}
else
{
if
(
*
pMnode
==
NULL
)
{
*
pMnode
=
NULL
;
}
else
{
*
pMnode
=
NULL
;
}
}
return
*
pMnode
;
}
int32_t
mgmtGetMnodeMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
int32_t
cols
=
0
;
...
...
@@ -88,11 +124,8 @@ int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon
char
ipstr
[
20
];
while
(
numOfRows
<
rows
)
{
pShow
->
pNode
=
mgmtGetNextMnode
(
pShow
,
(
SDnodeObj
**
)
&
pMnode
);
// pShow->pNode = sdbFetchRow(mnodeSdb, pShow->pNode, (void **)&pMnode);
// if (pMnode == NULL) break;
pShow
->
pNode
=
mgmtGetNextMnode
(
pShow
,
(
SSdbPeer
**
)
&
pMnode
);
if
(
pMnode
==
NULL
)
break
;
cols
=
0
;
...
...
@@ -123,25 +156,3 @@ int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, void *pCon
pShow
->
numOfReads
+=
numOfRows
;
return
numOfRows
;
}
static
int32_t
mgmtGetMnodesNum
()
{
if
(
mgmtGetMnodesNumFp
)
{
return
mgmtGetMnodesNumFp
();
}
else
{
return
1
;
}
}
static
void
*
mgmtGetNextMnode
(
SShowObj
*
pShow
,
SSdbPeer
**
pMnode
)
{
if
(
mgmtGetNextMnodeFp
)
{
return
mgmtGetNextMnodeFp
(
pShow
,
pMnode
);
}
else
{
if
(
*
pMnode
==
NULL
)
{
*
pMnode
=
NULL
;
}
else
{
*
pMnode
=
NULL
;
}
}
return
*
pMnode
;
}
\ No newline at end of file
src/mnode/src/mgmtProfile.c
浏览文件 @
3562f612
...
...
@@ -15,16 +15,27 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "mnode.h"
#include "mgmtProfile.h"
#include "taosmsg.h"
#include "tschemautil.h"
#include "mgmtProfile.h"
typedef
struct
{
char
user
[
TSDB_TABLE_ID_LEN
+
1
];
uint64_t
stime
;
uint32_t
ip
;
uint16_t
port
;
}
SConnInfo
;
typedef
struct
{
int
numOfConns
;
int
index
;
SConnInfo
connInfo
[];
}
SConnShow
;
typedef
struct
{
uint32_t
ip
;
uint16_t
port
;
char
user
[
TSDB_TABLE_ID_LEN
];
char
user
[
TSDB_TABLE_ID_LEN
+
1
];
}
SCDesc
;
typedef
struct
{
...
...
@@ -532,3 +543,123 @@ int32_t mgmtKillConnection(char *qidstr, void *pConn) {
return
TSDB_CODE_INVALID_CONNECTION
;
}
bool
mgmtCheckQhandle
(
uint64_t
qhandle
)
{
return
true
;
}
void
mgmtSaveQhandle
(
void
*
qhandle
)
{
}
void
mgmtFreeQhandle
(
void
*
qhandle
)
{
}
int
mgmtGetConns
(
SShowObj
*
pShow
,
void
*
pConn
)
{
// SAcctObj * pAcct = pConn->pAcct;
// SConnShow *pConnShow;
//
// pthread_mutex_lock(&pAcct->mutex);
//
// pConnShow = malloc(sizeof(SConnInfo) * pAcct->acctInfo.numOfConns + sizeof(SConnShow));
// pConnShow->index = 0;
// pConnShow->numOfConns = 0;
//
// if (pAcct->acctInfo.numOfConns > 0) {
// pConn = pAcct->pConn;
// SConnInfo *pConnInfo = pConnShow->connInfo;
//
// while (pConn && pConn->pUser) {
// strcpy(pConnInfo->user, pConn->pUser->user);
// pConnInfo->ip = pConn->ip;
// pConnInfo->port = pConn->port;
// pConnInfo->stime = pConn->stime;
//
// pConnShow->numOfConns++;
// pConnInfo++;
// pConn = pConn->next;
// }
// }
//
// pthread_mutex_unlock(&pAcct->mutex);
//
// // sorting based on useconds
//
// pShow->pNode = pConnShow;
return
0
;
}
int32_t
mgmtGetConnsMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
int32_t
cols
=
0
;
pShow
->
bytes
[
cols
]
=
TSDB_TABLE_NAME_LEN
;
SSchema
*
pSchema
=
tsGetSchema
(
pMeta
);
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"user"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
TSDB_IPv4ADDR_LEN
+
6
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"ip:port"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
8
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
strcpy
(
pSchema
[
cols
].
name
,
"login time"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pMeta
->
numOfColumns
=
htons
(
cols
);
pShow
->
numOfColumns
=
cols
;
pShow
->
offset
[
0
]
=
0
;
for
(
int
i
=
1
;
i
<
cols
;
++
i
)
pShow
->
offset
[
i
]
=
pShow
->
offset
[
i
-
1
]
+
pShow
->
bytes
[
i
-
1
];
pShow
->
numOfRows
=
1000000
;
pShow
->
pNode
=
NULL
;
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
mgmtGetConns
(
pShow
,
pConn
);
return
0
;
}
int32_t
mgmtRetrieveConns
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
int32_t
numOfRows
=
0
;
char
*
pWrite
;
int32_t
cols
=
0
;
SConnShow
*
pConnShow
=
(
SConnShow
*
)
pShow
->
pNode
;
if
(
rows
>
pConnShow
->
numOfConns
-
pConnShow
->
index
)
rows
=
pConnShow
->
numOfConns
-
pConnShow
->
index
;
while
(
numOfRows
<
rows
)
{
SConnInfo
*
pNode
=
pConnShow
->
connInfo
+
pConnShow
->
index
;
cols
=
0
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
pNode
->
user
);
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
uint32_t
ip
=
pNode
->
ip
;
sprintf
(
pWrite
,
"%d.%d.%d.%d:%hu"
,
ip
&
0xFF
,
(
ip
>>
8
)
&
0xFF
,
(
ip
>>
16
)
&
0xFF
,
ip
>>
24
,
htons
(
pNode
->
port
));
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int64_t
*
)
pWrite
=
pNode
->
stime
;
cols
++
;
numOfRows
++
;
pConnShow
->
index
++
;
}
if
(
numOfRows
==
0
)
{
tfree
(
pConnShow
);
}
pShow
->
numOfReads
+=
numOfRows
;
return
numOfRows
;
}
src/mnode/src/mgmtShell.c
浏览文件 @
3562f612
...
...
@@ -26,7 +26,6 @@
#include "mgmtAcct.h"
#include "mgmtBalance.h"
#include "mgmtChildTable.h"
#include "mgmtConn.h"
#include "mgmtDb.h"
#include "mgmtDnode.h"
#include "mgmtDnodeInt.h"
...
...
@@ -52,7 +51,6 @@ static void (*mgmtProcessShellMsg[TSDB_MSG_TYPE_MAX])(void *pCont, int32_t contL
static
void
mgmtProcessUnSupportMsg
(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
);
static
int
mgmtRetriveUserAuthInfo
(
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
uint32_t
mgmtAccessSquence
=
0
;
void
*
tsShellConnServer
=
NULL
;
void
mgmtProcessTranRequest
(
SSchedMsg
*
sched
)
{
...
...
@@ -1164,10 +1162,8 @@ static void mgmtProcessUnSupportMsg(void *pCont, int32_t contLen, void *ahandle)
rpcSendResponse
(
ahandle
,
TSDB_CODE_OPS_NOT_SUPPORT
,
NULL
,
0
);
}
void
(
*
mgmtProcessCreateDnodeMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
)
=
mgmtProcessUnSupportMsg
;
void
(
*
mgmtProcessCfgMnodeMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
)
=
mgmtProcessUnSupportMsg
;
void
(
*
mgmtProcessDropMnodeMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
)
=
mgmtProcessUnSupportMsg
;
void
(
*
mgmtProcessDropDnodeMsg
)(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
)
=
mgmtProcessUnSupportMsg
;
static
void
mgmtProcessAlterAcctMsg
(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
)
{
if
(
!
mgmtAlterAcctFp
)
{
...
...
@@ -1297,6 +1293,76 @@ static void mgmtProcessCreateAcctMsg(void *pCont, int32_t contLen, void *ahandle
rpcSendResponse
(
ahandle
,
code
,
NULL
,
0
);
}
static
void
mgmtProcessCreateDnodeMsg
(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
)
{
if
(
!
mgmtCreateDnodeFp
)
{
rpcSendResponse
(
ahandle
,
TSDB_CODE_OPS_NOT_SUPPORT
,
NULL
,
0
);
return
;
}
SCreateDnodeMsg
*
pCreate
=
(
SCreateDnodeMsg
*
)
pCont
;
if
(
mgmtCheckRedirectMsg
(
ahandle
)
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"failed to create dnode:%s, redirect this message"
,
pCreate
->
ip
);
return
;
}
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
ahandle
);
if
(
pUser
==
NULL
)
{
mError
(
"failed to create dnode:%s, reason:%s"
,
pCreate
->
ip
,
tstrerror
(
TSDB_CODE_INVALID_USER
));
rpcSendResponse
(
ahandle
,
TSDB_CODE_INVALID_USER
,
NULL
,
0
);
return
;
}
if
(
strcmp
(
pUser
->
user
,
"root"
)
!=
0
)
{
mError
(
"failed to create dnode:%s, reason:%s"
,
pCreate
->
ip
,
tstrerror
(
TSDB_CODE_NO_RIGHTS
));
rpcSendResponse
(
ahandle
,
TSDB_CODE_NO_RIGHTS
,
NULL
,
0
);
return
;
}
int32_t
code
=
(
*
mgmtCreateDnodeFp
)(
inet_addr
(
pCreate
->
ip
));
if
(
code
==
TSDB_CODE_SUCCESS
)
{
mLPrint
(
"dnode:%s is created by %s"
,
pCreate
->
ip
,
pUser
->
user
);
}
else
{
mError
(
"failed to create dnode:%s, reason:%s"
,
pCreate
->
ip
,
tstrerror
(
code
));
}
rpcSendResponse
(
ahandle
,
code
,
NULL
,
0
);
}
static
void
mgmtProcessDropDnodeMsg
(
void
*
pCont
,
int32_t
contLen
,
void
*
ahandle
)
{
if
(
!
mgmtDropDnodeByIpFp
)
{
rpcSendResponse
(
ahandle
,
TSDB_CODE_OPS_NOT_SUPPORT
,
NULL
,
0
);
return
;
}
SDropDnodeMsg
*
pDrop
=
(
SDropDnodeMsg
*
)
pCont
;
if
(
mgmtCheckRedirectMsg
(
ahandle
)
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"failed to drop dnode:%s, redirect this message"
,
pDrop
->
ip
);
return
;
}
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
ahandle
);
if
(
pUser
==
NULL
)
{
mError
(
"failed to drop dnode:%s, reason:%s"
,
pDrop
->
ip
,
tstrerror
(
TSDB_CODE_INVALID_USER
));
rpcSendResponse
(
ahandle
,
TSDB_CODE_INVALID_USER
,
NULL
,
0
);
return
;
}
if
(
strcmp
(
pUser
->
user
,
"root"
)
!=
0
)
{
mError
(
"failed to drop dnode:%s, reason:%s"
,
pDrop
->
ip
,
tstrerror
(
TSDB_CODE_NO_RIGHTS
));
rpcSendResponse
(
ahandle
,
TSDB_CODE_NO_RIGHTS
,
NULL
,
0
);
return
;
}
int32_t
code
=
(
*
mgmtDropDnodeByIpFp
)(
inet_addr
(
pDrop
->
ip
));
if
(
code
==
TSDB_CODE_SUCCESS
)
{
mLPrint
(
"dnode:%s set to removing state by %s"
,
pDrop
->
ip
,
pUser
->
user
);
}
else
{
mError
(
"failed to drop dnode:%s, reason:%s"
,
pDrop
->
ip
,
tstrerror
(
code
));
}
rpcSendResponse
(
ahandle
,
code
,
NULL
,
0
);
}
void
mgmtInitProcessShellMsg
()
{
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_CONNECT
]
=
mgmtProcessConnectMsg
;
mgmtProcessShellMsg
[
TSDB_MSG_TYPE_HEARTBEAT
]
=
mgmtProcessHeartBeatMsg
;
...
...
src/mnode/src/mgmtSystem.c
浏览文件 @
3562f612
...
...
@@ -61,10 +61,6 @@ int32_t mgmtCheckMgmtRunning() {
tsetModuleStatus
(
TSDB_MOD_MGMT
);
// strcpy(sdbMasterIp, mgmtIpStr[0]);
// strcpy(sdbPrivateIp, tsPrivateIp);
// sdbPublicIp = inet_addr(tsPublicIp);
return
0
;
}
...
...
src/mnode/src/mgmtVgroup.c
浏览文件 @
3562f612
...
...
@@ -318,6 +318,27 @@ int32_t mgmtGetVgroupMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn) {
return
0
;
}
char
*
mgmtGetVnodeStatus
(
SVgObj
*
pVgroup
,
SVnodeGid
*
pVnode
)
{
SDnodeObj
*
pDnode
=
mgmtGetDnode
(
pVnode
->
ip
);
if
(
pDnode
==
NULL
)
{
mError
(
"dnode:%s, vgroup:%d, vnode:%d dnode not exist"
,
taosIpStr
(
pVnode
->
ip
),
pVgroup
->
vgId
,
pVnode
->
vnode
);
return
"null"
;
}
if
(
pDnode
->
status
==
TSDB_DN_STATUS_OFFLINE
)
{
return
"offline"
;
}
SVnodeLoad
*
vload
=
pDnode
->
vload
+
pVnode
->
vnode
;
if
(
vload
->
vgId
!=
pVgroup
->
vgId
||
vload
->
vnode
!=
pVnode
->
vnode
)
{
mError
(
"dnode:%s, vgroup:%d, vnode:%d not same with dnode vgroup:%d vnode:%d"
,
taosIpStr
(
pVnode
->
ip
),
pVgroup
->
vgId
,
pVnode
->
vnode
,
vload
->
vgId
,
vload
->
vnode
);
return
"null"
;
}
return
(
char
*
)
taosGetVnodeStatusStr
(
vload
->
status
);
}
int32_t
mgmtRetrieveVgroups
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
int32_t
numOfRows
=
0
;
SVgObj
*
pVgroup
=
NULL
;
...
...
src/plugins/http/CMakeLists.txt
浏览文件 @
3562f612
...
...
@@ -13,6 +13,6 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
TARGET_LINK_LIBRARIES
(
http taos_static z
)
IF
(
TD_CLUSTER
)
TARGET_LINK_LIBRARIES
(
http
http_admin
)
TARGET_LINK_LIBRARIES
(
http
)
ENDIF
()
ENDIF
()
src/plugins/http/src/httpSystem.c
浏览文件 @
3562f612
...
...
@@ -34,13 +34,21 @@
#include "tgHandle.h"
#include "tlog.h"
#ifdef CLUSTER
void
adminInitHandle
(
HttpServer
*
pServer
);
void
opInitHandle
(
HttpServer
*
pServer
);
#else
void
adminInitHandle
(
HttpServer
*
pServer
)
{}
void
opInitHandle
(
HttpServer
*
pServer
)
{}
#endif
void
(
*
adminInitHandleFp
)(
HttpServer
*
pServer
)
=
NULL
;
void
(
*
opInitHandleFp
)(
HttpServer
*
pServer
)
=
NULL
;
void
adminInitHandle
(
HttpServer
*
pServer
)
{
if
(
adminInitHandleFp
)
{
(
*
adminInitHandleFp
)(
pServer
);
}
}
void
opInitHandle
(
HttpServer
*
pServer
)
{
if
(
opInitHandleFp
)
{
(
*
opInitHandleFp
)(
pServer
);
}
}
static
HttpServer
*
httpServer
=
NULL
;
void
taosInitNote
(
int
numOfNoteLines
,
int
maxNotes
,
char
*
lable
);
...
...
src/sdb/CMakeLists.txt
浏览文件 @
3562f612
...
...
@@ -11,6 +11,6 @@ IF ((TD_LINUX_64) OR (TD_LINUX_32 AND TD_ARM))
ADD_LIBRARY
(
sdb
${
SRC
}
)
TARGET_LINK_LIBRARIES
(
sdb trpc
)
IF
(
TD_CLUSTER
)
TARGET_LINK_LIBRARIES
(
sdb
mreplica
)
TARGET_LINK_LIBRARIES
(
sdb
)
ENDIF
()
ENDIF
()
src/sdb/inc/sdbint.h
浏览文件 @
3562f612
...
...
@@ -48,6 +48,21 @@
#define sdbPrint(...) \
{ tprintf("MND-SDB ", 255, __VA_ARGS__); }
#define mpeerError(...) \
if (sdbDebugFlag & DEBUG_ERROR) { \
tprintf("ERROR MND-MPEER ", 255, __VA_ARGS__); \
}
#define mpeerWarn(...) \
if (sdbDebugFlag & DEBUG_WARN) { \
tprintf("WARN MND-MPEER ", sdbDebugFlag, __VA_ARGS__); \
}
#define mpeerTrace(...) \
if (sdbDebugFlag & DEBUG_TRACE) { \
tprintf("MND-MPEER ", sdbDebugFlag, __VA_ARGS__); \
}
#define mpeerPrint(...) \
{ tprintf("MND-MPEER ", 255, __VA_ARGS__); }
#define sdbLError(...) taosLogError(__VA_ARGS__) sdbError(__VA_ARGS__)
#define sdbLWarn(...) taosLogWarn(__VA_ARGS__) sdbWarn(__VA_ARGS__)
#define sdbLPrint(...) taosLogPrint(__VA_ARGS__) sdbPrint(__VA_ARGS__)
...
...
@@ -69,12 +84,7 @@ typedef struct {
char
*
row
;
}
SSdbUpdate
;
typedef
struct
{
char
numOfTables
;
uint64_t
version
[];
}
SSdbSync
;
typedef
struct
{
typedef
struct
_SSdbTable
{
SSdbHeader
header
;
int
maxRows
;
int
dbId
;
...
...
@@ -109,23 +119,6 @@ typedef struct {
char
data
[];
}
SRowHead
;
typedef
struct
{
char
*
buffer
;
char
*
offset
;
int
trans
;
int
bufferSize
;
pthread_mutex_t
qmutex
;
}
STranQueue
;
typedef
struct
{
char
status
;
char
role
;
char
numOfMnodes
;
uint64_t
dbVersion
;
uint32_t
numOfDnodes
;
uint32_t
publicIp
;
}
SMnodeStatus
;
typedef
struct
{
uint8_t
dbId
;
char
type
;
...
...
@@ -140,7 +133,7 @@ extern int sdbNumOfTables;
extern
int64_t
sdbVersion
;
int
sdbForwardDbReqToPeer
(
SSdbTable
*
pTable
,
char
type
,
char
*
data
,
int
dataLen
);
int
sdb
RetrieveRows
(
int
fd
,
SSdbTable
*
pTable
,
uint64_t
version
);
int
mpeer
RetrieveRows
(
int
fd
,
SSdbTable
*
pTable
,
uint64_t
version
);
void
sdbResetTable
(
SSdbTable
*
pTable
);
extern
const
int16_t
sdbFileVersion
;
...
...
src/sdb/src/sdbEngine.c
浏览文件 @
3562f612
...
...
@@ -23,7 +23,6 @@
extern
char
version
[];
const
int16_t
sdbFileVersion
=
0
;
int
sdbExtConns
=
0
;
SRpcIpSet
*
pSdbIpList
=
NULL
;
SRpcIpSet
*
pSdbPublicIpList
=
NULL
;
SSdbPeer
*
sdbPeer
[
SDB_MAX_PEERS
];
// first slot for self
...
...
src/sdb/src/sdbstr.c
浏览文件 @
3562f612
...
...
@@ -12,32 +12,47 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "sdbint.h"
char
*
sdbStatusStr
[]
=
{
"offline"
,
"unsynced"
,
"syncing"
,
"serving"
,
"null"
};
char
*
sdbRoleStr
[]
=
{
"unauthed"
,
"undecided"
,
"master"
,
"slave"
,
"null"
};
#ifndef CLUSTER
/*
* Lite Version sync request is always successful
*/
int
sdbForwardDbReqToPeer
(
SSdbTable
*
pTable
,
char
type
,
char
*
data
,
int
dataLen
)
{
return
0
;
int32_t
(
*
mpeerInitMnodesFp
)(
char
*
directory
)
=
NULL
;
void
(
*
mpeerCleanUpMnodesFp
)()
=
NULL
;
int32_t
(
*
mpeerForwardRequestFp
)(
SSdbTable
*
pTable
,
char
type
,
void
*
cont
,
int32_t
contLen
)
=
NULL
;
char
*
sdbStatusStr
[]
=
{
"offline"
,
"unsynced"
,
"syncing"
,
"serving"
,
"null"
};
char
*
sdbRoleStr
[]
=
{
"unauthed"
,
"undecided"
,
"master"
,
"slave"
,
"null"
};
int32_t
sdbForwardDbReqToPeer
(
SSdbTable
*
pTable
,
char
type
,
char
*
data
,
int32_t
dataLen
)
{
if
(
mpeerForwardRequestFp
)
{
return
mpeerForwardRequestFp
(
pTable
,
type
,
data
,
dataLen
);
}
else
{
return
0
;
}
}
/*
* Lite Version does not need to initialize peers
*/
int
sdbInitPeers
(
char
*
directory
)
{
return
0
;
int32_t
sdbInitPeers
(
char
*
directory
)
{
if
(
mpeerInitMnodesFp
)
{
return
(
*
mpeerInitMnodesFp
)(
directory
);
}
else
{
return
0
;
}
}
/*
* Lite Version does not need to cleanup peers
*/
void
sdbCleanUpPeers
(){}
#endif
\ No newline at end of file
void
sdbCleanUpPeers
()
{
if
(
mpeerCleanUpMnodesFp
)
{
(
*
mpeerCleanUpMnodesFp
)();
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录