Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
fe12d0fe
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
fe12d0fe
编写于
4月 16, 2020
作者:
S
slguan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add cluster codes
上级
9b6c8a30
变更
18
隐藏空白更改
内联
并排
Showing
18 changed file
with
789 addition
and
236 deletion
+789
-236
src/dnode/src/dnodeMain.c
src/dnode/src/dnodeMain.c
+4
-0
src/inc/dnode.h
src/inc/dnode.h
+2
-0
src/inc/mnode.h
src/inc/mnode.h
+8
-7
src/inc/tacct.h
src/inc/tacct.h
+4
-16
src/mnode/inc/mgmtAcct.h
src/mnode/inc/mgmtAcct.h
+44
-0
src/mnode/inc/mgmtDb.h
src/mnode/inc/mgmtDb.h
+1
-1
src/mnode/inc/mgmtDnode.h
src/mnode/inc/mgmtDnode.h
+14
-15
src/mnode/src/mgmtAcct.c
src/mnode/src/mgmtAcct.c
+150
-21
src/mnode/src/mgmtBalance.c
src/mnode/src/mgmtBalance.c
+5
-5
src/mnode/src/mgmtDClient.c
src/mnode/src/mgmtDClient.c
+1
-1
src/mnode/src/mgmtDb.c
src/mnode/src/mgmtDb.c
+11
-11
src/mnode/src/mgmtDnode.c
src/mnode/src/mgmtDnode.c
+483
-97
src/mnode/src/mgmtMain.c
src/mnode/src/mgmtMain.c
+6
-6
src/mnode/src/mgmtProfile.c
src/mnode/src/mgmtProfile.c
+5
-5
src/mnode/src/mgmtShell.c
src/mnode/src/mgmtShell.c
+2
-2
src/mnode/src/mgmtTable.c
src/mnode/src/mgmtTable.c
+28
-28
src/mnode/src/mgmtUser.c
src/mnode/src/mgmtUser.c
+8
-8
src/mnode/src/mgmtVgroup.c
src/mnode/src/mgmtVgroup.c
+13
-13
未找到文件。
src/dnode/src/dnodeMain.c
浏览文件 @
fe12d0fe
...
...
@@ -229,3 +229,7 @@ static int32_t dnodeInitStorage() {
}
static
void
dnodeCleanupStorage
()
{}
bool
dnodeIsFirstDeploy
()
{
return
strcmp
(
tsMasterIp
,
tsPrivateIp
)
==
0
;
}
\ No newline at end of file
src/inc/dnode.h
浏览文件 @
fe12d0fe
...
...
@@ -44,6 +44,8 @@ void *dnodeAllocateRqueue(void *pVnode);
void
dnodeFreeRqueue
(
void
*
rqueue
);
void
dnodeSendRpcWriteRsp
(
void
*
pVnode
,
void
*
param
,
int32_t
code
);
bool
dnodeIsFirstDeploy
();
#ifdef __cplusplus
}
#endif
...
...
src/inc/mnode.h
浏览文件 @
fe12d0fe
...
...
@@ -60,14 +60,16 @@ typedef struct _dnode_obj {
int32_t
dnodeId
;
uint32_t
privateIp
;
uint32_t
publicIp
;
uint16_t
mnodeShellPort
;
uint16_t
mnodeDnodePort
;
uint16_t
dnodeShellPort
;
uint16_t
dnodeMnodePort
;
uint16_t
syncPort
;
uint32_t
moduleStatus
;
int64_t
createdTime
;
uint32_t
lastAccess
;
int32_t
openVnodes
;
int32_t
numOfTotalVnodes
;
// from dnode status msg, config information
uint32_t
rack
;
uint16_t
idc
;
uint16_t
slot
;
int32_t
totalVnodes
;
// from dnode status msg, config information
uint16_t
numOfCores
;
// from dnode status msg
int8_t
alternativeRole
;
// from dnode status msg, 0-any, 1-mgmt, 2-dnode
int8_t
status
;
// set in balance function
...
...
@@ -88,7 +90,6 @@ typedef struct _dnode_obj {
typedef
struct
{
int32_t
dnodeId
;
uint16_t
port
;
uint32_t
privateIp
;
uint32_t
publicIp
;
}
SVnodeGid
;
...
...
@@ -209,10 +210,10 @@ typedef struct _acct_obj {
SAcctCfg
cfg
;
int32_t
acctId
;
int64_t
createdTime
;
int8_t
dirty
;
int8_t
status
;
int8_t
reserved
[
14
];
int8_t
updateEnd
[
1
];
int32_t
refCount
;
int32_t
refCount
;
SAcctInfo
acctInfo
;
pthread_mutex_t
mutex
;
}
SAcctObj
;
...
...
src/inc/tacc
oun
t.h
→
src/inc/tacct.h
浏览文件 @
fe12d0fe
...
...
@@ -20,27 +20,15 @@
extern
"C"
{
#endif
struct
_acct_obj
;
struct
_user_obj
;
struct
_db_obj
;
typedef
enum
{
TSDB_ACC
T_USER
,
TSDB_ACC
T_DB
,
TSDB_ACC
T_TABLE
ACCT_GRAN
T_USER
,
ACCT_GRAN
T_DB
,
ACCT_GRAN
T_TABLE
}
EAcctGrantType
;
int32_t
acctInit
();
void
acctCleanUp
();
void
*
acctGetAcct
(
char
*
acctName
);
void
acctIncRef
(
struct
_acct_obj
*
pAcct
);
void
acctReleaseAcct
(
struct
_acct_obj
*
pAcct
);
int32_t
acctCheck
(
struct
_acct_obj
*
pAcct
,
EAcctGrantType
type
);
void
acctAddDb
(
struct
_acct_obj
*
pAcct
,
struct
_db_obj
*
pDb
);
void
acctRemoveDb
(
struct
_acct_obj
*
pAcct
,
struct
_db_obj
*
pDb
);
void
acctAddUser
(
struct
_acct_obj
*
pAcct
,
struct
_user_obj
*
pUser
);
void
acctRemoveUser
(
struct
_acct_obj
*
pAcct
,
struct
_user_obj
*
pUser
);
int32_t
acctCheck
(
void
*
pAcct
,
EAcctGrantType
type
);
#ifdef __cplusplus
}
...
...
src/mnode/inc/mgmtAcct.h
0 → 100644
浏览文件 @
fe12d0fe
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_MGMT_ACCT_H
#define TDENGINE_MGMT_ACCT_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "tacct.h"
struct
_acct_obj
;
struct
_user_obj
;
struct
_db_obj
;
int32_t
mgmtInitAccts
();
void
mgmtCleanUpAccts
();
void
*
mgmtGetAcct
(
char
*
acctName
);
void
mgmtIncAcctRef
(
struct
_acct_obj
*
pAcct
);
void
mgmtDecAcctRef
(
struct
_acct_obj
*
pAcct
);
void
mgmtAddDbToAcct
(
struct
_acct_obj
*
pAcct
,
struct
_db_obj
*
pDb
);
void
mgmtDropDbFromAcct
(
struct
_acct_obj
*
pAcct
,
struct
_db_obj
*
pDb
);
void
mgmtAddUserToAcct
(
struct
_acct_obj
*
pAcct
,
struct
_user_obj
*
pUser
);
void
mgmtDropUserFromAcct
(
struct
_acct_obj
*
pAcct
,
struct
_user_obj
*
pUser
);
#ifdef __cplusplus
}
#endif
#endif
src/mnode/inc/mgmtDb.h
浏览文件 @
fe12d0fe
...
...
@@ -33,7 +33,7 @@ void mgmtCleanUpDbs();
SDbObj
*
mgmtGetDb
(
char
*
db
);
SDbObj
*
mgmtGetDbByTableId
(
char
*
db
);
void
mgmtIncDbRef
(
SDbObj
*
pDb
);
void
mgmt
ReleaseDb
(
SDbObj
*
pDb
);
void
mgmt
DecDbRef
(
SDbObj
*
pDb
);
bool
mgmtCheckIsMonitorDB
(
char
*
db
,
char
*
monitordb
);
void
mgmtDropAllDbs
(
SAcctObj
*
pAcct
);
...
...
src/
inc/tcluster
.h
→
src/
mnode/inc/mgmtDnode
.h
浏览文件 @
fe12d0fe
...
...
@@ -33,21 +33,20 @@ enum _TAOS_DN_STATUS {
TAOS_DN_STATUS_READY
};
int32_t
clusterInit
();
void
clusterCleanUp
();
char
*
clusterGetDnodeStatusStr
(
int32_t
dnodeStatus
);
bool
clusterCheckModuleInDnode
(
struct
_dnode_obj
*
pDnode
,
int
moduleType
);
void
clusterMonitorDnodeModule
();
int32_t
clusterInitDnodes
();
void
clusterCleanupDnodes
();
int32_t
clusterGetDnodesNum
();
void
*
clusterGetNextDnode
(
void
*
pNode
,
struct
_dnode_obj
**
pDnode
);
void
clusterReleaseDnode
(
struct
_dnode_obj
*
pDnode
);
void
*
clusterGetDnode
(
int32_t
dnodeId
);
void
*
clusterGetDnodeByIp
(
uint32_t
ip
);
void
clusterUpdateDnode
(
struct
_dnode_obj
*
pDnode
);
int32_t
clusterDropDnode
(
struct
_dnode_obj
*
pDnode
);
int32_t
mgmtInitDnodes
();
void
mgmtCleanupDnodes
();
char
*
mgmtGetDnodeStatusStr
(
int32_t
dnodeStatus
);
bool
mgmtCheckModuleInDnode
(
struct
_dnode_obj
*
pDnode
,
int
moduleType
);
void
mgmtMonitorDnodeModule
();
int32_t
mgmtGetDnodesNum
();
void
*
mgmtGetNextDnode
(
void
*
pNode
,
struct
_dnode_obj
**
pDnode
);
void
mgmtReleaseDnode
(
struct
_dnode_obj
*
pDnode
);
void
*
mgmtGetDnode
(
int32_t
dnodeId
);
void
*
mgmtGetDnodeByIp
(
uint32_t
ip
);
void
mgmtUpdateDnode
(
struct
_dnode_obj
*
pDnode
);
int32_t
mgmtDropDnode
(
struct
_dnode_obj
*
pDnode
);
#ifdef __cplusplus
}
...
...
src/mnode/src/mgmtAcct.c
浏览文件 @
fe12d0fe
...
...
@@ -16,49 +16,178 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "dnode.h"
#include "mnode.h"
#include "
taccoun
t.h"
#include "
mgmtAcc
t.h"
#include "mgmtDb.h"
#include "mgmtSdb.h"
#include "mgmtUser.h"
#ifndef _ACCOUNT
static
void
*
tsAcctSdb
=
NULL
;
static
int32_t
tsAcctUpdateSize
;
static
void
mgmtCreateRootAcct
();
static
SAcctObj
tsAcctObj
=
{
0
};
static
int32_t
mgmtActionAcctDestroy
(
SSdbOperDesc
*
pOper
)
{
SAcctObj
*
pAcct
=
pOper
->
pObj
;
pthread_mutex_destroy
(
&
pAcct
->
mutex
);
tfree
(
pOper
->
pObj
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
acctInit
()
{
tsAcctObj
.
acctId
=
0
;
strcpy
(
tsAcctObj
.
user
,
"root"
);
static
int32_t
mgmtAcctActionInsert
(
SSdbOperDesc
*
pOper
)
{
SAcctObj
*
pAcct
=
pOper
->
pObj
;
memset
(
&
pAcct
->
acctInfo
,
0
,
sizeof
(
SAcctInfo
));
pthread_mutex_init
(
&
pAcct
->
mutex
,
NULL
);
return
TSDB_CODE_SUCCESS
;
}
void
acctCleanUp
()
{}
void
*
acctGetAcct
(
char
*
acctName
)
{
return
&
tsAcctObj
;
}
void
acctIncRef
(
struct
_acct_obj
*
pAcct
)
{}
void
acctReleaseAcct
(
SAcctObj
*
pAcct
)
{}
int32_t
acctCheck
(
SAcctObj
*
pAcct
,
EAcctGrantType
type
)
{
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtActionAcctDelete
(
SSdbOperDesc
*
pOper
)
{
SAcctObj
*
pAcct
=
pOper
->
pObj
;
mgmtDropAllUsers
(
pAcct
);
mgmtDropAllDbs
(
pAcct
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtActionAcctUpdate
(
SSdbOperDesc
*
pOper
)
{
SAcctObj
*
pAcct
=
pOper
->
pObj
;
SAcctObj
*
pSaved
=
mgmtGetAcct
(
pAcct
->
user
);
if
(
pAcct
!=
pSaved
)
{
memcpy
(
pSaved
,
pAcct
,
tsAcctUpdateSize
);
free
(
pAcct
);
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtActionActionEncode
(
SSdbOperDesc
*
pOper
)
{
SAcctObj
*
pAcct
=
pOper
->
pObj
;
memcpy
(
pOper
->
rowData
,
pAcct
,
tsAcctUpdateSize
);
pOper
->
rowSize
=
tsAcctUpdateSize
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtActionAcctDecode
(
SSdbOperDesc
*
pOper
)
{
SAcctObj
*
pAcct
=
(
SAcctObj
*
)
calloc
(
1
,
sizeof
(
SAcctObj
));
if
(
pAcct
==
NULL
)
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
memcpy
(
pAcct
,
pOper
->
rowData
,
tsAcctUpdateSize
);
pOper
->
pObj
=
pAcct
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtActionAcctRestored
()
{
if
(
dnodeIsFirstDeploy
())
{
mgmtCreateRootAcct
();
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
mgmtInitAccts
()
{
SAcctObj
tObj
;
tsAcctUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
SSdbTableDesc
tableDesc
=
{
.
tableId
=
SDB_TABLE_ACCOUNT
,
.
tableName
=
"accounts"
,
.
hashSessions
=
TSDB_MAX_ACCOUNTS
,
.
maxRowSize
=
tsAcctUpdateSize
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
.
keyType
=
SDB_KEY_STRING
,
.
insertFp
=
mgmtAcctActionInsert
,
.
deleteFp
=
mgmtActionAcctDelete
,
.
updateFp
=
mgmtActionAcctUpdate
,
.
encodeFp
=
mgmtActionActionEncode
,
.
decodeFp
=
mgmtActionAcctDecode
,
.
destroyFp
=
mgmtActionAcctDestroy
,
.
restoredFp
=
mgmtActionAcctRestored
};
#endif
tsAcctSdb
=
sdbOpenTable
(
&
tableDesc
);
if
(
tsAcctSdb
==
NULL
)
{
mError
(
"failed to init acct data"
);
return
-
1
;
}
void
acctAddDb
(
SAcctObj
*
pAcct
,
SDbObj
*
pDb
)
{
mTrace
(
"account table is created"
);
return
acctInit
();
}
void
mgmtCleanUpAccts
()
{
sdbCloseTable
(
tsAcctSdb
);
acctCleanUp
();
}
void
*
mgmtGetAcct
(
char
*
name
)
{
return
sdbGetRow
(
tsAcctSdb
,
name
);
}
void
mgmtIncAcctRef
(
SAcctObj
*
pAcct
)
{
sdbIncRef
(
tsAcctSdb
,
pAcct
);
}
void
mgmtDecAcctRef
(
SAcctObj
*
pAcct
)
{
sdbDecRef
(
tsAcctSdb
,
pAcct
);
}
void
mgmtAddDbToAcct
(
SAcctObj
*
pAcct
,
SDbObj
*
pDb
)
{
atomic_add_fetch_32
(
&
pAcct
->
acctInfo
.
numOfDbs
,
1
);
pDb
->
pAcct
=
pAcct
;
acctInc
Ref
(
pAcct
);
mgmtIncAcct
Ref
(
pAcct
);
}
void
acctRemoveDb
(
SAcctObj
*
pAcct
,
SDbObj
*
pDb
)
{
void
mgmtDropDbFromAcct
(
SAcctObj
*
pAcct
,
SDbObj
*
pDb
)
{
atomic_sub_fetch_32
(
&
pAcct
->
acctInfo
.
numOfDbs
,
1
);
pDb
->
pAcct
=
NULL
;
acctReleaseAcct
(
pAcct
);
mgmtDecAcctRef
(
pAcct
);
}
void
acctAddUser
(
SAcctObj
*
pAcct
,
SUserObj
*
pUser
)
{
void
mgmtAddUserToAcct
(
SAcctObj
*
pAcct
,
SUserObj
*
pUser
)
{
atomic_add_fetch_32
(
&
pAcct
->
acctInfo
.
numOfUsers
,
1
);
pUser
->
pAcct
=
pAcct
;
acctInc
Ref
(
pAcct
);
mgmtIncAcct
Ref
(
pAcct
);
}
void
acctRemoveUser
(
SAcctObj
*
pAcct
,
SUserObj
*
pUser
)
{
void
mgmtDropUserFromAcct
(
SAcctObj
*
pAcct
,
SUserObj
*
pUser
)
{
atomic_sub_fetch_32
(
&
pAcct
->
acctInfo
.
numOfUsers
,
1
);
pUser
->
pAcct
=
NULL
;
acctReleaseAcct
(
pAcct
);
}
\ No newline at end of file
mgmtDecAcctRef
(
pAcct
);
}
static
void
mgmtCreateRootAcct
()
{
int32_t
numOfAccts
=
sdbGetNumOfRows
(
tsAcctSdb
);
if
(
numOfAccts
!=
0
)
return
;
SAcctObj
*
pAcct
=
malloc
(
sizeof
(
SAcctObj
));
memset
(
pAcct
,
0
,
sizeof
(
SAcctObj
));
strcpy
(
pAcct
->
user
,
"root"
);
taosEncryptPass
((
uint8_t
*
)
"taosdata"
,
strlen
(
"taosdata"
),
pAcct
->
pass
);
pAcct
->
cfg
=
(
SAcctCfg
){
.
maxUsers
=
10
,
.
maxDbs
=
64
,
.
maxTimeSeries
=
INT32_MAX
,
.
maxConnections
=
1024
,
.
maxStreams
=
1000
,
.
maxPointsPerSecond
=
10000000
,
.
maxStorage
=
INT64_MAX
,
.
maxQueryTime
=
INT64_MAX
,
.
maxInbound
=
0
,
.
maxOutbound
=
0
,
.
accessState
=
TSDB_VN_ALL_ACCCESS
};
pAcct
->
acctId
=
sdbGetId
(
tsAcctSdb
);
pAcct
->
createdTime
=
taosGetTimestampMs
();
SSdbOperDesc
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsAcctSdb
,
.
pObj
=
pAcct
,
};
sdbInsertRow
(
&
oper
);
}
#ifndef _ACCT
int32_t
acctInit
()
{
return
TSDB_CODE_SUCCESS
;
}
void
acctCleanUp
()
{}
int32_t
acctCheck
(
void
*
pAcct
,
EAcctGrantType
type
)
{
return
TSDB_CODE_SUCCESS
;
}
#endif
\ No newline at end of file
src/mnode/src/mgmtBalance.c
浏览文件 @
fe12d0fe
...
...
@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "tbalance.h"
#include "mnode.h"
#include "
tcluster
.h"
#include "
mgmtDnode
.h"
#include "mgmtVgroup.h"
#ifndef _VPEER
...
...
@@ -31,17 +31,17 @@ int32_t balanceAllocVnodes(SVgObj *pVgroup) {
float
vnodeUsage
=
1
.
0
;
while
(
1
)
{
pNode
=
cluster
GetNextDnode
(
pNode
,
&
pDnode
);
pNode
=
mgmt
GetNextDnode
(
pNode
,
&
pDnode
);
if
(
pDnode
==
NULL
)
break
;
if
(
pDnode
->
numOfTotalVnodes
>
0
&&
pDnode
->
openVnodes
<
pDnode
->
numOfT
otalVnodes
)
{
float
usage
=
(
float
)
pDnode
->
openVnodes
/
pDnode
->
numOfT
otalVnodes
;
if
(
pDnode
->
totalVnodes
>
0
&&
pDnode
->
openVnodes
<
pDnode
->
t
otalVnodes
)
{
float
usage
=
(
float
)
pDnode
->
openVnodes
/
pDnode
->
t
otalVnodes
;
if
(
usage
<=
vnodeUsage
)
{
pSelDnode
=
pDnode
;
vnodeUsage
=
usage
;
}
}
cluster
ReleaseDnode
(
pDnode
);
mgmt
ReleaseDnode
(
pDnode
);
}
if
(
pSelDnode
==
NULL
)
{
...
...
src/mnode/src/mgmtDClient.c
浏览文件 @
fe12d0fe
...
...
@@ -23,7 +23,7 @@
#include "mnode.h"
#include "tbalance.h"
#include "mgmtDb.h"
#include "
tcluster
.h"
#include "
mgmtDnode
.h"
#include "tgrant.h"
#include "mgmtProfile.h"
#include "mgmtShell.h"
...
...
src/mnode/src/mgmtDb.c
浏览文件 @
fe12d0fe
...
...
@@ -19,10 +19,10 @@
#include "tutil.h"
#include "name.h"
#include "mnode.h"
#include "taccount.h"
#include "tbalance.h"
#include "mgmtAcct.h"
#include "mgmtDb.h"
#include "
tcluster
.h"
#include "
mgmtDnode
.h"
#include "tgrant.h"
#include "mpeer.h"
#include "mgmtShell.h"
...
...
@@ -51,7 +51,7 @@ static int32_t mgmtDbActionDestroy(SSdbOperDesc *pOper) {
static
int32_t
mgmtDbActionInsert
(
SSdbOperDesc
*
pOper
)
{
SDbObj
*
pDb
=
pOper
->
pObj
;
SAcctObj
*
pAcct
=
acc
tGetAcct
(
pDb
->
cfg
.
acct
);
SAcctObj
*
pAcct
=
mgm
tGetAcct
(
pDb
->
cfg
.
acct
);
pDb
->
pHead
=
NULL
;
pDb
->
pTail
=
NULL
;
...
...
@@ -60,7 +60,7 @@ static int32_t mgmtDbActionInsert(SSdbOperDesc *pOper) {
pDb
->
numOfSuperTables
=
0
;
if
(
pAcct
!=
NULL
)
{
acctAddDb
(
pAcct
,
pDb
);
mgmtAddDbToAcct
(
pAcct
,
pDb
);
}
else
{
mError
(
"db:%s, acct:%s info not exist in sdb"
,
pDb
->
name
,
pDb
->
cfg
.
acct
);
...
...
@@ -72,9 +72,9 @@ static int32_t mgmtDbActionInsert(SSdbOperDesc *pOper) {
static
int32_t
mgmtDbActionDelete
(
SSdbOperDesc
*
pOper
)
{
SDbObj
*
pDb
=
pOper
->
pObj
;
SAcctObj
*
pAcct
=
acc
tGetAcct
(
pDb
->
cfg
.
acct
);
SAcctObj
*
pAcct
=
mgm
tGetAcct
(
pDb
->
cfg
.
acct
);
acctRemoveDb
(
pAcct
,
pDb
);
mgmtDropDbFromAcct
(
pAcct
,
pDb
);
mgmtDropAllChildTables
(
pDb
);
mgmtDropAllSuperTables
(
pDb
);
mgmtDropAllVgroups
(
pDb
);
...
...
@@ -156,7 +156,7 @@ void mgmtIncDbRef(SDbObj *pDb) {
return
sdbIncRef
(
tsDbSdb
,
pDb
);
}
void
mgmt
ReleaseDb
(
SDbObj
*
pDb
)
{
void
mgmt
DecDbRef
(
SDbObj
*
pDb
)
{
return
sdbDecRef
(
tsDbSdb
,
pDb
);
}
...
...
@@ -288,14 +288,14 @@ static int32_t mgmtCheckDbParams(SCMCreateDbMsg *pCreate) {
}
static
int32_t
mgmtCreateDb
(
SAcctObj
*
pAcct
,
SCMCreateDbMsg
*
pCreate
)
{
int32_t
code
=
acctCheck
(
pAcct
,
TSDB_ACC
T_DB
);
int32_t
code
=
acctCheck
(
pAcct
,
ACCT_GRAN
T_DB
);
if
(
code
!=
0
)
{
return
code
;
}
SDbObj
*
pDb
=
mgmtGetDb
(
pCreate
->
db
);
if
(
pDb
!=
NULL
)
{
mgmt
ReleaseDb
(
pDb
);
mgmt
DecDbRef
(
pDb
);
return
TSDB_CODE_DB_ALREADY_EXIST
;
}
...
...
@@ -641,7 +641,7 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
cols
++
;
numOfRows
++
;
mgmt
ReleaseDb
(
pDb
);
mgmt
DecDbRef
(
pDb
);
}
pShow
->
numOfReads
+=
numOfRows
;
...
...
@@ -888,7 +888,7 @@ void mgmtDropAllDbs(SAcctObj *pAcct) {
mgmtSetDbDropping
(
pDb
);
numOfDbs
++
;
}
mgmt
ReleaseDb
(
pDb
);
mgmt
DecDbRef
(
pDb
);
}
mTrace
(
"acct:%s, all dbs is is set dirty"
,
pAcct
->
user
,
numOfDbs
);
...
...
src/mnode/src/mgmtDnode.c
浏览文件 @
fe12d0fe
...
...
@@ -17,91 +17,209 @@
#include "os.h"
#include "tmodule.h"
#include "tbalance.h"
#include "tcluster.h"
#include "tgrant.h"
#include "mgmtDnode.h"
#include "mnode.h"
#include "mpeer.h"
#include "mgmtDClient.h"
#include "mgmtShell.h"
#include "mgmtDServer.h"
#include "mgmtSdb.h"
#include "mgmtShell.h"
#include "mgmtUser.h"
#include "mgmtVgroup.h"
#include "dnodeMClient.h"
void
*
tsDnodeSdb
=
NULL
;
int32_t
tsDnodeUpdateSize
=
0
;
extern
void
*
tsVgroupSdb
;
static
int32_t
mgmtCreateDnode
(
uint32_t
ip
);
static
void
mgmtProcessCreateDnodeMsg
(
SQueuedMsg
*
pMsg
);
static
void
mgmtProcessDropDnodeMsg
(
SQueuedMsg
*
pMsg
);
static
void
mgmtProcessCfgDnodeMsg
(
SQueuedMsg
*
pMsg
);
static
void
mgmtProcessCfgDnodeMsgRsp
(
SRpcMsg
*
rpcMsg
)
;
static
void
mgmtProcessDnodeStatusMsg
(
SRpcMsg
*
rpcMsg
);
static
int32_t
mgmtGetModuleMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveModules
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
mgmtGetConfigMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveConfigs
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
mgmtGetVnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveVnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
mgmtGetDnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveDnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
mgmtDnodeActionDestroy
(
SSdbOperDesc
*
pOper
)
{
tfree
(
pOper
->
pObj
);
return
TSDB_CODE_SUCCESS
;
}
static
void
clusterProcessCfgDnodeMsg
(
SQueuedMsg
*
pMsg
);
static
void
clusterProcessCfgDnodeMsgRsp
(
SRpcMsg
*
rpcMsg
)
;
static
void
clusterProcessDnodeStatusMsg
(
SRpcMsg
*
rpcMsg
);
static
int32_t
clusterGetModuleMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
clusterRetrieveModules
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
clusterGetConfigMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
clusterRetrieveConfigs
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
clusterGetVnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
clusterRetrieveVnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
clusterGetDnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
clusterRetrieveDnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
#ifndef _CLUSTER
static
SDnodeObj
tsDnodeObj
=
{
0
};
int32_t
clusterInitDnodes
()
{
tsDnodeObj
.
dnodeId
=
1
;
tsDnodeObj
.
privateIp
=
inet_addr
(
tsPrivateIp
);
tsDnodeObj
.
publicIp
=
inet_addr
(
tsPublicIp
);
tsDnodeObj
.
createdTime
=
taosGetTimestampMs
();
tsDnodeObj
.
numOfTotalVnodes
=
tsNumOfTotalVnodes
;
tsDnodeObj
.
status
=
TAOS_DN_STATUS_OFFLINE
;
tsDnodeObj
.
lastReboot
=
taosGetTimestampSec
();
sprintf
(
tsDnodeObj
.
dnodeName
,
"%d"
,
tsDnodeObj
.
dnodeId
);
tsDnodeObj
.
moduleStatus
|=
(
1
<<
TSDB_MOD_MGMT
);
if
(
tsEnableHttpModule
)
{
tsDnodeObj
.
moduleStatus
|=
(
1
<<
TSDB_MOD_HTTP
);
}
if
(
tsEnableMonitorModule
)
{
tsDnodeObj
.
moduleStatus
|=
(
1
<<
TSDB_MOD_MONITOR
);
static
int32_t
mgmtDnodeActionInsert
(
SSdbOperDesc
*
pOper
)
{
SDnodeObj
*
pDnode
=
pOper
->
pObj
;
if
(
pDnode
->
status
!=
TAOS_DN_STATUS_DROPPING
)
{
pDnode
->
status
=
TAOS_DN_STATUS_OFFLINE
;
}
return
0
;
return
TSDB_CODE_SUCCESS
;
}
void
*
clusterGetNextDnode
(
void
*
pNode
,
SDnodeObj
**
pDnode
)
{
if
(
*
pDnode
==
NULL
)
{
*
pDnode
=
&
tsDnodeObj
;
}
else
{
*
pDnode
=
NULL
;
static
int32_t
mgmtDnodeActionDelete
(
SSdbOperDesc
*
pOper
)
{
SDnodeObj
*
pDnode
=
pOper
->
pObj
;
void
*
pNode
=
NULL
;
void
*
pLastNode
=
NULL
;
SVgObj
*
pVgroup
=
NULL
;
int32_t
numOfVgroups
=
0
;
while
(
1
)
{
pLastNode
=
pNode
;
pNode
=
sdbFetchRow
(
tsVgroupSdb
,
pNode
,
(
void
**
)
&
pVgroup
);
if
(
pVgroup
==
NULL
)
break
;
if
(
pVgroup
->
vnodeGid
[
0
].
dnodeId
==
pDnode
->
dnodeId
)
{
SSdbOperDesc
oper
=
{
.
type
=
SDB_OPER_LOCAL
,
.
table
=
tsVgroupSdb
,
.
pObj
=
pVgroup
,
};
sdbDeleteRow
(
&
oper
);
pNode
=
pLastNode
;
numOfVgroups
++
;
continue
;
}
}
mTrace
(
"dnode:%d, all vgroups:%d is dropped from sdb"
,
pDnode
->
dnodeId
,
numOfVgroups
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtDnodeActionUpdate
(
SSdbOperDesc
*
pOper
)
{
SDnodeObj
*
pDnode
=
pOper
->
pObj
;
SDnodeObj
*
pSaved
=
mgmtGetDnode
(
pDnode
->
dnodeId
);
if
(
pDnode
!=
pSaved
)
{
memcpy
(
pSaved
,
pDnode
,
pOper
->
rowSize
);
free
(
pDnode
);
}
return
*
pDnode
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
mgmtDnodeActionEncode
(
SSdbOperDesc
*
pOper
)
{
SDnodeObj
*
pDnode
=
pOper
->
pObj
;
memcpy
(
pOper
->
rowData
,
pDnode
,
tsDnodeUpdateSize
);
pOper
->
rowSize
=
tsDnodeUpdateSize
;
return
TSDB_CODE_SUCCESS
;
}
void
clusterCleanupDnodes
()
{}
int32_t
clusterGetDnodesNum
()
{
return
1
;
}
void
*
clusterGetDnode
(
int32_t
dnodeId
)
{
return
dnodeId
==
1
?
&
tsDnodeObj
:
NULL
;
}
void
*
clusterGetDnodeByIp
(
uint32_t
ip
)
{
return
&
tsDnodeObj
;
}
void
clusterReleaseDnode
(
struct
_dnode_obj
*
pDnode
)
{}
void
clusterUpdateDnode
(
struct
_dnode_obj
*
pDnode
)
{}
void
clusterMonitorDnodeModule
()
{}
static
int32_t
mgmtDnodeActionDecode
(
SSdbOperDesc
*
pOper
)
{
SDnodeObj
*
pDnode
=
(
SDnodeObj
*
)
calloc
(
1
,
sizeof
(
SDnodeObj
));
if
(
pDnode
==
NULL
)
return
TSDB_CODE_SERV_OUT_OF_MEMORY
;
#endif
memcpy
(
pDnode
,
pOper
->
rowData
,
tsDnodeUpdateSize
);
pOper
->
pObj
=
pDnode
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
clusterInit
()
{
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_CONFIG_DNODE
,
clusterProcessCfgDnodeMsg
);
mgmtAddDClientRspHandle
(
TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP
,
clusterProcessCfgDnodeMsgRsp
);
mgmtAddDServerMsgHandle
(
TSDB_MSG_TYPE_DM_STATUS
,
clusterProcessDnodeStatusMsg
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_MODULE
,
clusterGetModuleMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_MODULE
,
clusterRetrieveModules
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_CONFIGS
,
clusterGetConfigMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_CONFIGS
,
clusterRetrieveConfigs
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_VNODES
,
clusterGetVnodeMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_VNODES
,
clusterRetrieveVnodes
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_DNODE
,
clusterGetDnodeMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_DNODE
,
clusterRetrieveDnodes
);
static
int32_t
mgmtDnodeActionRestored
()
{
int32_t
numOfRows
=
sdbGetNumOfRows
(
tsDnodeSdb
);
if
(
numOfRows
<=
0
)
{
if
(
strcmp
(
tsMasterIp
,
tsPrivateIp
)
==
0
)
{
mgmtCreateDnode
(
inet_addr
(
tsPrivateIp
));
}
}
return
0
;
}
int32_t
mgmtInitDnodes
()
{
SDnodeObj
tObj
;
tsDnodeUpdateSize
=
(
int8_t
*
)
tObj
.
updateEnd
-
(
int8_t
*
)
&
tObj
;
SSdbTableDesc
tableDesc
=
{
.
tableId
=
SDB_TABLE_DNODE
,
.
tableName
=
"dnodes"
,
.
hashSessions
=
TSDB_MAX_DNODES
,
.
maxRowSize
=
tsDnodeUpdateSize
,
.
refCountPos
=
(
int8_t
*
)(
&
tObj
.
refCount
)
-
(
int8_t
*
)
&
tObj
,
.
keyType
=
SDB_KEY_AUTO
,
.
insertFp
=
mgmtDnodeActionInsert
,
.
deleteFp
=
mgmtDnodeActionDelete
,
.
updateFp
=
mgmtDnodeActionUpdate
,
.
encodeFp
=
mgmtDnodeActionEncode
,
.
decodeFp
=
mgmtDnodeActionDecode
,
.
destroyFp
=
mgmtDnodeActionDestroy
,
.
restoredFp
=
mgmtDnodeActionRestored
};
tsDnodeSdb
=
sdbOpenTable
(
&
tableDesc
);
if
(
tsDnodeSdb
==
NULL
)
{
mError
(
"failed to init dnodes data"
);
return
-
1
;
}
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_CREATE_DNODE
,
mgmtProcessCreateDnodeMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_DROP_DNODE
,
mgmtProcessDropDnodeMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_CONFIG_DNODE
,
mgmtProcessCfgDnodeMsg
);
mgmtAddDClientRspHandle
(
TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP
,
mgmtProcessCfgDnodeMsgRsp
);
mgmtAddDServerMsgHandle
(
TSDB_MSG_TYPE_DM_STATUS
,
mgmtProcessDnodeStatusMsg
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_MODULE
,
mgmtGetModuleMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_MODULE
,
mgmtRetrieveModules
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_CONFIGS
,
mgmtGetConfigMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_CONFIGS
,
mgmtRetrieveConfigs
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_VNODES
,
mgmtGetVnodeMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_VNODES
,
mgmtRetrieveVnodes
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_DNODE
,
mgmtGetDnodeMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_DNODE
,
mgmtRetrieveDnodes
);
return
clusterInitDnodes
();
mTrace
(
"dnodes table is created"
);
return
0
;
}
void
mgmtCleanupDnodes
()
{
sdbCloseTable
(
tsDnodeSdb
);
}
void
*
mgmtGetNextDnode
(
void
*
pNode
,
SDnodeObj
**
pDnode
)
{
return
sdbFetchRow
(
tsDnodeSdb
,
pNode
,
(
void
**
)
pDnode
);
}
int32_t
mgmtGetDnodesNum
()
{
return
sdbGetNumOfRows
(
tsDnodeSdb
);
}
void
*
mgmtGetDnode
(
int32_t
dnodeId
)
{
return
sdbGetRow
(
tsDnodeSdb
,
&
dnodeId
);
}
void
*
mgmtGetDnodeByIp
(
uint32_t
ip
)
{
SDnodeObj
*
pDnode
=
NULL
;
void
*
pNode
=
NULL
;
while
(
1
)
{
pNode
=
sdbFetchRow
(
tsDnodeSdb
,
pNode
,
(
void
**
)
&
pDnode
);
if
(
pDnode
==
NULL
)
break
;
if
(
ip
==
pDnode
->
privateIp
)
{
return
pDnode
;
}
mgmtReleaseDnode
(
pDnode
);
}
return
NULL
;
}
void
mgmtReleaseDnode
(
SDnodeObj
*
pDnode
)
{
sdbDecRef
(
tsDnodeSdb
,
pDnode
);
}
void
clusterCleanUp
()
{
clusterCleanupDnodes
();
void
mgmtUpdateDnode
(
SDnodeObj
*
pDnode
)
{
SSdbOperDesc
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsDnodeSdb
,
.
pObj
=
pDnode
,
.
rowSize
=
tsDnodeUpdateSize
};
sdbUpdateRow
(
&
oper
);
}
void
cluster
ProcessCfgDnodeMsg
(
SQueuedMsg
*
pMsg
)
{
void
mgmt
ProcessCfgDnodeMsg
(
SQueuedMsg
*
pMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
thandle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
SCMCfgDnodeMsg
*
pCmCfgDnode
=
pMsg
->
pCont
;
...
...
@@ -137,11 +255,11 @@ void clusterProcessCfgDnodeMsg(SQueuedMsg *pMsg) {
rpcSendResponse
(
&
rpcRsp
);
}
static
void
cluster
ProcessCfgDnodeMsgRsp
(
SRpcMsg
*
rpcMsg
)
{
static
void
mgmt
ProcessCfgDnodeMsgRsp
(
SRpcMsg
*
rpcMsg
)
{
mPrint
(
"cfg vnode rsp is received, result:%s"
,
tstrerror
(
rpcMsg
->
code
));
}
void
cluster
ProcessDnodeStatusMsg
(
SRpcMsg
*
rpcMsg
)
{
void
mgmt
ProcessDnodeStatusMsg
(
SRpcMsg
*
rpcMsg
)
{
SDMStatusMsg
*
pStatus
=
rpcMsg
->
pCont
;
pStatus
->
dnodeId
=
htonl
(
pStatus
->
dnodeId
);
pStatus
->
privateIp
=
htonl
(
pStatus
->
privateIp
);
...
...
@@ -159,14 +277,14 @@ void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
SDnodeObj
*
pDnode
=
NULL
;
if
(
pStatus
->
dnodeId
==
0
)
{
pDnode
=
cluster
GetDnodeByIp
(
pStatus
->
privateIp
);
pDnode
=
mgmt
GetDnodeByIp
(
pStatus
->
privateIp
);
if
(
pDnode
==
NULL
)
{
mTrace
(
"dnode not created, privateIp:%s"
,
taosIpStr
(
pStatus
->
privateIp
));
mgmtSendSimpleResp
(
rpcMsg
->
handle
,
TSDB_CODE_DNODE_NOT_EXIST
);
return
;
}
}
else
{
pDnode
=
cluster
GetDnode
(
pStatus
->
dnodeId
);
pDnode
=
mgmt
GetDnode
(
pStatus
->
dnodeId
);
if
(
pDnode
==
NULL
)
{
mError
(
"dnode:%d, not exist, privateIp:%s"
,
pStatus
->
dnodeId
,
taosIpStr
(
pStatus
->
privateIp
));
mgmtSendSimpleResp
(
rpcMsg
->
handle
,
TSDB_CODE_DNODE_NOT_EXIST
);
...
...
@@ -180,7 +298,7 @@ void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
pDnode
->
numOfCores
=
pStatus
->
numOfCores
;
pDnode
->
diskAvailable
=
pStatus
->
diskAvailable
;
pDnode
->
alternativeRole
=
pStatus
->
alternativeRole
;
pDnode
->
numOfTotalVnodes
=
pStatus
->
numOfTotalVnodes
;
pDnode
->
totalVnodes
=
pStatus
->
numOfTotalVnodes
;
if
(
pStatus
->
dnodeId
==
0
)
{
mTrace
(
"dnode:%d, first access, privateIp:%s, name:%s"
,
pDnode
->
dnodeId
,
taosIpStr
(
pDnode
->
privateIp
),
pDnode
->
dnodeName
);
...
...
@@ -209,10 +327,10 @@ void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
mTrace
(
"dnode:%d, from offline to online"
,
pDnode
->
dnodeId
);
pDnode
->
status
=
TAOS_DN_STATUS_READY
;
balanceNotify
();
cluster
MonitorDnodeModule
();
mgmt
MonitorDnodeModule
();
}
cluster
ReleaseDnode
(
pDnode
);
mgmt
ReleaseDnode
(
pDnode
);
int32_t
contLen
=
sizeof
(
SDMStatusRsp
)
+
TSDB_MAX_VNODES
*
sizeof
(
SVnodeAccess
);
SDMStatusRsp
*
pRsp
=
rpcMallocCont
(
contLen
);
...
...
@@ -242,7 +360,123 @@ void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
rpcSendResponse
(
&
rpcRsp
);
}
static
int32_t
clusterGetDnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
static
int32_t
mgmtCreateDnode
(
uint32_t
ip
)
{
int32_t
grantCode
=
grantCheck
(
TSDB_GRANT_DNODE
);
if
(
grantCode
!=
TSDB_CODE_SUCCESS
)
{
return
grantCode
;
}
SDnodeObj
*
pDnode
=
mgmtGetDnodeByIp
(
ip
);
if
(
pDnode
!=
NULL
)
{
mError
(
"dnode:%d is alredy exist, ip:%s"
,
pDnode
->
dnodeId
,
taosIpStr
(
pDnode
->
privateIp
));
return
TSDB_CODE_DNODE_ALREADY_EXIST
;
}
pDnode
=
(
SDnodeObj
*
)
calloc
(
1
,
sizeof
(
SDnodeObj
));
pDnode
->
privateIp
=
ip
;
pDnode
->
publicIp
=
ip
;
pDnode
->
createdTime
=
taosGetTimestampMs
();
pDnode
->
status
=
TAOS_DN_STATUS_OFFLINE
;
pDnode
->
totalVnodes
=
TSDB_INVALID_VNODE_NUM
;
sprintf
(
pDnode
->
dnodeName
,
"n%d"
,
sdbGetId
(
tsDnodeSdb
)
+
1
);
if
(
pDnode
->
privateIp
==
inet_addr
(
tsMasterIp
))
{
pDnode
->
moduleStatus
|=
(
1
<<
TSDB_MOD_MGMT
);
}
SSdbOperDesc
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsDnodeSdb
,
.
pObj
=
pDnode
,
.
rowSize
=
sizeof
(
SDnodeObj
)
};
int32_t
code
=
sdbInsertRow
(
&
oper
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tfree
(
pDnode
);
code
=
TSDB_CODE_SDB_ERROR
;
}
mPrint
(
"dnode:%d is created, result:%s"
,
pDnode
->
dnodeId
,
tstrerror
(
code
));
return
code
;
}
int32_t
mgmtDropDnode
(
SDnodeObj
*
pDnode
)
{
SSdbOperDesc
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsDnodeSdb
,
.
pObj
=
pDnode
};
int32_t
code
=
sdbDeleteRow
(
&
oper
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
code
=
TSDB_CODE_SDB_ERROR
;
}
mLPrint
(
"dnode:%d is dropped from cluster, result:%s"
,
pDnode
->
dnodeId
,
tstrerror
(
code
));
return
code
;
}
static
int32_t
clusterDropDnodeByIp
(
uint32_t
ip
)
{
SDnodeObj
*
pDnode
=
mgmtGetDnodeByIp
(
ip
);
if
(
pDnode
==
NULL
)
{
mError
(
"dnode:%s, is not exist"
,
taosIpStr
(
ip
));
return
TSDB_CODE_INVALID_VALUE
;
}
if
(
pDnode
->
privateIp
==
dnodeGetMnodeMasteIp
())
{
mError
(
"dnode:%d, can't drop dnode which is master"
,
pDnode
->
dnodeId
);
return
TSDB_CODE_NO_REMOVE_MASTER
;
}
#ifndef _VPEER
return
mgmtDropDnode
(
pDnode
);
#else
return
balanceDropDnode
(
pDnode
);
#endif
}
static
void
mgmtProcessCreateDnodeMsg
(
SQueuedMsg
*
pMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
thandle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
SCMCreateDnodeMsg
*
pCreate
=
pMsg
->
pCont
;
if
(
strcmp
(
pMsg
->
pUser
->
pAcct
->
user
,
"root"
)
!=
0
)
{
rpcRsp
.
code
=
TSDB_CODE_NO_RIGHTS
;
}
else
{
uint32_t
ip
=
inet_addr
(
pCreate
->
ip
);
rpcRsp
.
code
=
mgmtCreateDnode
(
ip
);
if
(
rpcRsp
.
code
==
TSDB_CODE_SUCCESS
)
{
SDnodeObj
*
pDnode
=
mgmtGetDnodeByIp
(
ip
);
mLPrint
(
"dnode:%d, ip:%s is created by %s"
,
pDnode
->
dnodeId
,
pCreate
->
ip
,
pMsg
->
pUser
->
user
);
}
else
{
mError
(
"failed to create dnode:%s, reason:%s"
,
pCreate
->
ip
,
tstrerror
(
rpcRsp
.
code
));
}
}
rpcSendResponse
(
&
rpcRsp
);
}
static
void
mgmtProcessDropDnodeMsg
(
SQueuedMsg
*
pMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
thandle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
SCMDropDnodeMsg
*
pDrop
=
pMsg
->
pCont
;
if
(
strcmp
(
pMsg
->
pUser
->
pAcct
->
user
,
"root"
)
!=
0
)
{
rpcRsp
.
code
=
TSDB_CODE_NO_RIGHTS
;
}
else
{
uint32_t
ip
=
inet_addr
(
pDrop
->
ip
);
rpcRsp
.
code
=
clusterDropDnodeByIp
(
ip
);
if
(
rpcRsp
.
code
==
TSDB_CODE_SUCCESS
)
{
mLPrint
(
"dnode:%s is dropped by %s"
,
pDrop
->
ip
,
pMsg
->
pUser
->
user
);
}
else
{
mError
(
"failed to drop dnode:%s, reason:%s"
,
pDrop
->
ip
,
tstrerror
(
rpcRsp
.
code
));
}
}
rpcSendResponse
(
&
rpcRsp
);
}
static
int32_t
mgmtGetDnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
if
(
pUser
==
NULL
)
return
0
;
...
...
@@ -309,7 +543,7 @@ static int32_t clusterGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *
pShow
->
offset
[
i
]
=
pShow
->
offset
[
i
-
1
]
+
pShow
->
bytes
[
i
-
1
];
}
pShow
->
numOfRows
=
cluster
GetDnodesNum
();
pShow
->
numOfRows
=
mgmt
GetDnodesNum
();
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
pShow
->
pNode
=
NULL
;
...
...
@@ -318,7 +552,7 @@ static int32_t clusterGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *
return
0
;
}
static
int32_t
cluster
RetrieveDnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
static
int32_t
mgmt
RetrieveDnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
int32_t
numOfRows
=
0
;
int32_t
cols
=
0
;
SDnodeObj
*
pDnode
=
NULL
;
...
...
@@ -326,7 +560,7 @@ static int32_t clusterRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows,
char
ipstr
[
32
];
while
(
numOfRows
<
rows
)
{
pShow
->
pNode
=
cluster
GetNextDnode
(
pShow
->
pNode
,
&
pDnode
);
pShow
->
pNode
=
mgmt
GetNextDnode
(
pShow
->
pNode
,
&
pDnode
);
if
(
pDnode
==
NULL
)
break
;
cols
=
0
;
...
...
@@ -350,7 +584,7 @@ static int32_t clusterRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows,
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
cluster
GetDnodeStatusStr
(
pDnode
->
status
));
strcpy
(
pWrite
,
mgmt
GetDnodeStatusStr
(
pDnode
->
status
));
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
...
...
@@ -358,29 +592,29 @@ static int32_t clusterRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows,
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int16_t
*
)
pWrite
=
pDnode
->
numOfT
otalVnodes
;
*
(
int16_t
*
)
pWrite
=
pDnode
->
t
otalVnodes
;
cols
++
;
#ifdef _VPEER
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
cluster
GetDnodeStatusStr
(
pDnode
->
status
));
strcpy
(
pWrite
,
mgmt
GetDnodeStatusStr
(
pDnode
->
status
));
cols
++
;
#endif
numOfRows
++
;
cluster
ReleaseDnode
(
pDnode
);
mgmt
ReleaseDnode
(
pDnode
);
}
pShow
->
numOfReads
+=
numOfRows
;
return
numOfRows
;
}
bool
cluster
CheckModuleInDnode
(
SDnodeObj
*
pDnode
,
int32_t
moduleType
)
{
bool
mgmt
CheckModuleInDnode
(
SDnodeObj
*
pDnode
,
int32_t
moduleType
)
{
uint32_t
status
=
pDnode
->
moduleStatus
&
(
1
<<
moduleType
);
return
status
>
0
;
}
static
int32_t
cluster
GetModuleMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
static
int32_t
mgmt
GetModuleMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
int32_t
cols
=
0
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
...
...
@@ -419,10 +653,10 @@ static int32_t clusterGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
pShow
->
numOfRows
=
0
;
SDnodeObj
*
pDnode
=
NULL
;
while
(
1
)
{
pShow
->
pNode
=
cluster
GetNextDnode
(
pShow
->
pNode
,
(
SDnodeObj
**
)
&
pDnode
);
pShow
->
pNode
=
mgmt
GetNextDnode
(
pShow
->
pNode
,
(
SDnodeObj
**
)
&
pDnode
);
if
(
pDnode
==
NULL
)
break
;
for
(
int32_t
moduleType
=
0
;
moduleType
<
TSDB_MOD_MAX
;
++
moduleType
)
{
if
(
cluster
CheckModuleInDnode
(
pDnode
,
moduleType
))
{
if
(
mgmt
CheckModuleInDnode
(
pDnode
,
moduleType
))
{
pShow
->
numOfRows
++
;
}
}
...
...
@@ -435,7 +669,7 @@ static int32_t clusterGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
return
0
;
}
int32_t
cluster
RetrieveModules
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
int32_t
mgmt
RetrieveModules
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
int32_t
numOfRows
=
0
;
SDnodeObj
*
pDnode
=
NULL
;
char
*
pWrite
;
...
...
@@ -443,12 +677,12 @@ int32_t clusterRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *
char
ipstr
[
20
];
while
(
numOfRows
<
rows
)
{
cluster
ReleaseDnode
(
pDnode
);
pShow
->
pNode
=
cluster
GetNextDnode
(
pShow
->
pNode
,
(
SDnodeObj
**
)
&
pDnode
);
mgmt
ReleaseDnode
(
pDnode
);
pShow
->
pNode
=
mgmt
GetNextDnode
(
pShow
->
pNode
,
(
SDnodeObj
**
)
&
pDnode
);
if
(
pDnode
==
NULL
)
break
;
for
(
int32_t
moduleType
=
0
;
moduleType
<
TSDB_MOD_MAX
;
++
moduleType
)
{
if
(
!
cluster
CheckModuleInDnode
(
pDnode
,
moduleType
))
{
if
(
!
mgmt
CheckModuleInDnode
(
pDnode
,
moduleType
))
{
continue
;
}
...
...
@@ -464,7 +698,7 @@ int32_t clusterRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
cluster
GetDnodeStatusStr
(
pDnode
->
status
));
strcpy
(
pWrite
,
mgmt
GetDnodeStatusStr
(
pDnode
->
status
));
cols
++
;
numOfRows
++
;
...
...
@@ -481,7 +715,7 @@ static bool clusterCheckConfigShow(SGlobalConfig *cfg) {
return
true
;
}
static
int32_t
cluster
GetConfigMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
static
int32_t
mgmt
GetConfigMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
int32_t
cols
=
0
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
...
...
@@ -523,7 +757,7 @@ static int32_t clusterGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
return
0
;
}
static
int32_t
cluster
RetrieveConfigs
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
static
int32_t
mgmt
RetrieveConfigs
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
int32_t
numOfRows
=
0
;
for
(
int32_t
i
=
tsGlobalConfigNum
-
1
;
i
>=
0
&&
numOfRows
<
rows
;
--
i
)
{
...
...
@@ -570,7 +804,7 @@ static int32_t clusterRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows,
return
numOfRows
;
}
static
int32_t
cluster
GetVnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
static
int32_t
mgmt
GetVnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
int32_t
cols
=
0
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
if
(
pUser
==
NULL
)
return
0
;
...
...
@@ -599,7 +833,7 @@ static int32_t clusterGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *
SDnodeObj
*
pDnode
=
NULL
;
if
(
pShow
->
payloadLen
>
0
)
{
uint32_t
ip
=
ip2uint
(
pShow
->
payload
);
pDnode
=
cluster
GetDnodeByIp
(
ip
);
pDnode
=
mgmt
GetDnodeByIp
(
ip
);
if
(
NULL
==
pDnode
)
{
return
TSDB_CODE_NODE_OFFLINE
;
}
...
...
@@ -616,7 +850,7 @@ static int32_t clusterGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *
pShow
->
pNode
=
pDnode
;
}
else
{
while
(
true
)
{
pShow
->
pNode
=
cluster
GetNextDnode
(
pShow
->
pNode
,
(
SDnodeObj
**
)
&
pDnode
);
pShow
->
pNode
=
mgmt
GetNextDnode
(
pShow
->
pNode
,
(
SDnodeObj
**
)
&
pDnode
);
if
(
pDnode
==
NULL
)
break
;
pShow
->
numOfRows
+=
pDnode
->
openVnodes
;
...
...
@@ -627,13 +861,13 @@ static int32_t clusterGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *
}
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
cluster
ReleaseDnode
(
pDnode
);
mgmt
ReleaseDnode
(
pDnode
);
mgmtReleaseUser
(
pUser
);
return
0
;
}
static
int32_t
cluster
RetrieveVnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
static
int32_t
mgmt
RetrieveVnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
int32_t
numOfRows
=
0
;
SDnodeObj
*
pDnode
=
NULL
;
char
*
pWrite
;
...
...
@@ -674,7 +908,7 @@ static int32_t clusterRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows,
return
numOfRows
;
}
char
*
cluster
GetDnodeStatusStr
(
int32_t
dnodeStatus
)
{
char
*
mgmt
GetDnodeStatusStr
(
int32_t
dnodeStatus
)
{
switch
(
dnodeStatus
)
{
case
TAOS_DN_STATUS_OFFLINE
:
return
"offline"
;
case
TAOS_DN_STATUS_DROPPING
:
return
"dropping"
;
...
...
@@ -683,3 +917,155 @@ char* clusterGetDnodeStatusStr(int32_t dnodeStatus) {
default:
return
"undefined"
;
}
}
static
void
clusterSetModuleInDnode
(
SDnodeObj
*
pDnode
,
int32_t
moduleType
)
{
pDnode
->
moduleStatus
|=
(
1
<<
moduleType
);
mgmtUpdateDnode
(
pDnode
);
if
(
moduleType
==
TSDB_MOD_MGMT
)
{
mpeerAddMnode
(
pDnode
->
dnodeId
);
mPrint
(
"dnode:%d, add it into mnode list"
,
pDnode
->
dnodeId
);
}
}
static
void
clusterUnSetModuleInDnode
(
SDnodeObj
*
pDnode
,
int32_t
moduleType
)
{
pDnode
->
moduleStatus
&=
~
(
1
<<
moduleType
);
mgmtUpdateDnode
(
pDnode
);
if
(
moduleType
==
TSDB_MOD_MGMT
)
{
mpeerRemoveMnode
(
pDnode
->
dnodeId
);
mPrint
(
"dnode:%d, remove it from mnode list"
,
pDnode
->
dnodeId
);
}
}
static
void
clusterStopAllModuleInDnode
(
SDnodeObj
*
pDnode
)
{
for
(
int32_t
moduleType
=
0
;
moduleType
<
TSDB_MOD_MAX
;
++
moduleType
)
{
if
(
!
mgmtCheckModuleInDnode
(
pDnode
,
moduleType
))
{
continue
;
}
mPrint
(
"dnode:%d, stop %s module for its offline or remove"
,
pDnode
->
dnodeId
,
tsModule
[
moduleType
].
name
);
clusterUnSetModuleInDnode
(
pDnode
,
moduleType
);
}
}
static
void
clusterStartModuleInAllDnodes
(
int32_t
moduleType
)
{
void
*
pNode
=
NULL
;
SDnodeObj
*
pDnode
=
NULL
;
while
(
1
)
{
pNode
=
mgmtGetNextDnode
(
pNode
,
&
pDnode
);
if
(
pDnode
==
NULL
)
break
;
if
(
!
mgmtCheckModuleInDnode
(
pDnode
,
moduleType
)
&&
pDnode
->
status
!=
TAOS_DN_STATUS_OFFLINE
&&
pDnode
->
status
!=
TAOS_DN_STATUS_DROPPING
)
{
mPrint
(
"dnode:%d, add %s module for schedule"
,
pDnode
->
dnodeId
,
tsModule
[
moduleType
].
name
);
clusterSetModuleInDnode
(
pDnode
,
moduleType
);
}
mgmtReleaseDnode
(
pNode
);
}
}
static
void
clusterStartModuleInOneDnode
(
int32_t
moduleType
)
{
void
*
pNode
=
NULL
;
SDnodeObj
*
pDnode
=
NULL
;
while
(
1
)
{
pNode
=
mgmtGetNextDnode
(
pNode
,
&
pDnode
);
if
(
pDnode
==
NULL
)
break
;
if
(
!
mgmtCheckModuleInDnode
(
pDnode
,
moduleType
)
&&
pDnode
->
status
!=
TAOS_DN_STATUS_OFFLINE
&&
pDnode
->
status
!=
TAOS_DN_STATUS_DROPPING
&&
!
(
moduleType
==
TSDB_MOD_MGMT
&&
pDnode
->
alternativeRole
==
TSDB_DNODE_ROLE_VNODE
))
{
mPrint
(
"dnode:%d, add %s module for schedule"
,
pDnode
->
dnodeId
,
tsModule
[
moduleType
].
name
);
clusterSetModuleInDnode
(
pDnode
,
moduleType
);
mgmtReleaseDnode
(
pNode
);
break
;
}
mgmtReleaseDnode
(
pNode
);
}
}
static
void
clusterStopModuleInOneDnode
(
int32_t
moduleType
)
{
void
*
pNode
=
NULL
;
SDnodeObj
*
pDnode
=
NULL
;
while
(
1
)
{
pNode
=
mgmtGetNextDnode
(
pNode
,
&
pDnode
);
if
(
pDnode
==
NULL
)
break
;
if
(
mgmtCheckModuleInDnode
(
pDnode
,
moduleType
))
{
mPrint
(
"dnode:%d, stop %s module for schedule"
,
pDnode
->
dnodeId
,
tsModule
[
moduleType
].
name
);
clusterUnSetModuleInDnode
(
pDnode
,
moduleType
);
mgmtReleaseDnode
(
pNode
);
break
;
}
mgmtReleaseDnode
(
pNode
);
}
}
void
mgmtMonitorDnodeModule
()
{
void
*
pNode
=
NULL
;
SDnodeObj
*
pDnode
=
NULL
;
int32_t
onlineDnodes
=
0
;
for
(
int32_t
moduleType
=
0
;
moduleType
<
TSDB_MOD_MGMT
+
1
;
++
moduleType
)
{
tsModule
[
moduleType
].
curNum
=
0
;
}
// dnode loop
while
(
1
)
{
pNode
=
mgmtGetNextDnode
(
pNode
,
&
pDnode
);
if
(
pDnode
==
NULL
)
break
;
if
(
pDnode
->
status
==
TAOS_DN_STATUS_DROPPING
)
{
mPrint
(
"dnode:%d, status:%d, remove all modules for removing"
,
pDnode
->
dnodeId
,
pDnode
->
status
);
clusterStopAllModuleInDnode
(
pDnode
);
mgmtReleaseDnode
(
pDnode
);
continue
;
}
for
(
int32_t
moduleType
=
0
;
moduleType
<
TSDB_MOD_MGMT
+
1
;
++
moduleType
)
{
if
(
mgmtCheckModuleInDnode
(
pDnode
,
moduleType
))
{
tsModule
[
moduleType
].
curNum
++
;
}
}
if
(
pDnode
->
status
!=
TAOS_DN_STATUS_OFFLINE
)
{
onlineDnodes
++
;
}
mgmtReleaseDnode
(
pDnode
);
}
for
(
int32_t
moduleType
=
0
;
moduleType
<
TSDB_MOD_MGMT
+
1
;
++
moduleType
)
{
if
(
tsModule
[
moduleType
].
num
==
-
1
)
{
clusterStartModuleInAllDnodes
(
moduleType
);
continue
;
}
if
(
tsModule
[
moduleType
].
curNum
<
tsModule
[
moduleType
].
num
)
{
if
(
onlineDnodes
<=
tsModule
[
moduleType
].
curNum
)
{
continue
;
}
mTrace
(
"need add %s module, curNum:%d, expectNum:%d"
,
tsModule
[
moduleType
].
name
,
tsModule
[
moduleType
].
curNum
,
tsModule
[
moduleType
].
num
);
for
(
int32_t
i
=
tsModule
[
moduleType
].
curNum
;
i
<
tsModule
[
moduleType
].
num
;
++
i
)
{
clusterStartModuleInOneDnode
(
moduleType
);
}
}
else
if
(
tsModule
[
moduleType
].
curNum
>
tsModule
[
moduleType
].
num
)
{
mTrace
(
"need drop %s module, curNum:%d, expectNum:%d"
,
tsModule
[
moduleType
].
name
,
tsModule
[
moduleType
].
curNum
,
tsModule
[
moduleType
].
num
);
for
(
int32_t
i
=
tsModule
[
moduleType
].
num
;
i
<
tsModule
[
moduleType
].
curNum
;
++
i
)
{
clusterStopModuleInOneDnode
(
moduleType
);
}
}
else
{
}
}
}
src/mnode/src/mgmtMain.c
浏览文件 @
fe12d0fe
...
...
@@ -19,9 +19,9 @@
#include "tmodule.h"
#include "tsched.h"
#include "mnode.h"
#include "
taccoun
t.h"
#include "
mgmtAcc
t.h"
#include "tbalance.h"
#include "
tcluster
.h"
#include "
mgmtDnode
.h"
#include "tgrant.h"
#include "mpeer.h"
#include "mgmtDb.h"
...
...
@@ -74,7 +74,7 @@ int32_t mgmtStartSystem() {
return
-
1
;
}
if
(
acctInit
()
<
0
)
{
if
(
mgmtInitAccts
()
<
0
)
{
mError
(
"failed to init accts"
);
return
-
1
;
}
...
...
@@ -89,7 +89,7 @@ int32_t mgmtStartSystem() {
return
-
1
;
}
if
(
clusterInit
()
<
0
)
{
if
(
mgmtInitDnodes
()
<
0
)
{
mError
(
"failed to init dnodes"
);
return
-
1
;
}
...
...
@@ -160,9 +160,9 @@ void mgmtCleanUpSystem() {
mgmtCleanUpTables
();
mgmtCleanUpVgroups
();
mgmtCleanUpDbs
();
clusterCleanUp
();
mgmtCleanupDnodes
();
mgmtCleanUpUsers
();
acctCleanUp
();
mgmtCleanUpAccts
();
sdbCleanUp
();
taosTmrCleanUp
(
tsMgmtTmr
);
mPrint
(
"mgmt is cleaned up"
);
...
...
src/mnode/src/mgmtProfile.c
浏览文件 @
fe12d0fe
...
...
@@ -16,8 +16,8 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "
taccoun
t.h"
#include "
tcluster
.h"
#include "
mgmtAcc
t.h"
#include "
mgmtDnode
.h"
#include "mgmtDb.h"
#include "mpeer.h"
#include "mgmtProfile.h"
...
...
@@ -787,11 +787,11 @@ void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) {
if
(
pMsg
!=
NULL
)
{
rpcFreeCont
(
pMsg
->
pCont
);
if
(
pMsg
->
pUser
)
mgmtReleaseUser
(
pMsg
->
pUser
);
if
(
pMsg
->
pDb
)
mgmt
ReleaseDb
(
pMsg
->
pDb
);
if
(
pMsg
->
pDb
)
mgmt
DecDbRef
(
pMsg
->
pDb
);
if
(
pMsg
->
pVgroup
)
mgmtReleaseVgroup
(
pMsg
->
pVgroup
);
if
(
pMsg
->
pTable
)
mgmtDecTableRef
(
pMsg
->
pTable
);
if
(
pMsg
->
pAcct
)
acctReleaseAcct
(
pMsg
->
pAcct
);
if
(
pMsg
->
pDnode
)
cluster
ReleaseDnode
(
pMsg
->
pDnode
);
if
(
pMsg
->
pAcct
)
mgmtDecAcctRef
(
pMsg
->
pAcct
);
if
(
pMsg
->
pDnode
)
mgmt
ReleaseDnode
(
pMsg
->
pDnode
);
free
(
pMsg
);
}
}
...
...
src/mnode/src/mgmtShell.c
浏览文件 @
fe12d0fe
...
...
@@ -22,10 +22,10 @@
#include "tsched.h"
#include "dnode.h"
#include "mnode.h"
#include "
taccoun
t.h"
#include "
mgmtAcc
t.h"
#include "tbalance.h"
#include "mgmtDb.h"
#include "
tcluster
.h"
#include "
mgmtDnode
.h"
#include "tgrant.h"
#include "mpeer.h"
#include "mgmtProfile.h"
...
...
src/mnode/src/mgmtTable.c
浏览文件 @
fe12d0fe
...
...
@@ -23,10 +23,10 @@
#include "taosmsg.h"
#include "tscompression.h"
#include "name.h"
#include "
taccoun
t.h"
#include "
mgmtAcc
t.h"
#include "mgmtDClient.h"
#include "mgmtDb.h"
#include "
tcluster
.h"
#include "
mgmtDnode
.h"
#include "mgmtDServer.h"
#include "tgrant.h"
#include "mpeer.h"
...
...
@@ -101,14 +101,14 @@ static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) {
mError
(
"ctable:%s, vgroup:%d not in db:%s"
,
pTable
->
info
.
tableId
,
pVgroup
->
vgId
,
pVgroup
->
dbName
);
return
TSDB_CODE_INVALID_DB
;
}
mgmt
ReleaseDb
(
pDb
);
mgmt
DecDbRef
(
pDb
);
SAcctObj
*
pAcct
=
acc
tGetAcct
(
pDb
->
cfg
.
acct
);
SAcctObj
*
pAcct
=
mgm
tGetAcct
(
pDb
->
cfg
.
acct
);
if
(
pAcct
==
NULL
)
{
mError
(
"ctable:%s, account:%s not exists"
,
pTable
->
info
.
tableId
,
pDb
->
cfg
.
acct
);
return
TSDB_CODE_INVALID_ACCT
;
}
acctReleaseAcct
(
pAcct
);
mgmtDecAcctRef
(
pAcct
);
if
(
pTable
->
info
.
type
==
TSDB_CHILD_TABLE
)
{
pTable
->
superTable
=
mgmtGetSuperTable
(
pTable
->
superTableId
);
...
...
@@ -143,14 +143,14 @@ static int32_t mgmtChildTableActionDelete(SSdbOperDesc *pOper) {
mError
(
"ctable:%s, vgroup:%d not in DB:%s"
,
pTable
->
info
.
tableId
,
pVgroup
->
vgId
,
pVgroup
->
dbName
);
return
TSDB_CODE_INVALID_DB
;
}
mgmt
ReleaseDb
(
pDb
);
mgmt
DecDbRef
(
pDb
);
SAcctObj
*
pAcct
=
acc
tGetAcct
(
pDb
->
cfg
.
acct
);
SAcctObj
*
pAcct
=
mgm
tGetAcct
(
pDb
->
cfg
.
acct
);
if
(
pAcct
==
NULL
)
{
mError
(
"ctable:%s, account:%s not exists"
,
pTable
->
info
.
tableId
,
pDb
->
cfg
.
acct
);
return
TSDB_CODE_INVALID_ACCT
;
}
acctReleaseAcct
(
pAcct
);
mgmtDecAcctRef
(
pAcct
);
if
(
pTable
->
info
.
type
==
TSDB_CHILD_TABLE
)
{
grantRestore
(
TSDB_GRANT_TIMESERIES
,
pTable
->
superTable
->
numOfColumns
-
1
);
...
...
@@ -258,7 +258,7 @@ static int32_t mgmtChildTableActionRestored() {
pNode
=
pLastNode
;
continue
;
}
mgmt
ReleaseDb
(
pDb
);
mgmt
DecDbRef
(
pDb
);
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
pTable
->
vgId
);
if
(
pVgroup
==
NULL
)
{
...
...
@@ -401,7 +401,7 @@ static int32_t mgmtSuperTableActionInsert(SSdbOperDesc *pOper) {
if
(
pDb
!=
NULL
)
{
mgmtAddSuperTableIntoDb
(
pDb
);
}
mgmt
ReleaseDb
(
pDb
);
mgmt
DecDbRef
(
pDb
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -413,7 +413,7 @@ static int32_t mgmtSuperTableActionDelete(SSdbOperDesc *pOper) {
mgmtRemoveSuperTableFromDb
(
pDb
);
mgmtDropAllChildTablesInStable
((
SSuperTableObj
*
)
pStable
);
}
mgmt
ReleaseDb
(
pDb
);
mgmt
DecDbRef
(
pDb
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -923,10 +923,10 @@ static int32_t mgmtAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SSc
pStable
->
numOfColumns
+=
ncols
;
pStable
->
sversion
++
;
SAcctObj
*
pAcct
=
acc
tGetAcct
(
pDb
->
cfg
.
acct
);
SAcctObj
*
pAcct
=
mgm
tGetAcct
(
pDb
->
cfg
.
acct
);
if
(
pAcct
!=
NULL
)
{
pAcct
->
acctInfo
.
numOfTimeSeries
+=
(
ncols
*
pStable
->
numOfTables
);
acctReleaseAcct
(
pAcct
);
mgmtDecAcctRef
(
pAcct
);
}
SSdbOperDesc
oper
=
{
...
...
@@ -960,10 +960,10 @@ static int32_t mgmtDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, ch
int32_t
schemaSize
=
sizeof
(
SSchema
)
*
(
pStable
->
numOfTags
+
pStable
->
numOfColumns
);
pStable
->
schema
=
realloc
(
pStable
->
schema
,
schemaSize
);
SAcctObj
*
pAcct
=
acc
tGetAcct
(
pDb
->
cfg
.
acct
);
SAcctObj
*
pAcct
=
mgm
tGetAcct
(
pDb
->
cfg
.
acct
);
if
(
pAcct
!=
NULL
)
{
pAcct
->
acctInfo
.
numOfTimeSeries
-=
pStable
->
numOfTables
;
acctReleaseAcct
(
pAcct
);
mgmtDecAcctRef
(
pAcct
);
}
SSdbOperDesc
oper
=
{
...
...
@@ -1029,7 +1029,7 @@ static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow,
pShow
->
numOfRows
=
pDb
->
numOfSuperTables
;
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
mgmt
ReleaseDb
(
pDb
);
mgmt
DecDbRef
(
pDb
);
return
0
;
}
...
...
@@ -1094,7 +1094,7 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v
}
pShow
->
numOfReads
+=
numOfRows
;
mgmt
ReleaseDb
(
pDb
);
mgmt
DecDbRef
(
pDb
);
return
numOfRows
;
}
...
...
@@ -1191,14 +1191,14 @@ static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) {
pRsp
->
vgroups
[
vg
].
vgId
=
htonl
(
vgId
);
for
(
int32_t
vn
=
0
;
vn
<
pVgroup
->
numOfVnodes
;
++
vn
)
{
SDnodeObj
*
pDnode
=
cluster
GetDnode
(
pVgroup
->
vnodeGid
[
vn
].
dnodeId
);
SDnodeObj
*
pDnode
=
mgmt
GetDnode
(
pVgroup
->
vnodeGid
[
vn
].
dnodeId
);
if
(
pDnode
==
NULL
)
break
;
pRsp
->
vgroups
[
vg
].
ipAddr
[
vn
].
ip
=
htonl
(
pDnode
->
privateIp
);
pRsp
->
vgroups
[
vg
].
ipAddr
[
vn
].
port
=
htons
(
tsDnodeShellPort
);
pRsp
->
vgroups
[
vg
].
numOfIps
++
;
cluster
ReleaseDnode
(
pDnode
);
mgmt
ReleaseDnode
(
pDnode
);
}
mgmtReleaseVgroup
(
pVgroup
);
...
...
@@ -1500,10 +1500,10 @@ static int32_t mgmtAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSc
pTable
->
numOfColumns
+=
ncols
;
pTable
->
sversion
++
;
SAcctObj
*
pAcct
=
acc
tGetAcct
(
pDb
->
cfg
.
acct
);
SAcctObj
*
pAcct
=
mgm
tGetAcct
(
pDb
->
cfg
.
acct
);
if
(
pAcct
!=
NULL
)
{
pAcct
->
acctInfo
.
numOfTimeSeries
+=
ncols
;
acctReleaseAcct
(
pAcct
);
mgmtDecAcctRef
(
pAcct
);
}
SSdbOperDesc
oper
=
{
...
...
@@ -1534,10 +1534,10 @@ static int32_t mgmtDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, ch
pTable
->
numOfColumns
--
;
pTable
->
sversion
++
;
SAcctObj
*
pAcct
=
acc
tGetAcct
(
pDb
->
cfg
.
acct
);
SAcctObj
*
pAcct
=
mgm
tGetAcct
(
pDb
->
cfg
.
acct
);
if
(
pAcct
!=
NULL
)
{
pAcct
->
acctInfo
.
numOfTimeSeries
--
;
acctReleaseAcct
(
pAcct
);
mgmtDecAcctRef
(
pAcct
);
}
SSdbOperDesc
oper
=
{
...
...
@@ -1600,7 +1600,7 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
}
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
SDnodeObj
*
pDnode
=
cluster
GetDnode
(
pVgroup
->
vnodeGid
[
i
].
dnodeId
);
SDnodeObj
*
pDnode
=
mgmt
GetDnode
(
pVgroup
->
vnodeGid
[
i
].
dnodeId
);
if
(
pDnode
==
NULL
)
break
;
if
(
usePublicIp
)
{
pMeta
->
vgroup
.
ipAddr
[
i
].
ip
=
htonl
(
pDnode
->
publicIp
);
...
...
@@ -1610,7 +1610,7 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
pMeta
->
vgroup
.
ipAddr
[
i
].
port
=
htonl
(
tsDnodeShellPort
);
}
pMeta
->
vgroup
.
numOfIps
++
;
cluster
ReleaseDnode
(
pDnode
);
mgmt
ReleaseDnode
(
pDnode
);
}
pMeta
->
vgroup
.
vgId
=
htonl
(
pVgroup
->
vgId
);
...
...
@@ -1730,7 +1730,7 @@ static void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) {
}
static
SChildTableObj
*
mgmtGetTableByPos
(
uint32_t
dnodeId
,
int32_t
vnode
,
int32_t
sid
)
{
SDnodeObj
*
pObj
=
cluster
GetDnode
(
dnodeId
);
SDnodeObj
*
pObj
=
mgmt
GetDnode
(
dnodeId
);
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
vnode
);
if
(
pObj
==
NULL
||
pVgroup
==
NULL
)
{
...
...
@@ -1968,7 +1968,7 @@ static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
pShow
->
numOfRows
=
pDb
->
numOfTables
;
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
mgmt
ReleaseDb
(
pDb
);
mgmt
DecDbRef
(
pDb
);
return
0
;
}
...
...
@@ -2045,7 +2045,7 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows,
const
int32_t
NUM_OF_COLUMNS
=
4
;
mgmtVacuumResult
(
data
,
NUM_OF_COLUMNS
,
numOfRows
,
rows
,
pShow
);
mgmt
ReleaseDb
(
pDb
);
mgmt
DecDbRef
(
pDb
);
return
numOfRows
;
}
...
...
src/mnode/src/mgmtUser.c
浏览文件 @
fe12d0fe
...
...
@@ -18,7 +18,7 @@
#include "trpc.h"
#include "ttime.h"
#include "tutil.h"
#include "
taccoun
t.h"
#include "
mgmtAcc
t.h"
#include "tgrant.h"
#include "mpeer.h"
#include "mgmtSdb.h"
...
...
@@ -40,10 +40,10 @@ static int32_t mgmtUserActionDestroy(SSdbOperDesc *pOper) {
static
int32_t
mgmtUserActionInsert
(
SSdbOperDesc
*
pOper
)
{
SUserObj
*
pUser
=
pOper
->
pObj
;
SAcctObj
*
pAcct
=
acc
tGetAcct
(
pUser
->
acct
);
SAcctObj
*
pAcct
=
mgm
tGetAcct
(
pUser
->
acct
);
if
(
pAcct
!=
NULL
)
{
acctAddUser
(
pAcct
,
pUser
);
mgmtAddUserToAcct
(
pAcct
,
pUser
);
}
else
{
mError
(
"user:%s, acct:%s info not exist in sdb"
,
pUser
->
user
,
pUser
->
acct
);
...
...
@@ -55,10 +55,10 @@ static int32_t mgmtUserActionInsert(SSdbOperDesc *pOper) {
static
int32_t
mgmtUserActionDelete
(
SSdbOperDesc
*
pOper
)
{
SUserObj
*
pUser
=
pOper
->
pObj
;
SAcctObj
*
pAcct
=
acc
tGetAcct
(
pUser
->
acct
);
SAcctObj
*
pAcct
=
mgm
tGetAcct
(
pUser
->
acct
);
if
(
pAcct
!=
NULL
)
{
acctRemoveUser
(
pAcct
,
pUser
);
mgmtDropUserFromAcct
(
pAcct
,
pUser
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -92,11 +92,11 @@ static int32_t mgmtUserActionDecode(SSdbOperDesc *pOper) {
static
int32_t
mgmtUserActionRestored
()
{
if
(
strcmp
(
tsMasterIp
,
tsPrivateIp
)
==
0
)
{
SAcctObj
*
pAcct
=
acc
tGetAcct
(
"root"
);
SAcctObj
*
pAcct
=
mgm
tGetAcct
(
"root"
);
mgmtCreateUser
(
pAcct
,
"root"
,
"taosdata"
);
mgmtCreateUser
(
pAcct
,
"monitor"
,
tsInternalPass
);
mgmtCreateUser
(
pAcct
,
"_root"
,
tsInternalPass
);
acctReleaseAcct
(
pAcct
);
mgmtDecAcctRef
(
pAcct
);
}
return
0
;
...
...
@@ -167,7 +167,7 @@ static int32_t mgmtUpdateUser(SUserObj *pUser) {
}
int32_t
mgmtCreateUser
(
SAcctObj
*
pAcct
,
char
*
name
,
char
*
pass
)
{
int32_t
code
=
acctCheck
(
pAcct
,
TSDB_ACC
T_USER
);
int32_t
code
=
acctCheck
(
pAcct
,
ACCT_GRAN
T_USER
);
if
(
code
!=
0
)
{
return
code
;
}
...
...
src/mnode/src/mgmtVgroup.c
浏览文件 @
fe12d0fe
...
...
@@ -19,7 +19,7 @@
#include "tlog.h"
#include "tbalance.h"
#include "tsync.h"
#include "
tcluster
.h"
#include "
mgmtDnode
.h"
#include "mnode.h"
#include "mgmtDb.h"
#include "mgmtDClient.h"
...
...
@@ -63,7 +63,7 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) {
if
(
pDb
==
NULL
)
{
return
TSDB_CODE_INVALID_DB
;
}
mgmt
ReleaseDb
(
pDb
);
mgmt
DecDbRef
(
pDb
);
pVgroup
->
pDb
=
pDb
;
pVgroup
->
prev
=
NULL
;
...
...
@@ -84,12 +84,12 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) {
}
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
SDnodeObj
*
pDnode
=
cluster
GetDnode
(
pVgroup
->
vnodeGid
[
i
].
dnodeId
);
SDnodeObj
*
pDnode
=
mgmt
GetDnode
(
pVgroup
->
vnodeGid
[
i
].
dnodeId
);
if
(
pDnode
!=
NULL
)
{
pVgroup
->
vnodeGid
[
i
].
privateIp
=
pDnode
->
privateIp
;
pVgroup
->
vnodeGid
[
i
].
publicIp
=
pDnode
->
publicIp
;
atomic_add_fetch_32
(
&
pDnode
->
openVnodes
,
1
);
cluster
ReleaseDnode
(
pDnode
);
mgmt
ReleaseDnode
(
pDnode
);
}
}
...
...
@@ -106,14 +106,14 @@ static int32_t mgmtVgroupActionDelete(SSdbOperDesc *pOper) {
mgmtRemoveVgroupFromDb
(
pVgroup
);
}
mgmt
ReleaseDb
(
pVgroup
->
pDb
);
mgmt
DecDbRef
(
pVgroup
->
pDb
);
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
SDnodeObj
*
pDnode
=
cluster
GetDnode
(
pVgroup
->
vnodeGid
[
i
].
dnodeId
);
SDnodeObj
*
pDnode
=
mgmt
GetDnode
(
pVgroup
->
vnodeGid
[
i
].
dnodeId
);
if
(
pDnode
)
{
atomic_sub_fetch_32
(
&
pDnode
->
openVnodes
,
1
);
}
cluster
ReleaseDnode
(
pDnode
);
mgmt
ReleaseDnode
(
pDnode
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -381,18 +381,18 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
pShow
->
pNode
=
pVgroup
;
}
mgmt
ReleaseDb
(
pDb
);
mgmt
DecDbRef
(
pDb
);
return
0
;
}
char
*
mgmtGetVnodeStatus
(
SVgObj
*
pVgroup
,
SVnodeGid
*
pVnode
)
{
SDnodeObj
*
pDnode
=
cluster
GetDnode
(
pVnode
->
dnodeId
);
SDnodeObj
*
pDnode
=
mgmt
GetDnode
(
pVnode
->
dnodeId
);
if
(
pDnode
==
NULL
)
{
mError
(
"vgroup:%d, not exist in dnode:%d"
,
pVgroup
->
vgId
,
pDnode
->
dnodeId
);
return
"null"
;
}
cluster
ReleaseDnode
(
pDnode
);
mgmt
ReleaseDnode
(
pDnode
);
if
(
pDnode
->
status
==
TAOS_DN_STATUS_OFFLINE
)
{
return
"offline"
;
...
...
@@ -467,7 +467,7 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
}
pShow
->
numOfReads
+=
numOfRows
;
mgmt
ReleaseDb
(
pDb
);
mgmt
DecDbRef
(
pDb
);
return
numOfRows
;
}
...
...
@@ -676,13 +676,13 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) {
pCfg
->
dnodeId
=
htonl
(
pCfg
->
dnodeId
);
pCfg
->
vgId
=
htonl
(
pCfg
->
vgId
);
SDnodeObj
*
pDnode
=
cluster
GetDnode
(
pCfg
->
dnodeId
);
SDnodeObj
*
pDnode
=
mgmt
GetDnode
(
pCfg
->
dnodeId
);
if
(
pDnode
==
NULL
)
{
mTrace
(
"dnode:%s, invalid dnode"
,
taosIpStr
(
pCfg
->
dnodeId
),
pCfg
->
vgId
);
mgmtSendSimpleResp
(
rpcMsg
->
handle
,
TSDB_CODE_NOT_ACTIVE_VNODE
);
return
;
}
cluster
ReleaseDnode
(
pDnode
);
mgmt
ReleaseDnode
(
pDnode
);
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
pCfg
->
vgId
);
if
(
pVgroup
==
NULL
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录