Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c1169467
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c1169467
编写于
12月 29, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
create bnode and snode
上级
f38b1b21
变更
18
隐藏空白更改
内联
并排
Showing
18 changed file
with
1488 addition
and
39 deletion
+1488
-39
include/common/tmsg.h
include/common/tmsg.h
+6
-3
include/common/tmsgdef.h
include/common/tmsgdef.h
+20
-0
include/dnode/mnode/sdb/sdb.h
include/dnode/mnode/sdb/sdb.h
+15
-12
include/util/taoserror.h
include/util/taoserror.h
+7
-2
source/dnode/mgmt/impl/src/dndBnode.c
source/dnode/mgmt/impl/src/dndBnode.c
+2
-2
source/dnode/mgmt/impl/src/dndQnode.c
source/dnode/mgmt/impl/src/dndQnode.c
+2
-2
source/dnode/mgmt/impl/src/dndSnode.c
source/dnode/mgmt/impl/src/dndSnode.c
+2
-2
source/dnode/mnode/impl/inc/mndBnode.h
source/dnode/mnode/impl/inc/mndBnode.h
+5
-5
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+21
-0
source/dnode/mnode/impl/inc/mndQnode.h
source/dnode/mnode/impl/inc/mndQnode.h
+15
-4
source/dnode/mnode/impl/inc/mndSnode.h
source/dnode/mnode/impl/inc/mndSnode.h
+32
-0
source/dnode/mnode/impl/src/mndBnode.c
source/dnode/mnode/impl/src/mndBnode.c
+446
-0
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+2
-2
source/dnode/mnode/impl/src/mndQnode.c
source/dnode/mnode/impl/src/mndQnode.c
+446
-0
source/dnode/mnode/impl/src/mndSnode.c
source/dnode/mnode/impl/src/mndSnode.c
+446
-0
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+6
-2
source/dnode/mnode/sdb/src/sdbHash.c
source/dnode/mnode/sdb/src/sdbHash.c
+8
-1
source/util/src/terror.c
source/util/src/terror.c
+7
-2
未找到文件。
include/common/tmsg.h
浏览文件 @
c1169467
...
@@ -71,6 +71,9 @@ typedef enum _mgmt_table {
...
@@ -71,6 +71,9 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_TABLE
,
TSDB_MGMT_TABLE_TABLE
,
TSDB_MGMT_TABLE_DNODE
,
TSDB_MGMT_TABLE_DNODE
,
TSDB_MGMT_TABLE_MNODE
,
TSDB_MGMT_TABLE_MNODE
,
TSDB_MGMT_TABLE_QNODE
,
TSDB_MGMT_TABLE_SNODE
,
TSDB_MGMT_TABLE_BNODE
,
TSDB_MGMT_TABLE_VGROUP
,
TSDB_MGMT_TABLE_VGROUP
,
TSDB_MGMT_TABLE_STB
,
TSDB_MGMT_TABLE_STB
,
TSDB_MGMT_TABLE_MODULE
,
TSDB_MGMT_TABLE_MODULE
,
...
@@ -866,15 +869,15 @@ typedef struct {
...
@@ -866,15 +869,15 @@ typedef struct {
typedef
struct
{
typedef
struct
{
int32_t
dnodeId
;
int32_t
dnodeId
;
}
S
CreateQnodeInMsg
,
SDropQnodeIn
Msg
;
}
S
MCreateQnodeMsg
,
SMCreateQnodeMsg
,
SDCreateQnodeMsg
,
SDDropQnode
Msg
;
typedef
struct
{
typedef
struct
{
int32_t
dnodeId
;
int32_t
dnodeId
;
}
S
CreateSnodeInMsg
,
SDropSnodeIn
Msg
;
}
S
MCreateSnodeMsg
,
SMCreateSnodeMsg
,
SDCreateSnodeMsg
,
SDDropSnode
Msg
;
typedef
struct
{
typedef
struct
{
int32_t
dnodeId
;
int32_t
dnodeId
;
}
S
CreateBnodeInMsg
,
SDropBnodeIn
Msg
;
}
S
MCreateBnodeMsg
,
SMCreateBnodeMsg
,
SDCreateBnodeMsg
,
SDDropBnode
Msg
;
typedef
struct
{
typedef
struct
{
int32_t
dnodeId
;
int32_t
dnodeId
;
...
...
include/common/tmsgdef.h
浏览文件 @
c1169467
...
@@ -70,6 +70,15 @@ enum {
...
@@ -70,6 +70,15 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_DND_CREATE_MNODE
,
"dnode-create-mnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_CREATE_MNODE
,
"dnode-create-mnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_ALTER_MNODE
,
"dnode-alter-mnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_ALTER_MNODE
,
"dnode-alter-mnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_DROP_MNODE
,
"dnode-drop-mnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_DROP_MNODE
,
"dnode-drop-mnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_CREATE_QNODE
,
"dnode-create-qnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_ALTER_QNODE
,
"dnode-alter-qnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_DROP_QNODE
,
"dnode-drop-qnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_CREATE_SNODE
,
"dnode-create-snode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_ALTER_SNODE
,
"dnode-alter-snode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_DROP_SNODE
,
"dnode-drop-snode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_CREATE_BNODE
,
"dnode-create-bnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_ALTER_BNODE
,
"dnode-alter-bnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_DROP_BNODE
,
"dnode-drop-bnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_CREATE_VNODE
,
"dnode-create-vnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_CREATE_VNODE
,
"dnode-create-vnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_ALTER_VNODE
,
"dnode-alter-vnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_ALTER_VNODE
,
"dnode-alter-vnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_DROP_VNODE
,
"dnode-drop-vnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_DND_DROP_VNODE
,
"dnode-drop-vnode"
,
NULL
,
NULL
)
...
@@ -90,9 +99,20 @@ enum {
...
@@ -90,9 +99,20 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_USER
,
"mnode-drop-user"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_USER
,
"mnode-drop-user"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_DNODE
,
"mnode-create-dnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_DNODE
,
"mnode-create-dnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CONFIG_DNODE
,
"mnode-config-dnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CONFIG_DNODE
,
"mnode-config-dnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_ALTER_DNODE
,
"mnode-alter-dnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_DNODE
,
"mnode-drop-dnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_DNODE
,
"mnode-drop-dnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_MNODE
,
"mnode-create-mnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_MNODE
,
"mnode-create-mnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_ALTER_MNODE
,
"mnode-alter-mnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_MNODE
,
"mnode-drop-mnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_MNODE
,
"mnode-drop-mnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_QNODE
,
"mnode-create-qnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_ALTER_QNODE
,
"mnode-alter-qnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_QNODE
,
"mnode-drop-qnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_SNODE
,
"mnode-create-snode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_ALTER_SNODE
,
"mnode-alter-snode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_SNODE
,
"mnode-drop-snode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_BNODE
,
"mnode-create-bnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_ALTER_BNODE
,
"mnode-alter-bnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_BNODE
,
"mnode-drop-bnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_DB
,
"mnode-create-db"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_DB
,
"mnode-create-db"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_DB
,
"mnode-drop-db"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_DB
,
"mnode-drop-db"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_USE_DB
,
"mnode-use-db"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_USE_DB
,
"mnode-use-db"
,
NULL
,
NULL
)
...
...
include/dnode/mnode/sdb/sdb.h
浏览文件 @
c1169467
...
@@ -157,18 +157,21 @@ typedef enum {
...
@@ -157,18 +157,21 @@ typedef enum {
SDB_TRANS
=
1
,
SDB_TRANS
=
1
,
SDB_CLUSTER
=
2
,
SDB_CLUSTER
=
2
,
SDB_MNODE
=
3
,
SDB_MNODE
=
3
,
SDB_DNODE
=
4
,
SDB_QNODE
=
4
,
SDB_USER
=
5
,
SDB_SNODE
=
5
,
SDB_AUTH
=
6
,
SDB_BNODE
=
6
,
SDB_ACCT
=
7
,
SDB_DNODE
=
7
,
SDB_CONSUMER
=
8
,
SDB_USER
=
8
,
SDB_CGROUP
=
9
,
SDB_AUTH
=
9
,
SDB_TOPIC
=
10
,
SDB_ACCT
=
10
,
SDB_VGROUP
=
11
,
SDB_CONSUMER
=
11
,
SDB_STB
=
12
,
SDB_CGROUP
=
12
,
SDB_DB
=
13
,
SDB_TOPIC
=
13
,
SDB_FUNC
=
14
,
SDB_VGROUP
=
14
,
SDB_MAX
=
15
SDB_STB
=
15
,
SDB_DB
=
16
,
SDB_FUNC
=
17
,
SDB_MAX
=
18
}
ESdbType
;
}
ESdbType
;
typedef
struct
SSdb
SSdb
;
typedef
struct
SSdb
SSdb
;
...
...
include/util/taoserror.h
浏览文件 @
c1169467
...
@@ -172,10 +172,15 @@ int32_t* taosGetErrno();
...
@@ -172,10 +172,15 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_DNODE_EP TAOS_DEF_ERROR_CODE(0, 0x0347)
#define TSDB_CODE_MND_INVALID_DNODE_EP TAOS_DEF_ERROR_CODE(0, 0x0347)
#define TSDB_CODE_MND_INVALID_DNODE_ID TAOS_DEF_ERROR_CODE(0, 0x0348)
#define TSDB_CODE_MND_INVALID_DNODE_ID TAOS_DEF_ERROR_CODE(0, 0x0348)
// mnode-
m
node
// mnode-node
#define TSDB_CODE_MND_MNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0350)
#define TSDB_CODE_MND_MNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0350)
#define TSDB_CODE_MND_MNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0351)
#define TSDB_CODE_MND_MNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0351)
#define TSDB_CODE_MND_TOO_MANY_MNODES TAOS_DEF_ERROR_CODE(0, 0x0352)
#define TSDB_CODE_MND_QNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0352)
#define TSDB_CODE_MND_QNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0353)
#define TSDB_CODE_MND_SNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0354)
#define TSDB_CODE_MND_SNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0355)
#define TSDB_CODE_MND_BNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0356)
#define TSDB_CODE_MND_BNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0357)
// mnode-acct
// mnode-acct
#define TSDB_CODE_MND_ACCT_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360)
#define TSDB_CODE_MND_ACCT_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360)
...
...
source/dnode/mgmt/impl/src/dndBnode.c
浏览文件 @
c1169467
...
@@ -256,7 +256,7 @@ static int32_t dndDropBnode(SDnode *pDnode) {
...
@@ -256,7 +256,7 @@ static int32_t dndDropBnode(SDnode *pDnode) {
}
}
int32_t
dndProcessCreateBnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
int32_t
dndProcessCreateBnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
S
CreateBnodeIn
Msg
*
pMsg
=
pRpcMsg
->
pCont
;
S
DCreateBnode
Msg
*
pMsg
=
pRpcMsg
->
pCont
;
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
if
(
pMsg
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
if
(
pMsg
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
...
@@ -268,7 +268,7 @@ int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
...
@@ -268,7 +268,7 @@ int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
}
}
int32_t
dndProcessDropBnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
int32_t
dndProcessDropBnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
SD
ropBnodeIn
Msg
*
pMsg
=
pRpcMsg
->
pCont
;
SD
DropBnode
Msg
*
pMsg
=
pRpcMsg
->
pCont
;
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
if
(
pMsg
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
if
(
pMsg
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
...
...
source/dnode/mgmt/impl/src/dndQnode.c
浏览文件 @
c1169467
...
@@ -261,7 +261,7 @@ static int32_t dndDropQnode(SDnode *pDnode) {
...
@@ -261,7 +261,7 @@ static int32_t dndDropQnode(SDnode *pDnode) {
}
}
int32_t
dndProcessCreateQnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
int32_t
dndProcessCreateQnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
S
CreateQnodeIn
Msg
*
pMsg
=
pRpcMsg
->
pCont
;
S
DCreateQnode
Msg
*
pMsg
=
pRpcMsg
->
pCont
;
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
if
(
pMsg
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
if
(
pMsg
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
...
@@ -273,7 +273,7 @@ int32_t dndProcessCreateQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
...
@@ -273,7 +273,7 @@ int32_t dndProcessCreateQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
}
}
int32_t
dndProcessDropQnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
int32_t
dndProcessDropQnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
SD
ropQnodeIn
Msg
*
pMsg
=
pRpcMsg
->
pCont
;
SD
DropQnode
Msg
*
pMsg
=
pRpcMsg
->
pCont
;
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
if
(
pMsg
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
if
(
pMsg
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
...
...
source/dnode/mgmt/impl/src/dndSnode.c
浏览文件 @
c1169467
...
@@ -256,7 +256,7 @@ static int32_t dndDropSnode(SDnode *pDnode) {
...
@@ -256,7 +256,7 @@ static int32_t dndDropSnode(SDnode *pDnode) {
}
}
int32_t
dndProcessCreateSnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
int32_t
dndProcessCreateSnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
S
CreateSnodeIn
Msg
*
pMsg
=
pRpcMsg
->
pCont
;
S
DCreateSnode
Msg
*
pMsg
=
pRpcMsg
->
pCont
;
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
if
(
pMsg
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
if
(
pMsg
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
...
@@ -268,7 +268,7 @@ int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
...
@@ -268,7 +268,7 @@ int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
}
}
int32_t
dndProcessDropSnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
int32_t
dndProcessDropSnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
SD
ropSnodeIn
Msg
*
pMsg
=
pRpcMsg
->
pCont
;
SD
DropSnode
Msg
*
pMsg
=
pRpcMsg
->
pCont
;
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
if
(
pMsg
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
if
(
pMsg
->
dnodeId
!=
dndGetDnodeId
(
pDnode
))
{
...
...
source/dnode/mnode/impl/inc/mndB
alanc
e.h
→
source/dnode/mnode/impl/inc/mndB
nod
e.h
浏览文件 @
c1169467
...
@@ -13,8 +13,8 @@
...
@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#ifndef _TD_MND_B
ALANC
E_H_
#ifndef _TD_MND_B
NOD
E_H_
#define _TD_MND_B
ALANC
E_H_
#define _TD_MND_B
NOD
E_H_
#include "mndInt.h"
#include "mndInt.h"
...
@@ -22,11 +22,11 @@
...
@@ -22,11 +22,11 @@
extern
"C"
{
extern
"C"
{
#endif
#endif
int32_t
mndInitB
alanc
e
(
SMnode
*
pMnode
);
int32_t
mndInitB
nod
e
(
SMnode
*
pMnode
);
void
mndCleanupB
alanc
e
(
SMnode
*
pMnode
);
void
mndCleanupB
nod
e
(
SMnode
*
pMnode
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
#endif
/*_TD_MND_B
ALANC
E_H_*/
#endif
/*_TD_MND_B
NOD
E_H_*/
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
c1169467
...
@@ -144,6 +144,27 @@ typedef struct {
...
@@ -144,6 +144,27 @@ typedef struct {
SDnodeObj
*
pDnode
;
SDnodeObj
*
pDnode
;
}
SMnodeObj
;
}
SMnodeObj
;
typedef
struct
{
int32_t
id
;
int64_t
createdTime
;
int64_t
updateTime
;
SDnodeObj
*
pDnode
;
}
SQnodeObj
;
typedef
struct
{
int32_t
id
;
int64_t
createdTime
;
int64_t
updateTime
;
SDnodeObj
*
pDnode
;
}
SSnodeObj
;
typedef
struct
{
int32_t
id
;
int64_t
createdTime
;
int64_t
updateTime
;
SDnodeObj
*
pDnode
;
}
SBnodeObj
;
typedef
struct
{
typedef
struct
{
int32_t
maxUsers
;
int32_t
maxUsers
;
int32_t
maxDbs
;
int32_t
maxDbs
;
...
...
source/dnode/mnode/impl/
src/mndBalance.c
→
source/dnode/mnode/impl/
inc/mndQnode.h
浏览文件 @
c1169467
...
@@ -13,9 +13,20 @@
...
@@ -13,9 +13,20 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#define _DEFAULT_SOURCE
#ifndef _TD_MND_QNODE_H_
#include "os.h"
#define _TD_MND_QNODE_H_
#include "mndInt.h"
#include "mndInt.h"
int32_t
mndInitBalance
(
SMnode
*
pMnode
)
{
return
0
;
}
#ifdef __cplusplus
void
mndCleanupBalance
(
SMnode
*
pMnode
)
{}
extern
"C"
{
\ No newline at end of file
#endif
int32_t
mndInitQnode
(
SMnode
*
pMnode
);
void
mndCleanupQnode
(
SMnode
*
pMnode
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_MND_QNODE_H_*/
source/dnode/mnode/impl/inc/mndSnode.h
0 → 100644
浏览文件 @
c1169467
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MND_SNODE_H_
#define _TD_MND_SNODE_H_
#include "mndInt.h"
#ifdef __cplusplus
extern
"C"
{
#endif
int32_t
mndInitSnode
(
SMnode
*
pMnode
);
void
mndCleanupSnode
(
SMnode
*
pMnode
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_MND_SNODE_H_*/
source/dnode/mnode/impl/src/mndBnode.c
0 → 100644
浏览文件 @
c1169467
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndBnode.h"
#include "mndDnode.h"
#include "mndShow.h"
#include "mndTrans.h"
#define TSDB_BNODE_VER_NUMBER 1
#define TSDB_BNODE_RESERVE_SIZE 64
static
SSdbRaw
*
mndBnodeActionEncode
(
SBnodeObj
*
pObj
);
static
SSdbRow
*
mndBnodeActionDecode
(
SSdbRaw
*
pRaw
);
static
int32_t
mndBnodeActionInsert
(
SSdb
*
pSdb
,
SBnodeObj
*
pObj
);
static
int32_t
mndBnodeActionDelete
(
SSdb
*
pSdb
,
SBnodeObj
*
pObj
);
static
int32_t
mndBnodeActionUpdate
(
SSdb
*
pSdb
,
SBnodeObj
*
pOldBnode
,
SBnodeObj
*
pNewBnode
);
static
int32_t
mndProcessCreateBnodeReq
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropBnodeReq
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessCreateBnodeRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropBnodeRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndGetBnodeMeta
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
STableMetaMsg
*
pMeta
);
static
int32_t
mndRetrieveBnodes
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
);
static
void
mndCancelGetNextBnode
(
SMnode
*
pMnode
,
void
*
pIter
);
int32_t
mndInitBnode
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_BNODE
,
.
keyType
=
SDB_KEY_INT32
,
.
encodeFp
=
(
SdbEncodeFp
)
mndBnodeActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndBnodeActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndBnodeActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndBnodeActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndBnodeActionDelete
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_BNODE
,
mndProcessCreateBnodeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_BNODE
,
mndProcessDropBnodeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_CREATE_BNODE_RSP
,
mndProcessCreateBnodeRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_DROP_BNODE_RSP
,
mndProcessDropBnodeRsp
);
mndAddShowMetaHandle
(
pMnode
,
TSDB_MGMT_TABLE_BNODE
,
mndGetBnodeMeta
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_BNODE
,
mndRetrieveBnodes
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_BNODE
,
mndCancelGetNextBnode
);
return
sdbSetTable
(
pMnode
->
pSdb
,
table
);
}
void
mndCleanupBnode
(
SMnode
*
pMnode
)
{}
static
SBnodeObj
*
mndAcquireBnode
(
SMnode
*
pMnode
,
int32_t
snodeId
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SBnodeObj
*
pObj
=
sdbAcquire
(
pSdb
,
SDB_BNODE
,
&
snodeId
);
if
(
pObj
==
NULL
)
{
terrno
=
TSDB_CODE_MND_BNODE_NOT_EXIST
;
}
return
pObj
;
}
static
void
mndReleaseBnode
(
SMnode
*
pMnode
,
SBnodeObj
*
pObj
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbRelease
(
pSdb
,
pObj
);
}
static
SSdbRaw
*
mndBnodeActionEncode
(
SBnodeObj
*
pObj
)
{
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_BNODE
,
TSDB_BNODE_VER_NUMBER
,
sizeof
(
SBnodeObj
)
+
TSDB_BNODE_RESERVE_SIZE
);
if
(
pRaw
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
SDB_SET_INT32
(
pRaw
,
dataPos
,
pObj
->
id
);
SDB_SET_INT64
(
pRaw
,
dataPos
,
pObj
->
createdTime
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pObj
->
updateTime
)
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
TSDB_BNODE_RESERVE_SIZE
)
return
pRaw
;
}
static
SSdbRow
*
mndBnodeActionDecode
(
SSdbRaw
*
pRaw
)
{
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
return
NULL
;
if
(
sver
!=
TSDB_BNODE_VER_NUMBER
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
mError
(
"failed to decode snode since %s"
,
terrstr
());
return
NULL
;
}
SSdbRow
*
pRow
=
sdbAllocRow
(
sizeof
(
SBnodeObj
));
SBnodeObj
*
pObj
=
sdbGetRowObj
(
pRow
);
if
(
pObj
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pObj
->
id
)
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pObj
->
createdTime
)
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pObj
->
updateTime
)
SDB_GET_RESERVE
(
pRaw
,
pRow
,
dataPos
,
TSDB_BNODE_RESERVE_SIZE
)
return
pRow
;
}
static
int32_t
mndBnodeActionInsert
(
SSdb
*
pSdb
,
SBnodeObj
*
pObj
)
{
mTrace
(
"snode:%d, perform insert action"
,
pObj
->
id
);
pObj
->
pDnode
=
sdbAcquire
(
pSdb
,
SDB_DNODE
,
&
pObj
->
id
);
if
(
pObj
->
pDnode
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DNODE_NOT_EXIST
;
mError
(
"snode:%d, failed to perform insert action since %s"
,
pObj
->
id
,
terrstr
());
return
-
1
;
}
return
0
;
}
static
int32_t
mndBnodeActionDelete
(
SSdb
*
pSdb
,
SBnodeObj
*
pObj
)
{
mTrace
(
"snode:%d, perform delete action"
,
pObj
->
id
);
if
(
pObj
->
pDnode
!=
NULL
)
{
sdbRelease
(
pSdb
,
pObj
->
pDnode
);
pObj
->
pDnode
=
NULL
;
}
return
0
;
}
static
int32_t
mndBnodeActionUpdate
(
SSdb
*
pSdb
,
SBnodeObj
*
pOldBnode
,
SBnodeObj
*
pNewBnode
)
{
mTrace
(
"snode:%d, perform update action"
,
pOldBnode
->
id
);
pOldBnode
->
updateTime
=
pNewBnode
->
updateTime
;
return
0
;
}
static
int32_t
mndSetCreateBnodeRedoLogs
(
STrans
*
pTrans
,
SBnodeObj
*
pObj
)
{
SSdbRaw
*
pRedoRaw
=
mndBnodeActionEncode
(
pObj
);
if
(
pRedoRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pRedoRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_CREATING
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetCreateBnodeCommitLogs
(
STrans
*
pTrans
,
SBnodeObj
*
pObj
)
{
SSdbRaw
*
pCommitRaw
=
mndBnodeActionEncode
(
pObj
);
if
(
pCommitRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetCreateBnodeRedoActions
(
STrans
*
pTrans
,
SDnodeObj
*
pDnode
,
SBnodeObj
*
pObj
)
{
SDCreateBnodeMsg
*
pMsg
=
malloc
(
sizeof
(
SDCreateBnodeMsg
));
if
(
pMsg
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetDnodeEpset
(
pDnode
);
action
.
pCont
=
pMsg
;
action
.
contLen
=
sizeof
(
SDCreateBnodeMsg
);
action
.
msgType
=
TDMT_DND_CREATE_BNODE
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
free
(
pMsg
);
return
-
1
;
}
return
0
;
}
static
int32_t
mndCreateBnode
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SDnodeObj
*
pDnode
,
SMCreateBnodeMsg
*
pCreate
)
{
SBnodeObj
snodeObj
=
{
0
};
snodeObj
.
id
=
pDnode
->
id
;
snodeObj
.
createdTime
=
taosGetTimestampMs
();
snodeObj
.
updateTime
=
snodeObj
.
createdTime
;
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"snode:%d, failed to create since %s"
,
pCreate
->
dnodeId
,
terrstr
());
goto
CREATE_BNODE_OVER
;
}
mDebug
(
"trans:%d, used to create snode:%d"
,
pTrans
->
id
,
pCreate
->
dnodeId
);
if
(
mndSetCreateBnodeRedoLogs
(
pTrans
,
&
snodeObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set redo log since %s"
,
pTrans
->
id
,
terrstr
());
goto
CREATE_BNODE_OVER
;
}
if
(
mndSetCreateBnodeCommitLogs
(
pTrans
,
&
snodeObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set commit log since %s"
,
pTrans
->
id
,
terrstr
());
goto
CREATE_BNODE_OVER
;
}
if
(
mndSetCreateBnodeRedoActions
(
pTrans
,
pDnode
,
&
snodeObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set redo actions since %s"
,
pTrans
->
id
,
terrstr
());
goto
CREATE_BNODE_OVER
;
}
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
goto
CREATE_BNODE_OVER
;
}
code
=
0
;
CREATE_BNODE_OVER:
mndTransDrop
(
pTrans
);
return
code
;
}
static
int32_t
mndProcessCreateBnodeReq
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SMCreateBnodeMsg
*
pCreate
=
pMsg
->
rpcMsg
.
pCont
;
pCreate
->
dnodeId
=
htonl
(
pCreate
->
dnodeId
);
mDebug
(
"snode:%d, start to create"
,
pCreate
->
dnodeId
);
SBnodeObj
*
pObj
=
mndAcquireBnode
(
pMnode
,
pCreate
->
dnodeId
);
if
(
pObj
!=
NULL
)
{
mError
(
"snode:%d, snode already exist"
,
pObj
->
id
);
mndReleaseBnode
(
pMnode
,
pObj
);
return
-
1
;
}
SDnodeObj
*
pDnode
=
mndAcquireDnode
(
pMnode
,
pCreate
->
dnodeId
);
if
(
pDnode
==
NULL
)
{
mError
(
"snode:%d, dnode not exist"
,
pCreate
->
dnodeId
);
terrno
=
TSDB_CODE_MND_DNODE_NOT_EXIST
;
return
-
1
;
}
int32_t
code
=
mndCreateBnode
(
pMnode
,
pMsg
,
pDnode
,
pCreate
);
mndReleaseDnode
(
pMnode
,
pDnode
);
if
(
code
!=
0
)
{
mError
(
"snode:%d, failed to create since %s"
,
pCreate
->
dnodeId
,
terrstr
());
return
-
1
;
}
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
}
static
int32_t
mndSetDropBnodeRedoLogs
(
STrans
*
pTrans
,
SBnodeObj
*
pObj
)
{
SSdbRaw
*
pRedoRaw
=
mndBnodeActionEncode
(
pObj
);
if
(
pRedoRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pRedoRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_DROPPING
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetDropBnodeCommitLogs
(
STrans
*
pTrans
,
SBnodeObj
*
pObj
)
{
SSdbRaw
*
pCommitRaw
=
mndBnodeActionEncode
(
pObj
);
if
(
pCommitRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetDropBnodeRedoActions
(
STrans
*
pTrans
,
SDnodeObj
*
pDnode
,
SBnodeObj
*
pObj
)
{
SDDropBnodeMsg
*
pMsg
=
malloc
(
sizeof
(
SDDropBnodeMsg
));
if
(
pMsg
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetDnodeEpset
(
pDnode
);
action
.
pCont
=
pMsg
;
action
.
contLen
=
sizeof
(
SDDropBnodeMsg
);
action
.
msgType
=
TDMT_DND_DROP_BNODE
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
free
(
pMsg
);
return
-
1
;
}
return
0
;
}
static
int32_t
mndDropBnode
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SBnodeObj
*
pObj
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"snode:%d, failed to drop since %s"
,
pObj
->
id
,
terrstr
());
goto
DROP_BNODE_OVER
;
}
mDebug
(
"trans:%d, used to drop snode:%d"
,
pTrans
->
id
,
pObj
->
id
);
if
(
mndSetDropBnodeRedoLogs
(
pTrans
,
pObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set redo log since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_BNODE_OVER
;
}
if
(
mndSetDropBnodeCommitLogs
(
pTrans
,
pObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set commit log since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_BNODE_OVER
;
}
if
(
mndSetDropBnodeRedoActions
(
pTrans
,
pObj
->
pDnode
,
pObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set redo actions since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_BNODE_OVER
;
}
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_BNODE_OVER
;
}
code
=
0
;
DROP_BNODE_OVER:
mndTransDrop
(
pTrans
);
return
code
;
}
static
int32_t
mndProcessDropBnodeReq
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SMCreateBnodeMsg
*
pDrop
=
pMsg
->
rpcMsg
.
pCont
;
pDrop
->
dnodeId
=
htonl
(
pDrop
->
dnodeId
);
mDebug
(
"snode:%d, start to drop"
,
pDrop
->
dnodeId
);
if
(
pDrop
->
dnodeId
<=
0
)
{
terrno
=
TSDB_CODE_SDB_APP_ERROR
;
mError
(
"snode:%d, failed to drop since %s"
,
pDrop
->
dnodeId
,
terrstr
());
return
-
1
;
}
SBnodeObj
*
pObj
=
mndAcquireBnode
(
pMnode
,
pDrop
->
dnodeId
);
if
(
pObj
==
NULL
)
{
mError
(
"snode:%d, not exist"
,
pDrop
->
dnodeId
);
terrno
=
TSDB_CODE_MND_DNODE_NOT_EXIST
;
return
-
1
;
}
int32_t
code
=
mndDropBnode
(
pMnode
,
pMsg
,
pObj
);
if
(
code
!=
0
)
{
mError
(
"snode:%d, failed to drop since %s"
,
pMnode
->
dnodeId
,
terrstr
());
return
-
1
;
}
sdbRelease
(
pMnode
->
pSdb
,
pMnode
);
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
}
static
int32_t
mndProcessCreateBnodeRsp
(
SMnodeMsg
*
pMsg
)
{
mndTransProcessRsp
(
pMsg
);
return
0
;
}
static
int32_t
mndProcessDropBnodeRsp
(
SMnodeMsg
*
pMsg
)
{
mndTransProcessRsp
(
pMsg
);
return
0
;
}
static
int32_t
mndGetBnodeMeta
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
STableMetaMsg
*
pMeta
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
cols
=
0
;
SSchema
*
pSchema
=
pMeta
->
pSchema
;
pShow
->
bytes
[
cols
]
=
2
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_SMALLINT
;
strcpy
(
pSchema
[
cols
].
name
,
"id"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
TSDB_EP_LEN
+
VARSTR_HEADER_SIZE
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"endpoint"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
8
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
strcpy
(
pSchema
[
cols
].
name
,
"create_time"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pMeta
->
numOfColumns
=
htonl
(
cols
);
pShow
->
numOfColumns
=
cols
;
pShow
->
offset
[
0
]
=
0
;
for
(
int32_t
i
=
1
;
i
<
cols
;
++
i
)
{
pShow
->
offset
[
i
]
=
pShow
->
offset
[
i
-
1
]
+
pShow
->
bytes
[
i
-
1
];
}
pShow
->
numOfRows
=
sdbGetSize
(
pSdb
,
SDB_BNODE
);
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
strcpy
(
pMeta
->
tbFname
,
mndShowStr
(
pShow
->
type
));
return
0
;
}
static
int32_t
mndRetrieveBnodes
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfRows
=
0
;
int32_t
cols
=
0
;
SBnodeObj
*
pObj
=
NULL
;
char
*
pWrite
;
while
(
numOfRows
<
rows
)
{
pShow
->
pIter
=
sdbFetch
(
pSdb
,
SDB_BNODE
,
pShow
->
pIter
,
(
void
**
)
&
pObj
);
if
(
pShow
->
pIter
==
NULL
)
break
;
cols
=
0
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int16_t
*
)
pWrite
=
pObj
->
id
;
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
pObj
->
pDnode
->
ep
,
pShow
->
bytes
[
cols
]);
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int64_t
*
)
pWrite
=
pObj
->
createdTime
;
cols
++
;
numOfRows
++
;
sdbRelease
(
pSdb
,
pObj
);
}
mndVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
pShow
->
numOfReads
+=
numOfRows
;
return
numOfRows
;
}
static
void
mndCancelGetNextBnode
(
SMnode
*
pMnode
,
void
*
pIter
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
}
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
c1169467
...
@@ -329,7 +329,7 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
...
@@ -329,7 +329,7 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
static
int32_t
mndCreateMnode
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SDnodeObj
*
pDnode
,
SCreateMnodeMsg
*
pCreate
)
{
static
int32_t
mndCreateMnode
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SDnodeObj
*
pDnode
,
SCreateMnodeMsg
*
pCreate
)
{
SMnodeObj
mnodeObj
=
{
0
};
SMnodeObj
mnodeObj
=
{
0
};
mnodeObj
.
id
=
sdbGetMaxId
(
pMnode
->
pSdb
,
SDB_MNODE
)
;
mnodeObj
.
id
=
pDnode
->
id
;
mnodeObj
.
createdTime
=
taosGetTimestampMs
();
mnodeObj
.
createdTime
=
taosGetTimestampMs
();
mnodeObj
.
updateTime
=
mnodeObj
.
createdTime
;
mnodeObj
.
updateTime
=
mnodeObj
.
createdTime
;
...
@@ -562,7 +562,7 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) {
...
@@ -562,7 +562,7 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) {
return
-
1
;
return
-
1
;
}
}
sdbRelease
(
pMnode
->
pSdb
,
p
Mnode
);
sdbRelease
(
pMnode
->
pSdb
,
p
Obj
);
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
}
}
...
...
source/dnode/mnode/impl/src/mndQnode.c
0 → 100644
浏览文件 @
c1169467
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndQnode.h"
#include "mndDnode.h"
#include "mndShow.h"
#include "mndTrans.h"
#define TSDB_QNODE_VER_NUMBER 1
#define TSDB_QNODE_RESERVE_SIZE 64
static
SSdbRaw
*
mndQnodeActionEncode
(
SQnodeObj
*
pObj
);
static
SSdbRow
*
mndQnodeActionDecode
(
SSdbRaw
*
pRaw
);
static
int32_t
mndQnodeActionInsert
(
SSdb
*
pSdb
,
SQnodeObj
*
pObj
);
static
int32_t
mndQnodeActionDelete
(
SSdb
*
pSdb
,
SQnodeObj
*
pObj
);
static
int32_t
mndQnodeActionUpdate
(
SSdb
*
pSdb
,
SQnodeObj
*
pOldQnode
,
SQnodeObj
*
pNewQnode
);
static
int32_t
mndProcessCreateQnodeReq
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropQnodeReq
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessCreateQnodeRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropQnodeRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndGetQnodeMeta
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
STableMetaMsg
*
pMeta
);
static
int32_t
mndRetrieveQnodes
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
);
static
void
mndCancelGetNextQnode
(
SMnode
*
pMnode
,
void
*
pIter
);
int32_t
mndInitQnode
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_QNODE
,
.
keyType
=
SDB_KEY_INT32
,
.
encodeFp
=
(
SdbEncodeFp
)
mndQnodeActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndQnodeActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndQnodeActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndQnodeActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndQnodeActionDelete
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_QNODE
,
mndProcessCreateQnodeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_QNODE
,
mndProcessDropQnodeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_CREATE_QNODE_RSP
,
mndProcessCreateQnodeRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_DROP_QNODE_RSP
,
mndProcessDropQnodeRsp
);
mndAddShowMetaHandle
(
pMnode
,
TSDB_MGMT_TABLE_QNODE
,
mndGetQnodeMeta
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_QNODE
,
mndRetrieveQnodes
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_QNODE
,
mndCancelGetNextQnode
);
return
sdbSetTable
(
pMnode
->
pSdb
,
table
);
}
void
mndCleanupQnode
(
SMnode
*
pMnode
)
{}
static
SQnodeObj
*
mndAcquireQnode
(
SMnode
*
pMnode
,
int32_t
qnodeId
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SQnodeObj
*
pObj
=
sdbAcquire
(
pSdb
,
SDB_QNODE
,
&
qnodeId
);
if
(
pObj
==
NULL
)
{
terrno
=
TSDB_CODE_MND_QNODE_NOT_EXIST
;
}
return
pObj
;
}
static
void
mndReleaseQnode
(
SMnode
*
pMnode
,
SQnodeObj
*
pObj
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbRelease
(
pSdb
,
pObj
);
}
static
SSdbRaw
*
mndQnodeActionEncode
(
SQnodeObj
*
pObj
)
{
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_QNODE
,
TSDB_QNODE_VER_NUMBER
,
sizeof
(
SQnodeObj
)
+
TSDB_QNODE_RESERVE_SIZE
);
if
(
pRaw
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
SDB_SET_INT32
(
pRaw
,
dataPos
,
pObj
->
id
);
SDB_SET_INT64
(
pRaw
,
dataPos
,
pObj
->
createdTime
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pObj
->
updateTime
)
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
TSDB_QNODE_RESERVE_SIZE
)
return
pRaw
;
}
static
SSdbRow
*
mndQnodeActionDecode
(
SSdbRaw
*
pRaw
)
{
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
return
NULL
;
if
(
sver
!=
TSDB_QNODE_VER_NUMBER
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
mError
(
"failed to decode qnode since %s"
,
terrstr
());
return
NULL
;
}
SSdbRow
*
pRow
=
sdbAllocRow
(
sizeof
(
SQnodeObj
));
SQnodeObj
*
pObj
=
sdbGetRowObj
(
pRow
);
if
(
pObj
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pObj
->
id
)
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pObj
->
createdTime
)
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pObj
->
updateTime
)
SDB_GET_RESERVE
(
pRaw
,
pRow
,
dataPos
,
TSDB_QNODE_RESERVE_SIZE
)
return
pRow
;
}
static
int32_t
mndQnodeActionInsert
(
SSdb
*
pSdb
,
SQnodeObj
*
pObj
)
{
mTrace
(
"qnode:%d, perform insert action"
,
pObj
->
id
);
pObj
->
pDnode
=
sdbAcquire
(
pSdb
,
SDB_DNODE
,
&
pObj
->
id
);
if
(
pObj
->
pDnode
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DNODE_NOT_EXIST
;
mError
(
"qnode:%d, failed to perform insert action since %s"
,
pObj
->
id
,
terrstr
());
return
-
1
;
}
return
0
;
}
static
int32_t
mndQnodeActionDelete
(
SSdb
*
pSdb
,
SQnodeObj
*
pObj
)
{
mTrace
(
"qnode:%d, perform delete action"
,
pObj
->
id
);
if
(
pObj
->
pDnode
!=
NULL
)
{
sdbRelease
(
pSdb
,
pObj
->
pDnode
);
pObj
->
pDnode
=
NULL
;
}
return
0
;
}
static
int32_t
mndQnodeActionUpdate
(
SSdb
*
pSdb
,
SQnodeObj
*
pOldQnode
,
SQnodeObj
*
pNewQnode
)
{
mTrace
(
"qnode:%d, perform update action"
,
pOldQnode
->
id
);
pOldQnode
->
updateTime
=
pNewQnode
->
updateTime
;
return
0
;
}
static
int32_t
mndSetCreateQnodeRedoLogs
(
STrans
*
pTrans
,
SQnodeObj
*
pObj
)
{
SSdbRaw
*
pRedoRaw
=
mndQnodeActionEncode
(
pObj
);
if
(
pRedoRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pRedoRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_CREATING
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetCreateQnodeCommitLogs
(
STrans
*
pTrans
,
SQnodeObj
*
pObj
)
{
SSdbRaw
*
pCommitRaw
=
mndQnodeActionEncode
(
pObj
);
if
(
pCommitRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetCreateQnodeRedoActions
(
STrans
*
pTrans
,
SDnodeObj
*
pDnode
,
SQnodeObj
*
pObj
)
{
SDCreateQnodeMsg
*
pMsg
=
malloc
(
sizeof
(
SDCreateQnodeMsg
));
if
(
pMsg
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetDnodeEpset
(
pDnode
);
action
.
pCont
=
pMsg
;
action
.
contLen
=
sizeof
(
SDCreateQnodeMsg
);
action
.
msgType
=
TDMT_DND_CREATE_QNODE
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
free
(
pMsg
);
return
-
1
;
}
return
0
;
}
static
int32_t
mndCreateQnode
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SDnodeObj
*
pDnode
,
SMCreateQnodeMsg
*
pCreate
)
{
SQnodeObj
qnodeObj
=
{
0
};
qnodeObj
.
id
=
pDnode
->
id
;
qnodeObj
.
createdTime
=
taosGetTimestampMs
();
qnodeObj
.
updateTime
=
qnodeObj
.
createdTime
;
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"qnode:%d, failed to create since %s"
,
pCreate
->
dnodeId
,
terrstr
());
goto
CREATE_QNODE_OVER
;
}
mDebug
(
"trans:%d, used to create qnode:%d"
,
pTrans
->
id
,
pCreate
->
dnodeId
);
if
(
mndSetCreateQnodeRedoLogs
(
pTrans
,
&
qnodeObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set redo log since %s"
,
pTrans
->
id
,
terrstr
());
goto
CREATE_QNODE_OVER
;
}
if
(
mndSetCreateQnodeCommitLogs
(
pTrans
,
&
qnodeObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set commit log since %s"
,
pTrans
->
id
,
terrstr
());
goto
CREATE_QNODE_OVER
;
}
if
(
mndSetCreateQnodeRedoActions
(
pTrans
,
pDnode
,
&
qnodeObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set redo actions since %s"
,
pTrans
->
id
,
terrstr
());
goto
CREATE_QNODE_OVER
;
}
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
goto
CREATE_QNODE_OVER
;
}
code
=
0
;
CREATE_QNODE_OVER:
mndTransDrop
(
pTrans
);
return
code
;
}
static
int32_t
mndProcessCreateQnodeReq
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SMCreateQnodeMsg
*
pCreate
=
pMsg
->
rpcMsg
.
pCont
;
pCreate
->
dnodeId
=
htonl
(
pCreate
->
dnodeId
);
mDebug
(
"qnode:%d, start to create"
,
pCreate
->
dnodeId
);
SQnodeObj
*
pObj
=
mndAcquireQnode
(
pMnode
,
pCreate
->
dnodeId
);
if
(
pObj
!=
NULL
)
{
mError
(
"qnode:%d, qnode already exist"
,
pObj
->
id
);
mndReleaseQnode
(
pMnode
,
pObj
);
return
-
1
;
}
SDnodeObj
*
pDnode
=
mndAcquireDnode
(
pMnode
,
pCreate
->
dnodeId
);
if
(
pDnode
==
NULL
)
{
mError
(
"qnode:%d, dnode not exist"
,
pCreate
->
dnodeId
);
terrno
=
TSDB_CODE_MND_DNODE_NOT_EXIST
;
return
-
1
;
}
int32_t
code
=
mndCreateQnode
(
pMnode
,
pMsg
,
pDnode
,
pCreate
);
mndReleaseDnode
(
pMnode
,
pDnode
);
if
(
code
!=
0
)
{
mError
(
"qnode:%d, failed to create since %s"
,
pCreate
->
dnodeId
,
terrstr
());
return
-
1
;
}
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
}
static
int32_t
mndSetDropQnodeRedoLogs
(
STrans
*
pTrans
,
SQnodeObj
*
pObj
)
{
SSdbRaw
*
pRedoRaw
=
mndQnodeActionEncode
(
pObj
);
if
(
pRedoRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pRedoRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_DROPPING
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetDropQnodeCommitLogs
(
STrans
*
pTrans
,
SQnodeObj
*
pObj
)
{
SSdbRaw
*
pCommitRaw
=
mndQnodeActionEncode
(
pObj
);
if
(
pCommitRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetDropQnodeRedoActions
(
STrans
*
pTrans
,
SDnodeObj
*
pDnode
,
SQnodeObj
*
pObj
)
{
SDDropQnodeMsg
*
pMsg
=
malloc
(
sizeof
(
SDDropQnodeMsg
));
if
(
pMsg
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetDnodeEpset
(
pDnode
);
action
.
pCont
=
pMsg
;
action
.
contLen
=
sizeof
(
SDDropQnodeMsg
);
action
.
msgType
=
TDMT_DND_DROP_QNODE
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
free
(
pMsg
);
return
-
1
;
}
return
0
;
}
static
int32_t
mndDropQnode
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SQnodeObj
*
pObj
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"qnode:%d, failed to drop since %s"
,
pObj
->
id
,
terrstr
());
goto
DROP_QNODE_OVER
;
}
mDebug
(
"trans:%d, used to drop qnode:%d"
,
pTrans
->
id
,
pObj
->
id
);
if
(
mndSetDropQnodeRedoLogs
(
pTrans
,
pObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set redo log since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_QNODE_OVER
;
}
if
(
mndSetDropQnodeCommitLogs
(
pTrans
,
pObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set commit log since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_QNODE_OVER
;
}
if
(
mndSetDropQnodeRedoActions
(
pTrans
,
pObj
->
pDnode
,
pObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set redo actions since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_QNODE_OVER
;
}
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_QNODE_OVER
;
}
code
=
0
;
DROP_QNODE_OVER:
mndTransDrop
(
pTrans
);
return
code
;
}
static
int32_t
mndProcessDropQnodeReq
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SMCreateQnodeMsg
*
pDrop
=
pMsg
->
rpcMsg
.
pCont
;
pDrop
->
dnodeId
=
htonl
(
pDrop
->
dnodeId
);
mDebug
(
"qnode:%d, start to drop"
,
pDrop
->
dnodeId
);
if
(
pDrop
->
dnodeId
<=
0
)
{
terrno
=
TSDB_CODE_SDB_APP_ERROR
;
mError
(
"qnode:%d, failed to drop since %s"
,
pDrop
->
dnodeId
,
terrstr
());
return
-
1
;
}
SQnodeObj
*
pObj
=
mndAcquireQnode
(
pMnode
,
pDrop
->
dnodeId
);
if
(
pObj
==
NULL
)
{
mError
(
"qnode:%d, not exist"
,
pDrop
->
dnodeId
);
terrno
=
TSDB_CODE_MND_DNODE_NOT_EXIST
;
return
-
1
;
}
int32_t
code
=
mndDropQnode
(
pMnode
,
pMsg
,
pObj
);
if
(
code
!=
0
)
{
mError
(
"qnode:%d, failed to drop since %s"
,
pMnode
->
dnodeId
,
terrstr
());
return
-
1
;
}
sdbRelease
(
pMnode
->
pSdb
,
pMnode
);
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
}
static
int32_t
mndProcessCreateQnodeRsp
(
SMnodeMsg
*
pMsg
)
{
mndTransProcessRsp
(
pMsg
);
return
0
;
}
static
int32_t
mndProcessDropQnodeRsp
(
SMnodeMsg
*
pMsg
)
{
mndTransProcessRsp
(
pMsg
);
return
0
;
}
static
int32_t
mndGetQnodeMeta
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
STableMetaMsg
*
pMeta
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
cols
=
0
;
SSchema
*
pSchema
=
pMeta
->
pSchema
;
pShow
->
bytes
[
cols
]
=
2
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_SMALLINT
;
strcpy
(
pSchema
[
cols
].
name
,
"id"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
TSDB_EP_LEN
+
VARSTR_HEADER_SIZE
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"endpoint"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
8
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
strcpy
(
pSchema
[
cols
].
name
,
"create_time"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pMeta
->
numOfColumns
=
htonl
(
cols
);
pShow
->
numOfColumns
=
cols
;
pShow
->
offset
[
0
]
=
0
;
for
(
int32_t
i
=
1
;
i
<
cols
;
++
i
)
{
pShow
->
offset
[
i
]
=
pShow
->
offset
[
i
-
1
]
+
pShow
->
bytes
[
i
-
1
];
}
pShow
->
numOfRows
=
sdbGetSize
(
pSdb
,
SDB_QNODE
);
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
strcpy
(
pMeta
->
tbFname
,
mndShowStr
(
pShow
->
type
));
return
0
;
}
static
int32_t
mndRetrieveQnodes
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfRows
=
0
;
int32_t
cols
=
0
;
SQnodeObj
*
pObj
=
NULL
;
char
*
pWrite
;
while
(
numOfRows
<
rows
)
{
pShow
->
pIter
=
sdbFetch
(
pSdb
,
SDB_QNODE
,
pShow
->
pIter
,
(
void
**
)
&
pObj
);
if
(
pShow
->
pIter
==
NULL
)
break
;
cols
=
0
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int16_t
*
)
pWrite
=
pObj
->
id
;
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
pObj
->
pDnode
->
ep
,
pShow
->
bytes
[
cols
]);
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int64_t
*
)
pWrite
=
pObj
->
createdTime
;
cols
++
;
numOfRows
++
;
sdbRelease
(
pSdb
,
pObj
);
}
mndVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
pShow
->
numOfReads
+=
numOfRows
;
return
numOfRows
;
}
static
void
mndCancelGetNextQnode
(
SMnode
*
pMnode
,
void
*
pIter
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
}
source/dnode/mnode/impl/src/mndSnode.c
0 → 100644
浏览文件 @
c1169467
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mndSnode.h"
#include "mndDnode.h"
#include "mndShow.h"
#include "mndTrans.h"
#define TSDB_SNODE_VER_NUMBER 1
#define TSDB_SNODE_RESERVE_SIZE 64
static
SSdbRaw
*
mndSnodeActionEncode
(
SSnodeObj
*
pObj
);
static
SSdbRow
*
mndSnodeActionDecode
(
SSdbRaw
*
pRaw
);
static
int32_t
mndSnodeActionInsert
(
SSdb
*
pSdb
,
SSnodeObj
*
pObj
);
static
int32_t
mndSnodeActionDelete
(
SSdb
*
pSdb
,
SSnodeObj
*
pObj
);
static
int32_t
mndSnodeActionUpdate
(
SSdb
*
pSdb
,
SSnodeObj
*
pOldSnode
,
SSnodeObj
*
pNewSnode
);
static
int32_t
mndProcessCreateSnodeReq
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropSnodeReq
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessCreateSnodeRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndProcessDropSnodeRsp
(
SMnodeMsg
*
pMsg
);
static
int32_t
mndGetSnodeMeta
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
STableMetaMsg
*
pMeta
);
static
int32_t
mndRetrieveSnodes
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
);
static
void
mndCancelGetNextSnode
(
SMnode
*
pMnode
,
void
*
pIter
);
int32_t
mndInitSnode
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_SNODE
,
.
keyType
=
SDB_KEY_INT32
,
.
encodeFp
=
(
SdbEncodeFp
)
mndSnodeActionEncode
,
.
decodeFp
=
(
SdbDecodeFp
)
mndSnodeActionDecode
,
.
insertFp
=
(
SdbInsertFp
)
mndSnodeActionInsert
,
.
updateFp
=
(
SdbUpdateFp
)
mndSnodeActionUpdate
,
.
deleteFp
=
(
SdbDeleteFp
)
mndSnodeActionDelete
};
mndSetMsgHandle
(
pMnode
,
TDMT_MND_CREATE_SNODE
,
mndProcessCreateSnodeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_SNODE
,
mndProcessDropSnodeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_CREATE_SNODE_RSP
,
mndProcessCreateSnodeRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_DROP_SNODE_RSP
,
mndProcessDropSnodeRsp
);
mndAddShowMetaHandle
(
pMnode
,
TSDB_MGMT_TABLE_SNODE
,
mndGetSnodeMeta
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_SNODE
,
mndRetrieveSnodes
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_SNODE
,
mndCancelGetNextSnode
);
return
sdbSetTable
(
pMnode
->
pSdb
,
table
);
}
void
mndCleanupSnode
(
SMnode
*
pMnode
)
{}
static
SSnodeObj
*
mndAcquireSnode
(
SMnode
*
pMnode
,
int32_t
snodeId
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSnodeObj
*
pObj
=
sdbAcquire
(
pSdb
,
SDB_SNODE
,
&
snodeId
);
if
(
pObj
==
NULL
)
{
terrno
=
TSDB_CODE_MND_SNODE_NOT_EXIST
;
}
return
pObj
;
}
static
void
mndReleaseSnode
(
SMnode
*
pMnode
,
SSnodeObj
*
pObj
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbRelease
(
pSdb
,
pObj
);
}
static
SSdbRaw
*
mndSnodeActionEncode
(
SSnodeObj
*
pObj
)
{
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_SNODE
,
TSDB_SNODE_VER_NUMBER
,
sizeof
(
SSnodeObj
)
+
TSDB_SNODE_RESERVE_SIZE
);
if
(
pRaw
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
SDB_SET_INT32
(
pRaw
,
dataPos
,
pObj
->
id
);
SDB_SET_INT64
(
pRaw
,
dataPos
,
pObj
->
createdTime
)
SDB_SET_INT64
(
pRaw
,
dataPos
,
pObj
->
updateTime
)
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
TSDB_SNODE_RESERVE_SIZE
)
return
pRaw
;
}
static
SSdbRow
*
mndSnodeActionDecode
(
SSdbRaw
*
pRaw
)
{
int8_t
sver
=
0
;
if
(
sdbGetRawSoftVer
(
pRaw
,
&
sver
)
!=
0
)
return
NULL
;
if
(
sver
!=
TSDB_SNODE_VER_NUMBER
)
{
terrno
=
TSDB_CODE_SDB_INVALID_DATA_VER
;
mError
(
"failed to decode snode since %s"
,
terrstr
());
return
NULL
;
}
SSdbRow
*
pRow
=
sdbAllocRow
(
sizeof
(
SSnodeObj
));
SSnodeObj
*
pObj
=
sdbGetRowObj
(
pRow
);
if
(
pObj
==
NULL
)
return
NULL
;
int32_t
dataPos
=
0
;
SDB_GET_INT32
(
pRaw
,
pRow
,
dataPos
,
&
pObj
->
id
)
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pObj
->
createdTime
)
SDB_GET_INT64
(
pRaw
,
pRow
,
dataPos
,
&
pObj
->
updateTime
)
SDB_GET_RESERVE
(
pRaw
,
pRow
,
dataPos
,
TSDB_SNODE_RESERVE_SIZE
)
return
pRow
;
}
static
int32_t
mndSnodeActionInsert
(
SSdb
*
pSdb
,
SSnodeObj
*
pObj
)
{
mTrace
(
"snode:%d, perform insert action"
,
pObj
->
id
);
pObj
->
pDnode
=
sdbAcquire
(
pSdb
,
SDB_DNODE
,
&
pObj
->
id
);
if
(
pObj
->
pDnode
==
NULL
)
{
terrno
=
TSDB_CODE_MND_DNODE_NOT_EXIST
;
mError
(
"snode:%d, failed to perform insert action since %s"
,
pObj
->
id
,
terrstr
());
return
-
1
;
}
return
0
;
}
static
int32_t
mndSnodeActionDelete
(
SSdb
*
pSdb
,
SSnodeObj
*
pObj
)
{
mTrace
(
"snode:%d, perform delete action"
,
pObj
->
id
);
if
(
pObj
->
pDnode
!=
NULL
)
{
sdbRelease
(
pSdb
,
pObj
->
pDnode
);
pObj
->
pDnode
=
NULL
;
}
return
0
;
}
static
int32_t
mndSnodeActionUpdate
(
SSdb
*
pSdb
,
SSnodeObj
*
pOldSnode
,
SSnodeObj
*
pNewSnode
)
{
mTrace
(
"snode:%d, perform update action"
,
pOldSnode
->
id
);
pOldSnode
->
updateTime
=
pNewSnode
->
updateTime
;
return
0
;
}
static
int32_t
mndSetCreateSnodeRedoLogs
(
STrans
*
pTrans
,
SSnodeObj
*
pObj
)
{
SSdbRaw
*
pRedoRaw
=
mndSnodeActionEncode
(
pObj
);
if
(
pRedoRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pRedoRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_CREATING
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetCreateSnodeCommitLogs
(
STrans
*
pTrans
,
SSnodeObj
*
pObj
)
{
SSdbRaw
*
pCommitRaw
=
mndSnodeActionEncode
(
pObj
);
if
(
pCommitRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_READY
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetCreateSnodeRedoActions
(
STrans
*
pTrans
,
SDnodeObj
*
pDnode
,
SSnodeObj
*
pObj
)
{
SDCreateSnodeMsg
*
pMsg
=
malloc
(
sizeof
(
SDCreateSnodeMsg
));
if
(
pMsg
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetDnodeEpset
(
pDnode
);
action
.
pCont
=
pMsg
;
action
.
contLen
=
sizeof
(
SDCreateSnodeMsg
);
action
.
msgType
=
TDMT_DND_CREATE_SNODE
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
free
(
pMsg
);
return
-
1
;
}
return
0
;
}
static
int32_t
mndCreateSnode
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SDnodeObj
*
pDnode
,
SMCreateSnodeMsg
*
pCreate
)
{
SSnodeObj
snodeObj
=
{
0
};
snodeObj
.
id
=
pDnode
->
id
;
snodeObj
.
createdTime
=
taosGetTimestampMs
();
snodeObj
.
updateTime
=
snodeObj
.
createdTime
;
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"snode:%d, failed to create since %s"
,
pCreate
->
dnodeId
,
terrstr
());
goto
CREATE_SNODE_OVER
;
}
mDebug
(
"trans:%d, used to create snode:%d"
,
pTrans
->
id
,
pCreate
->
dnodeId
);
if
(
mndSetCreateSnodeRedoLogs
(
pTrans
,
&
snodeObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set redo log since %s"
,
pTrans
->
id
,
terrstr
());
goto
CREATE_SNODE_OVER
;
}
if
(
mndSetCreateSnodeCommitLogs
(
pTrans
,
&
snodeObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set commit log since %s"
,
pTrans
->
id
,
terrstr
());
goto
CREATE_SNODE_OVER
;
}
if
(
mndSetCreateSnodeRedoActions
(
pTrans
,
pDnode
,
&
snodeObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set redo actions since %s"
,
pTrans
->
id
,
terrstr
());
goto
CREATE_SNODE_OVER
;
}
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
goto
CREATE_SNODE_OVER
;
}
code
=
0
;
CREATE_SNODE_OVER:
mndTransDrop
(
pTrans
);
return
code
;
}
static
int32_t
mndProcessCreateSnodeReq
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SMCreateSnodeMsg
*
pCreate
=
pMsg
->
rpcMsg
.
pCont
;
pCreate
->
dnodeId
=
htonl
(
pCreate
->
dnodeId
);
mDebug
(
"snode:%d, start to create"
,
pCreate
->
dnodeId
);
SSnodeObj
*
pObj
=
mndAcquireSnode
(
pMnode
,
pCreate
->
dnodeId
);
if
(
pObj
!=
NULL
)
{
mError
(
"snode:%d, snode already exist"
,
pObj
->
id
);
mndReleaseSnode
(
pMnode
,
pObj
);
return
-
1
;
}
SDnodeObj
*
pDnode
=
mndAcquireDnode
(
pMnode
,
pCreate
->
dnodeId
);
if
(
pDnode
==
NULL
)
{
mError
(
"snode:%d, dnode not exist"
,
pCreate
->
dnodeId
);
terrno
=
TSDB_CODE_MND_DNODE_NOT_EXIST
;
return
-
1
;
}
int32_t
code
=
mndCreateSnode
(
pMnode
,
pMsg
,
pDnode
,
pCreate
);
mndReleaseDnode
(
pMnode
,
pDnode
);
if
(
code
!=
0
)
{
mError
(
"snode:%d, failed to create since %s"
,
pCreate
->
dnodeId
,
terrstr
());
return
-
1
;
}
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
}
static
int32_t
mndSetDropSnodeRedoLogs
(
STrans
*
pTrans
,
SSnodeObj
*
pObj
)
{
SSdbRaw
*
pRedoRaw
=
mndSnodeActionEncode
(
pObj
);
if
(
pRedoRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendRedolog
(
pTrans
,
pRedoRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pRedoRaw
,
SDB_STATUS_DROPPING
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetDropSnodeCommitLogs
(
STrans
*
pTrans
,
SSnodeObj
*
pObj
)
{
SSdbRaw
*
pCommitRaw
=
mndSnodeActionEncode
(
pObj
);
if
(
pCommitRaw
==
NULL
)
return
-
1
;
if
(
mndTransAppendCommitlog
(
pTrans
,
pCommitRaw
)
!=
0
)
return
-
1
;
if
(
sdbSetRawStatus
(
pCommitRaw
,
SDB_STATUS_DROPPED
)
!=
0
)
return
-
1
;
return
0
;
}
static
int32_t
mndSetDropSnodeRedoActions
(
STrans
*
pTrans
,
SDnodeObj
*
pDnode
,
SSnodeObj
*
pObj
)
{
SDDropSnodeMsg
*
pMsg
=
malloc
(
sizeof
(
SDDropSnodeMsg
));
if
(
pMsg
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pMsg
->
dnodeId
=
htonl
(
pMsg
->
dnodeId
);
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetDnodeEpset
(
pDnode
);
action
.
pCont
=
pMsg
;
action
.
contLen
=
sizeof
(
SDDropSnodeMsg
);
action
.
msgType
=
TDMT_DND_DROP_SNODE
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
free
(
pMsg
);
return
-
1
;
}
return
0
;
}
static
int32_t
mndDropSnode
(
SMnode
*
pMnode
,
SMnodeMsg
*
pMsg
,
SSnodeObj
*
pObj
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
&
pMsg
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"snode:%d, failed to drop since %s"
,
pObj
->
id
,
terrstr
());
goto
DROP_SNODE_OVER
;
}
mDebug
(
"trans:%d, used to drop snode:%d"
,
pTrans
->
id
,
pObj
->
id
);
if
(
mndSetDropSnodeRedoLogs
(
pTrans
,
pObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set redo log since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_SNODE_OVER
;
}
if
(
mndSetDropSnodeCommitLogs
(
pTrans
,
pObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set commit log since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_SNODE_OVER
;
}
if
(
mndSetDropSnodeRedoActions
(
pTrans
,
pObj
->
pDnode
,
pObj
)
!=
0
)
{
mError
(
"trans:%d, failed to set redo actions since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_SNODE_OVER
;
}
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
goto
DROP_SNODE_OVER
;
}
code
=
0
;
DROP_SNODE_OVER:
mndTransDrop
(
pTrans
);
return
code
;
}
static
int32_t
mndProcessDropSnodeReq
(
SMnodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SMCreateSnodeMsg
*
pDrop
=
pMsg
->
rpcMsg
.
pCont
;
pDrop
->
dnodeId
=
htonl
(
pDrop
->
dnodeId
);
mDebug
(
"snode:%d, start to drop"
,
pDrop
->
dnodeId
);
if
(
pDrop
->
dnodeId
<=
0
)
{
terrno
=
TSDB_CODE_SDB_APP_ERROR
;
mError
(
"snode:%d, failed to drop since %s"
,
pDrop
->
dnodeId
,
terrstr
());
return
-
1
;
}
SSnodeObj
*
pObj
=
mndAcquireSnode
(
pMnode
,
pDrop
->
dnodeId
);
if
(
pObj
==
NULL
)
{
mError
(
"snode:%d, not exist"
,
pDrop
->
dnodeId
);
terrno
=
TSDB_CODE_MND_DNODE_NOT_EXIST
;
return
-
1
;
}
int32_t
code
=
mndDropSnode
(
pMnode
,
pMsg
,
pObj
);
if
(
code
!=
0
)
{
mError
(
"snode:%d, failed to drop since %s"
,
pMnode
->
dnodeId
,
terrstr
());
return
-
1
;
}
sdbRelease
(
pMnode
->
pSdb
,
pMnode
);
return
TSDB_CODE_MND_ACTION_IN_PROGRESS
;
}
static
int32_t
mndProcessCreateSnodeRsp
(
SMnodeMsg
*
pMsg
)
{
mndTransProcessRsp
(
pMsg
);
return
0
;
}
static
int32_t
mndProcessDropSnodeRsp
(
SMnodeMsg
*
pMsg
)
{
mndTransProcessRsp
(
pMsg
);
return
0
;
}
static
int32_t
mndGetSnodeMeta
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
STableMetaMsg
*
pMeta
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
cols
=
0
;
SSchema
*
pSchema
=
pMeta
->
pSchema
;
pShow
->
bytes
[
cols
]
=
2
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_SMALLINT
;
strcpy
(
pSchema
[
cols
].
name
,
"id"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
TSDB_EP_LEN
+
VARSTR_HEADER_SIZE
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"endpoint"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
8
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
strcpy
(
pSchema
[
cols
].
name
,
"create_time"
);
pSchema
[
cols
].
bytes
=
htonl
(
pShow
->
bytes
[
cols
]);
cols
++
;
pMeta
->
numOfColumns
=
htonl
(
cols
);
pShow
->
numOfColumns
=
cols
;
pShow
->
offset
[
0
]
=
0
;
for
(
int32_t
i
=
1
;
i
<
cols
;
++
i
)
{
pShow
->
offset
[
i
]
=
pShow
->
offset
[
i
-
1
]
+
pShow
->
bytes
[
i
-
1
];
}
pShow
->
numOfRows
=
sdbGetSize
(
pSdb
,
SDB_SNODE
);
pShow
->
rowSize
=
pShow
->
offset
[
cols
-
1
]
+
pShow
->
bytes
[
cols
-
1
];
strcpy
(
pMeta
->
tbFname
,
mndShowStr
(
pShow
->
type
));
return
0
;
}
static
int32_t
mndRetrieveSnodes
(
SMnodeMsg
*
pMsg
,
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
)
{
SMnode
*
pMnode
=
pMsg
->
pMnode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfRows
=
0
;
int32_t
cols
=
0
;
SSnodeObj
*
pObj
=
NULL
;
char
*
pWrite
;
while
(
numOfRows
<
rows
)
{
pShow
->
pIter
=
sdbFetch
(
pSdb
,
SDB_SNODE
,
pShow
->
pIter
,
(
void
**
)
&
pObj
);
if
(
pShow
->
pIter
==
NULL
)
break
;
cols
=
0
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int16_t
*
)
pWrite
=
pObj
->
id
;
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
pObj
->
pDnode
->
ep
,
pShow
->
bytes
[
cols
]);
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int64_t
*
)
pWrite
=
pObj
->
createdTime
;
cols
++
;
numOfRows
++
;
sdbRelease
(
pSdb
,
pObj
);
}
mndVacuumResult
(
data
,
pShow
->
numOfColumns
,
numOfRows
,
rows
,
pShow
);
pShow
->
numOfReads
+=
numOfRows
;
return
numOfRows
;
}
static
void
mndCancelGetNextSnode
(
SMnode
*
pMnode
,
void
*
pIter
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
}
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
c1169467
...
@@ -16,14 +16,16 @@
...
@@ -16,14 +16,16 @@
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include "mndAcct.h"
#include "mndAcct.h"
#include "mndAuth.h"
#include "mndAuth.h"
#include "mndB
alanc
e.h"
#include "mndB
nod
e.h"
#include "mndCluster.h"
#include "mndCluster.h"
#include "mndDb.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndDnode.h"
#include "mndFunc.h"
#include "mndFunc.h"
#include "mndMnode.h"
#include "mndMnode.h"
#include "mndProfile.h"
#include "mndProfile.h"
#include "mndQnode.h"
#include "mndShow.h"
#include "mndShow.h"
#include "mndSnode.h"
#include "mndStb.h"
#include "mndStb.h"
#include "mndSync.h"
#include "mndSync.h"
#include "mndTelem.h"
#include "mndTelem.h"
...
@@ -147,6 +149,9 @@ static int32_t mndInitSteps(SMnode *pMnode) {
...
@@ -147,6 +149,9 @@ static int32_t mndInitSteps(SMnode *pMnode) {
if
(
mndAllocStep
(
pMnode
,
"mnode-trans"
,
mndInitTrans
,
mndCleanupTrans
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-trans"
,
mndInitTrans
,
mndCleanupTrans
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-cluster"
,
mndInitCluster
,
mndCleanupCluster
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-cluster"
,
mndInitCluster
,
mndCleanupCluster
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-mnode"
,
mndInitMnode
,
mndCleanupMnode
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-mnode"
,
mndInitMnode
,
mndCleanupMnode
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-qnode"
,
mndInitQnode
,
mndCleanupQnode
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-qnode"
,
mndInitSnode
,
mndCleanupSnode
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-qnode"
,
mndInitBnode
,
mndCleanupBnode
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-dnode"
,
mndInitDnode
,
mndCleanupDnode
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-dnode"
,
mndInitDnode
,
mndCleanupDnode
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-user"
,
mndInitUser
,
mndCleanupUser
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-user"
,
mndInitUser
,
mndCleanupUser
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-auth"
,
mndInitAuth
,
mndCleanupAuth
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-auth"
,
mndInitAuth
,
mndCleanupAuth
)
!=
0
)
return
-
1
;
...
@@ -162,7 +167,6 @@ static int32_t mndInitSteps(SMnode *pMnode) {
...
@@ -162,7 +167,6 @@ static int32_t mndInitSteps(SMnode *pMnode) {
if
(
mndAllocStep
(
pMnode
,
"mnode-sdb-read"
,
mndReadSdb
,
NULL
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-sdb-read"
,
mndReadSdb
,
NULL
)
!=
0
)
return
-
1
;
}
}
if
(
mndAllocStep
(
pMnode
,
"mnode-timer"
,
mndInitTimer
,
NULL
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-timer"
,
mndInitTimer
,
NULL
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-balance"
,
mndInitBalance
,
mndCleanupBalance
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-profile"
,
mndInitProfile
,
mndCleanupProfile
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-profile"
,
mndInitProfile
,
mndCleanupProfile
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-show"
,
mndInitShow
,
mndCleanupShow
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-show"
,
mndInitShow
,
mndCleanupShow
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-sync"
,
mndInitSync
,
mndCleanupSync
)
!=
0
)
return
-
1
;
if
(
mndAllocStep
(
pMnode
,
"mnode-sync"
,
mndInitSync
,
mndCleanupSync
)
!=
0
)
return
-
1
;
...
...
source/dnode/mnode/sdb/src/sdbHash.c
浏览文件 @
c1169467
...
@@ -24,6 +24,12 @@ static const char *sdbTableName(ESdbType type) {
...
@@ -24,6 +24,12 @@ static const char *sdbTableName(ESdbType type) {
return
"cluster"
;
return
"cluster"
;
case
SDB_MNODE
:
case
SDB_MNODE
:
return
"mnode"
;
return
"mnode"
;
case
SDB_QNODE
:
return
"qnode"
;
case
SDB_SNODE
:
return
"snode"
;
case
SDB_BNODE
:
return
"bnode"
;
case
SDB_DNODE
:
case
SDB_DNODE
:
return
"dnode"
;
return
"dnode"
;
case
SDB_USER
:
case
SDB_USER
:
...
@@ -55,7 +61,8 @@ void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) {
...
@@ -55,7 +61,8 @@ void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper) {
}
else
if
(
keyType
==
SDB_KEY_INT32
)
{
}
else
if
(
keyType
==
SDB_KEY_INT32
)
{
mTrace
(
"%s:%d, refCount:%d oper:%s"
,
sdbTableName
(
pRow
->
type
),
*
(
int32_t
*
)
pRow
->
pObj
,
pRow
->
refCount
,
oper
);
mTrace
(
"%s:%d, refCount:%d oper:%s"
,
sdbTableName
(
pRow
->
type
),
*
(
int32_t
*
)
pRow
->
pObj
,
pRow
->
refCount
,
oper
);
}
else
if
(
keyType
==
SDB_KEY_INT64
)
{
}
else
if
(
keyType
==
SDB_KEY_INT64
)
{
mTrace
(
"%s:%"
PRId64
", refCount:%d oper:%s"
,
sdbTableName
(
pRow
->
type
),
*
(
int64_t
*
)
pRow
->
pObj
,
pRow
->
refCount
,
oper
);
mTrace
(
"%s:%"
PRId64
", refCount:%d oper:%s"
,
sdbTableName
(
pRow
->
type
),
*
(
int64_t
*
)
pRow
->
pObj
,
pRow
->
refCount
,
oper
);
}
else
{
}
else
{
}
}
}
}
...
...
source/util/src/terror.c
浏览文件 @
c1169467
...
@@ -182,10 +182,15 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DNODE_CFG, "Invalid dnode cfg")
...
@@ -182,10 +182,15 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_DNODE_CFG, "Invalid dnode cfg")
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_DNODE_EP
,
"Invalid dnode end point"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_DNODE_EP
,
"Invalid dnode end point"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_DNODE_ID
,
"Invalid dnode id"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_INVALID_DNODE_ID
,
"Invalid dnode id"
)
// mnode-
m
node
// mnode-node
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_MNODE_ALREADY_EXIST
,
"Mnode already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_MNODE_ALREADY_EXIST
,
"Mnode already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_MNODE_NOT_EXIST
,
"Mnode not there"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_MNODE_NOT_EXIST
,
"Mnode not there"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_TOO_MANY_MNODES
,
"Too many mnodes"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_QNODE_ALREADY_EXIST
,
"Qnode already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_QNODE_NOT_EXIST
,
"Qnode not there"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_SNODE_ALREADY_EXIST
,
"Snode already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_SNODE_NOT_EXIST
,
"Snode not there"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_BNODE_ALREADY_EXIST
,
"Bnode already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_BNODE_NOT_EXIST
,
"Bnode not there"
)
// mnode-acct
// mnode-acct
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_ACCT_ALREADY_EXIST
,
"Account already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_ACCT_ALREADY_EXIST
,
"Account already exists"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录