Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8568f4c4
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8568f4c4
编写于
4月 08, 2020
作者:
S
slguan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-82] refactor balance module
上级
2d0396e9
变更
35
隐藏空白更改
内联
并排
Showing
35 changed file
with
420 addition
and
629 deletion
+420
-629
src/dnode/src/dnodeMain.c
src/dnode/src/dnodeMain.c
+1
-1
src/dnode/src/dnodeMgmt.c
src/dnode/src/dnodeMgmt.c
+0
-1
src/inc/mnode.h
src/inc/mnode.h
+9
-11
src/inc/taccount.h
src/inc/taccount.h
+16
-13
src/inc/tadmin.h
src/inc/tadmin.h
+11
-7
src/inc/taosmsg.h
src/inc/taosmsg.h
+1
-1
src/inc/tbalance.h
src/inc/tbalance.h
+14
-17
src/inc/tcluster.h
src/inc/tcluster.h
+54
-0
src/inc/tgrant.h
src/inc/tgrant.h
+2
-2
src/inc/vnode.h
src/inc/vnode.h
+9
-1
src/mnode/inc/mgmtDb.h
src/mnode/inc/mgmtDb.h
+6
-1
src/mnode/inc/mgmtMnode.h
src/mnode/inc/mgmtMnode.h
+13
-0
src/mnode/inc/mgmtUser.h
src/mnode/inc/mgmtUser.h
+1
-2
src/mnode/inc/mgmtVgroup.h
src/mnode/inc/mgmtVgroup.h
+5
-0
src/mnode/src/mgmtAcct.c
src/mnode/src/mgmtAcct.c
+10
-8
src/mnode/src/mgmtBalance.c
src/mnode/src/mgmtBalance.c
+19
-40
src/mnode/src/mgmtDClient.c
src/mnode/src/mgmtDClient.c
+3
-4
src/mnode/src/mgmtDServer.c
src/mnode/src/mgmtDServer.c
+2
-3
src/mnode/src/mgmtDb.c
src/mnode/src/mgmtDb.c
+19
-20
src/mnode/src/mgmtDnode.c
src/mnode/src/mgmtDnode.c
+78
-126
src/mnode/src/mgmtGrant.c
src/mnode/src/mgmtGrant.c
+1
-1
src/mnode/src/mgmtMain.c
src/mnode/src/mgmtMain.c
+8
-8
src/mnode/src/mgmtMnode.c
src/mnode/src/mgmtMnode.c
+20
-2
src/mnode/src/mgmtProfile.c
src/mnode/src/mgmtProfile.c
+7
-5
src/mnode/src/mgmtShell.c
src/mnode/src/mgmtShell.c
+32
-11
src/mnode/src/mgmtTable.c
src/mnode/src/mgmtTable.c
+28
-30
src/mnode/src/mgmtUser.c
src/mnode/src/mgmtUser.c
+12
-16
src/mnode/src/mgmtVgroup.c
src/mnode/src/mgmtVgroup.c
+26
-22
src/query/src/queryExecutor.c
src/query/src/queryExecutor.c
+0
-1
src/util/inc/tstatus.h
src/util/inc/tstatus.h
+0
-105
src/util/src/tstatus.c
src/util/src/tstatus.c
+0
-149
src/vnode/main/inc/vnodeInt.h
src/vnode/main/inc/vnodeInt.h
+1
-9
src/vnode/main/src/vnodeMain.c
src/vnode/main/src/vnodeMain.c
+6
-7
src/vnode/main/src/vnodeRead.c
src/vnode/main/src/vnodeRead.c
+1
-1
src/vnode/main/src/vnodeWrite.c
src/vnode/main/src/vnodeWrite.c
+5
-4
未找到文件。
src/dnode/src/dnodeMain.c
浏览文件 @
8568f4c4
...
...
@@ -28,7 +28,7 @@
#include "dnodeRead.h"
#include "dnodeShell.h"
#include "dnodeWrite.h"
#include "
mgmtG
rant.h"
#include "
tg
rant.h"
static
int32_t
dnodeInitSystem
();
static
int32_t
dnodeInitStorage
();
...
...
src/dnode/src/dnodeMgmt.c
浏览文件 @
8568f4c4
...
...
@@ -20,7 +20,6 @@
#include "taosmsg.h"
#include "tlog.h"
#include "trpc.h"
#include "tstatus.h"
#include "tsdb.h"
#include "ttime.h"
#include "ttimer.h"
...
...
src/inc/mnode.h
浏览文件 @
8568f4c4
...
...
@@ -21,7 +21,6 @@ extern "C" {
#endif
#include "os.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "taoserror.h"
...
...
@@ -40,7 +39,8 @@ extern "C" {
struct
_vg_obj
;
struct
_db_obj
;
struct
_acctObj
;
struct
_acct_obj
;
struct
_user_obj
;
typedef
struct
{
int32_t
mnodeId
;
...
...
@@ -65,7 +65,7 @@ typedef struct {
void
*
pSync
;
}
SMnodeObj
;
typedef
struct
{
typedef
struct
_dnode_obj
{
int32_t
dnodeId
;
uint32_t
privateIp
;
uint32_t
publicIp
;
...
...
@@ -79,16 +79,15 @@ typedef struct {
uint16_t
slot
;
uint16_t
numOfCores
;
// from dnode status msg
int8_t
alternativeRole
;
// from dnode status msg, 0-any, 1-mgmt, 2-dnode
int8_t
lbStatus
;
// set in balance function
float
lbScore
;
// calc in balance function
int8_t
status
;
// set in balance function
int32_t
customScore
;
// config by user
char
dnodeName
[
TSDB_DNODE_NAME_LEN
+
1
];
int8_t
reserved
[
15
];
int8_t
updateEnd
[
1
];
int32_t
refCount
;
SVnodeLoad
vload
[
TSDB_MAX_VNODES
];
int32_t
status
;
uint32_t
lastReboot
;
// time stamp for last reboot
float
score
;
// calc in balance function
float
diskAvailable
;
// from dnode status msg
int16_t
diskAvgUsage
;
// calc from sys.disk
int16_t
cpuAvgUsage
;
// calc from sys.cpu
...
...
@@ -150,7 +149,6 @@ typedef struct _vg_obj {
int32_t
lbDnodeId
;
int32_t
lbTime
;
int8_t
status
;
int8_t
reserved
[
14
];
int8_t
updateEnd
[
1
];
int32_t
refCount
;
...
...
@@ -163,7 +161,7 @@ typedef struct _vg_obj {
typedef
struct
_db_obj
{
char
name
[
TSDB_DB_NAME_LEN
+
1
];
int8_t
dirty
;
int8_t
status
;
int64_t
createdTime
;
SDbCfg
cfg
;
int8_t
reserved
[
15
];
...
...
@@ -174,7 +172,7 @@ typedef struct _db_obj {
int32_t
numOfSuperTables
;
SVgObj
*
pHead
;
SVgObj
*
pTail
;
struct
_acct
O
bj
*
pAcct
;
struct
_acct
_o
bj
*
pAcct
;
}
SDbObj
;
typedef
struct
_user_obj
{
...
...
@@ -187,7 +185,7 @@ typedef struct _user_obj {
int8_t
reserved
[
13
];
int8_t
updateEnd
[
1
];
int32_t
refCount
;
struct
_acct
O
bj
*
pAcct
;
struct
_acct
_o
bj
*
pAcct
;
SQqueryList
*
pQList
;
// query list
SStreamList
*
pSList
;
// stream list
}
SUserObj
;
...
...
@@ -210,7 +208,7 @@ typedef struct {
int8_t
accessState
;
// Checked by mgmt heartbeat message
}
SAcctInfo
;
typedef
struct
_acct
O
bj
{
typedef
struct
_acct
_o
bj
{
char
user
[
TSDB_USER_LEN
+
1
];
char
pass
[
TSDB_KEY_LEN
+
1
];
SAcctCfg
cfg
;
...
...
src/
mnode/inc/mgmtAcc
t.h
→
src/
inc/taccoun
t.h
浏览文件 @
8568f4c4
...
...
@@ -13,31 +13,34 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_
MGMT_
ACCT_H
#define TDENGINE_
MGMT_
ACCT_H
#ifndef TDENGINE_ACCT_H
#define TDENGINE_ACCT_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "mnode.h"
struct
_acct_obj
;
struct
_user_obj
;
struct
_db_obj
;
typedef
enum
{
TSDB_ACCT_USER
,
TSDB_ACCT_DB
,
TSDB_ACCT_TABLE
}
EAcctGrantType
;
int32_t
acctInit
();
void
acctCleanUp
();
SAcctObj
*
acctGetAcct
(
char
*
acctName
);
void
acctIncRef
(
SAcctO
bj
*
pAcct
);
void
acctDecRef
(
SAcctO
bj
*
pAcct
);
int32_t
acctCheck
(
SAcctO
bj
*
pAcct
,
EAcctGrantType
type
);
int32_t
acctInit
();
void
acctCleanUp
();
void
*
acctGetAcct
(
char
*
acctName
);
void
acctIncRef
(
struct
_acct_o
bj
*
pAcct
);
void
acctReleaseAcct
(
struct
_acct_o
bj
*
pAcct
);
int32_t
acctCheck
(
struct
_acct_o
bj
*
pAcct
,
EAcctGrantType
type
);
void
acctAddDb
(
SAcctObj
*
pAcct
,
SDbO
bj
*
pDb
);
void
acctRemoveDb
(
SAcctObj
*
pAcct
,
SDbO
bj
*
pDb
);
void
acctAddUser
(
SAcctObj
*
pAcct
,
SUserO
bj
*
pUser
);
void
acctRemoveUser
(
SAcctObj
*
pAcct
,
SUserO
bj
*
pUser
);
void
acctAddDb
(
struct
_acct_obj
*
pAcct
,
struct
_db_o
bj
*
pDb
);
void
acctRemoveDb
(
struct
_acct_obj
*
pAcct
,
struct
_db_o
bj
*
pDb
);
void
acctAddUser
(
struct
_acct_obj
*
pAcct
,
struct
_user_o
bj
*
pUser
);
void
acctRemoveUser
(
struct
_acct_obj
*
pAcct
,
struct
_user_o
bj
*
pUser
);
#ifdef __cplusplus
}
...
...
src/
mnode/inc/mgmtBalance
.h
→
src/
inc/tadmin
.h
浏览文件 @
8568f4c4
...
...
@@ -13,18 +13,22 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_
MGMT_BALANCE
_H
#define TDENGINE_
MGMT_BALANCE
_H
#ifndef TDENGINE_
ADMIN
_H
#define TDENGINE_
ADMIN
_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "mnode.h"
int32_t
mgmtInitBalance
();
void
mgmtCleanupBalance
();
void
mgmtBalanceNotify
()
;
int32_t
mgmtAllocVnodes
(
SVgObj
*
pVgroup
);
#include <stdint.h>
#include <stdbool.h>
void
adminInit
();
struct
_http_server_obj_
;
extern
void
(
*
adminInitHandleFp
)(
struct
_http_server_obj_
*
pServer
);
extern
void
(
*
opInitHandleFp
)(
struct
_http_server_obj_
*
pServer
);
#ifdef __cplusplus
}
...
...
src/inc/taosmsg.h
浏览文件 @
8568f4c4
...
...
@@ -515,7 +515,7 @@ typedef struct {
int64_t
compStorage
;
int64_t
pointsWritten
;
uint8_t
status
;
uint8_t
syncStatus
;
uint8_t
role
;
uint8_t
accessState
;
uint8_t
reserved
[
5
];
}
SVnodeLoad
;
...
...
src/
mnode/inc/mgmtDnod
e.h
→
src/
inc/tbalanc
e.h
浏览文件 @
8568f4c4
...
...
@@ -13,29 +13,26 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_
MGMT_DNOD
E_H
#define TDENGINE_
MGMT_DNOD
E_H
#ifndef TDENGINE_
BALANC
E_H
#define TDENGINE_
BALANC
E_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "mnode.h"
enum
_TSDB_DN_STATUS
{
TSDB_DN_STATUS_OFFLINE
,
TSDB_DN_STATUS_DROPING
,
TSDB_DN_STATUS_BALANCING
,
TSDB_DN_STATUS_READY
};
#include <stdint.h>
#include <stdbool.h>
#include <pthread.h>
int32_t
mgmtInitDnodes
();
void
mgmtCleanUpDnodes
();
int32_t
mgmtGetDnodesNum
();
void
*
mgmtGetNextDnode
(
void
*
pNode
,
SDnodeObj
**
pDnode
);
void
mgmtReleaseDnode
(
SDnodeObj
*
pDnode
);
char
*
mgmtGetDnodeStatusStr
(
int32_t
dnodeStatus
);
SDnodeObj
*
mgmtGetDnode
(
int32_t
dnodeId
);
SDnodeObj
*
mgmtGetDnodeByIp
(
uint32_t
ip
);
struct
_db_obj
;
struct
_vg_obj
;
struct
_dnode_obj
;
int32_t
balanceInit
();
void
balanceCleanUp
();
void
balanceNotify
();
int32_t
balanceAllocVnodes
(
struct
_vg_obj
*
pVgroup
);
int32_t
balanceDropDnode
(
struct
_dnode_obj
*
pDnode
);
#ifdef __cplusplus
}
...
...
src/inc/tcluster.h
0 → 100644
浏览文件 @
8568f4c4
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_CLUSTER_H
#define TDENGINE_CLUSTER_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include <stdint.h>
#include <stdbool.h>
#include <pthread.h>
struct
_dnode_obj
;
enum
_TSDB_DN_STATUS
{
TSDB_DN_STATUS_OFFLINE
,
TSDB_DN_STATUS_DROPING
,
TSDB_DN_STATUS_BALANCING
,
TSDB_DN_STATUS_READY
};
int32_t
clusterInit
();
void
clusterCleanUp
();
char
*
clusterGetDnodeStatusStr
(
int32_t
dnodeStatus
);
int32_t
clusterInitDnodes
();
void
clusterCleanupDnodes
();
int32_t
clusterGetDnodesNum
();
void
*
clusterGetNextDnode
(
void
*
pNode
,
struct
_dnode_obj
**
pDnode
);
void
clusterReleaseDnode
(
struct
_dnode_obj
*
pDnode
);
void
*
clusterGetDnode
(
int32_t
dnodeId
);
void
*
clusterGetDnodeByIp
(
uint32_t
ip
);
void
clusterUpdateDnode
(
struct
_dnode_obj
*
pDnode
);
int32_t
clusterDropDnode
(
struct
_dnode_obj
*
pDnode
);
#ifdef __cplusplus
}
#endif
#endif
src/
mnode/inc/mgmtG
rant.h
→
src/
inc/tg
rant.h
浏览文件 @
8568f4c4
...
...
@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_
MGMT_GR
ANT_H
#define TDENGINE_
MGMT_GR
ANT_H
#ifndef TDENGINE_
GT
ANT_H
#define TDENGINE_
GT
ANT_H
#ifdef __cplusplus
"C"
{
...
...
src/inc/vnode.h
浏览文件 @
8568f4c4
...
...
@@ -20,6 +20,14 @@
extern
"C"
{
#endif
typedef
enum
_VN_STATUS
{
TAOS_VN_STATUS_INIT
,
TAOS_VN_STATUS_CREATING
,
TAOS_VN_STATUS_READY
,
TAOS_VN_STATUS_CLOSING
,
TAOS_VN_STATUS_DELETING
,
}
EVnStatus
;
typedef
struct
{
int
len
;
int
code
;
...
...
@@ -41,7 +49,7 @@ void* vnodeGetWqueue(int32_t vgId);
void
*
vnodeGetWal
(
void
*
pVnode
);
void
*
vnodeGetTsdb
(
void
*
pVnode
);
int32_t
vnodeProcessWrite
(
void
*
pVnode
,
int
qtype
,
SWalHea
d
*
pHead
,
void
*
item
);
int32_t
vnodeProcessWrite
(
void
*
pVnode
,
int
qtype
,
voi
d
*
pHead
,
void
*
item
);
void
vnodeBuildStatusMsg
(
void
*
param
);
int32_t
vnodeProcessRead
(
void
*
pVnode
,
int
msgType
,
void
*
pCont
,
int32_t
contLen
,
SRspRet
*
ret
);
...
...
src/mnode/inc/mgmtDb.h
浏览文件 @
8568f4c4
...
...
@@ -22,13 +22,18 @@ extern "C" {
#include "mnode.h"
enum
_TSDB_DB_STATUS
{
TSDB_DB_STATUS_READY
,
TSDB_DB_STATUS_DROPPING
};
// api
int32_t
mgmtInitDbs
();
void
mgmtCleanUpDbs
();
SDbObj
*
mgmtGetDb
(
char
*
db
);
SDbObj
*
mgmtGetDbByTableId
(
char
*
db
);
void
mgmtIncDbRef
(
SDbObj
*
pDb
);
void
mgmt
DecDbRef
(
SDbObj
*
pDb
);
void
mgmt
ReleaseDb
(
SDbObj
*
pDb
);
bool
mgmtCheckIsMonitorDB
(
char
*
db
,
char
*
monitordb
);
void
mgmtDropAllDbs
(
SAcctObj
*
pAcct
);
...
...
src/mnode/inc/mgmtMnode.h
浏览文件 @
8568f4c4
...
...
@@ -20,6 +20,19 @@
extern
"C"
{
#endif
enum
_TSDB_MN_STATUS
{
TSDB_MN_STATUS_OFFLINE
,
TSDB_MN_STATUS_UNSYNCED
,
TSDB_MN_STATUS_SYNCING
,
TSDB_MN_STATUS_SERVING
};
enum
_TSDB_MN_ROLE
{
TSDB_MN_ROLE_UNDECIDED
,
TSDB_MN_ROLE_SLAVE
,
TSDB_MN_ROLE_MASTER
};
int32_t
mgmtInitMnodes
();
void
mgmtCleanupMnodes
();
...
...
src/mnode/inc/mgmtUser.h
浏览文件 @
8568f4c4
...
...
@@ -24,8 +24,7 @@ extern "C" {
int32_t
mgmtInitUsers
();
void
mgmtCleanUpUsers
();
SUserObj
*
mgmtGetUser
(
char
*
name
);
void
mgmtIncUserRef
(
SUserObj
*
pUser
);
void
mgmtDecUserRef
(
SUserObj
*
pUser
);
void
mgmtReleaseUser
(
SUserObj
*
pUser
);
SUserObj
*
mgmtGetUserFromConn
(
void
*
pConn
,
bool
*
usePublicIp
);
int32_t
mgmtCreateUser
(
SAcctObj
*
pAcct
,
char
*
name
,
char
*
pass
);
void
mgmtDropAllUsers
(
SAcctObj
*
pAcct
);
...
...
src/mnode/inc/mgmtVgroup.h
浏览文件 @
8568f4c4
...
...
@@ -24,6 +24,11 @@ extern "C" {
#include <stdbool.h>
#include "mnode.h"
enum
_TSDB_VG_STATUS
{
TSDB_VG_STATUS_READY
,
TSDB_VG_STATUS_UPDATE
};
int32_t
mgmtInitVgroups
();
void
mgmtCleanUpVgroups
();
SVgObj
*
mgmtGetVgroup
(
int32_t
vgId
);
...
...
src/mnode/src/mgmtAcct.c
浏览文件 @
8568f4c4
...
...
@@ -17,9 +17,10 @@
#include "os.h"
#include "taoserror.h"
#include "mnode.h"
#include "
mgmtAcc
t.h"
#include "
taccoun
t.h"
#include "mgmtDb.h"
#include "mgmtUser.h"
#ifndef _ACCOUNT
static
SAcctObj
tsAcctObj
=
{
0
};
...
...
@@ -30,11 +31,12 @@ int32_t acctInit() {
return
TSDB_CODE_SUCCESS
;
}
void
acctCleanUp
()
{}
SAcctObj
*
acctGetAcct
(
char
*
acctName
)
{
return
&
tsAcctObj
;
}
void
acctIncRef
(
SAcctObj
*
pAcct
)
{}
void
acctDecRef
(
SAcctObj
*
pAcct
)
{}
int32_t
acctCheck
(
SAcctObj
*
pAcct
,
EAcctGrantType
type
)
{
return
TSDB_CODE_SUCCESS
;
}
void
acctCleanUp
()
{}
void
*
acctGetAcct
(
char
*
acctName
)
{
return
&
tsAcctObj
;
}
void
acctIncRef
(
struct
_acct_obj
*
pAcct
)
{}
void
acctReleaseAcct
(
SAcctObj
*
pAcct
)
{}
int32_t
acctCheck
(
SAcctObj
*
pAcct
,
EAcctGrantType
type
)
{
return
TSDB_CODE_SUCCESS
;
}
#endif
void
acctAddDb
(
SAcctObj
*
pAcct
,
SDbObj
*
pDb
)
{
...
...
@@ -46,7 +48,7 @@ void acctAddDb(SAcctObj *pAcct, SDbObj *pDb) {
void
acctRemoveDb
(
SAcctObj
*
pAcct
,
SDbObj
*
pDb
)
{
atomic_sub_fetch_32
(
&
pAcct
->
acctInfo
.
numOfDbs
,
1
);
pDb
->
pAcct
=
NULL
;
acct
IncRef
(
pAcct
);
acct
ReleaseAcct
(
pAcct
);
}
void
acctAddUser
(
SAcctObj
*
pAcct
,
SUserObj
*
pUser
)
{
...
...
@@ -58,5 +60,5 @@ void acctAddUser(SAcctObj *pAcct, SUserObj *pUser) {
void
acctRemoveUser
(
SAcctObj
*
pAcct
,
SUserObj
*
pUser
)
{
atomic_sub_fetch_32
(
&
pAcct
->
acctInfo
.
numOfUsers
,
1
);
pUser
->
pAcct
=
NULL
;
acct
IncRef
(
pAcct
);
acct
ReleaseAcct
(
pAcct
);
}
\ No newline at end of file
src/mnode/src/mgmtBalance.c
浏览文件 @
8568f4c4
...
...
@@ -14,56 +14,34 @@
*/
#define _DEFAULT_SOURCE
#include "tstatus.h"
#include "mgmtBalance.h"
#include "mgmtDnode.h"
#include "tbalance.h"
#include "mnode.h"
#include "tcluster.h"
#include "mgmtVgroup.h"
extern
int32_t
balanceInit
();
extern
void
balanceCleanUp
();
extern
void
balanceNotify
();
extern
int32_t
balanceAllocVnodes
(
SVgObj
*
pVgroup
);
#ifndef _VPEER
int32_t
balanceInit
()
{
return
0
;
}
void
balanceCleanUp
()
{}
void
balanceNotify
()
{}
int32_t
mgmtInitBalance
()
{
#ifdef _VPEER
return
balanceInit
();
#else
return
0
;
#endif
}
void
mgmtCleanupBalance
()
{
#ifdef _VPEER
balanceCleanUp
();
#endif
}
void
mgmtBalanceNotify
()
{
#ifdef _VPEER
balanceNotify
();
#endif
}
int32_t
mgmtAllocVnodes
(
SVgObj
*
pVgroup
)
{
#ifdef _VPEER
return
balanceAllocVnodes
(
pVgroup
);
#else
int32_t
balanceAllocVnodes
(
SVgObj
*
pVgroup
)
{
void
*
pNode
=
NULL
;
SDnodeObj
*
pDnode
=
NULL
;
SDnodeObj
*
pSelDnode
=
NULL
;
float
vnodeUsage
=
1
.
0
;
while
(
1
)
{
mgmtDecDnodeRef
(
pDnode
);
pNode
=
mgmtGetNextDnode
(
pNode
,
&
pDnode
);
pNode
=
clusterGetNextDnode
(
pNode
,
&
pDnode
);
if
(
pDnode
==
NULL
)
break
;
if
(
pDnode
->
numOfTotalVnodes
<=
0
)
continue
;
if
(
pDnode
->
openVnodes
==
pDnode
->
numOfTotalVnodes
)
continue
;
float
usage
=
(
float
)
pDnode
->
openVnodes
/
pDnode
->
numOfTotalVnodes
;
if
(
usage
<=
vnodeUsage
)
{
pSelDnode
=
pDnode
;
vnodeUsage
=
usage
;
if
(
pDnode
->
numOfTotalVnodes
>
0
&&
pDnode
->
openVnodes
<
pDnode
->
numOfTotalVnodes
)
{
float
usage
=
(
float
)
pDnode
->
openVnodes
/
pDnode
->
numOfTotalVnodes
;
if
(
usage
<=
vnodeUsage
)
{
pSelDnode
=
pDnode
;
vnodeUsage
=
usage
;
}
}
clusterReleaseDnode
(
pDnode
);
}
if
(
pSelDnode
==
NULL
)
{
...
...
@@ -77,5 +55,6 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) {
mTrace
(
"dnode:%d, alloc one vnode to vgroup, openVnodes:%d"
,
pSelDnode
->
dnodeId
,
pSelDnode
->
openVnodes
);
return
TSDB_CODE_SUCCESS
;
#endif
}
#endif
src/mnode/src/mgmtDClient.c
浏览文件 @
8568f4c4
...
...
@@ -17,15 +17,14 @@
#include "os.h"
#include "taoserror.h"
#include "tsched.h"
#include "tstatus.h"
#include "tsystem.h"
#include "tutil.h"
#include "dnode.h"
#include "mnode.h"
#include "
mgmtB
alance.h"
#include "
tb
alance.h"
#include "mgmtDb.h"
#include "
mgmtDnode
.h"
#include "
mgmtG
rant.h"
#include "
tcluster
.h"
#include "
tg
rant.h"
#include "mgmtProfile.h"
#include "mgmtShell.h"
#include "mgmtTable.h"
...
...
src/mnode/src/mgmtDServer.c
浏览文件 @
8568f4c4
...
...
@@ -18,15 +18,14 @@
#include "taoserror.h"
#include "trpc.h"
#include "tsched.h"
#include "tstatus.h"
#include "tsystem.h"
#include "tutil.h"
#include "dnode.h"
#include "mnode.h"
#include "
mgmtB
alance.h"
#include "
tb
alance.h"
#include "mgmtDb.h"
#include "mgmtDServer.h"
#include "
mgmtG
rant.h"
#include "
tg
rant.h"
#include "mgmtProfile.h"
#include "mgmtShell.h"
#include "mgmtTable.h"
...
...
src/mnode/src/mgmtDb.c
浏览文件 @
8568f4c4
...
...
@@ -16,15 +16,14 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tstatus.h"
#include "tutil.h"
#include "name.h"
#include "mnode.h"
#include "
mgmtAcc
t.h"
#include "
mgmtB
alance.h"
#include "
taccoun
t.h"
#include "
tb
alance.h"
#include "mgmtDb.h"
#include "
mgmtDnode
.h"
#include "
mgmtG
rant.h"
#include "
tcluster
.h"
#include "
tg
rant.h"
#include "mgmtShell.h"
#include "mgmtMnode.h"
#include "mgmtProfile.h"
...
...
@@ -38,7 +37,7 @@ static int32_t tsDbUpdateSize;
static
int32_t
mgmtCreateDb
(
SAcctObj
*
pAcct
,
SCMCreateDbMsg
*
pCreate
);
static
void
mgmtDropDb
(
SQueuedMsg
*
newMsg
);
static
int32_t
mgmtSetDbD
irty
(
SDbObj
*
pDb
);
static
int32_t
mgmtSetDbD
ropping
(
SDbObj
*
pDb
);
static
int32_t
mgmtGetDbMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveDbs
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
void
mgmtProcessCreateDbMsg
(
SQueuedMsg
*
pMsg
);
...
...
@@ -146,11 +145,11 @@ SDbObj *mgmtGetDb(char *db) {
return
(
SDbObj
*
)
sdbGetRow
(
tsDbSdb
,
db
);
}
void
mgmtIncDbRef
(
SDbObj
*
pDb
)
{
void
mgmtIncDbRef
(
SDbObj
*
pDb
)
{
return
sdbIncRef
(
tsDbSdb
,
pDb
);
}
void
mgmt
DecDbRef
(
SDbObj
*
pDb
)
{
void
mgmt
ReleaseDb
(
SDbObj
*
pDb
)
{
return
sdbDecRef
(
tsDbSdb
,
pDb
);
}
...
...
@@ -289,7 +288,7 @@ static int32_t mgmtCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate) {
SDbObj
*
pDb
=
mgmtGetDb
(
pCreate
->
db
);
if
(
pDb
!=
NULL
)
{
mgmt
DecDbRef
(
pDb
);
mgmt
ReleaseDb
(
pDb
);
return
TSDB_CODE_DB_ALREADY_EXIST
;
}
...
...
@@ -519,7 +518,7 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn)
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
pShow
->
numOfRows
=
pUser
->
pAcct
->
acctInfo
.
numOfDbs
;
mgmt
DecUserRef
(
pUser
);
mgmt
ReleaseUser
(
pUser
);
return
0
;
}
...
...
@@ -631,15 +630,15 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
pDb
->
dirty
!=
TSDB_DB_STATUS_READY
?
"dropping"
:
"ready"
);
strcpy
(
pWrite
,
pDb
->
status
!=
TSDB_DB_STATUS_READY
?
"dropping"
:
"ready"
);
cols
++
;
numOfRows
++
;
mgmt
DecDbRef
(
pDb
);
mgmt
ReleaseDb
(
pDb
);
}
pShow
->
numOfReads
+=
numOfRows
;
mgmt
DecUserRef
(
pUser
);
mgmt
ReleaseUser
(
pUser
);
return
numOfRows
;
}
...
...
@@ -659,10 +658,10 @@ void mgmtRemoveTableFromDb(SDbObj *pDb) {
atomic_add_fetch_32
(
&
pDb
->
numOfTables
,
-
1
);
}
static
int32_t
mgmtSetDbD
irty
(
SDbObj
*
pDb
)
{
if
(
pDb
->
dirty
)
return
TSDB_CODE_SUCCESS
;
static
int32_t
mgmtSetDbD
ropping
(
SDbObj
*
pDb
)
{
if
(
pDb
->
status
)
return
TSDB_CODE_SUCCESS
;
pDb
->
dirty
=
true
;
pDb
->
status
=
true
;
SSdbOperDesc
oper
=
{
.
type
=
SDB_OPER_TYPE_GLOBAL
,
.
table
=
tsDbSdb
,
...
...
@@ -850,7 +849,7 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
return
;
}
int32_t
code
=
mgmtSetDbD
irty
(
pDb
);
int32_t
code
=
mgmtSetDbD
ropping
(
pDb
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"db:%s, failed to drop, reason:%s"
,
pDrop
->
db
,
tstrerror
(
code
));
mgmtSendSimpleResp
(
pMsg
->
thandle
,
code
);
...
...
@@ -881,11 +880,11 @@ void mgmtDropAllDbs(SAcctObj *pAcct) {
if
(
pDb
==
NULL
)
break
;
if
(
pDb
->
pAcct
==
pAcct
)
{
mgmtSetDbD
irty
(
pDb
);
mgmtSetDbD
ropping
(
pDb
);
numOfDbs
++
;
}
mgmt
DecDbRef
(
pDb
);
mgmt
ReleaseDb
(
pDb
);
}
mTrace
(
"acct:%s, all dbs is is set dirty"
,
pAcct
->
user
,
numOfDbs
);
}
\ No newline at end of file
}
src/mnode/src/mgmtDnode.c
浏览文件 @
8568f4c4
...
...
@@ -16,9 +16,9 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tmodule.h"
#include "t
status
.h"
#include "
mgmtBalance
.h"
#include "m
gmtD
node.h"
#include "t
balance
.h"
#include "
tcluster
.h"
#include "mnode.h"
#include "mgmtDClient.h"
#include "mgmtMnode.h"
#include "mgmtShell.h"
...
...
@@ -26,45 +26,23 @@
#include "mgmtUser.h"
#include "mgmtVgroup.h"
static
void
mgmtProcessCfgDnodeMsg
(
SQueuedMsg
*
pMsg
);
static
void
mgmtProcessCfgDnodeMsgRsp
(
SRpcMsg
*
rpcMsg
)
;
static
void
mgmtProcessDnodeStatusMsg
(
SRpcMsg
*
rpcMsg
);
static
int32_t
mgmtGetModuleMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveModules
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
mgmtGetConfigMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveConfigs
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
mgmtGetVnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveVnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
mgmtGetDnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveDnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
extern
int32_t
clusterInit
();
extern
void
clusterCleanUp
();
extern
int32_t
clusterGetDnodesNum
();
extern
void
*
clusterGetNextDnode
(
void
*
pNode
,
SDnodeObj
**
pDnode
);
extern
void
clusterReleaseDnode
(
SDnodeObj
*
pDnode
);
extern
SDnodeObj
*
clusterGetDnode
(
int32_t
dnodeId
);
extern
SDnodeObj
*
clusterGetDnodeByIp
(
uint32_t
ip
);
static
void
clusterProcessCfgDnodeMsg
(
SQueuedMsg
*
pMsg
);
static
void
clusterProcessCfgDnodeMsgRsp
(
SRpcMsg
*
rpcMsg
)
;
static
void
clusterProcessDnodeStatusMsg
(
SRpcMsg
*
rpcMsg
);
static
int32_t
clusterGetModuleMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
clusterRetrieveModules
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
clusterGetConfigMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
clusterRetrieveConfigs
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
clusterGetVnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
clusterRetrieveVnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
clusterGetDnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
clusterRetrieveDnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
#ifndef _CLUSTER
static
SDnodeObj
tsDnodeObj
=
{
0
};
#endif
int32_t
mgmtInitDnodes
()
{
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_CONFIG_DNODE
,
mgmtProcessCfgDnodeMsg
);
mgmtAddDClientRspHandle
(
TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP
,
mgmtProcessCfgDnodeMsgRsp
);
mgmtAddDServerMsgHandle
(
TSDB_MSG_TYPE_DM_STATUS
,
mgmtProcessDnodeStatusMsg
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_MODULE
,
mgmtGetModuleMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_MODULE
,
mgmtRetrieveModules
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_CONFIGS
,
mgmtGetConfigMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_CONFIGS
,
mgmtRetrieveConfigs
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_VNODES
,
mgmtGetVnodeMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_VNODES
,
mgmtRetrieveVnodes
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_DNODE
,
mgmtGetDnodeMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_DNODE
,
mgmtRetrieveDnodes
);
#ifdef _CLUSTER
return
clusterInit
();
#else
int32_t
clusterInitDnodes
()
{
tsDnodeObj
.
dnodeId
=
1
;
tsDnodeObj
.
privateIp
=
inet_addr
(
tsPrivateIp
);
tsDnodeObj
.
publicIp
=
inet_addr
(
tsPublicIp
);
...
...
@@ -82,63 +60,47 @@ int32_t mgmtInitDnodes() {
tsDnodeObj
.
moduleStatus
|=
(
1
<<
TSDB_MOD_MONITOR
);
}
return
0
;
#endif
}
void
mgmtCleanUpDnodes
()
{
#ifdef _CLUSTER
clusterCleanUp
();
#endif
}
SDnodeObj
*
mgmtGetDnode
(
int32_t
dnodeId
)
{
#ifdef _CLUSTER
return
clusterGetDnode
(
dnodeId
);
#else
if
(
dnodeId
==
1
)
{
return
&
tsDnodeObj
;
void
*
clusterGetNextDnode
(
void
*
pNode
,
SDnodeObj
**
pDnode
)
{
if
(
*
pDnode
==
NULL
)
{
*
pDnode
=
&
tsDnodeObj
;
}
else
{
return
NULL
;
*
pDnode
=
NULL
;
}
#endif
return
*
pDnode
;
}
SDnodeObj
*
mgmtGetDnodeByIp
(
uint32_t
ip
)
{
#ifdef _CLUSTER
return
clusterGetDnodeByIp
(
ip
);
#else
return
&
tsDnodeObj
;
#endif
}
void
clusterCleanupDnodes
()
{}
int32_t
clusterGetDnodesNum
()
{
return
1
;
}
void
*
clusterGetDnode
(
int32_t
dnodeId
)
{
return
dnodeId
==
1
?
&
tsDnodeObj
:
NULL
;
}
void
*
clusterGetDnodeByIp
(
uint32_t
ip
)
{
return
&
tsDnodeObj
;
}
void
clusterReleaseDnode
(
struct
_dnode_obj
*
pDnode
)
{}
void
clusterUpdateDnode
(
struct
_dnode_obj
*
pDnode
)
{}
int32_t
mgmtGetDnodesNum
()
{
#ifdef _CLUSTER
return
clusterGetDnodesNum
();
#else
return
1
;
#endif
}
void
mgmtReleaseDnode
(
SDnodeObj
*
pDnode
)
{
#ifdef _CLUSTER
return
clusterReleaseDnode
(
pDnode
);
#endif
int32_t
clusterInit
()
{
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_CONFIG_DNODE
,
clusterProcessCfgDnodeMsg
);
mgmtAddDClientRspHandle
(
TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP
,
clusterProcessCfgDnodeMsgRsp
);
mgmtAddDServerMsgHandle
(
TSDB_MSG_TYPE_DM_STATUS
,
clusterProcessDnodeStatusMsg
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_MODULE
,
clusterGetModuleMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_MODULE
,
clusterRetrieveModules
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_CONFIGS
,
clusterGetConfigMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_CONFIGS
,
clusterRetrieveConfigs
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_VNODES
,
clusterGetVnodeMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_VNODES
,
clusterRetrieveVnodes
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_DNODE
,
clusterGetDnodeMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_DNODE
,
clusterRetrieveDnodes
);
return
clusterInitDnodes
();
}
void
*
mgmtGetNextDnode
(
void
*
pNode
,
SDnodeObj
**
pDnode
)
{
#ifdef _CLUSTER
return
clusterGetNextDnode
(
pNode
,
pDnode
);
#else
if
(
*
pDnode
==
NULL
)
{
*
pDnode
=
&
tsDnodeObj
;
}
else
{
*
pDnode
=
NULL
;
}
return
*
pDnode
;
#endif
void
clusterCleanUp
()
{
clusterCleanupDnodes
();
}
void
mgmt
ProcessCfgDnodeMsg
(
SQueuedMsg
*
pMsg
)
{
void
cluster
ProcessCfgDnodeMsg
(
SQueuedMsg
*
pMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
thandle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
SCMCfgDnodeMsg
*
pCmCfgDnode
=
pMsg
->
pCont
;
...
...
@@ -174,11 +136,11 @@ void mgmtProcessCfgDnodeMsg(SQueuedMsg *pMsg) {
rpcSendResponse
(
&
rpcRsp
);
}
static
void
mgmt
ProcessCfgDnodeMsgRsp
(
SRpcMsg
*
rpcMsg
)
{
static
void
cluster
ProcessCfgDnodeMsgRsp
(
SRpcMsg
*
rpcMsg
)
{
mPrint
(
"cfg vnode rsp is received, result:%s"
,
tstrerror
(
rpcMsg
->
code
));
}
void
mgmt
ProcessDnodeStatusMsg
(
SRpcMsg
*
rpcMsg
)
{
void
cluster
ProcessDnodeStatusMsg
(
SRpcMsg
*
rpcMsg
)
{
if
(
mgmtCheckRedirect
(
rpcMsg
->
handle
))
return
;
SDMStatusMsg
*
pStatus
=
rpcMsg
->
pCont
;
...
...
@@ -198,14 +160,14 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
SDnodeObj
*
pDnode
=
NULL
;
if
(
pStatus
->
dnodeId
==
0
)
{
pDnode
=
mgmt
GetDnodeByIp
(
pStatus
->
privateIp
);
pDnode
=
cluster
GetDnodeByIp
(
pStatus
->
privateIp
);
if
(
pDnode
==
NULL
)
{
mTrace
(
"dnode not created, privateIp:%s"
,
taosIpStr
(
pStatus
->
privateIp
));
mgmtSendSimpleResp
(
rpcMsg
->
handle
,
TSDB_CODE_DNODE_NOT_EXIST
);
return
;
}
}
else
{
pDnode
=
mgmt
GetDnode
(
pStatus
->
dnodeId
);
pDnode
=
cluster
GetDnode
(
pStatus
->
dnodeId
);
if
(
pDnode
==
NULL
)
{
mError
(
"dnode:%d, not exist, privateIp:%s"
,
pStatus
->
dnodeId
,
taosIpStr
(
pStatus
->
privateIp
));
mgmtSendSimpleResp
(
rpcMsg
->
handle
,
TSDB_CODE_DNODE_NOT_EXIST
);
...
...
@@ -238,16 +200,16 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
mPrint
(
"dnode:%d, vgroup:%d not exist in mnode, drop it"
,
pDnode
->
dnodeId
,
pDnode
->
vload
[
j
].
vgId
);
mgmtSendDropVnodeMsg
(
pDnode
->
vload
[
j
].
vgId
,
&
ipSet
,
NULL
);
}
mgmt
DecVgroupRef
(
pVgroup
);
mgmt
ReleaseVgroup
(
pVgroup
);
}
if
(
pDnode
->
status
!=
TSDB_DN_STATUS_READY
)
{
mTrace
(
"dnode:%d, from offline to online"
,
pDnode
->
dnodeId
);
pDnode
->
status
=
TSDB_DN_STATUS_READY
;
mgmtB
alanceNotify
();
b
alanceNotify
();
}
mgmtDecDnodeRef
(
pDnode
);
clusterReleaseDnode
(
pDnode
);
int32_t
contLen
=
sizeof
(
SDMStatusRsp
)
+
TSDB_MAX_VNODES
*
sizeof
(
SVnodeAccess
);
SDMStatusRsp
*
pRsp
=
rpcMallocCont
(
contLen
);
...
...
@@ -277,7 +239,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
rpcSendResponse
(
&
rpcRsp
);
}
static
int32_t
mgmt
GetDnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
static
int32_t
cluster
GetDnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
if
(
pUser
==
NULL
)
return
0
;
...
...
@@ -344,16 +306,16 @@ static int32_t mgmtGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pShow
->
offset
[
i
]
=
pShow
->
offset
[
i
-
1
]
+
pShow
->
bytes
[
i
-
1
];
}
pShow
->
numOfRows
=
mgmt
GetDnodesNum
();
pShow
->
numOfRows
=
cluster
GetDnodesNum
();
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
pShow
->
pNode
=
NULL
;
mgmt
DecUserRef
(
pUser
);
mgmt
ReleaseUser
(
pUser
);
return
0
;
}
static
int32_t
mgmt
RetrieveDnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
static
int32_t
cluster
RetrieveDnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
int32_t
numOfRows
=
0
;
int32_t
cols
=
0
;
SDnodeObj
*
pDnode
=
NULL
;
...
...
@@ -361,8 +323,8 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi
char
ipstr
[
32
];
while
(
numOfRows
<
rows
)
{
mgmtDecDnodeRef
(
pDnode
);
pShow
->
pNode
=
mgmt
GetNextDnode
(
pShow
->
pNode
,
(
SDnodeObj
**
)
&
pDnode
);
clusterReleaseDnode
(
pDnode
);
pShow
->
pNode
=
cluster
GetNextDnode
(
pShow
->
pNode
,
(
SDnodeObj
**
)
&
pDnode
);
if
(
pDnode
==
NULL
)
break
;
cols
=
0
;
...
...
@@ -386,7 +348,7 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
taosGetDnodeStatusStr
(
pDnode
->
status
)
);
strcpy
(
pWrite
,
clusterGetDnodeStatusStr
(
pDnode
->
status
)
);
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
...
...
@@ -399,7 +361,7 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi
#ifdef _VPEER
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
taosGetDnodeLbStatusStr
(
pDnode
->
lbS
tatus
));
strcpy
(
pWrite
,
clusterGetDnodeStatusStr
(
pDnode
->
s
tatus
));
cols
++
;
#endif
...
...
@@ -415,7 +377,7 @@ static bool clusterCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) {
return
status
>
0
;
}
static
int32_t
mgmt
GetModuleMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
static
int32_t
cluster
GetModuleMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
int32_t
cols
=
0
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
...
...
@@ -454,7 +416,7 @@ static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pShow
->
numOfRows
=
0
;
SDnodeObj
*
pDnode
=
NULL
;
while
(
1
)
{
pShow
->
pNode
=
mgmt
GetNextDnode
(
pShow
->
pNode
,
(
SDnodeObj
**
)
&
pDnode
);
pShow
->
pNode
=
cluster
GetNextDnode
(
pShow
->
pNode
,
(
SDnodeObj
**
)
&
pDnode
);
if
(
pDnode
==
NULL
)
break
;
for
(
int32_t
moduleType
=
0
;
moduleType
<
TSDB_MOD_MAX
;
++
moduleType
)
{
if
(
clusterCheckModuleInDnode
(
pDnode
,
moduleType
))
{
...
...
@@ -465,12 +427,12 @@ static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
pShow
->
pNode
=
NULL
;
mgmt
DecUserRef
(
pUser
);
mgmt
ReleaseUser
(
pUser
);
return
0
;
}
int32_t
mgmt
RetrieveModules
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
int32_t
cluster
RetrieveModules
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
int32_t
numOfRows
=
0
;
SDnodeObj
*
pDnode
=
NULL
;
char
*
pWrite
;
...
...
@@ -478,8 +440,8 @@ int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pCo
char
ipstr
[
20
];
while
(
numOfRows
<
rows
)
{
mgmtDecDnodeRef
(
pDnode
);
pShow
->
pNode
=
mgmt
GetNextDnode
(
pShow
->
pNode
,
(
SDnodeObj
**
)
&
pDnode
);
clusterReleaseDnode
(
pDnode
);
pShow
->
pNode
=
cluster
GetNextDnode
(
pShow
->
pNode
,
(
SDnodeObj
**
)
&
pDnode
);
if
(
pDnode
==
NULL
)
break
;
for
(
int32_t
moduleType
=
0
;
moduleType
<
TSDB_MOD_MAX
;
++
moduleType
)
{
...
...
@@ -499,7 +461,7 @@ int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pCo
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
taosGetDnodeStatusStr
(
pDnode
->
status
)
);
strcpy
(
pWrite
,
clusterGetDnodeStatusStr
(
pDnode
->
status
)
);
cols
++
;
numOfRows
++
;
...
...
@@ -516,7 +478,7 @@ static bool clusterCheckConfigShow(SGlobalConfig *cfg) {
return
true
;
}
static
int32_t
mgmt
GetConfigMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
static
int32_t
cluster
GetConfigMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
int32_t
cols
=
0
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
...
...
@@ -553,12 +515,12 @@ static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
pShow
->
pNode
=
NULL
;
mgmt
DecUserRef
(
pUser
);
mgmt
ReleaseUser
(
pUser
);
return
0
;
}
static
int32_t
mgmt
RetrieveConfigs
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
static
int32_t
cluster
RetrieveConfigs
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
int32_t
numOfRows
=
0
;
for
(
int32_t
i
=
tsGlobalConfigNum
-
1
;
i
>=
0
&&
numOfRows
<
rows
;
--
i
)
{
...
...
@@ -605,7 +567,7 @@ static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, vo
return
numOfRows
;
}
static
int32_t
mgmt
GetVnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
static
int32_t
cluster
GetVnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
int32_t
cols
=
0
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
if
(
pUser
==
NULL
)
return
0
;
...
...
@@ -625,12 +587,6 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
12
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"sync_status"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pMeta
->
numOfColumns
=
htons
(
cols
);
pShow
->
numOfColumns
=
cols
;
...
...
@@ -640,7 +596,7 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
SDnodeObj
*
pDnode
=
NULL
;
if
(
pShow
->
payloadLen
>
0
)
{
uint32_t
ip
=
ip2uint
(
pShow
->
payload
);
pDnode
=
mgmt
GetDnodeByIp
(
ip
);
pDnode
=
cluster
GetDnodeByIp
(
ip
);
if
(
NULL
==
pDnode
)
{
return
TSDB_CODE_NODE_OFFLINE
;
}
...
...
@@ -657,7 +613,7 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pShow
->
pNode
=
pDnode
;
}
else
{
while
(
true
)
{
pShow
->
pNode
=
mgmt
GetNextDnode
(
pShow
->
pNode
,
(
SDnodeObj
**
)
&
pDnode
);
pShow
->
pNode
=
cluster
GetNextDnode
(
pShow
->
pNode
,
(
SDnodeObj
**
)
&
pDnode
);
if
(
pDnode
==
NULL
)
break
;
pShow
->
numOfRows
+=
pDnode
->
openVnodes
;
...
...
@@ -668,13 +624,13 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
}
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
mgmtDecDnodeRef
(
pDnode
);
mgmt
DecUserRef
(
pUser
);
clusterReleaseDnode
(
pDnode
);
mgmt
ReleaseUser
(
pUser
);
return
0
;
}
static
int32_t
mgmt
RetrieveVnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
static
int32_t
cluster
RetrieveVnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
)
{
int32_t
numOfRows
=
0
;
SDnodeObj
*
pDnode
=
NULL
;
char
*
pWrite
;
...
...
@@ -700,11 +656,7 @@ static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, voi
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
taosGetVnodeStatusStr
(
pVnode
->
status
));
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
taosGetVnodeSyncStatusStr
(
pVnode
->
syncStatus
));
strcpy
(
pWrite
,
pVnode
->
status
?
"ready"
:
"offline"
);
cols
++
;
numOfRows
++
;
...
...
@@ -719,7 +671,7 @@ static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, voi
return
numOfRows
;
}
char
*
mgmt
GetDnodeStatusStr
(
int32_t
dnodeStatus
)
{
char
*
cluster
GetDnodeStatusStr
(
int32_t
dnodeStatus
)
{
switch
(
dnodeStatus
)
{
case
TSDB_DN_STATUS_OFFLINE
:
return
"offline"
;
case
TSDB_DN_STATUS_DROPING
:
return
"dropping"
;
...
...
src/mnode/src/mgmtGrant.c
浏览文件 @
8568f4c4
...
...
@@ -18,7 +18,7 @@
#include "os.h"
#include "taoserror.h"
#include "tlog.h"
#include "
mgmtG
rant.h"
#include "
tg
rant.h"
int32_t
grantInit
()
{
return
TSDB_CODE_SUCCESS
;
}
void
grantCleanUp
()
{}
...
...
src/mnode/src/mgmtMain.c
浏览文件 @
8568f4c4
...
...
@@ -19,13 +19,13 @@
#include "tmodule.h"
#include "tsched.h"
#include "mnode.h"
#include "mgmtAcct.h"
#include "mgmtBalance.h"
#include "taccount.h"
#include "tbalance.h"
#include "tcluster.h"
#include "mgmtDb.h"
#include "mgmtDClient.h"
#include "mgmtDnode.h"
#include "mgmtDServer.h"
#include "
mgmtG
rant.h"
#include "
tg
rant.h"
#include "mgmtMnode.h"
#include "mgmtSdb.h"
#include "mgmtVgroup.h"
...
...
@@ -89,7 +89,7 @@ int32_t mgmtStartSystem() {
return
-
1
;
}
if
(
mgmtInitDnodes
()
<
0
)
{
if
(
clusterInit
()
<
0
)
{
mError
(
"failed to init dnodes"
);
return
-
1
;
}
...
...
@@ -122,7 +122,7 @@ int32_t mgmtStartSystem() {
return
-
1
;
}
if
(
mgmtInitBalance
()
<
0
)
{
if
(
balanceInit
()
<
0
)
{
mError
(
"failed to init dnode balance"
)
}
...
...
@@ -148,14 +148,14 @@ void mgmtCleanUpSystem() {
mPrint
(
"starting to clean up mgmt"
);
grantCleanUp
();
mgmtCleanupMnodes
();
mgmtCleanupBalance
();
balanceCleanUp
();
mgmtCleanUpShell
();
mgmtCleanupDClient
();
mgmtCleanupDServer
();
mgmtCleanUpTables
();
mgmtCleanUpVgroups
();
mgmtCleanUpDbs
();
mgmtCleanUpDnodes
();
clusterCleanUp
();
mgmtCleanUpUsers
();
acctCleanUp
();
taosTmrCleanUp
(
tsMgmtTmr
);
...
...
src/mnode/src/mgmtMnode.c
浏览文件 @
8568f4c4
...
...
@@ -16,7 +16,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "tstatus.h"
#include "trpc.h"
#include "mgmtMnode.h"
#include "mgmtSdb.h"
...
...
@@ -64,6 +63,25 @@ static void *mgmtGetNextMnode(void *pNode, SMnodeObj **pMnode) {
return
*
pMnode
;
}
char
*
taosGetMnodeStatusStr
(
int32_t
mnodeStatus
)
{
switch
(
mnodeStatus
)
{
case
TSDB_MN_STATUS_OFFLINE
:
return
"offline"
;
case
TSDB_MN_STATUS_UNSYNCED
:
return
"unsynced"
;
case
TSDB_MN_STATUS_SYNCING
:
return
"syncing"
;
case
TSDB_MN_STATUS_SERVING
:
return
"serving"
;
default:
return
"undefined"
;
}
}
char
*
taosGetMnodeRoleStr
(
int32_t
mnodeRole
)
{
switch
(
mnodeRole
)
{
case
TSDB_MN_ROLE_UNDECIDED
:
return
"undicided"
;
case
TSDB_MN_ROLE_SLAVE
:
return
"slave"
;
case
TSDB_MN_ROLE_MASTER
:
return
"master"
;
default:
return
"undefined"
;
}
}
static
int32_t
mgmtGetMnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
if
(
pUser
==
NULL
)
return
0
;
...
...
@@ -120,7 +138,7 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
pShow
->
numOfRows
=
mgmtGetMnodesNum
();
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
pShow
->
pNode
=
NULL
;
mgmt
DecUserRef
(
pUser
);
mgmt
ReleaseUser
(
pUser
);
return
0
;
}
...
...
src/mnode/src/mgmtProfile.c
浏览文件 @
8568f4c4
...
...
@@ -16,6 +16,8 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taosmsg.h"
#include "taccount.h"
#include "tcluster.h"
#include "mgmtDb.h"
#include "mgmtMnode.h"
#include "mgmtProfile.h"
...
...
@@ -787,12 +789,12 @@ void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg) {
void
mgmtFreeQueuedMsg
(
SQueuedMsg
*
pMsg
)
{
if
(
pMsg
!=
NULL
)
{
rpcFreeCont
(
pMsg
->
pCont
);
if
(
pMsg
->
pUser
)
mgmt
DecUserRef
(
pMsg
->
pUser
);
if
(
pMsg
->
pDb
)
mgmt
DecDbRef
(
pMsg
->
pDb
);
if
(
pMsg
->
pVgroup
)
mgmt
DecVgroupRef
(
pMsg
->
pVgroup
);
if
(
pMsg
->
pUser
)
mgmt
ReleaseUser
(
pMsg
->
pUser
);
if
(
pMsg
->
pDb
)
mgmt
ReleaseDb
(
pMsg
->
pDb
);
if
(
pMsg
->
pVgroup
)
mgmt
ReleaseVgroup
(
pMsg
->
pVgroup
);
if
(
pMsg
->
pTable
)
mgmtDecTableRef
(
pMsg
->
pTable
);
// if (pMsg->pAcct) acctDecRef
(pMsg->pAcct);
// if (pMsg->pDnode) mgmtDecTableRef
(pMsg->pDnode);
if
(
pMsg
->
pAcct
)
acctReleaseAcct
(
pMsg
->
pAcct
);
if
(
pMsg
->
pDnode
)
clusterReleaseDnode
(
pMsg
->
pDnode
);
free
(
pMsg
);
}
}
...
...
src/mnode/src/mgmtShell.c
浏览文件 @
8568f4c4
...
...
@@ -19,15 +19,14 @@
#include "taoserror.h"
#include "tlog.h"
#include "trpc.h"
#include "tstatus.h"
#include "tsched.h"
#include "dnode.h"
#include "mnode.h"
#include "
mgmtAcc
t.h"
#include "
mgmtB
alance.h"
#include "
taccoun
t.h"
#include "
tb
alance.h"
#include "mgmtDb.h"
#include "
mgmtDnode
.h"
#include "
mgmtG
rant.h"
#include "
tcluster
.h"
#include "
tg
rant.h"
#include "mgmtMnode.h"
#include "mgmtProfile.h"
#include "mgmtSdb.h"
...
...
@@ -179,6 +178,28 @@ static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) {
}
}
char
*
mgmtGetShowTypeStr
(
int32_t
showType
)
{
switch
(
showType
)
{
case
TSDB_MGMT_TABLE_ACCT
:
return
"show accounts"
;
case
TSDB_MGMT_TABLE_USER
:
return
"show users"
;
case
TSDB_MGMT_TABLE_DB
:
return
"show databases"
;
case
TSDB_MGMT_TABLE_TABLE
:
return
"show tables"
;
case
TSDB_MGMT_TABLE_DNODE
:
return
"show dnodes"
;
case
TSDB_MGMT_TABLE_MNODE
:
return
"show mnodes"
;
case
TSDB_MGMT_TABLE_VGROUP
:
return
"show vgroups"
;
case
TSDB_MGMT_TABLE_METRIC
:
return
"show stables"
;
case
TSDB_MGMT_TABLE_MODULE
:
return
"show modules"
;
case
TSDB_MGMT_TABLE_QUERIES
:
return
"show queries"
;
case
TSDB_MGMT_TABLE_STREAMS
:
return
"show streams"
;
case
TSDB_MGMT_TABLE_CONFIGS
:
return
"show configs"
;
case
TSDB_MGMT_TABLE_CONNS
:
return
"show connections"
;
case
TSDB_MGMT_TABLE_SCORES
:
return
"show scores"
;
case
TSDB_MGMT_TABLE_GRANTS
:
return
"show grants"
;
case
TSDB_MGMT_TABLE_VNODES
:
return
"show vnodes"
;
default:
return
"undefined"
;
}
}
static
void
mgmtProcessShowMsg
(
SQueuedMsg
*
pMsg
)
{
SCMShowMsg
*
pShowMsg
=
pMsg
->
pCont
;
if
(
pShowMsg
->
type
>=
TSDB_MGMT_TABLE_MAX
)
{
...
...
@@ -187,7 +208,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) {
}
if
(
!
tsMgmtShowMetaFp
[
pShowMsg
->
type
]
||
!
tsMgmtShowRetrieveFp
[
pShowMsg
->
type
])
{
mError
(
"show type:%s is not support"
,
taos
GetShowTypeStr
(
pShowMsg
->
type
));
mError
(
"show type:%s is not support"
,
mgmt
GetShowTypeStr
(
pShowMsg
->
type
));
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_OPS_NOT_SUPPORT
);
return
;
}
...
...
@@ -209,7 +230,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) {
mgmtSaveQhandle
(
pShow
);
pShowRsp
->
qhandle
=
htobe64
((
uint64_t
)
pShow
);
mTrace
(
"show:%p, type:%s, start to get meta"
,
pShow
,
taos
GetShowTypeStr
(
pShowMsg
->
type
));
mTrace
(
"show:%p, type:%s, start to get meta"
,
pShow
,
mgmt
GetShowTypeStr
(
pShowMsg
->
type
));
int32_t
code
=
(
*
tsMgmtShowMetaFp
[
pShowMsg
->
type
])(
&
pShowRsp
->
tableMeta
,
pShow
,
pMsg
->
thandle
);
if
(
code
==
0
)
{
SRpcMsg
rpcRsp
=
{
...
...
@@ -220,7 +241,7 @@ static void mgmtProcessShowMsg(SQueuedMsg *pMsg) {
};
rpcSendResponse
(
&
rpcRsp
);
}
else
{
mError
(
"show:%p, type:%s, failed to get meta, reason:%s"
,
pShow
,
taos
GetShowTypeStr
(
pShowMsg
->
type
),
tstrerror
(
code
));
mError
(
"show:%p, type:%s, failed to get meta, reason:%s"
,
pShow
,
mgmt
GetShowTypeStr
(
pShowMsg
->
type
),
tstrerror
(
code
));
mgmtFreeQhandle
(
pShow
);
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
thandle
,
...
...
@@ -248,7 +269,7 @@ static void mgmtProcessRetrieveMsg(SQueuedMsg *pMsg) {
}
SShowObj
*
pShow
=
(
SShowObj
*
)
pRetrieve
->
qhandle
;
mTrace
(
"show:%p, type:%s, retrieve data"
,
pShow
,
taos
GetShowTypeStr
(
pShow
->
type
));
mTrace
(
"show:%p, type:%s, retrieve data"
,
pShow
,
mgmt
GetShowTypeStr
(
pShow
->
type
));
if
(
!
mgmtCheckQhandle
(
pRetrieve
->
qhandle
))
{
mError
(
"pShow:%p, query memory is corrupted"
,
pShow
);
...
...
@@ -338,11 +359,11 @@ static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secr
SUserObj
*
pUser
=
mgmtGetUser
(
user
);
if
(
pUser
==
NULL
)
{
*
secret
=
0
;
mgmt
DecUserRef
(
pUser
);
mgmt
ReleaseUser
(
pUser
);
return
TSDB_CODE_INVALID_USER
;
}
else
{
memcpy
(
secret
,
pUser
->
pass
,
TSDB_KEY_LEN
);
mgmt
DecUserRef
(
pUser
);
mgmt
ReleaseUser
(
pUser
);
return
TSDB_CODE_SUCCESS
;
}
}
...
...
src/mnode/src/mgmtTable.c
浏览文件 @
8568f4c4
...
...
@@ -19,7 +19,6 @@
#include "tscompression.h"
#include "tskiplist.h"
#include "ttime.h"
#include "tstatus.h"
#include "tutil.h"
#include "qast.h"
#include "qextbuffer.h"
...
...
@@ -28,15 +27,14 @@
#include "tscompression.h"
#include "tskiplist.h"
#include "tsqlfunction.h"
#include "tstatus.h"
#include "ttime.h"
#include "name.h"
#include "
mgmtAcc
t.h"
#include "
taccoun
t.h"
#include "mgmtDClient.h"
#include "mgmtDb.h"
#include "
mgmtDnode
.h"
#include "
tcluster
.h"
#include "mgmtDServer.h"
#include "
mgmtG
rant.h"
#include "
tg
rant.h"
#include "mgmtMnode.h"
#include "mgmtProfile.h"
#include "mgmtSdb.h"
...
...
@@ -98,21 +96,21 @@ static int32_t mgmtChildTableActionInsert(SSdbOperDesc *pOper) {
mError
(
"ctable:%s, not in vgroup:%d"
,
pTable
->
info
.
tableId
,
pTable
->
vgId
);
return
TSDB_CODE_INVALID_VGROUP_ID
;
}
mgmt
DecVgroupRef
(
pVgroup
);
mgmt
ReleaseVgroup
(
pVgroup
);
SDbObj
*
pDb
=
mgmtGetDb
(
pVgroup
->
dbName
);
if
(
pDb
==
NULL
)
{
mError
(
"ctable:%s, vgroup:%d not in db:%s"
,
pTable
->
info
.
tableId
,
pVgroup
->
vgId
,
pVgroup
->
dbName
);
return
TSDB_CODE_INVALID_DB
;
}
mgmt
DecDbRef
(
pDb
);
mgmt
ReleaseDb
(
pDb
);
SAcctObj
*
pAcct
=
acctGetAcct
(
pDb
->
cfg
.
acct
);
if
(
pAcct
==
NULL
)
{
mError
(
"ctable:%s, account:%s not exists"
,
pTable
->
info
.
tableId
,
pDb
->
cfg
.
acct
);
return
TSDB_CODE_INVALID_ACCT
;
}
acct
DecRef
(
pAcct
);
acct
ReleaseAcct
(
pAcct
);
if
(
pTable
->
info
.
type
==
TSDB_CHILD_TABLE
)
{
pTable
->
superTable
=
mgmtGetSuperTable
(
pTable
->
superTableId
);
...
...
@@ -140,21 +138,21 @@ static int32_t mgmtChildTableActionDelete(SSdbOperDesc *pOper) {
if
(
pVgroup
==
NULL
)
{
return
TSDB_CODE_INVALID_VGROUP_ID
;
}
mgmt
DecVgroupRef
(
pVgroup
);
mgmt
ReleaseVgroup
(
pVgroup
);
SDbObj
*
pDb
=
mgmtGetDb
(
pVgroup
->
dbName
);
if
(
pDb
==
NULL
)
{
mError
(
"ctable:%s, vgroup:%d not in DB:%s"
,
pTable
->
info
.
tableId
,
pVgroup
->
vgId
,
pVgroup
->
dbName
);
return
TSDB_CODE_INVALID_DB
;
}
mgmt
DecDbRef
(
pDb
);
mgmt
ReleaseDb
(
pDb
);
SAcctObj
*
pAcct
=
acctGetAcct
(
pDb
->
cfg
.
acct
);
if
(
pAcct
==
NULL
)
{
mError
(
"ctable:%s, account:%s not exists"
,
pTable
->
info
.
tableId
,
pDb
->
cfg
.
acct
);
return
TSDB_CODE_INVALID_ACCT
;
}
acct
DecRef
(
pAcct
);
acct
ReleaseAcct
(
pAcct
);
if
(
pTable
->
info
.
type
==
TSDB_CHILD_TABLE
)
{
grantRestore
(
TSDB_GRANT_TIMESERIES
,
pTable
->
superTable
->
numOfColumns
-
1
);
...
...
@@ -272,7 +270,7 @@ static int32_t mgmtInitChildTables() {
pNode
=
pLastNode
;
continue
;
}
mgmt
DecDbRef
(
pDb
);
mgmt
ReleaseDb
(
pDb
);
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
pTable
->
vgId
);
if
(
pVgroup
==
NULL
)
{
...
...
@@ -286,7 +284,7 @@ static int32_t mgmtInitChildTables() {
pNode
=
pLastNode
;
continue
;
}
mgmt
DecVgroupRef
(
pVgroup
);
mgmt
ReleaseVgroup
(
pVgroup
);
if
(
strcmp
(
pVgroup
->
dbName
,
pDb
->
name
)
!=
0
)
{
mError
(
"ctable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it"
,
...
...
@@ -354,7 +352,7 @@ static int32_t mgmtSuperTableActionInsert(SSdbOperDesc *pOper) {
if
(
pDb
!=
NULL
)
{
mgmtAddSuperTableIntoDb
(
pDb
);
}
mgmt
DecDbRef
(
pDb
);
mgmt
ReleaseDb
(
pDb
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -366,7 +364,7 @@ static int32_t mgmtSuperTableActionDelete(SSdbOperDesc *pOper) {
mgmtRemoveSuperTableFromDb
(
pDb
);
mgmtDropAllChildTablesInStable
((
SSuperTableObj
*
)
pStable
);
}
mgmt
DecDbRef
(
pDb
);
mgmt
ReleaseDb
(
pDb
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -554,7 +552,7 @@ static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
}
pMsg
->
pDb
=
mgmtGetDb
(
pCreate
->
db
);
if
(
pMsg
->
pDb
==
NULL
||
pMsg
->
pDb
->
dirty
)
{
if
(
pMsg
->
pDb
==
NULL
||
pMsg
->
pDb
->
status
!=
TSDB_DB_STATUS_READY
)
{
mError
(
"table:%s, failed to create, db not selected"
,
pCreate
->
tableId
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_DB_NOT_SELECTED
);
return
;
...
...
@@ -572,7 +570,7 @@ static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
static
void
mgmtProcessDropTableMsg
(
SQueuedMsg
*
pMsg
)
{
SCMDropTableMsg
*
pDrop
=
pMsg
->
pCont
;
pMsg
->
pDb
=
mgmtGetDbByTableId
(
pDrop
->
tableId
);
if
(
pMsg
->
pDb
==
NULL
||
pMsg
->
pDb
->
dirty
)
{
if
(
pMsg
->
pDb
==
NULL
||
pMsg
->
pDb
->
status
!=
TSDB_DB_STATUS_READY
)
{
mError
(
"table:%s, failed to drop table, db not selected"
,
pDrop
->
tableId
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_DB_NOT_SELECTED
);
return
;
...
...
@@ -611,7 +609,7 @@ static void mgmtProcessTableMetaMsg(SQueuedMsg *pMsg) {
mTrace
(
"table:%s, table meta msg is received from thandle:%p"
,
pInfo
->
tableId
,
pMsg
->
thandle
);
pMsg
->
pDb
=
mgmtGetDbByTableId
(
pInfo
->
tableId
);
if
(
pMsg
->
pDb
==
NULL
||
pMsg
->
pDb
->
dirty
)
{
if
(
pMsg
->
pDb
==
NULL
||
pMsg
->
pDb
->
status
!=
TSDB_DB_STATUS_READY
)
{
mError
(
"table:%s, failed to get table meta, db not selected"
,
pInfo
->
tableId
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_DB_NOT_SELECTED
);
return
;
...
...
@@ -860,7 +858,7 @@ static int32_t mgmtAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SSc
SAcctObj
*
pAcct
=
acctGetAcct
(
pDb
->
cfg
.
acct
);
if
(
pAcct
!=
NULL
)
{
pAcct
->
acctInfo
.
numOfTimeSeries
+=
(
ncols
*
pStable
->
numOfTables
);
acct
DecRef
(
pAcct
);
acct
ReleaseAcct
(
pAcct
);
}
SSdbOperDesc
oper
=
{
...
...
@@ -897,7 +895,7 @@ static int32_t mgmtDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, ch
SAcctObj
*
pAcct
=
acctGetAcct
(
pDb
->
cfg
.
acct
);
if
(
pAcct
!=
NULL
)
{
pAcct
->
acctInfo
.
numOfTimeSeries
-=
pStable
->
numOfTables
;
acct
DecRef
(
pAcct
);
acct
ReleaseAcct
(
pAcct
);
}
SSdbOperDesc
oper
=
{
...
...
@@ -963,7 +961,7 @@ static int32_t mgmtGetShowSuperTableMeta(STableMetaMsg *pMeta, SShowObj *pShow,
pShow
->
numOfRows
=
pDb
->
numOfSuperTables
;
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
mgmt
DecDbRef
(
pDb
);
mgmt
ReleaseDb
(
pDb
);
return
0
;
}
...
...
@@ -1028,7 +1026,7 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v
}
pShow
->
numOfReads
+=
numOfRows
;
mgmt
DecDbRef
(
pDb
);
mgmt
ReleaseDb
(
pDb
);
return
numOfRows
;
}
...
...
@@ -1106,7 +1104,7 @@ static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) {
return
;
}
SCMSTableVgroupRspMsg
*
pRsp
=
rpcMallocCont
(
sizeof
(
SCMSTableVgroupRspMsg
)
+
sizeof
(
uint32_t
)
*
mgmt
GetDnodesNum
());
SCMSTableVgroupRspMsg
*
pRsp
=
rpcMallocCont
(
sizeof
(
SCMSTableVgroupRspMsg
)
+
sizeof
(
uint32_t
)
*
cluster
GetDnodesNum
());
if
(
pRsp
==
NULL
)
{
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_INVALID_TABLE
);
return
;
...
...
@@ -1409,7 +1407,7 @@ static int32_t mgmtAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSc
SAcctObj
*
pAcct
=
acctGetAcct
(
pDb
->
cfg
.
acct
);
if
(
pAcct
!=
NULL
)
{
pAcct
->
acctInfo
.
numOfTimeSeries
+=
ncols
;
acct
DecRef
(
pAcct
);
acct
ReleaseAcct
(
pAcct
);
}
SSdbOperDesc
oper
=
{
...
...
@@ -1443,7 +1441,7 @@ static int32_t mgmtDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, ch
SAcctObj
*
pAcct
=
acctGetAcct
(
pDb
->
cfg
.
acct
);
if
(
pAcct
!=
NULL
)
{
pAcct
->
acctInfo
.
numOfTimeSeries
--
;
acct
DecRef
(
pAcct
);
acct
ReleaseAcct
(
pAcct
);
}
SSdbOperDesc
oper
=
{
...
...
@@ -1633,7 +1631,7 @@ static void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) {
}
static
SChildTableObj
*
mgmtGetTableByPos
(
uint32_t
dnodeId
,
int32_t
vnode
,
int32_t
sid
)
{
SDnodeObj
*
pObj
=
mgmt
GetDnode
(
dnodeId
);
SDnodeObj
*
pObj
=
cluster
GetDnode
(
dnodeId
);
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
vnode
);
if
(
pObj
==
NULL
||
pVgroup
==
NULL
)
{
...
...
@@ -1642,7 +1640,7 @@ static SChildTableObj* mgmtGetTableByPos(uint32_t dnodeId, int32_t vnode, int32_
SChildTableObj
*
pTable
=
pVgroup
->
tableList
[
sid
];
mgmtIncTableRef
((
STableObj
*
)
pTable
);
mgmt
DecVgroupRef
(
pVgroup
);
mgmt
ReleaseVgroup
(
pVgroup
);
return
pTable
;
}
...
...
@@ -1863,7 +1861,7 @@ static int32_t mgmtGetShowTableMeta(STableMetaMsg *pMeta, SShowObj *pShow, void
pShow
->
numOfRows
=
pDb
->
numOfTables
;
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
mgmt
DecDbRef
(
pDb
);
mgmt
ReleaseDb
(
pDb
);
return
0
;
}
...
...
@@ -1940,7 +1938,7 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows,
const
int32_t
NUM_OF_COLUMNS
=
4
;
mgmtVacuumResult
(
data
,
NUM_OF_COLUMNS
,
numOfRows
,
rows
,
pShow
);
mgmt
DecDbRef
(
pDb
);
mgmt
ReleaseDb
(
pDb
);
return
numOfRows
;
}
...
...
@@ -1950,7 +1948,7 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) {
mTrace
(
"table:%s, alter table msg is received from thandle:%p"
,
pAlter
->
tableId
,
pMsg
->
thandle
);
pMsg
->
pDb
=
mgmtGetDbByTableId
(
pAlter
->
tableId
);
if
(
pMsg
->
pDb
==
NULL
||
pMsg
->
pDb
->
dirty
)
{
if
(
pMsg
->
pDb
==
NULL
||
pMsg
->
pDb
->
status
!=
TSDB_DB_STATUS_READY
)
{
mError
(
"table:%s, failed to alter table, db not selected"
,
pAlter
->
tableId
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_DB_NOT_SELECTED
);
return
;
...
...
src/mnode/src/mgmtUser.c
浏览文件 @
8568f4c4
...
...
@@ -18,8 +18,8 @@
#include "trpc.h"
#include "ttime.h"
#include "tutil.h"
#include "
mgmtAcc
t.h"
#include "
mgmtG
rant.h"
#include "
taccoun
t.h"
#include "
tg
rant.h"
#include "mgmtMnode.h"
#include "mgmtSdb.h"
#include "mgmtShell.h"
...
...
@@ -117,7 +117,7 @@ int32_t mgmtInitUsers() {
mgmtCreateUser
(
pAcct
,
"root"
,
"taosdata"
);
mgmtCreateUser
(
pAcct
,
"monitor"
,
tsInternalPass
);
mgmtCreateUser
(
pAcct
,
"_root"
,
tsInternalPass
);
acct
DecRef
(
pAcct
);
acct
ReleaseAcct
(
pAcct
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_CREATE_USER
,
mgmtProcessCreateUserMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_ALTER_USER
,
mgmtProcessAlterUserMsg
);
...
...
@@ -137,11 +137,7 @@ SUserObj *mgmtGetUser(char *name) {
return
(
SUserObj
*
)
sdbGetRow
(
tsUserSdb
,
name
);
}
void
mgmtIncUserRef
(
SUserObj
*
pUser
)
{
return
sdbIncRef
(
tsUserSdb
,
pUser
);
}
void
mgmtDecUserRef
(
SUserObj
*
pUser
)
{
void
mgmtReleaseUser
(
SUserObj
*
pUser
)
{
return
sdbDecRef
(
tsUserSdb
,
pUser
);
}
...
...
@@ -174,7 +170,7 @@ int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) {
SUserObj
*
pUser
=
mgmtGetUser
(
name
);
if
(
pUser
!=
NULL
)
{
mTrace
(
"user:%s is already there"
,
name
);
mgmt
DecUserRef
(
pUser
);
mgmt
ReleaseUser
(
pUser
);
return
TSDB_CODE_USER_ALREADY_EXIST
;
}
...
...
@@ -264,7 +260,7 @@ static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCon
pShow
->
numOfRows
=
pUser
->
pAcct
->
acctInfo
.
numOfUsers
;
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
mgmt
DecUserRef
(
pUser
);
mgmt
ReleaseUser
(
pUser
);
return
0
;
}
...
...
@@ -299,7 +295,7 @@ static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void
cols
++
;
numOfRows
++
;
mgmt
DecUserRef
(
pUser
);
mgmt
ReleaseUser
(
pUser
);
}
pShow
->
numOfReads
+=
numOfRows
;
return
numOfRows
;
...
...
@@ -351,7 +347,7 @@ static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) {
if
(
strcmp
(
pUser
->
user
,
"monitor"
)
==
0
||
(
strcmp
(
pUser
->
user
+
1
,
pUser
->
acct
)
==
0
&&
pUser
->
user
[
0
]
==
'_'
))
{
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_NO_RIGHTS
);
mgmt
DecUserRef
(
pUser
);
mgmt
ReleaseUser
(
pUser
);
return
;
}
...
...
@@ -427,7 +423,7 @@ static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) {
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_NO_RIGHTS
);
}
mgmt
DecUserRef
(
pUser
);
mgmt
ReleaseUser
(
pUser
);
}
static
void
mgmtProcessDropUserMsg
(
SQueuedMsg
*
pMsg
)
{
...
...
@@ -446,7 +442,7 @@ static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg) {
if
(
strcmp
(
pUser
->
user
,
"monitor"
)
==
0
||
strcmp
(
pUser
->
user
,
pUser
->
acct
)
==
0
||
(
strcmp
(
pUser
->
user
+
1
,
pUser
->
acct
)
==
0
&&
pUser
->
user
[
0
]
==
'_'
))
{
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_NO_RIGHTS
);
mgmt
DecUserRef
(
pUser
);
mgmt
ReleaseUser
(
pUser
);
return
;
}
...
...
@@ -475,7 +471,7 @@ static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg) {
}
mgmtSendSimpleResp
(
pMsg
->
thandle
,
code
);
mgmt
DecUserRef
(
pUser
);
mgmt
ReleaseUser
(
pUser
);
}
void
mgmtDropAllUsers
(
SAcctObj
*
pAcct
)
{
...
...
@@ -501,7 +497,7 @@ void mgmtDropAllUsers(SAcctObj *pAcct) {
numOfUsers
++
;
}
mgmt
DecUserRef
(
pUser
);
mgmt
ReleaseUser
(
pUser
);
}
mTrace
(
"acct:%s, all users:%d is dropped from sdb"
,
pAcct
->
user
,
numOfUsers
);
...
...
src/mnode/src/mgmtVgroup.c
浏览文件 @
8568f4c4
...
...
@@ -17,12 +17,11 @@
#include "os.h"
#include "taoserror.h"
#include "tlog.h"
#include "tstatus.h"
#include "tbalance.h"
#include "tcluster.h"
#include "mnode.h"
#include "mgmtBalance.h"
#include "mgmtDb.h"
#include "mgmtDClient.h"
#include "mgmtDnode.h"
#include "mgmtDServer.h"
#include "mgmtMnode.h"
#include "mgmtProfile.h"
...
...
@@ -54,11 +53,11 @@ static int32_t mgmtVgroupActionDestroy(SSdbOperDesc *pOper) {
}
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
SDnodeObj
*
pDnode
=
mgmt
GetDnode
(
pVgroup
->
vnodeGid
[
i
].
dnodeId
);
SDnodeObj
*
pDnode
=
cluster
GetDnode
(
pVgroup
->
vnodeGid
[
i
].
dnodeId
);
if
(
pDnode
)
{
atomic_sub_fetch_32
(
&
pDnode
->
openVnodes
,
1
);
}
mgmtDecDnodeRef
(
pDnode
);
clusterReleaseDnode
(
pDnode
);
}
tfree
(
pOper
->
pObj
);
...
...
@@ -71,7 +70,7 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) {
if
(
pDb
==
NULL
)
{
return
TSDB_CODE_INVALID_DB
;
}
mgmt
DecDbRef
(
pDb
);
mgmt
ReleaseDb
(
pDb
);
pVgroup
->
pDb
=
pDb
;
pVgroup
->
prev
=
NULL
;
...
...
@@ -92,12 +91,12 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) {
}
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
SDnodeObj
*
pDnode
=
mgmt
GetDnode
(
pVgroup
->
vnodeGid
[
i
].
dnodeId
);
SDnodeObj
*
pDnode
=
cluster
GetDnode
(
pVgroup
->
vnodeGid
[
i
].
dnodeId
);
if
(
pDnode
!=
NULL
)
{
pVgroup
->
vnodeGid
[
i
].
privateIp
=
pDnode
->
privateIp
;
pVgroup
->
vnodeGid
[
i
].
publicIp
=
pDnode
->
publicIp
;
atomic_add_fetch_32
(
&
pDnode
->
openVnodes
,
1
);
mgmtDecDnodeRef
(
pDnode
);
clusterReleaseDnode
(
pDnode
);
}
}
...
...
@@ -114,7 +113,7 @@ static int32_t mgmtVgroupActionDelete(SSdbOperDesc *pOper) {
mgmtRemoveVgroupFromDb
(
pVgroup
);
}
mgmt
DecDbRef
(
pVgroup
->
pDb
);
mgmt
ReleaseDb
(
pVgroup
->
pDb
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -214,12 +213,16 @@ SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) {
return
pDb
->
pHead
;
}
void
*
mgmtGetNextVgroup
(
void
*
pNode
,
SVgObj
**
pVgroup
)
{
return
sdbFetchRow
(
tsVgroupSdb
,
pNode
,
(
void
**
)
pVgroup
);
}
void
mgmtCreateVgroup
(
SQueuedMsg
*
pMsg
,
SDbObj
*
pDb
)
{
SVgObj
*
pVgroup
=
(
SVgObj
*
)
calloc
(
1
,
sizeof
(
SVgObj
));
strcpy
(
pVgroup
->
dbName
,
pDb
->
name
);
pVgroup
->
numOfVnodes
=
pDb
->
cfg
.
replications
;
pVgroup
->
createdTime
=
taosGetTimestampMs
();
if
(
mgmt
AllocVnodes
(
pVgroup
)
!=
0
)
{
if
(
balance
AllocVnodes
(
pVgroup
)
!=
0
)
{
mError
(
"db:%s, no enough dnode to alloc %d vnodes to vgroup"
,
pDb
->
name
,
pVgroup
->
numOfVnodes
);
free
(
pVgroup
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_NO_ENOUGH_DNODES
);
...
...
@@ -310,7 +313,7 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
mgmtDecTableRef
(
pTable
);
pVgroup
=
mgmtGetVgroup
(((
SChildTableObj
*
)
pTable
)
->
vgId
);
if
(
NULL
==
pVgroup
)
return
TSDB_CODE_INVALID_TABLE_ID
;
mgmt
DecVgroupRef
(
pVgroup
);
mgmt
ReleaseVgroup
(
pVgroup
);
maxReplica
=
pVgroup
->
numOfVnodes
>
maxReplica
?
pVgroup
->
numOfVnodes
:
maxReplica
;
}
else
{
SVgObj
*
pVgroup
=
pDb
->
pHead
;
...
...
@@ -356,18 +359,18 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
pShow
->
pNode
=
pVgroup
;
}
mgmt
DecDbRef
(
pDb
);
mgmt
ReleaseDb
(
pDb
);
return
0
;
}
char
*
mgmtGetVnodeStatus
(
SVgObj
*
pVgroup
,
SVnodeGid
*
pVnode
)
{
SDnodeObj
*
pDnode
=
mgmt
GetDnode
(
pVnode
->
dnodeId
);
SDnodeObj
*
pDnode
=
cluster
GetDnode
(
pVnode
->
dnodeId
);
if
(
pDnode
==
NULL
)
{
mError
(
"vgroup:%d, not exist in dnode:%d"
,
pVgroup
->
vgId
,
pDnode
->
dnodeId
);
return
"null"
;
}
mgmtDecDnodeRef
(
pDnode
);
clusterReleaseDnode
(
pDnode
);
if
(
pDnode
->
status
==
TSDB_DN_STATUS_OFFLINE
)
{
return
"offline"
;
...
...
@@ -375,7 +378,7 @@ char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) {
for
(
int
i
=
0
;
i
<
pDnode
->
openVnodes
;
++
i
)
{
if
(
pDnode
->
vload
[
i
].
vgId
==
pVgroup
->
vgId
)
{
return
(
char
*
)
taosGetVnodeStatusStr
(
pDnode
->
vload
[
i
].
status
)
;
return
pDnode
->
vload
[
i
].
status
?
"ready"
:
"offline"
;
}
}
...
...
@@ -415,7 +418,7 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
strcpy
(
pWrite
,
taosGetVgroupLbStatusStr
(
pVgroup
->
lbStatus
)
);
strcpy
(
pWrite
,
pVgroup
->
status
?
"updating"
:
"ready"
);
cols
++
;
for
(
int32_t
i
=
0
;
i
<
maxReplica
;
++
i
)
{
...
...
@@ -442,7 +445,7 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
}
pShow
->
numOfReads
+=
numOfRows
;
mgmt
DecDbRef
(
pDb
);
mgmt
ReleaseDb
(
pDb
);
return
numOfRows
;
}
...
...
@@ -653,13 +656,13 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) {
pCfg
->
dnodeId
=
htonl
(
pCfg
->
dnodeId
);
pCfg
->
vgId
=
htonl
(
pCfg
->
vgId
);
SDnodeObj
*
pDnode
=
mgmt
GetDnode
(
pCfg
->
dnodeId
);
SDnodeObj
*
pDnode
=
cluster
GetDnode
(
pCfg
->
dnodeId
);
if
(
pDnode
==
NULL
)
{
mTrace
(
"dnode:%s, invalid dnode"
,
taosIpStr
(
pCfg
->
dnodeId
),
pCfg
->
vgId
);
mgmtSendSimpleResp
(
rpcMsg
->
handle
,
TSDB_CODE_NOT_ACTIVE_VNODE
);
return
;
}
mgmtDecDnodeRef
(
pDnode
);
clusterReleaseDnode
(
pDnode
);
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
pCfg
->
vgId
);
if
(
pVgroup
==
NULL
)
{
...
...
@@ -667,7 +670,7 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) {
mgmtSendSimpleResp
(
rpcMsg
->
handle
,
TSDB_CODE_NOT_ACTIVE_VNODE
);
return
;
}
mgmt
DecVgroupRef
(
pVgroup
);
mgmt
ReleaseVgroup
(
pVgroup
);
mgmtSendSimpleResp
(
rpcMsg
->
handle
,
TSDB_CODE_SUCCESS
);
...
...
@@ -683,7 +686,7 @@ void mgmtDropAllVgroups(SDbObj *pDropDb) {
SVgObj
*
pVgroup
=
NULL
;
while
(
1
)
{
mgmt
DecVgroupRef
(
pVgroup
);
mgmt
ReleaseVgroup
(
pVgroup
);
pNode
=
sdbFetchRow
(
tsVgroupSdb
,
pNode
,
(
void
**
)
&
pVgroup
);
if
(
pVgroup
==
NULL
)
break
;
...
...
@@ -712,4 +715,5 @@ void mgmtAlterVgroup(SVgObj *pVgroup, void *ahandle) {
}
else
{
mgmtAddToShellQueue
(
ahandle
);
}
}
\ No newline at end of file
}
src/query/src/queryExecutor.c
浏览文件 @
8568f4c4
...
...
@@ -20,7 +20,6 @@
#include "tlog.h"
#include "tlosertree.h"
#include "tscompression.h"
#include "tstatus.h"
#include "ttime.h"
#include "qast.h"
...
...
src/util/inc/tstatus.h
已删除
100644 → 0
浏览文件 @
2d0396e9
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSTATUS_H
#define TDENGINE_TSTATUS_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include <stdint.h>
#include <stdbool.h>
#include "taoserror.h"
enum
_TSDB_DB_STATUS
{
TSDB_DB_STATUS_READY
,
TSDB_DB_STATUS_DROPPING
,
TSDB_DB_STATUS_DROP_FROM_SDB
};
typedef
enum
_TSDB_VN_STATUS
{
TSDB_VN_STATUS_NOT_READY
,
TSDB_VN_STATUS_UNSYNCED
,
TSDB_VN_STATUS_SLAVE
,
TSDB_VN_STATUS_MASTER
,
TSDB_VN_STATUS_CREATING
,
TSDB_VN_STATUS_CLOSING
,
TSDB_VN_STATUS_DELETING
,
}
EVnodeStatus
;
enum
_TSDB_VN_SYNC_STATUS
{
TSDB_VN_SYNC_STATUS_INIT
,
TSDB_VN_SYNC_STATUS_SYNCING
,
TSDB_VN_SYNC_STATUS_SYNC_CACHE
,
TSDB_VN_SYNC_STATUS_SYNC_FILE
};
enum
_TSDB_VN_DROP_STATUS
{
TSDB_VN_DROP_STATUS_READY
,
TSDB_VN_DROP_STATUS_DROPPING
};
enum
_TSDB_MN_STATUS
{
TSDB_MN_STATUS_OFFLINE
,
TSDB_MN_STATUS_UNSYNCED
,
TSDB_MN_STATUS_SYNCING
,
TSDB_MN_STATUS_SERVING
};
enum
_TSDB_MN_ROLE
{
TSDB_MN_ROLE_UNDECIDED
,
TSDB_MN_ROLE_SLAVE
,
TSDB_MN_ROLE_MASTER
};
enum
_TSDB_VG_STATUS
{
TSDB_VG_STATUS_READY
,
TSDB_VG_STATUS_UPDATE
};
enum
_TSDB_VN_STREAM_STATUS
{
TSDB_VN_STREAM_STATUS_STOP
,
TSDB_VN_STREAM_STATUS_START
};
enum
TSDB_TABLE_STATUS
{
TSDB_METER_STATE_READY
=
0x00
,
TSDB_METER_STATE_INSERTING
=
0x01
,
TSDB_METER_STATE_IMPORTING
=
0x02
,
TSDB_METER_STATE_UPDATING
=
0x04
,
TSDB_METER_STATE_DROPPING
=
0x10
,
TSDB_METER_STATE_DROPPED
=
0x18
,
};
char
*
taosGetVgroupStatusStr
(
int32_t
vgroupStatus
);
char
*
taosGetDbStatusStr
(
int32_t
dbStatus
);
char
*
taosGetVnodeStatusStr
(
int32_t
vnodeStatus
);
char
*
taosGetVnodeSyncStatusStr
(
int32_t
vnodeSyncStatus
);
char
*
taosGetVnodeDropStatusStr
(
int32_t
dropping
);
char
*
taosGetDnodeLbStatusStr
(
int32_t
dnodeBalanceStatus
);
char
*
taosGetVgroupLbStatusStr
(
int32_t
vglbStatus
);
char
*
taosGetVnodeStreamStatusStr
(
int32_t
vnodeStreamStatus
);
char
*
taosGetTableStatusStr
(
int32_t
tableStatus
);
char
*
taosGetShowTypeStr
(
int32_t
showType
);
char
*
taosGetMnodeStatusStr
(
int32_t
mnodeStatus
);
char
*
taosGetMnodeRoleStr
(
int32_t
mnodeRole
);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSTATUS_H
src/util/src/tstatus.c
已删除
100644 → 0
浏览文件 @
2d0396e9
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "taosmsg.h"
#include "tstatus.h"
char
*
taosGetVgroupStatusStr
(
int32_t
vgroupStatus
)
{
switch
(
vgroupStatus
)
{
case
TSDB_VG_STATUS_READY
:
return
(
char
*
)
tstrerror
(
vgroupStatus
);
case
TSDB_VG_STATUS_IN_PROGRESS
:
return
(
char
*
)
tstrerror
(
vgroupStatus
);
case
TSDB_VG_STATUS_NO_DISK_PERMISSIONS
:
return
(
char
*
)
tstrerror
(
vgroupStatus
);
case
TSDB_VG_STATUS_SERVER_NO_PACE
:
return
(
char
*
)
tstrerror
(
vgroupStatus
);
case
TSDB_VG_STATUS_SERV_OUT_OF_MEMORY
:
return
(
char
*
)
tstrerror
(
vgroupStatus
);
case
TSDB_VG_STATUS_INIT_FAILED
:
return
(
char
*
)
tstrerror
(
vgroupStatus
);
case
TSDB_VG_STATUS_FULL
:
return
(
char
*
)
tstrerror
(
vgroupStatus
);
default:
return
"undefined"
;
}
}
char
*
taosGetDbStatusStr
(
int32_t
dbStatus
)
{
switch
(
dbStatus
)
{
case
TSDB_DB_STATUS_READY
:
return
"ready"
;
case
TSDB_DB_STATUS_DROPPING
:
return
"dropping"
;
case
TSDB_DB_STATUS_DROP_FROM_SDB
:
return
"drop_from_sdb"
;
default:
return
"undefined"
;
}
}
char
*
taosGetVnodeStatusStr
(
int32_t
vnodeStatus
)
{
switch
(
vnodeStatus
)
{
case
TSDB_VN_STATUS_NOT_READY
:
return
"not_ready"
;
case
TSDB_VN_STATUS_UNSYNCED
:
return
"unsynced"
;
case
TSDB_VN_STATUS_SLAVE
:
return
"slave"
;
case
TSDB_VN_STATUS_MASTER
:
return
"master"
;
case
TSDB_VN_STATUS_CREATING
:
return
"creating"
;
case
TSDB_VN_STATUS_CLOSING
:
return
"closing"
;
case
TSDB_VN_STATUS_DELETING
:
return
"deleting"
;
default:
return
"undefined"
;
}
}
char
*
taosGetVnodeSyncStatusStr
(
int32_t
vnodeSyncStatus
)
{
switch
(
vnodeSyncStatus
)
{
case
TSDB_VN_SYNC_STATUS_INIT
:
return
"ready"
;
case
TSDB_VN_SYNC_STATUS_SYNCING
:
return
"syncing"
;
case
TSDB_VN_SYNC_STATUS_SYNC_CACHE
:
return
"sync_cache"
;
case
TSDB_VN_SYNC_STATUS_SYNC_FILE
:
return
"sync_file"
;
default:
return
"undefined"
;
}
}
char
*
taosGetVnodeDropStatusStr
(
int32_t
dropping
)
{
switch
(
dropping
)
{
case
TSDB_VN_DROP_STATUS_READY
:
return
"ready"
;
case
TSDB_VN_DROP_STATUS_DROPPING
:
return
"dropping"
;
default:
return
"undefined"
;
}
}
char
*
taosGetDnodeLbStatusStr
(
int32_t
dnodeBalanceStatus
)
{
switch
(
dnodeBalanceStatus
)
{
case
TSDB_DN_LB_STATUS_BALANCED
:
return
"balanced"
;
case
TSDB_DN_LB_STATUS_BALANCING
:
return
"balancing"
;
case
TSDB_DN_LB_STATUS_OFFLINE_REMOVING
:
return
"offline removing"
;
case
TSDB_DN_LB_STATE_SHELL_REMOVING
:
return
"removing"
;
default:
return
"undefined"
;
}
}
char
*
taosGetVgroupLbStatusStr
(
int32_t
vglbStatus
)
{
switch
(
vglbStatus
)
{
case
TSDB_VG_LB_STATUS_READY
:
return
"ready"
;
case
TSDB_VG_LB_STATUS_UPDATE
:
return
"updating"
;
default:
return
"undefined"
;
}
}
char
*
taosGetVnodeStreamStatusStr
(
int32_t
vnodeStreamStatus
)
{
switch
(
vnodeStreamStatus
)
{
case
TSDB_VN_STREAM_STATUS_START
:
return
"start"
;
case
TSDB_VN_STREAM_STATUS_STOP
:
return
"stop"
;
default:
return
"undefined"
;
}
}
char
*
taosGetTableStatusStr
(
int32_t
tableStatus
)
{
switch
(
tableStatus
)
{
case
TSDB_METER_STATE_INSERTING
:
return
"inserting"
;
case
TSDB_METER_STATE_IMPORTING
:
return
"importing"
;
case
TSDB_METER_STATE_UPDATING
:
return
"updating"
;
case
TSDB_METER_STATE_DROPPING
:
return
"deleting"
;
case
TSDB_METER_STATE_DROPPED
:
return
"dropped"
;
case
TSDB_METER_STATE_READY
:
return
"ready"
;
default:
return
"undefined"
;
}
}
char
*
taosGetShowTypeStr
(
int32_t
showType
)
{
switch
(
showType
)
{
case
TSDB_MGMT_TABLE_ACCT
:
return
"show accounts"
;
case
TSDB_MGMT_TABLE_USER
:
return
"show users"
;
case
TSDB_MGMT_TABLE_DB
:
return
"show databases"
;
case
TSDB_MGMT_TABLE_TABLE
:
return
"show tables"
;
case
TSDB_MGMT_TABLE_DNODE
:
return
"show dnodes"
;
case
TSDB_MGMT_TABLE_MNODE
:
return
"show mnodes"
;
case
TSDB_MGMT_TABLE_VGROUP
:
return
"show vgroups"
;
case
TSDB_MGMT_TABLE_METRIC
:
return
"show stables"
;
case
TSDB_MGMT_TABLE_MODULE
:
return
"show modules"
;
case
TSDB_MGMT_TABLE_QUERIES
:
return
"show queries"
;
case
TSDB_MGMT_TABLE_STREAMS
:
return
"show streams"
;
case
TSDB_MGMT_TABLE_CONFIGS
:
return
"show configs"
;
case
TSDB_MGMT_TABLE_CONNS
:
return
"show connections"
;
case
TSDB_MGMT_TABLE_SCORES
:
return
"show scores"
;
case
TSDB_MGMT_TABLE_GRANTS
:
return
"show grants"
;
case
TSDB_MGMT_TABLE_VNODES
:
return
"show vnodes"
;
default:
return
"undefined"
;
}
}
char
*
taosGetMnodeStatusStr
(
int32_t
mnodeStatus
)
{
switch
(
mnodeStatus
)
{
case
TSDB_MN_STATUS_OFFLINE
:
return
"offline"
;
case
TSDB_MN_STATUS_UNSYNCED
:
return
"unsynced"
;
case
TSDB_MN_STATUS_SYNCING
:
return
"syncing"
;
case
TSDB_MN_STATUS_SERVING
:
return
"serving"
;
default:
return
"undefined"
;
}
}
char
*
taosGetMnodeRoleStr
(
int32_t
mnodeRole
)
{
switch
(
mnodeRole
)
{
case
TSDB_MN_ROLE_UNDECIDED
:
return
"undicided"
;
case
TSDB_MN_ROLE_SLAVE
:
return
"slave"
;
case
TSDB_MN_ROLE_MASTER
:
return
"master"
;
default:
return
"undefined"
;
}
}
src/vnode/main/inc/vnodeInt.h
浏览文件 @
8568f4c4
...
...
@@ -23,18 +23,10 @@ extern "C" {
#include "tsync.h"
#include "twal.h"
typedef
enum
_VN_STATUS
{
VN_STATUS_INIT
,
VN_STATUS_CREATING
,
VN_STATUS_READY
,
VN_STATUS_CLOSING
,
VN_STATUS_DELETING
,
}
EVnStatus
;
typedef
struct
{
int32_t
vgId
;
// global vnode group ID
int32_t
refCount
;
// reference count
EVnStatus
status
;
int
status
;
int
role
;
int64_t
version
;
void
*
wqueue
;
...
...
src/vnode/main/src/vnodeMain.c
浏览文件 @
8568f4c4
...
...
@@ -20,7 +20,6 @@
#include "taosmsg.h"
#include "tlog.h"
#include "trpc.h"
#include "tstatus.h"
#include "tsdb.h"
#include "ttime.h"
#include "ttimer.h"
...
...
@@ -112,7 +111,7 @@ int32_t vnodeDrop(int32_t vgId) {
}
dTrace
(
"pVnode:%p vgId:%d, vnode will be dropped"
,
pVnode
,
pVnode
->
vgId
);
pVnode
->
status
=
VN_STATUS_DELETING
;
pVnode
->
status
=
TAOS_
VN_STATUS_DELETING
;
vnodeCleanUp
(
pVnode
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -124,7 +123,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
SVnodeObj
*
pVnode
=
calloc
(
sizeof
(
SVnodeObj
),
1
);
pVnode
->
vgId
=
vnode
;
pVnode
->
status
=
VN_STATUS_INIT
;
pVnode
->
status
=
TAOS_
VN_STATUS_INIT
;
pVnode
->
refCount
=
1
;
pVnode
->
version
=
0
;
taosAddIntHash
(
tsDnodeVnodesHash
,
pVnode
->
vgId
,
(
char
*
)(
&
pVnode
));
...
...
@@ -161,7 +160,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
walRestore
(
pVnode
->
wal
,
pVnode
,
vnodeWriteToQueue
);
pVnode
->
status
=
VN_STATUS_READY
;
pVnode
->
status
=
TAOS_
VN_STATUS_READY
;
dTrace
(
"pVnode:%p vgId:%d, vnode is opened in %s"
,
pVnode
,
pVnode
->
vgId
,
rootDir
);
tsOpennedVnodes
++
;
...
...
@@ -174,7 +173,7 @@ int32_t vnodeClose(int32_t vgId) {
if
(
pVnode
==
NULL
)
return
0
;
dTrace
(
"pVnode:%p vgId:%d, vnode will be closed"
,
pVnode
,
pVnode
->
vgId
);
pVnode
->
status
=
VN_STATUS_CLOSING
;
pVnode
->
status
=
TAOS_
VN_STATUS_CLOSING
;
vnodeCleanUp
(
pVnode
);
return
0
;
...
...
@@ -198,7 +197,7 @@ void vnodeRelease(void *pVnodeRaw) {
dnodeFreeWqueue
(
pVnode
->
wqueue
);
pVnode
->
wqueue
=
NULL
;
if
(
pVnode
->
status
==
VN_STATUS_DELETING
)
{
if
(
pVnode
->
status
==
TAOS_
VN_STATUS_DELETING
)
{
// remove the whole directory
}
...
...
@@ -258,7 +257,7 @@ void vnodeBuildStatusMsg(void *param) {
static
void
vnodeBuildVloadMsg
(
char
*
pNode
,
void
*
param
)
{
SVnodeObj
*
pVnode
=
*
(
SVnodeObj
**
)
pNode
;
if
(
pVnode
->
status
==
VN_STATUS_DELETING
)
return
;
if
(
pVnode
->
status
==
TAOS_
VN_STATUS_DELETING
)
return
;
SDMStatusMsg
*
pStatus
=
param
;
if
(
pStatus
->
openVnodes
>=
TSDB_MAX_VNODES
)
return
;
...
...
src/vnode/main/src/vnodeRead.c
浏览文件 @
8568f4c4
...
...
@@ -42,7 +42,7 @@ int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen,
if
(
vnodeProcessReadMsgFp
[
msgType
]
==
NULL
)
return
TSDB_CODE_MSG_NOT_PROCESSED
;
if
(
pVnode
->
status
==
VN_STATUS_DELETING
||
pVnode
->
status
==
VN_STATUS_CLOSING
)
if
(
pVnode
->
status
==
TAOS_VN_STATUS_DELETING
||
pVnode
->
status
==
TAOS_
VN_STATUS_CLOSING
)
return
TSDB_CODE_NOT_ACTIVE_VNODE
;
return
(
*
vnodeProcessReadMsgFp
[
msgType
])(
pVnode
,
pCont
,
contLen
,
ret
);
...
...
src/vnode/main/src/vnodeWrite.c
浏览文件 @
8568f4c4
...
...
@@ -41,18 +41,19 @@ void vnodeInitWriteFp(void) {
vnodeProcessWriteMsgFp
[
TSDB_MSG_TYPE_MD_DROP_STABLE
]
=
vnodeProcessDropStableMsg
;
}
int32_t
vnodeProcessWrite
(
void
*
param
,
int
qtype
,
SWalHead
*
pHead
,
void
*
item
)
{
int32_t
vnodeProcessWrite
(
void
*
param
1
,
int
qtype
,
void
*
param2
,
void
*
item
)
{
int32_t
code
=
0
;
SVnodeObj
*
pVnode
=
(
SVnodeObj
*
)
param
;
SVnodeObj
*
pVnode
=
(
SVnodeObj
*
)
param1
;
SWalHead
*
pHead
=
param2
;
if
(
vnodeProcessWriteMsgFp
[
pHead
->
msgType
]
==
NULL
)
return
TSDB_CODE_MSG_NOT_PROCESSED
;
if
(
pVnode
->
status
==
VN_STATUS_DELETING
||
pVnode
->
status
==
VN_STATUS_CLOSING
)
if
(
pVnode
->
status
==
TAOS_VN_STATUS_DELETING
||
pVnode
->
status
==
TAOS_
VN_STATUS_CLOSING
)
return
TSDB_CODE_NOT_ACTIVE_VNODE
;
if
(
pHead
->
version
==
0
)
{
// from client
if
(
pVnode
->
status
!=
VN_STATUS_READY
)
if
(
pVnode
->
status
!=
TAOS_
VN_STATUS_READY
)
return
TSDB_CODE_NOT_ACTIVE_VNODE
;
// if (pVnode->replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录