Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
59a39792
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
59a39792
编写于
4月 30, 2020
作者:
S
slguan
提交者:
GitHub
4月 30, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #1763 from taosdata/feature/drop
Feature/drop
上级
ed71f9a1
a60ec8f6
变更
17
隐藏空白更改
内联
并排
Showing
17 changed file
with
264 addition
and
213 deletion
+264
-213
src/mnode/inc/mgmtDb.h
src/mnode/inc/mgmtDb.h
+1
-0
src/mnode/inc/mgmtDef.h
src/mnode/inc/mgmtDef.h
+0
-1
src/mnode/inc/mgmtMnode.h
src/mnode/inc/mgmtMnode.h
+4
-1
src/mnode/inc/mgmtTable.h
src/mnode/inc/mgmtTable.h
+9
-7
src/mnode/inc/mgmtUser.h
src/mnode/inc/mgmtUser.h
+1
-1
src/mnode/inc/mgmtVgroup.h
src/mnode/inc/mgmtVgroup.h
+2
-1
src/mnode/src/mgmtAcct.c
src/mnode/src/mgmtAcct.c
+2
-2
src/mnode/src/mgmtBalance.c
src/mnode/src/mgmtBalance.c
+1
-1
src/mnode/src/mgmtDb.c
src/mnode/src/mgmtDb.c
+46
-22
src/mnode/src/mgmtDnode.c
src/mnode/src/mgmtDnode.c
+17
-41
src/mnode/src/mgmtMnode.c
src/mnode/src/mgmtMnode.c
+22
-9
src/mnode/src/mgmtProfile.c
src/mnode/src/mgmtProfile.c
+3
-3
src/mnode/src/mgmtSdb.c
src/mnode/src/mgmtSdb.c
+2
-2
src/mnode/src/mgmtShell.c
src/mnode/src/mgmtShell.c
+5
-8
src/mnode/src/mgmtTable.c
src/mnode/src/mgmtTable.c
+79
-74
src/mnode/src/mgmtUser.c
src/mnode/src/mgmtUser.c
+5
-9
src/mnode/src/mgmtVgroup.c
src/mnode/src/mgmtVgroup.c
+65
-31
未找到文件。
src/mnode/inc/mgmtDb.h
浏览文件 @
59a39792
...
...
@@ -32,6 +32,7 @@ int32_t mgmtInitDbs();
void
mgmtCleanUpDbs
();
SDbObj
*
mgmtGetDb
(
char
*
db
);
SDbObj
*
mgmtGetDbByTableId
(
char
*
db
);
void
*
mgmtGetNextDb
(
void
*
pNode
,
SDbObj
**
pDb
);
void
mgmtIncDbRef
(
SDbObj
*
pDb
);
void
mgmtDecDbRef
(
SDbObj
*
pDb
);
bool
mgmtCheckIsMonitorDB
(
char
*
db
,
char
*
monitordb
);
...
...
src/mnode/inc/mgmtDef.h
浏览文件 @
59a39792
...
...
@@ -237,7 +237,6 @@ typedef struct {
typedef
struct
{
uint8_t
msgType
;
int8_t
usePublicIp
;
int8_t
received
;
int8_t
successed
;
int8_t
expected
;
...
...
src/mnode/inc/mgmtMnode.h
浏览文件 @
59a39792
...
...
@@ -33,16 +33,19 @@ void mgmtCleanupMnodes();
int32_t
mgmtAddMnode
(
int32_t
dnodeId
);
int32_t
mgmtDropMnode
(
int32_t
dnodeId
);
void
mgmtDropMnodeLocal
(
int32_t
dnodeId
);
void
*
mgmtGetMnode
(
int32_t
mnodeId
);
int32_t
mgmtGetMnodesNum
();
void
*
mgmtGetNextMnode
(
void
*
pNode
,
struct
SMnodeObj
**
pMnode
);
void
mgmtReleaseMnode
(
struct
SMnodeObj
*
pMnode
);
void
mgmtIncMnodeRef
(
struct
SMnodeObj
*
pMnode
);
void
mgmtDecMnodeRef
(
struct
SMnodeObj
*
pMnode
);
char
*
mgmtGetMnodeRoleStr
();
void
mgmtGetMnodeIpSet
(
SRpcIpSet
*
ipSet
);
void
mgmtGetMnodeInfos
(
void
*
mnodes
);
#ifdef __cplusplus
}
#endif
...
...
src/mnode/inc/mgmtTable.h
浏览文件 @
59a39792
...
...
@@ -22,13 +22,15 @@ extern "C" {
#include "mgmtDef.h"
int32_t
mgmtInitTables
();
void
mgmtCleanUpTables
();
STableObj
*
mgmtGetTable
(
char
*
tableId
);
void
mgmtIncTableRef
(
void
*
pTable
);
void
mgmtDecTableRef
(
void
*
pTable
);
void
mgmtDropAllChildTables
(
SDbObj
*
pDropDb
);
void
mgmtDropAllSuperTables
(
SDbObj
*
pDropDb
);
int32_t
mgmtInitTables
();
void
mgmtCleanUpTables
();
void
*
mgmtGetTable
(
char
*
tableId
);
void
mgmtIncTableRef
(
void
*
pTable
);
void
mgmtDecTableRef
(
void
*
pTable
);
void
*
mgmtGetNextChildTable
(
void
*
pNode
,
SChildTableObj
**
pTable
);
void
*
mgmtGetNextSuperTable
(
void
*
pNode
,
SSuperTableObj
**
pTable
);
void
mgmtDropAllChildTables
(
SDbObj
*
pDropDb
);
void
mgmtDropAllSuperTables
(
SDbObj
*
pDropDb
);
#ifdef __cplusplus
}
...
...
src/mnode/inc/mgmtUser.h
浏览文件 @
59a39792
...
...
@@ -27,7 +27,7 @@ SUserObj *mgmtGetUser(char *name);
void
*
mgmtGetNextUser
(
void
*
pNode
,
SUserObj
**
pUser
);
void
mgmtIncUserRef
(
SUserObj
*
pUser
);
void
mgmtDecUserRef
(
SUserObj
*
pUser
);
SUserObj
*
mgmtGetUserFromConn
(
void
*
pConn
,
bool
*
usePublicIp
);
SUserObj
*
mgmtGetUserFromConn
(
void
*
pConn
);
int32_t
mgmtCreateUser
(
SAcctObj
*
pAcct
,
char
*
name
,
char
*
pass
);
void
mgmtDropAllUsers
(
SAcctObj
*
pAcct
);
...
...
src/mnode/inc/mgmtVgroup.h
浏览文件 @
59a39792
...
...
@@ -32,7 +32,8 @@ void mgmtCleanUpVgroups();
SVgObj
*
mgmtGetVgroup
(
int32_t
vgId
);
void
mgmtIncVgroupRef
(
SVgObj
*
pVgroup
);
void
mgmtDecVgroupRef
(
SVgObj
*
pVgroup
);
void
mgmtDropAllVgroups
(
SDbObj
*
pDropDb
);
void
mgmtDropAllDbVgroups
(
SDbObj
*
pDropDb
);
void
mgmtDropAllDnodeVgroups
(
SDnodeObj
*
pDropDnode
);
void
*
mgmtGetNextVgroup
(
void
*
pNode
,
SVgObj
**
pVgroup
);
void
mgmtUpdateVgroup
(
SVgObj
*
pVgroup
);
...
...
src/mnode/src/mgmtAcct.c
浏览文件 @
59a39792
...
...
@@ -27,8 +27,8 @@
#include "mgmtUser.h"
void
*
tsAcctSdb
=
NULL
;
int32_t
tsAcctUpdateSize
;
static
void
mgmtCreateRootAcct
();
static
int32_t
tsAcctUpdateSize
;
static
void
mgmtCreateRootAcct
();
static
int32_t
mgmtActionAcctDestroy
(
SSdbOper
*
pOper
)
{
SAcctObj
*
pAcct
=
pOper
->
pObj
;
...
...
src/mnode/src/mgmtBalance.c
浏览文件 @
59a39792
...
...
@@ -35,7 +35,7 @@ int32_t balanceAllocVnodes(SVgObj *pVgroup) {
void
*
pNode
=
NULL
;
SDnodeObj
*
pDnode
=
NULL
;
SDnodeObj
*
pSelDnode
=
NULL
;
float
vnodeUsage
=
1
.
0
;
float
vnodeUsage
=
1
000
.
0
;
while
(
1
)
{
pNode
=
mgmtGetNextDnode
(
pNode
,
&
pDnode
);
...
...
src/mnode/src/mgmtDb.c
浏览文件 @
59a39792
...
...
@@ -36,7 +36,7 @@
#include "mgmtUser.h"
#include "mgmtVgroup.h"
void
*
tsDbSdb
=
NULL
;
static
void
*
tsDbSdb
=
NULL
;
static
int32_t
tsDbUpdateSize
;
static
int32_t
mgmtCreateDb
(
SAcctObj
*
pAcct
,
SCMCreateDbMsg
*
pCreate
);
...
...
@@ -82,7 +82,7 @@ static int32_t mgmtDbActionDelete(SSdbOper *pOper) {
mgmtDropDbFromAcct
(
pAcct
,
pDb
);
mgmtDropAllChildTables
(
pDb
);
mgmtDropAllSuperTables
(
pDb
);
mgmtDropAllVgroups
(
pDb
);
mgmtDropAll
Db
Vgroups
(
pDb
);
mgmtDecAcctRef
(
pAcct
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -95,6 +95,7 @@ static int32_t mgmtDbActionUpdate(SSdbOper *pOper) {
memcpy
(
pSaved
,
pDb
,
pOper
->
rowSize
);
free
(
pDb
);
}
mgmtDecDbRef
(
pSaved
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -154,6 +155,10 @@ int32_t mgmtInitDbs() {
return
0
;
}
void
*
mgmtGetNextDb
(
void
*
pNode
,
SDbObj
**
pDb
)
{
return
sdbFetchRow
(
tsDbSdb
,
pNode
,
(
void
**
)
pDb
);
}
SDbObj
*
mgmtGetDb
(
char
*
db
)
{
return
(
SDbObj
*
)
sdbGetRow
(
tsDbSdb
,
db
);
}
...
...
@@ -174,7 +179,7 @@ SDbObj *mgmtGetDbByTableId(char *tableId) {
memset
(
db
,
0
,
sizeof
(
db
));
strncpy
(
db
,
tableId
,
pos
-
tableId
);
return
(
SDbObj
*
)
sdbGetRow
(
tsDbSdb
,
db
);
return
mgmtGetDb
(
db
);
}
static
int32_t
mgmtCheckDbCfg
(
SDbCfg
*
pCfg
)
{
...
...
@@ -346,8 +351,27 @@ bool mgmtCheckIsMonitorDB(char *db, char *monitordb) {
return
(
strncasecmp
(
dbName
,
monitordb
,
len
)
==
0
&&
len
==
strlen
(
monitordb
));
}
#if 0
void mgmtPrintVgroups(SDbObj *pDb, char *oper) {
mPrint("db:%s, vgroup link from head, oper:%s", pDb->name, oper);
SVgObj *pVgroup = pDb->pHead;
while (pVgroup != NULL) {
mPrint("vgId:%d", pVgroup->vgId);
pVgroup = pVgroup->next;
}
mPrint("db:%s, vgroup link from tail", pDb->name, pDb->numOfVgroups);
pVgroup = pDb->pTail;
while (pVgroup != NULL) {
mPrint("vgId:%d", pVgroup->vgId);
pVgroup = pVgroup->prev;
}
}
#endif
void
mgmtAddVgroupIntoDb
(
SVgObj
*
pVgroup
)
{
SDbObj
*
pDb
=
pVgroup
->
pDb
;
pVgroup
->
next
=
pDb
->
pHead
;
pVgroup
->
prev
=
NULL
;
...
...
@@ -397,7 +421,7 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn)
int32_t
cols
=
0
;
SSchema
*
pSchema
=
pMeta
->
schema
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
);
if
(
pUser
==
NULL
)
return
0
;
pShow
->
bytes
[
cols
]
=
TSDB_DB_NAME_LEN
;
...
...
@@ -545,11 +569,11 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
SDbObj
*
pDb
=
NULL
;
char
*
pWrite
;
int32_t
cols
=
0
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
);
if
(
pUser
==
NULL
)
return
0
;
while
(
numOfRows
<
rows
)
{
pShow
->
pNode
=
sdbFetchRow
(
tsDbSdb
,
pShow
->
pNode
,
(
void
**
)
&
pDb
);
pShow
->
pNode
=
mgmtGetNextDb
(
pShow
->
pNode
,
&
pDb
);
if
(
pDb
==
NULL
)
break
;
cols
=
0
;
...
...
@@ -674,8 +698,7 @@ static int32_t mgmtSetDbDropping(SDbObj *pDb) {
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsDbSdb
,
.
pObj
=
pDb
,
.
rowSize
=
tsDbUpdateSize
.
pObj
=
pDb
};
int32_t
code
=
sdbUpdateRow
(
&
oper
);
...
...
@@ -803,8 +826,7 @@ static int32_t mgmtAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter) {
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsDbSdb
,
.
pObj
=
pDb
,
.
rowSize
=
tsDbUpdateSize
.
pObj
=
pDb
};
int32_t
code
=
sdbUpdateRow
(
&
oper
);
...
...
@@ -839,21 +861,21 @@ static void mgmtProcessAlterDbMsg(SQueuedMsg *pMsg) {
return
;
}
SDbObj
*
pDb
=
pMsg
->
pDb
=
mgmtGetDb
(
pAlter
->
db
);
if
(
pDb
==
NULL
)
{
if
(
pMsg
->
pDb
==
NULL
)
pMsg
->
pDb
=
mgmtGetDb
(
pAlter
->
db
);
if
(
p
Msg
->
p
Db
==
NULL
)
{
mError
(
"db:%s, failed to alter, invalid db"
,
pAlter
->
db
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_INVALID_DB
);
return
;
}
int32_t
code
=
mgmtAlterDb
(
pDb
,
pAlter
);
int32_t
code
=
mgmtAlterDb
(
p
Msg
->
p
Db
,
pAlter
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"db:%s, failed to alter, invalid db option"
,
pAlter
->
db
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
code
);
return
;
}
mTrace
(
"db:%s, all vgroups is altered"
,
pDb
->
name
);
mTrace
(
"db:%s, all vgroups is altered"
,
p
Msg
->
p
Db
->
name
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_SUCCESS
);
}
...
...
@@ -884,8 +906,8 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
return
;
}
SDbObj
*
pDb
=
pMsg
->
pDb
=
mgmtGetDb
(
pDrop
->
db
);
if
(
pDb
==
NULL
)
{
if
(
pMsg
->
pDb
==
NULL
)
pMsg
->
pDb
=
mgmtGetDb
(
pDrop
->
db
);
if
(
p
Msg
->
p
Db
==
NULL
)
{
if
(
pDrop
->
ignoreNotExists
)
{
mTrace
(
"db:%s, db is not exist, think drop success"
,
pDrop
->
db
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_SUCCESS
);
...
...
@@ -897,30 +919,32 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
}
}
if
(
mgmtCheckIsMonitorDB
(
pDb
->
name
,
tsMonitorDbName
))
{
if
(
mgmtCheckIsMonitorDB
(
p
Msg
->
p
Db
->
name
,
tsMonitorDbName
))
{
mError
(
"db:%s, can't drop monitor database"
,
pDrop
->
db
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_MONITOR_DB_FORBIDDEN
);
return
;
}
int32_t
code
=
mgmtSetDbDropping
(
pDb
);
int32_t
code
=
mgmtSetDbDropping
(
p
Msg
->
p
Db
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"db:%s, failed to drop, reason:%s"
,
pDrop
->
db
,
tstrerror
(
code
));
mgmtSendSimpleResp
(
pMsg
->
thandle
,
code
);
return
;
}
SVgObj
*
pVgroup
=
pDb
->
pHead
;
#if 0
SVgObj *pVgroup = pMsg->pDb->pHead;
if (pVgroup != NULL) {
mPrint
(
"vg
roup
:%d, will be dropped"
,
pVgroup
->
vgId
);
mPrint("vg
Id
:%d, will be dropped", pVgroup->vgId);
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
newMsg->ahandle = pVgroup;
newMsg->expected = pVgroup->numOfVnodes;
mgmtDropVgroup(pVgroup, newMsg);
return;
}
#endif
mTrace
(
"db:%s, all vgroups is dropped"
,
pDb
->
name
);
mTrace
(
"db:%s, all vgroups is dropped"
,
p
Msg
->
p
Db
->
name
);
mgmtDropDb
(
pMsg
);
}
...
...
@@ -932,7 +956,7 @@ void mgmtDropAllDbs(SAcctObj *pAcct) {
mPrint
(
"acct:%s, all dbs will be dropped from sdb"
,
pAcct
->
user
);
while
(
1
)
{
pNode
=
sdbFetchRow
(
tsDbSdb
,
pNode
,
(
void
**
)
&
pDb
);
pNode
=
mgmtGetNextDb
(
pNode
,
&
pDb
);
if
(
pDb
==
NULL
)
break
;
if
(
pDb
->
pAcct
==
pAcct
)
{
...
...
src/mnode/src/mgmtDnode.c
浏览文件 @
59a39792
...
...
@@ -36,9 +36,9 @@
#include "mgmtUser.h"
#include "mgmtVgroup.h"
void
*
tsDnodeSdb
=
NULL
;
int32_t
tsDnodeUpdateSize
=
0
;
int32_t
tsAccessSquence
=
0
;
static
void
*
tsDnodeSdb
=
NULL
;
static
int32_t
tsDnodeUpdateSize
=
0
;
extern
void
*
tsMnodeSdb
;
extern
void
*
tsVgroupSdb
;
...
...
@@ -73,39 +73,12 @@ static int32_t mgmtDnodeActionInsert(SSdbOper *pOper) {
static
int32_t
mgmtDnodeActionDelete
(
SSdbOper
*
pOper
)
{
SDnodeObj
*
pDnode
=
pOper
->
pObj
;
void
*
pNode
=
NULL
;
void
*
pLastNode
=
NULL
;
SVgObj
*
pVgroup
=
NULL
;
int32_t
numOfVgroups
=
0
;
while
(
1
)
{
pLastNode
=
pNode
;
pNode
=
sdbFetchRow
(
tsVgroupSdb
,
pNode
,
(
void
**
)
&
pVgroup
);
if
(
pVgroup
==
NULL
)
break
;
if
(
pVgroup
->
vnodeGid
[
0
].
dnodeId
==
pDnode
->
dnodeId
)
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_LOCAL
,
.
table
=
tsVgroupSdb
,
.
pObj
=
pVgroup
,
};
sdbDeleteRow
(
&
oper
);
pNode
=
pLastNode
;
numOfVgroups
++
;
continue
;
}
}
SMnodeObj
*
pMnode
=
mgmtGetMnode
(
pDnode
->
dnodeId
);
if
(
pMnode
!=
NULL
)
{
SSdbOper
oper
=
{.
type
=
SDB_OPER_LOCAL
,
.
table
=
tsMnodeSdb
,
.
pObj
=
pMnode
};
sdbDeleteRow
(
&
oper
);
mgmtReleaseMnode
(
pMnode
);
}
mgmtDropAllDnodeVgroups
(
pDnode
);
mgmtDropMnodeLocal
(
pDnode
->
dnodeId
);
balanceNotify
();
mTrace
(
"dnode:%d, all vgroups
:%d is dropped from sdb"
,
pDnode
->
dnodeId
,
numOfVgroups
);
mTrace
(
"dnode:%d, all vgroups
is dropped from sdb"
,
pDnode
->
dnodeId
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -116,6 +89,7 @@ static int32_t mgmtDnodeActionUpdate(SSdbOper *pOper) {
memcpy
(
pSaved
,
pDnode
,
pOper
->
rowSize
);
free
(
pDnode
);
}
mgmtDecDnodeRef
(
pSaved
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -212,7 +186,7 @@ void *mgmtGetDnodeByIp(char *ep) {
void
*
pNode
=
NULL
;
while
(
1
)
{
pNode
=
sdbFetchRow
(
tsDnodeSdb
,
pNode
,
(
void
**
)
&
pDnode
);
pNode
=
mgmtGetNextDnode
(
pNode
,
&
pDnode
);
if
(
pDnode
==
NULL
)
break
;
if
(
strcmp
(
ep
,
pDnode
->
dnodeEp
)
==
0
)
{
return
pDnode
;
...
...
@@ -235,8 +209,7 @@ void mgmtUpdateDnode(SDnodeObj *pDnode) {
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsDnodeSdb
,
.
pObj
=
pDnode
,
.
rowSize
=
tsDnodeUpdateSize
.
pObj
=
pDnode
};
sdbUpdateRow
(
&
oper
);
...
...
@@ -336,7 +309,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
pVload
->
vgId
);
if
(
pVgroup
==
NULL
)
{
SRpcIpSet
ipSet
=
mgmtGetIpSetFromIp
(
pDnode
->
dnodeEp
);
mPrint
(
"dnode:%d, vg
roup
:%d not exist in mnode, drop it"
,
pDnode
->
dnodeId
,
pVload
->
vgId
);
mPrint
(
"dnode:%d, vg
Id
:%d not exist in mnode, drop it"
,
pDnode
->
dnodeId
,
pVload
->
vgId
);
mgmtSendDropVnodeMsg
(
pVload
->
vgId
,
&
ipSet
,
NULL
);
}
else
{
mgmtUpdateVgroupStatus
(
pVgroup
,
pDnode
,
pVload
);
...
...
@@ -387,6 +360,7 @@ static int32_t mgmtCreateDnode(char *ep) {
SDnodeObj
*
pDnode
=
mgmtGetDnodeByIp
(
ep
);
if
(
pDnode
!=
NULL
)
{
mgmtDecDnodeRef
(
pDnode
);
mError
(
"dnode:%d is alredy exist, %s:%d"
,
pDnode
->
dnodeId
,
pDnode
->
dnodeFqdn
,
pDnode
->
dnodePort
);
return
TSDB_CODE_DNODE_ALREADY_EXIST
;
}
...
...
@@ -440,6 +414,7 @@ static int32_t mgmtDropDnodeByIp(char *ep) {
return
TSDB_CODE_DNODE_NOT_EXIST
;
}
mgmtDecDnodeRef
(
pDnode
);
if
(
strcmp
(
pDnode
->
dnodeEp
,
dnodeGetMnodeMasterEp
())
==
0
)
{
mError
(
"dnode:%d, can't drop dnode:%s which is master"
,
pDnode
->
dnodeId
,
ep
);
return
TSDB_CODE_NO_REMOVE_MASTER
;
...
...
@@ -464,6 +439,7 @@ static void mgmtProcessCreateDnodeMsg(SQueuedMsg *pMsg) {
if
(
rpcRsp
.
code
==
TSDB_CODE_SUCCESS
)
{
SDnodeObj
*
pDnode
=
mgmtGetDnodeByIp
(
pCreate
->
ep
);
mLPrint
(
"dnode:%d, %s is created by %s"
,
pDnode
->
dnodeId
,
pCreate
->
ep
,
pMsg
->
pUser
->
user
);
mgmtDecDnodeRef
(
pDnode
);
}
else
{
mError
(
"failed to create dnode:%s, reason:%s"
,
pCreate
->
ep
,
tstrerror
(
rpcRsp
.
code
));
}
...
...
@@ -492,7 +468,7 @@ static void mgmtProcessDropDnodeMsg(SQueuedMsg *pMsg) {
}
static
int32_t
mgmtGetDnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
);
if
(
pUser
==
NULL
)
return
0
;
if
(
strcmp
(
pUser
->
pAcct
->
user
,
"root"
)
!=
0
)
{
...
...
@@ -609,7 +585,7 @@ static bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) {
static
int32_t
mgmtGetModuleMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
int32_t
cols
=
0
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
);
if
(
pUser
==
NULL
)
return
0
;
if
(
strcmp
(
pUser
->
user
,
"root"
)
!=
0
)
{
...
...
@@ -719,7 +695,7 @@ static bool mgmtCheckConfigShow(SGlobalCfg *cfg) {
static
int32_t
mgmtGetConfigMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
int32_t
cols
=
0
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
);
if
(
pUser
==
NULL
)
return
0
;
if
(
strcmp
(
pUser
->
user
,
"root"
)
!=
0
)
{
...
...
@@ -806,7 +782,7 @@ static int32_t mgmtRetrieveConfigs(SShowObj *pShow, char *data, int32_t rows, vo
static
int32_t
mgmtGetVnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
int32_t
cols
=
0
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
);
if
(
pUser
==
NULL
)
return
0
;
if
(
strcmp
(
pUser
->
user
,
"root"
)
!=
0
)
{
...
...
src/mnode/src/mgmtMnode.c
浏览文件 @
59a39792
...
...
@@ -30,7 +30,7 @@
#include "mgmtShell.h"
#include "mgmtUser.h"
void
*
tsMnodeSdb
=
NULL
;
static
void
*
tsMnodeSdb
=
NULL
;
static
int32_t
tsMnodeUpdateSize
=
0
;
static
int32_t
mgmtGetMnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveMnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
...
...
@@ -71,7 +71,7 @@ static int32_t mgmtMnodeActionUpdate(SSdbOper *pOper) {
memcpy
(
pSaved
,
pMnode
,
pOper
->
rowSize
);
free
(
pMnode
);
}
mgmtDecMnodeRef
(
pSaved
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -97,7 +97,7 @@ static int32_t mgmtMnodeActionRestored() {
mgmtGetNextMnode
(
NULL
,
&
pMnode
);
if
(
pMnode
!=
NULL
)
{
pMnode
->
role
=
TAOS_SYNC_ROLE_MASTER
;
mgmt
ReleaseMnode
(
pMnode
);
mgmt
DecMnodeRef
(
pMnode
);
}
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -148,7 +148,11 @@ void *mgmtGetMnode(int32_t mnodeId) {
return
sdbGetRow
(
tsMnodeSdb
,
&
mnodeId
);
}
void
mgmtReleaseMnode
(
SMnodeObj
*
pMnode
)
{
void
mgmtIncMnodeRef
(
SMnodeObj
*
pMnode
)
{
sdbIncRef
(
tsMnodeSdb
,
pMnode
);
}
void
mgmtDecMnodeRef
(
SMnodeObj
*
pMnode
)
{
sdbDecRef
(
tsMnodeSdb
,
pMnode
);
}
...
...
@@ -187,7 +191,7 @@ void mgmtGetMnodeIpSet(SRpcIpSet *ipSet) {
ipSet
->
numOfIps
++
;
mgmt
ReleaseMnode
(
pMnode
);
mgmt
DecMnodeRef
(
pMnode
);
}
}
...
...
@@ -209,7 +213,7 @@ void mgmtGetMnodeInfos(void *param) {
}
index
++
;
mgmt
ReleaseMnode
(
pMnode
);
mgmt
DecMnodeRef
(
pMnode
);
}
mnodes
->
nodeNum
=
index
;
...
...
@@ -235,8 +239,17 @@ int32_t mgmtAddMnode(int32_t dnodeId) {
return
code
;
}
void
mgmtDropMnodeLocal
(
int32_t
dnodeId
)
{
SMnodeObj
*
pMnode
=
mgmtGetMnode
(
dnodeId
);
if
(
pMnode
!=
NULL
)
{
SSdbOper
oper
=
{.
type
=
SDB_OPER_LOCAL
,
.
table
=
tsMnodeSdb
,
.
pObj
=
pMnode
};
sdbDeleteRow
(
&
oper
);
mgmtDecMnodeRef
(
pMnode
);
}
}
int32_t
mgmtDropMnode
(
int32_t
dnodeId
)
{
SMnodeObj
*
pMnode
=
sdbGetRow
(
tsMnodeSdb
,
&
dnodeId
);
SMnodeObj
*
pMnode
=
mgmtGetMnode
(
dnodeId
);
if
(
pMnode
==
NULL
)
{
return
TSDB_CODE_DNODE_NOT_EXIST
;
}
...
...
@@ -258,7 +271,7 @@ int32_t mgmtDropMnode(int32_t dnodeId) {
static
int32_t
mgmtGetMnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
sdbUpdateMnodeRoles
();
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
);
if
(
pUser
==
NULL
)
return
0
;
if
(
strcmp
(
pUser
->
pAcct
->
user
,
"root"
)
!=
0
)
{
...
...
@@ -339,7 +352,7 @@ static int32_t mgmtRetrieveMnodes(SShowObj *pShow, char *data, int32_t rows, voi
numOfRows
++
;
mgmt
ReleaseMnode
(
pMnode
);
mgmt
DecMnodeRef
(
pMnode
);
}
pShow
->
numOfReads
+=
numOfRows
;
...
...
src/mnode/src/mgmtProfile.c
浏览文件 @
59a39792
...
...
@@ -675,7 +675,7 @@ int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn
void
mgmtProcessKillQueryMsg
(
SQueuedMsg
*
pMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
thandle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pMsg
->
thandle
,
NULL
);
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pMsg
->
thandle
);
if
(
pUser
==
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_INVALID_USER
;
rpcSendResponse
(
&
rpcRsp
);
...
...
@@ -699,7 +699,7 @@ void mgmtProcessKillQueryMsg(SQueuedMsg *pMsg) {
void
mgmtProcessKillStreamMsg
(
SQueuedMsg
*
pMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
thandle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pMsg
->
thandle
,
NULL
);
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pMsg
->
thandle
);
if
(
pUser
==
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_INVALID_USER
;
rpcSendResponse
(
&
rpcRsp
);
...
...
@@ -723,7 +723,7 @@ void mgmtProcessKillStreamMsg(SQueuedMsg *pMsg) {
void
mgmtProcessKillConnectionMsg
(
SQueuedMsg
*
pMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
thandle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pMsg
->
thandle
,
NULL
);
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pMsg
->
thandle
);
if
(
pUser
==
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_INVALID_USER
;
rpcSendResponse
(
&
rpcRsp
);
...
...
src/mnode/src/mgmtSdb.c
浏览文件 @
59a39792
...
...
@@ -184,7 +184,7 @@ void sdbUpdateMnodeRoles() {
if
(
pMnode
!=
NULL
)
{
pMnode
->
role
=
roles
.
role
[
i
];
sdbPrint
(
"mnode:%d, role:%s"
,
pMnode
->
mnodeId
,
mgmtGetMnodeRoleStr
(
pMnode
->
role
));
mgmt
ReleaseMnode
(
pMnode
);
mgmt
DecMnodeRef
(
pMnode
);
}
}
}
...
...
@@ -252,7 +252,7 @@ void sdbUpdateSync() {
strcpy
(
syncCfg
.
nodeInfo
[
index
].
nodeFqdn
,
pMnode
->
pDnode
->
dnodeEp
);
index
++
;
mgmt
ReleaseMnode
(
pMnode
);
mgmt
DecMnodeRef
(
pMnode
);
}
}
...
...
src/mnode/src/mgmtShell.c
浏览文件 @
59a39792
...
...
@@ -421,6 +421,7 @@ static void mgmtProcessConnectMsg(SQueuedMsg *pMsg) {
code
=
TSDB_CODE_INVALID_DB
;
goto
connect_over
;
}
mgmtDecDbRef
(
pDb
);
}
SCMConnectRsp
*
pConnectRsp
=
rpcMallocCont
(
sizeof
(
SCMConnectRsp
));
...
...
@@ -454,9 +455,8 @@ static void mgmtProcessUseMsg(SQueuedMsg *pMsg) {
SCMUseDbMsg
*
pUseDbMsg
=
pMsg
->
pCont
;
// todo check for priority of current user
pMsg
->
pDb
=
mgmtGetDb
(
pUseDbMsg
->
db
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
pMsg
->
pDb
==
NULL
)
pMsg
->
pDb
=
mgmtGetDb
(
pUseDbMsg
->
db
);
if
(
pMsg
->
pDb
==
NULL
)
{
code
=
TSDB_CODE_INVALID_DB
;
}
...
...
@@ -470,7 +470,7 @@ static void mgmtProcessUseMsg(SQueuedMsg *pMsg) {
*/
static
bool
mgmtCheckTableMetaMsgReadOnly
(
SQueuedMsg
*
pMsg
)
{
SCMTableInfoMsg
*
pInfo
=
pMsg
->
pCont
;
pMsg
->
pTable
=
mgmtGetTable
(
pInfo
->
tableId
);
if
(
pMsg
->
pTable
==
NULL
)
pMsg
->
pTable
=
mgmtGetTable
(
pInfo
->
tableId
);
if
(
pMsg
->
pTable
!=
NULL
)
return
true
;
// If table does not exists and autoCreate flag is set, we add the handler into task queue
...
...
@@ -551,8 +551,7 @@ void mgmtFreeQhandle(void *qhandle, bool forceRemove) {
}
void
*
mgmtMallocQueuedMsg
(
SRpcMsg
*
rpcMsg
)
{
bool
usePublicIp
=
false
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
rpcMsg
->
handle
,
&
usePublicIp
);
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
rpcMsg
->
handle
);
if
(
pUser
==
NULL
)
{
return
NULL
;
}
...
...
@@ -563,7 +562,6 @@ void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg) {
pMsg
->
contLen
=
rpcMsg
->
contLen
;
pMsg
->
pCont
=
rpcMsg
->
pCont
;
pMsg
->
pUser
=
pUser
;
pMsg
->
usePublicIp
=
usePublicIp
;
return
pMsg
;
}
...
...
@@ -591,8 +589,7 @@ void* mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg) {
pDestMsg
->
retry
=
pSrcMsg
->
retry
;
pDestMsg
->
maxRetry
=
pSrcMsg
->
maxRetry
;
pDestMsg
->
pUser
=
pSrcMsg
->
pUser
;
pDestMsg
->
usePublicIp
=
pSrcMsg
->
usePublicIp
;
pSrcMsg
->
pCont
=
NULL
;
pSrcMsg
->
pUser
=
NULL
;
...
...
src/mnode/src/mgmtTable.c
浏览文件 @
59a39792
...
...
@@ -41,8 +41,8 @@
#include "mgmtVgroup.h"
#include "tcompare.h"
void
*
tsChildTableSdb
;
void
*
tsSuperTableSdb
;
static
void
*
tsChildTableSdb
;
static
void
*
tsSuperTableSdb
;
static
int32_t
tsChildTableUpdateSize
;
static
int32_t
tsSuperTableUpdateSize
;
static
void
*
mgmtGetChildTable
(
char
*
tableId
);
...
...
@@ -97,14 +97,14 @@ static int32_t mgmtChildTableActionInsert(SSdbOper *pOper) {
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
pTable
->
vgId
);
if
(
pVgroup
==
NULL
)
{
mError
(
"ctable:%s, not in vg
roup
:%d"
,
pTable
->
info
.
tableId
,
pTable
->
vgId
);
mError
(
"ctable:%s, not in vg
Id
:%d"
,
pTable
->
info
.
tableId
,
pTable
->
vgId
);
return
TSDB_CODE_INVALID_VGROUP_ID
;
}
mgmtDecVgroupRef
(
pVgroup
);
SDbObj
*
pDb
=
mgmtGetDb
(
pVgroup
->
dbName
);
if
(
pDb
==
NULL
)
{
mError
(
"ctable:%s, vg
roup
:%d not in db:%s"
,
pTable
->
info
.
tableId
,
pVgroup
->
vgId
,
pVgroup
->
dbName
);
mError
(
"ctable:%s, vg
Id
:%d not in db:%s"
,
pTable
->
info
.
tableId
,
pVgroup
->
vgId
,
pVgroup
->
dbName
);
return
TSDB_CODE_INVALID_DB
;
}
mgmtDecDbRef
(
pDb
);
...
...
@@ -117,6 +117,7 @@ static int32_t mgmtChildTableActionInsert(SSdbOper *pOper) {
mgmtDecAcctRef
(
pAcct
);
if
(
pTable
->
info
.
type
==
TSDB_CHILD_TABLE
)
{
// add ref
pTable
->
superTable
=
mgmtGetSuperTable
(
pTable
->
superTableId
);
mgmtAddTableIntoStable
(
pTable
->
superTable
,
pTable
);
grantAdd
(
TSDB_GRANT_TIMESERIES
,
pTable
->
superTable
->
numOfColumns
-
1
);
...
...
@@ -146,7 +147,7 @@ static int32_t mgmtChildTableActionDelete(SSdbOper *pOper) {
SDbObj
*
pDb
=
mgmtGetDb
(
pVgroup
->
dbName
);
if
(
pDb
==
NULL
)
{
mError
(
"ctable:%s, vg
roup
:%d not in DB:%s"
,
pTable
->
info
.
tableId
,
pVgroup
->
vgId
,
pVgroup
->
dbName
);
mError
(
"ctable:%s, vg
Id
:%d not in DB:%s"
,
pTable
->
info
.
tableId
,
pVgroup
->
vgId
,
pVgroup
->
dbName
);
return
TSDB_CODE_INVALID_DB
;
}
mgmtDecDbRef
(
pDb
);
...
...
@@ -186,6 +187,7 @@ static int32_t mgmtChildTableActionUpdate(SSdbOper *pOper) {
free
(
oldSql
);
free
(
oldSchema
);
}
mgmtDecTableRef
(
pTable
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -250,7 +252,7 @@ static int32_t mgmtChildTableActionRestored() {
while
(
1
)
{
pLastNode
=
pNode
;
mgmtDecTableRef
(
pTable
);
pNode
=
sdbFetchRow
(
tsChildTableSdb
,
pNode
,
(
void
**
)
&
pTable
);
pNode
=
mgmtGetNextChildTable
(
pNode
,
&
pTable
);
if
(
pTable
==
NULL
)
break
;
SDbObj
*
pDb
=
mgmtGetDbByTableId
(
pTable
->
info
.
tableId
);
...
...
@@ -268,7 +270,7 @@ static int32_t mgmtChildTableActionRestored() {
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
pTable
->
vgId
);
if
(
pVgroup
==
NULL
)
{
mError
(
"ctable:%s, failed to get vg
roup
:%d sid:%d, discard it"
,
pTable
->
info
.
tableId
,
pTable
->
vgId
,
pTable
->
sid
);
mError
(
"ctable:%s, failed to get vg
Id
:%d sid:%d, discard it"
,
pTable
->
info
.
tableId
,
pTable
->
vgId
,
pTable
->
sid
);
pTable
->
vgId
=
0
;
SSdbOper
desc
=
{
0
};
desc
.
type
=
SDB_OPER_LOCAL
;
...
...
@@ -281,7 +283,7 @@ static int32_t mgmtChildTableActionRestored() {
mgmtDecVgroupRef
(
pVgroup
);
if
(
strcmp
(
pVgroup
->
dbName
,
pDb
->
name
)
!=
0
)
{
mError
(
"ctable:%s, db:%s not match with vg
roup
:%d db:%s sid:%d, discard it"
,
mError
(
"ctable:%s, db:%s not match with vg
Id
:%d db:%s sid:%d, discard it"
,
pTable
->
info
.
tableId
,
pDb
->
name
,
pTable
->
vgId
,
pVgroup
->
dbName
,
pTable
->
sid
);
pTable
->
vgId
=
0
;
SSdbOper
desc
=
{
0
};
...
...
@@ -294,7 +296,7 @@ static int32_t mgmtChildTableActionRestored() {
}
if
(
pVgroup
->
tableList
==
NULL
)
{
mError
(
"ctable:%s, vg
roup
:%d tableList is null"
,
pTable
->
info
.
tableId
,
pTable
->
vgId
);
mError
(
"ctable:%s, vg
Id
:%d tableList is null"
,
pTable
->
info
.
tableId
,
pTable
->
vgId
);
pTable
->
vgId
=
0
;
SSdbOper
desc
=
{
0
};
desc
.
type
=
SDB_OPER_LOCAL
;
...
...
@@ -435,7 +437,7 @@ static int32_t mgmtSuperTableActionUpdate(SSdbOper *pOper) {
free
(
pNew
);
free
(
oldSchema
);
}
mgmtDecTableRef
(
pTable
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -558,20 +560,28 @@ static void *mgmtGetSuperTable(char *tableId) {
return
sdbGetRow
(
tsSuperTableSdb
,
tableId
);
}
STableObj
*
mgmtGetTable
(
char
*
tableId
)
{
STableObj
*
tableInfo
=
sdbGetRow
(
tsSuperTableSdb
,
tableId
);
if
(
tableInfo
!=
NULL
)
{
return
tableInfo
;
void
*
mgmtGetTable
(
char
*
tableId
)
{
void
*
pTable
=
mgmtGetSuperTable
(
tableId
);
if
(
pTable
!=
NULL
)
{
return
pTable
;
}
tableInfo
=
sdbGetRow
(
tsChildTableSdb
,
tableId
);
if
(
tableInfo
!=
NULL
)
{
return
tableInfo
;
pTable
=
mgmtGetChildTable
(
tableId
);
if
(
pTable
!=
NULL
)
{
return
pTable
;
}
return
NULL
;
}
void
*
mgmtGetNextChildTable
(
void
*
pNode
,
SChildTableObj
**
pTable
)
{
return
sdbFetchRow
(
tsChildTableSdb
,
pNode
,
(
void
**
)
pTable
);
}
void
*
mgmtGetNextSuperTable
(
void
*
pNode
,
SSuperTableObj
**
pTable
)
{
return
sdbFetchRow
(
tsSuperTableSdb
,
pNode
,
(
void
**
)
pTable
);
}
void
mgmtIncTableRef
(
void
*
p1
)
{
STableObj
*
pTable
=
(
STableObj
*
)
p1
;
if
(
pTable
->
type
==
TSDB_SUPER_TABLE
)
{
...
...
@@ -787,8 +797,6 @@ static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) {
mgmtDecVgroupRef
(
pVgroup
);
}
}
//mError("stable:%s, numOfTables:%d not 0", pStable->info.tableId, pStable->numOfTables);
//mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_OTHERS);
}
else
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
...
...
@@ -846,8 +854,7 @@ static int32_t mgmtAddSuperTableTag(SSuperTableObj *pStable, SSchema schema[], i
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsSuperTableSdb
,
.
pObj
=
pStable
,
.
rowSize
=
tsSuperTableUpdateSize
.
pObj
=
pStable
};
int32_t
code
=
sdbUpdateRow
(
&
oper
);
...
...
@@ -874,8 +881,7 @@ static int32_t mgmtDropSuperTableTag(SSuperTableObj *pStable, char *tagName) {
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsSuperTableSdb
,
.
pObj
=
pStable
,
.
rowSize
=
tsSuperTableUpdateSize
.
pObj
=
pStable
};
int32_t
code
=
sdbUpdateRow
(
&
oper
);
...
...
@@ -911,8 +917,7 @@ static int32_t mgmtModifySuperTableTagName(SSuperTableObj *pStable, char *oldTag
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsSuperTableSdb
,
.
pObj
=
pStable
,
.
rowSize
=
tsSuperTableUpdateSize
.
pObj
=
pStable
};
int32_t
code
=
sdbUpdateRow
(
&
oper
);
...
...
@@ -977,8 +982,7 @@ static int32_t mgmtAddSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, SSc
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsSuperTableSdb
,
.
pObj
=
pStable
,
.
rowSize
=
tsSuperTableUpdateSize
.
pObj
=
pStable
};
int32_t
code
=
sdbUpdateRow
(
&
oper
);
...
...
@@ -1015,8 +1019,7 @@ static int32_t mgmtDropSuperTableColumn(SDbObj *pDb, SSuperTableObj *pStable, ch
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsSuperTableSdb
,
.
pObj
=
pStable
,
.
rowSize
=
tsSuperTableUpdateSize
.
pObj
=
pStable
};
int32_t
code
=
sdbUpdateRow
(
&
oper
);
...
...
@@ -1099,7 +1102,8 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v
char
stableName
[
TSDB_TABLE_NAME_LEN
]
=
{
0
};
while
(
numOfRows
<
rows
)
{
pShow
->
pNode
=
sdbFetchRow
(
tsSuperTableSdb
,
pShow
->
pNode
,
(
void
**
)
&
pTable
);
mgmtDecTableRef
(
pTable
);
pShow
->
pNode
=
mgmtGetNextSuperTable
(
pShow
->
pNode
,
&
pTable
);
if
(
pTable
==
NULL
)
break
;
if
(
strncmp
(
pTable
->
info
.
tableId
,
prefix
,
prefixLen
))
{
continue
;
...
...
@@ -1135,8 +1139,6 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v
cols
++
;
numOfRows
++
;
mgmtDecTableRef
(
pTable
);
}
pShow
->
numOfReads
+=
numOfRows
;
...
...
@@ -1155,7 +1157,8 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) {
mPrint
(
"db:%s, all super tables will be dropped from sdb"
,
pDropDb
->
name
);
while
(
1
)
{
pNode
=
sdbFetchRow
(
tsSuperTableSdb
,
pNode
,
(
void
**
)
&
pTable
);
pLastNode
=
pNode
;
pNode
=
mgmtGetNextSuperTable
(
pNode
,
&
pTable
);
if
(
pTable
==
NULL
)
break
;
if
(
strncmp
(
pDropDb
->
name
,
pTable
->
info
.
tableId
,
dbNameLen
)
==
0
)
{
...
...
@@ -1213,14 +1216,13 @@ static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) {
static
void
mgmtProcessSuperTableVgroupMsg
(
SQueuedMsg
*
pMsg
)
{
SCMSTableVgroupMsg
*
pInfo
=
pMsg
->
pCont
;
SSuperTableObj
*
pTable
=
mgmtGetSuperTable
(
pInfo
->
tableId
);
pMsg
->
pTable
=
(
STableObj
*
)
pTable
;
if
(
pMsg
->
pTable
==
NULL
)
pMsg
->
pTable
=
mgmtGetSuperTable
(
pInfo
->
tableId
);
if
(
pMsg
->
pTable
==
NULL
)
{
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_INVALID_TABLE
);
return
;
}
SSuperTableObj
*
pTable
=
(
SSuperTableObj
*
)
pMsg
->
pTable
;
int32_t
contLen
=
sizeof
(
SCMSTableVgroupRspMsg
)
+
sizeof
(
SCMVgroupInfo
)
*
pTable
->
vgLen
;
SCMSTableVgroupRspMsg
*
pRsp
=
rpcMallocCont
(
contLen
);
if
(
pRsp
==
NULL
)
{
...
...
@@ -1430,17 +1432,21 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
}
int32_t
sid
=
taosAllocateId
(
pVgroup
->
idPool
);
if
(
sid
<
0
)
{
mTrace
(
"tables:%s, no enough sid in vg
roup:%d"
,
pVgroup
->
vgId
);
if
(
sid
<
=
0
)
{
mTrace
(
"tables:%s, no enough sid in vg
Id:%d"
,
pCreate
->
tableId
,
pVgroup
->
vgId
);
mgmtCreateVgroup
(
mgmtCloneQueuedMsg
(
pMsg
),
pMsg
->
pDb
);
return
;
}
if
(
pMsg
->
retry
==
0
)
{
pMsg
->
pTable
=
(
STableObj
*
)
mgmtDoCreateChildTable
(
pCreate
,
pVgroup
,
sid
);
if
(
pMsg
->
pTable
==
NULL
)
{
pMsg
->
pTable
=
(
STableObj
*
)
mgmtDoCreateChildTable
(
pCreate
,
pVgroup
,
sid
);
mgmtIncTableRef
(
pMsg
->
pTable
);
}
}
else
{
pMsg
->
pTable
=
mgmtGetTable
(
pCreate
->
tableId
);
if
(
pMsg
->
pTable
==
NULL
)
pMsg
->
pTable
=
mgmtGetTable
(
pCreate
->
tableId
);
}
if
(
pMsg
->
pTable
==
NULL
)
{
mgmtSendSimpleResp
(
pMsg
->
thandle
,
terrno
);
return
;
...
...
@@ -1456,7 +1462,6 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
SQueuedMsg
*
newMsg
=
mgmtCloneQueuedMsg
(
pMsg
);
newMsg
->
ahandle
=
pMsg
->
pTable
;
newMsg
->
maxRetry
=
5
;
mgmtIncTableRef
(
pMsg
->
pTable
);
SRpcMsg
rpcMsg
=
{
.
handle
=
newMsg
,
.
pCont
=
pMDCreate
,
...
...
@@ -1470,8 +1475,8 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
static
void
mgmtProcessDropChildTableMsg
(
SQueuedMsg
*
pMsg
)
{
SChildTableObj
*
pTable
=
(
SChildTableObj
*
)
pMsg
->
pTable
;
SVgObj
*
pVgroup
=
pMsg
->
pVgroup
=
mgmtGetVgroup
(
pTable
->
vgId
);
if
(
pVgroup
==
NULL
)
{
if
(
pMsg
->
pVgroup
==
NULL
)
pMsg
->
pVgroup
=
mgmtGetVgroup
(
pTable
->
vgId
);
if
(
p
Msg
->
p
Vgroup
==
NULL
)
{
mError
(
"table:%s, failed to drop ctable, vgroup not exist"
,
pTable
->
info
.
tableId
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_OTHERS
);
return
;
...
...
@@ -1490,7 +1495,7 @@ static void mgmtProcessDropChildTableMsg(SQueuedMsg *pMsg) {
pDrop
->
sid
=
htonl
(
pTable
->
sid
);
pDrop
->
uid
=
htobe64
(
pTable
->
uid
);
SRpcIpSet
ipSet
=
mgmtGetIpSetFromVgroup
(
pVgroup
);
SRpcIpSet
ipSet
=
mgmtGetIpSetFromVgroup
(
p
Msg
->
p
Vgroup
);
mTrace
(
"table:%s, send drop ctable msg"
,
pDrop
->
tableId
);
SQueuedMsg
*
newMsg
=
mgmtCloneQueuedMsg
(
pMsg
);
...
...
@@ -1556,8 +1561,7 @@ static int32_t mgmtAddNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, SSc
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsChildTableSdb
,
.
pObj
=
pTable
,
.
rowSize
=
tsChildTableUpdateSize
.
pObj
=
pTable
};
int32_t
code
=
sdbUpdateRow
(
&
oper
);
...
...
@@ -1589,8 +1593,7 @@ static int32_t mgmtDropNormalTableColumn(SDbObj *pDb, SChildTableObj *pTable, ch
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsChildTableSdb
,
.
pObj
=
pTable
,
.
rowSize
=
tsChildTableUpdateSize
.
pObj
=
pTable
};
int32_t
code
=
sdbUpdateRow
(
&
oper
);
...
...
@@ -1638,21 +1641,21 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
pMeta
->
contLen
=
sizeof
(
STableMetaMsg
)
+
mgmtSetSchemaFromNormalTable
(
pMeta
->
schema
,
pTable
);
}
SVgObj
*
pVgroup
=
pMsg
->
pVgroup
=
mgmtGetVgroup
(
pTable
->
vgId
);
if
(
pVgroup
==
NULL
)
{
if
(
pMsg
->
pVgroup
==
NULL
)
pMsg
->
pVgroup
=
mgmtGetVgroup
(
pTable
->
vgId
);
if
(
p
Msg
->
p
Vgroup
==
NULL
)
{
mError
(
"table:%s, failed to get table meta, db not selected"
,
pTable
->
info
.
tableId
);
return
TSDB_CODE_INVALID_VGROUP_ID
;
}
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
SDnodeObj
*
pDnode
=
mgmtGetDnode
(
pVgroup
->
vnodeGid
[
i
].
dnodeId
);
for
(
int32_t
i
=
0
;
i
<
p
Msg
->
p
Vgroup
->
numOfVnodes
;
++
i
)
{
SDnodeObj
*
pDnode
=
mgmtGetDnode
(
p
Msg
->
p
Vgroup
->
vnodeGid
[
i
].
dnodeId
);
if
(
pDnode
==
NULL
)
break
;
strcpy
(
pMeta
->
vgroup
.
ipAddr
[
i
].
fqdn
,
pDnode
->
dnodeFqdn
);
pMeta
->
vgroup
.
ipAddr
[
i
].
port
=
htons
(
pDnode
->
dnodePort
+
TSDB_PORT_DNODESHELL
);
pMeta
->
vgroup
.
numOfIps
++
;
mgmtDecDnodeRef
(
pDnode
);
}
pMeta
->
vgroup
.
vgId
=
htonl
(
pVgroup
->
vgId
);
pMeta
->
vgroup
.
vgId
=
htonl
(
p
Msg
->
p
Vgroup
->
vgId
);
mTrace
(
"table:%s, uid:%"
PRIu64
" table meta is retrieved"
,
pTable
->
info
.
tableId
,
pTable
->
uid
);
...
...
@@ -1714,7 +1717,8 @@ void mgmtDropAllChildTables(SDbObj *pDropDb) {
mPrint
(
"db:%s, all child tables will be dropped from sdb"
,
pDropDb
->
name
);
while
(
1
)
{
pNode
=
sdbFetchRow
(
tsChildTableSdb
,
pNode
,
(
void
**
)
&
pTable
);
pLastNode
=
pNode
;
pNode
=
mgmtGetNextChildTable
(
pNode
,
&
pTable
);
if
(
pTable
==
NULL
)
break
;
if
(
strncmp
(
pDropDb
->
name
,
pTable
->
info
.
tableId
,
dbNameLen
)
==
0
)
{
...
...
@@ -1742,7 +1746,8 @@ static void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) {
mPrint
(
"stable:%s, all child tables will dropped from sdb"
,
pStable
->
info
.
tableId
,
numOfTables
);
while
(
1
)
{
pNode
=
sdbFetchRow
(
tsChildTableSdb
,
pNode
,
(
void
**
)
&
pTable
);
pLastNode
=
pNode
;
pNode
=
mgmtGetNextChildTable
(
pNode
,
&
pTable
);
if
(
pTable
==
NULL
)
break
;
if
(
pTable
->
superTable
==
pStable
)
{
...
...
@@ -1762,16 +1767,13 @@ static void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) {
mPrint
(
"stable:%s, all child tables:%d is dropped from sdb"
,
pStable
->
info
.
tableId
,
numOfTables
);
}
static
SChildTableObj
*
mgmtGetTableByPos
(
uint32_t
dnodeId
,
int32_t
vnode
,
int32_t
sid
)
{
SDnodeObj
*
pObj
=
mgmtGetDnode
(
dnodeId
);
static
SChildTableObj
*
mgmtGetTableByPos
(
int32_t
vnode
,
int32_t
sid
)
{
SVgObj
*
pVgroup
=
mgmtGetVgroup
(
vnode
);
if
(
pVgroup
==
NULL
)
return
NULL
;
if
(
pObj
==
NULL
||
pVgroup
==
NULL
)
{
return
NULL
;
}
SChildTableObj
*
pTable
=
pVgroup
->
tableList
[
sid
];
SChildTableObj
*
pTable
=
pVgroup
->
tableList
[
sid
-
1
];
mgmtIncTableRef
((
STableObj
*
)
pTable
);
mgmtDecVgroupRef
(
pVgroup
);
return
pTable
;
}
...
...
@@ -1783,7 +1785,7 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) {
pCfg
->
sid
=
htonl
(
pCfg
->
sid
);
mTrace
(
"dnode:%s, vnode:%d, sid:%d, receive table config msg"
,
taosIpStr
(
pCfg
->
dnode
),
pCfg
->
vnode
,
pCfg
->
sid
);
SChildTableObj
*
pTable
=
mgmtGetTableByPos
(
pCfg
->
dnode
,
pCfg
->
vnode
,
pCfg
->
sid
);
SChildTableObj
*
pTable
=
mgmtGetTableByPos
(
pCfg
->
vnode
,
pCfg
->
sid
);
if
(
pTable
==
NULL
)
{
mError
(
"dnode:%s, vnode:%d, sid:%d, table not found"
,
taosIpStr
(
pCfg
->
dnode
),
pCfg
->
vnode
,
pCfg
->
sid
);
mgmtSendSimpleResp
(
rpcMsg
->
handle
,
TSDB_CODE_NOT_ACTIVE_TABLE
);
...
...
@@ -1798,6 +1800,7 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) {
mgmtDecTableRef
(
pTable
);
return
;
}
SDnodeObj
*
pDnode
=
mgmtGetDnode
(
pCfg
->
dnode
);
SRpcIpSet
ipSet
=
mgmtGetIpSetFromIp
(
pDnode
->
dnodeEp
);
SRpcMsg
rpcRsp
=
{
...
...
@@ -1808,7 +1811,9 @@ static void mgmtProcessTableCfgMsg(SRpcMsg *rpcMsg) {
.
msgType
=
TSDB_MSG_TYPE_MD_CREATE_TABLE
};
mgmtSendMsgToDnode
(
&
ipSet
,
&
rpcRsp
);
mgmtDecTableRef
(
pTable
);
mgmtDecDnodeRef
(
pDnode
);
}
// handle drop child response
...
...
@@ -1829,8 +1834,8 @@ static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
return
;
}
SVgObj
*
pVgroup
=
queueMsg
->
pVgroup
=
mgmtGetVgroup
(
pTable
->
vgId
);
if
(
pVgroup
==
NULL
)
{
if
(
queueMsg
->
pVgroup
==
NULL
)
queueMsg
->
pVgroup
=
mgmtGetVgroup
(
pTable
->
vgId
);
if
(
queueMsg
->
pVgroup
==
NULL
)
{
mError
(
"table:%s, failed to get vgroup"
,
pTable
->
info
.
tableId
);
mgmtSendSimpleResp
(
queueMsg
->
thandle
,
TSDB_CODE_INVALID_VGROUP_ID
);
return
;
...
...
@@ -1849,9 +1854,9 @@ static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
return
;
}
if
(
pVgroup
->
numOfTables
<=
0
)
{
mPrint
(
"vg
roup:%d, all tables is dropped, drop vgroup"
,
pVgroup
->
vgId
);
mgmtDropVgroup
(
pVgroup
,
NULL
);
if
(
queueMsg
->
pVgroup
->
numOfTables
<=
0
)
{
mPrint
(
"vg
Id:%d, all tables is dropped, drop vgroup"
,
queueMsg
->
pVgroup
->
vgId
);
mgmtDropVgroup
(
queueMsg
->
pVgroup
,
NULL
);
}
mgmtSendSimpleResp
(
queueMsg
->
thandle
,
TSDB_CODE_SUCCESS
);
...
...
@@ -1928,8 +1933,8 @@ static void mgmtProcessMultiTableMetaMsg(SQueuedMsg *pMsg) {
SChildTableObj
*
pTable
=
mgmtGetChildTable
(
tableId
);
if
(
pTable
==
NULL
)
continue
;
SDbObj
*
pDb
=
mgmtGetDbByTableId
(
tableId
);
if
(
pDb
==
NULL
)
continue
;
if
(
pMsg
->
pDb
==
NULL
)
pMsg
->
pDb
=
mgmtGetDbByTableId
(
tableId
);
if
(
p
Msg
->
p
Db
==
NULL
)
continue
;
int
availLen
=
totalMallocLen
-
pMultiMeta
->
contLen
;
if
(
availLen
<=
sizeof
(
STableMetaMsg
)
+
sizeof
(
SSchema
)
*
TSDB_MAX_COLUMNS
)
{
...
...
@@ -2028,7 +2033,8 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows,
int32_t
prefixLen
=
strlen
(
prefix
);
while
(
numOfRows
<
rows
)
{
pShow
->
pNode
=
sdbFetchRow
(
tsChildTableSdb
,
pShow
->
pNode
,
(
void
**
)
&
pTable
);
mgmtDecTableRef
(
pTable
);
pShow
->
pNode
=
mgmtGetNextChildTable
(
pShow
->
pNode
,
&
pTable
);
if
(
pTable
==
NULL
)
break
;
// not belong to current db
...
...
@@ -2072,7 +2078,6 @@ static int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows,
cols
++
;
numOfRows
++
;
mgmtDecTableRef
(
pTable
);
}
pShow
->
numOfReads
+=
numOfRows
;
...
...
@@ -2088,7 +2093,7 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) {
SCMAlterTableMsg
*
pAlter
=
pMsg
->
pCont
;
mTrace
(
"table:%s, alter table msg is received from thandle:%p"
,
pAlter
->
tableId
,
pMsg
->
thandle
);
pMsg
->
pDb
=
mgmtGetDbByTableId
(
pAlter
->
tableId
);
if
(
pMsg
->
pDb
==
NULL
)
pMsg
->
pDb
=
mgmtGetDbByTableId
(
pAlter
->
tableId
);
if
(
pMsg
->
pDb
==
NULL
||
pMsg
->
pDb
->
status
!=
TSDB_DB_STATUS_READY
)
{
mError
(
"table:%s, failed to alter table, db not selected"
,
pAlter
->
tableId
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_DB_NOT_SELECTED
);
...
...
@@ -2101,7 +2106,7 @@ static void mgmtProcessAlterTableMsg(SQueuedMsg *pMsg) {
return
;
}
pMsg
->
pTable
=
mgmtGetTable
(
pAlter
->
tableId
);
if
(
pMsg
->
pTable
==
NULL
)
pMsg
->
pTable
=
mgmtGetTable
(
pAlter
->
tableId
);
if
(
pMsg
->
pTable
==
NULL
)
{
mError
(
"table:%s, failed to alter table, table not exist"
,
pMsg
->
pTable
->
tableId
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_INVALID_TABLE
);
...
...
src/mnode/src/mgmtUser.c
浏览文件 @
59a39792
...
...
@@ -168,8 +168,7 @@ static int32_t mgmtUpdateUser(SUserObj *pUser) {
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsUserSdb
,
.
pObj
=
pUser
,
.
rowSize
=
tsUserUpdateSize
.
pObj
=
pUser
};
int32_t
code
=
sdbUpdateRow
(
&
oper
);
...
...
@@ -249,7 +248,7 @@ static int32_t mgmtDropUser(SUserObj *pUser) {
}
static
int32_t
mgmtGetUserMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
)
{
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
,
NULL
);
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pConn
);
if
(
pUser
==
NULL
)
{
return
TSDB_CODE_NO_USER_FROM_CONN
;
}
...
...
@@ -298,7 +297,7 @@ static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void
char
*
pWrite
;
while
(
numOfRows
<
rows
)
{
pShow
->
pNode
=
sdbFetchRow
(
tsUserSdb
,
pShow
->
pNode
,
(
void
**
)
&
pUser
);
pShow
->
pNode
=
mgmtGetNextUser
(
pShow
->
pNode
,
&
pUser
);
if
(
pUser
==
NULL
)
break
;
cols
=
0
;
...
...
@@ -329,12 +328,9 @@ static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void
return
numOfRows
;
}
SUserObj
*
mgmtGetUserFromConn
(
void
*
pConn
,
bool
*
usePublicIp
)
{
SUserObj
*
mgmtGetUserFromConn
(
void
*
pConn
)
{
SRpcConnInfo
connInfo
;
if
(
rpcGetConnInfo
(
pConn
,
&
connInfo
)
==
0
)
{
if
(
usePublicIp
)
{
*
usePublicIp
=
(
connInfo
.
serverIp
==
tsPublicIpInt
);
}
return
mgmtGetUser
(
connInfo
.
user
);
}
else
{
mError
(
"can not get user from conn:%p"
,
pConn
);
...
...
@@ -510,7 +506,7 @@ void mgmtDropAllUsers(SAcctObj *pAcct) {
while
(
1
)
{
pLastNode
=
pNode
;
pNode
=
sdbFetchRow
(
tsUserSdb
,
pNode
,
(
void
**
)
&
pUser
);
pNode
=
mgmtGetNextUser
(
pNode
,
&
pUser
);
if
(
pUser
==
NULL
)
break
;
if
(
strncmp
(
pUser
->
acct
,
pAcct
->
user
,
acctNameLen
)
==
0
)
{
...
...
src/mnode/src/mgmtVgroup.c
浏览文件 @
59a39792
...
...
@@ -36,8 +36,8 @@
#include "mgmtTable.h"
#include "mgmtVgroup.h"
void
*
tsVgroupSdb
=
NULL
;
int32_t
tsVgUpdateSize
=
0
;
static
void
*
tsVgroupSdb
=
NULL
;
static
int32_t
tsVgUpdateSize
=
0
;
static
int32_t
mgmtGetVgroupMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveVgroups
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
...
...
@@ -62,6 +62,8 @@ static int32_t mgmtVgroupActionDestroy(SSdbOper *pOper) {
static
int32_t
mgmtVgroupActionInsert
(
SSdbOper
*
pOper
)
{
SVgObj
*
pVgroup
=
pOper
->
pObj
;
// refer to db
SDbObj
*
pDb
=
mgmtGetDb
(
pVgroup
->
dbName
);
if
(
pDb
==
NULL
)
{
return
TSDB_CODE_INVALID_DB
;
...
...
@@ -74,13 +76,13 @@ static int32_t mgmtVgroupActionInsert(SSdbOper *pOper) {
int32_t
size
=
sizeof
(
SChildTableObj
*
)
*
pDb
->
cfg
.
maxTables
;
pVgroup
->
tableList
=
calloc
(
pDb
->
cfg
.
maxTables
,
sizeof
(
SChildTableObj
*
));
if
(
pVgroup
->
tableList
==
NULL
)
{
mError
(
"vg
roup
:%d, failed to malloc(size:%d) for the tableList of vgroups"
,
pVgroup
->
vgId
,
size
);
mError
(
"vg
Id
:%d, failed to malloc(size:%d) for the tableList of vgroups"
,
pVgroup
->
vgId
,
size
);
return
-
1
;
}
pVgroup
->
idPool
=
taosInitIdPool
(
pDb
->
cfg
.
maxTables
);
if
(
pVgroup
->
idPool
==
NULL
)
{
mError
(
"vg
roup
:%d, failed to taosInitIdPool for vgroups"
,
pVgroup
->
vgId
);
mError
(
"vg
Id
:%d, failed to taosInitIdPool for vgroups"
,
pVgroup
->
vgId
);
tfree
(
pVgroup
->
tableList
);
return
-
1
;
}
...
...
@@ -101,7 +103,7 @@ static int32_t mgmtVgroupActionInsert(SSdbOper *pOper) {
static
int32_t
mgmtVgroupActionDelete
(
SSdbOper
*
pOper
)
{
SVgObj
*
pVgroup
=
pOper
->
pObj
;
if
(
pVgroup
->
pDb
!=
NULL
)
{
mgmtRemoveVgroupFromDb
(
pVgroup
);
}
...
...
@@ -140,6 +142,7 @@ static int32_t mgmtVgroupActionUpdate(SSdbOper *pOper) {
if
(
pDnode
!=
NULL
)
{
atomic_add_fetch_32
(
&
pDnode
->
openVnodes
,
1
);
}
mgmtDecDnodeRef
(
pDnode
);
}
}
...
...
@@ -147,14 +150,15 @@ static int32_t mgmtVgroupActionUpdate(SSdbOper *pOper) {
SDbObj
*
pDb
=
pVgroup
->
pDb
;
if
(
pDb
!=
NULL
)
{
if
(
pDb
->
cfg
.
maxTables
!=
oldTables
)
{
mPrint
(
"vg
roup
:%d tables change from %d to %d"
,
pVgroup
->
vgId
,
oldTables
,
pDb
->
cfg
.
maxTables
);
mPrint
(
"vg
Id
:%d tables change from %d to %d"
,
pVgroup
->
vgId
,
oldTables
,
pDb
->
cfg
.
maxTables
);
taosUpdateIdPool
(
pVgroup
->
idPool
,
pDb
->
cfg
.
maxTables
);
int32_t
size
=
sizeof
(
SChildTableObj
*
)
*
pDb
->
cfg
.
maxTables
;
pVgroup
->
tableList
=
(
SChildTableObj
**
)
realloc
(
pVgroup
->
tableList
,
size
);
}
}
mTrace
(
"vgroup:%d, is updated, tables:%d numOfVnode:%d"
,
pVgroup
->
vgId
,
pDb
->
cfg
.
maxTables
,
pVgroup
->
numOfVnodes
);
mgmtDecVgroupRef
(
pVgroup
);
mTrace
(
"vgId:%d, is updated, tables:%d numOfVnode:%d"
,
pVgroup
->
vgId
,
pDb
->
cfg
.
maxTables
,
pVgroup
->
numOfVnodes
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -237,8 +241,7 @@ void mgmtUpdateVgroup(SVgObj *pVgroup) {
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
.
table
=
tsVgroupSdb
,
.
pObj
=
pVgroup
,
.
rowSize
=
tsVgUpdateSize
.
pObj
=
pVgroup
};
sdbUpdateRow
(
&
oper
);
...
...
@@ -261,7 +264,7 @@ void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVlo
if
(
!
dnodeExist
)
{
SRpcIpSet
ipSet
=
mgmtGetIpSetFromIp
(
pDnode
->
dnodeEp
);
mError
(
"vg
roup
:%d, dnode:%d not exist in mnode, drop it"
,
pVload
->
vgId
,
pDnode
->
dnodeId
);
mError
(
"vg
Id
:%d, dnode:%d not exist in mnode, drop it"
,
pVload
->
vgId
,
pDnode
->
dnodeId
);
mgmtSendDropVnodeMsg
(
pVload
->
vgId
,
&
ipSet
,
NULL
);
return
;
}
...
...
@@ -273,7 +276,7 @@ void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVlo
}
if
(
pVload
->
cfgVersion
!=
pVgroup
->
pDb
->
cfgVersion
||
pVload
->
replica
!=
pVgroup
->
numOfVnodes
)
{
mError
(
"dnode:%d, vg
roup
:%d, vnode cfgVersion:%d repica:%d not match with mgmt cfgVersion:%d replica:%d"
,
mError
(
"dnode:%d, vg
Id
:%d, vnode cfgVersion:%d repica:%d not match with mgmt cfgVersion:%d replica:%d"
,
pDnode
->
dnodeId
,
pVload
->
vgId
,
pVload
->
cfgVersion
,
pVload
->
replica
,
pVgroup
->
pDb
->
cfgVersion
,
pVgroup
->
numOfVnodes
);
mgmtSendCreateVgroupMsg
(
pVgroup
,
NULL
);
...
...
@@ -317,9 +320,9 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) {
return
;
}
mPrint
(
"vg
roup
:%d, is created in mnode, db:%s replica:%d"
,
pVgroup
->
vgId
,
pDb
->
name
,
pVgroup
->
numOfVnodes
);
mPrint
(
"vg
Id
:%d, is created in mnode, db:%s replica:%d"
,
pVgroup
->
vgId
,
pDb
->
name
,
pVgroup
->
numOfVnodes
);
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
mPrint
(
"vg
roup
:%d, index:%d, dnode:%d"
,
pVgroup
->
vgId
,
i
,
pVgroup
->
vnodeGid
[
i
].
dnodeId
);
mPrint
(
"vg
Id
:%d, index:%d, dnode:%d"
,
pVgroup
->
vgId
,
i
,
pVgroup
->
vnodeGid
[
i
].
dnodeId
);
}
pMsg
->
ahandle
=
pVgroup
;
...
...
@@ -331,7 +334,7 @@ void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle) {
if
(
ahandle
!=
NULL
)
{
mgmtSendDropVgroupMsg
(
pVgroup
,
ahandle
);
}
else
{
mTrace
(
"vg
roup
:%d, replica:%d is deleting from sdb"
,
pVgroup
->
vgId
,
pVgroup
->
numOfVnodes
);
mTrace
(
"vg
Id
:%d, replica:%d is deleting from sdb"
,
pVgroup
->
vgId
,
pVgroup
->
numOfVnodes
);
mgmtSendDropVgroupMsg
(
pVgroup
,
NULL
);
SSdbOper
oper
=
{
.
type
=
SDB_OPER_GLOBAL
,
...
...
@@ -379,6 +382,7 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
if
(
pShow
->
payloadLen
>
0
)
{
pTable
=
mgmtGetTable
(
pShow
->
payload
);
if
(
NULL
==
pTable
||
pTable
->
type
==
TSDB_SUPER_TABLE
)
{
mgmtDecTableRef
(
pTable
);
return
TSDB_CODE_INVALID_TABLE_ID
;
}
mgmtDecTableRef
(
pTable
);
...
...
@@ -505,25 +509,28 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
}
void
mgmtAddTableIntoVgroup
(
SVgObj
*
pVgroup
,
SChildTableObj
*
pTable
)
{
if
(
pTable
->
sid
>=
0
&&
pVgroup
->
tableList
[
pTable
->
sid
]
==
NULL
)
{
pVgroup
->
tableList
[
pTable
->
sid
]
=
pTable
;
if
(
pTable
->
sid
>=
1
&&
pVgroup
->
tableList
[
pTable
->
sid
-
1
]
==
NULL
)
{
pVgroup
->
tableList
[
pTable
->
sid
-
1
]
=
pTable
;
taosIdPoolMarkStatus
(
pVgroup
->
idPool
,
pTable
->
sid
);
pVgroup
->
numOfTables
++
;
}
if
(
pVgroup
->
numOfTables
>=
pVgroup
->
pDb
->
cfg
.
maxTables
)
mgmtAddVgroupIntoDbTail
(
pVgroup
);
if
(
pVgroup
->
numOfTables
>=
pVgroup
->
pDb
->
cfg
.
maxTables
)
{
mgmtMoveVgroupToTail
(
pVgroup
);
}
mgmtIncVgroupRef
(
pVgroup
);
}
void
mgmtRemoveTableFromVgroup
(
SVgObj
*
pVgroup
,
SChildTableObj
*
pTable
)
{
if
(
pTable
->
sid
>=
0
&&
pVgroup
->
tableList
[
pTable
->
sid
]
!=
NULL
)
{
pVgroup
->
tableList
[
pTable
->
sid
]
=
NULL
;
if
(
pTable
->
sid
>=
1
&&
pVgroup
->
tableList
[
pTable
->
sid
-
1
]
!=
NULL
)
{
pVgroup
->
tableList
[
pTable
->
sid
-
1
]
=
NULL
;
taosFreeId
(
pVgroup
->
idPool
,
pTable
->
sid
);
pVgroup
->
numOfTables
--
;
}
if
(
pVgroup
->
numOfTables
>=
pVgroup
->
pDb
->
cfg
.
maxTables
)
mgmtAddVgroupIntoDbTail
(
pVgroup
);
mgmtMoveVgroupToHead
(
pVgroup
);
mgmtDecVgroupRef
(
pVgroup
);
}
SMDCreateVnodeMsg
*
mgmtBuildCreateVnodeMsg
(
SVgObj
*
pVgroup
)
{
...
...
@@ -588,7 +595,7 @@ SRpcIpSet mgmtGetIpSetFromIp(char *ep) {
}
void
mgmtSendCreateVnodeMsg
(
SVgObj
*
pVgroup
,
SRpcIpSet
*
ipSet
,
void
*
ahandle
)
{
mTrace
(
"vg
roup
:%d, send create vnode:%d msg, ahandle:%p"
,
pVgroup
->
vgId
,
pVgroup
->
vgId
,
ahandle
);
mTrace
(
"vg
Id
:%d, send create vnode:%d msg, ahandle:%p"
,
pVgroup
->
vgId
,
pVgroup
->
vgId
,
ahandle
);
SMDCreateVnodeMsg
*
pCreate
=
mgmtBuildCreateVnodeMsg
(
pVgroup
);
SRpcMsg
rpcMsg
=
{
.
handle
=
ahandle
,
...
...
@@ -601,7 +608,7 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) {
}
void
mgmtSendCreateVgroupMsg
(
SVgObj
*
pVgroup
,
void
*
ahandle
)
{
mTrace
(
"vg
roup
:%d, send create all vnodes msg, ahandle:%p"
,
pVgroup
->
vgId
,
ahandle
);
mTrace
(
"vg
Id
:%d, send create all vnodes msg, ahandle:%p"
,
pVgroup
->
vgId
,
ahandle
);
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
SRpcIpSet
ipSet
=
mgmtGetIpSetFromIp
(
pVgroup
->
vnodeGid
[
i
].
pDnode
->
dnodeEp
);
mgmtSendCreateVnodeMsg
(
pVgroup
,
&
ipSet
,
ahandle
);
...
...
@@ -619,7 +626,7 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
}
SVgObj
*
pVgroup
=
queueMsg
->
ahandle
;
mTrace
(
"vg
roup
:%d, create vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p"
,
mTrace
(
"vg
Id
:%d, create vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p"
,
pVgroup
->
vgId
,
tstrerror
(
rpcMsg
->
code
),
queueMsg
->
received
,
queueMsg
->
successed
,
queueMsg
->
expected
,
queueMsg
->
thandle
,
rpcMsg
->
handle
);
...
...
@@ -654,7 +661,7 @@ static SMDDropVnodeMsg *mgmtBuildDropVnodeMsg(int32_t vgId) {
}
void
mgmtSendDropVnodeMsg
(
int32_t
vgId
,
SRpcIpSet
*
ipSet
,
void
*
ahandle
)
{
mTrace
(
"vg
roup
:%d, send drop vnode msg, ahandle:%p"
,
vgId
,
ahandle
);
mTrace
(
"vg
Id
:%d, send drop vnode msg, ahandle:%p"
,
vgId
,
ahandle
);
SMDDropVnodeMsg
*
pDrop
=
mgmtBuildDropVnodeMsg
(
vgId
);
SRpcMsg
rpcMsg
=
{
.
handle
=
ahandle
,
...
...
@@ -667,7 +674,7 @@ void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) {
}
static
void
mgmtSendDropVgroupMsg
(
SVgObj
*
pVgroup
,
void
*
ahandle
)
{
mTrace
(
"vg
roup
:%d, send drop all vnodes msg, ahandle:%p"
,
pVgroup
->
vgId
,
ahandle
);
mTrace
(
"vg
Id
:%d, send drop all vnodes msg, ahandle:%p"
,
pVgroup
->
vgId
,
ahandle
);
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
SRpcIpSet
ipSet
=
mgmtGetIpSetFromIp
(
pVgroup
->
vnodeGid
[
i
].
pDnode
->
dnodeEp
);
mgmtSendDropVnodeMsg
(
pVgroup
->
vgId
,
&
ipSet
,
ahandle
);
...
...
@@ -675,7 +682,7 @@ static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
}
static
void
mgmtProcessDropVnodeRsp
(
SRpcMsg
*
rpcMsg
)
{
mTrace
(
"drop vnode rsp is received
"
);
mTrace
(
"drop vnode rsp is received
, handle:%p"
,
rpcMsg
->
handle
);
if
(
rpcMsg
->
handle
==
NULL
)
return
;
SQueuedMsg
*
queueMsg
=
rpcMsg
->
handle
;
...
...
@@ -686,7 +693,7 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
}
SVgObj
*
pVgroup
=
queueMsg
->
ahandle
;
mTrace
(
"vg
roup
:%d, drop vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p"
,
mTrace
(
"vg
Id
:%d, drop vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p"
,
pVgroup
->
vgId
,
tstrerror
(
rpcMsg
->
code
),
queueMsg
->
received
,
queueMsg
->
successed
,
queueMsg
->
expected
,
queueMsg
->
thandle
,
rpcMsg
->
handle
);
...
...
@@ -736,7 +743,33 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) {
mgmtSendCreateVnodeMsg
(
pVgroup
,
&
ipSet
,
NULL
);
}
void
mgmtDropAllVgroups
(
SDbObj
*
pDropDb
)
{
void
mgmtDropAllDnodeVgroups
(
SDnodeObj
*
pDropDnode
)
{
void
*
pNode
=
NULL
;
void
*
pLastNode
=
NULL
;
SVgObj
*
pVgroup
=
NULL
;
int32_t
numOfVgroups
=
0
;
while
(
1
)
{
pLastNode
=
pNode
;
pNode
=
mgmtGetNextVgroup
(
pNode
,
&
pVgroup
);
if
(
pVgroup
==
NULL
)
break
;
if
(
pVgroup
->
vnodeGid
[
0
].
dnodeId
==
pDropDnode
->
dnodeId
)
{
SSdbOper
oper
=
{
.
type
=
SDB_OPER_LOCAL
,
.
table
=
tsVgroupSdb
,
.
pObj
=
pVgroup
,
};
sdbDeleteRow
(
&
oper
);
pNode
=
pLastNode
;
numOfVgroups
++
;
continue
;
}
mgmtDecVgroupRef
(
pVgroup
);
}
}
void
mgmtDropAllDbVgroups
(
SDbObj
*
pDropDb
)
{
void
*
pNode
=
NULL
;
void
*
pLastNode
=
NULL
;
int32_t
numOfVgroups
=
0
;
...
...
@@ -744,7 +777,8 @@ void mgmtDropAllVgroups(SDbObj *pDropDb) {
mPrint
(
"db:%s, all vgroups will be dropped from sdb"
,
pDropDb
->
name
);
while
(
1
)
{
pNode
=
sdbFetchRow
(
tsVgroupSdb
,
pNode
,
(
void
**
)
&
pVgroup
);
pLastNode
=
pNode
;
pNode
=
mgmtGetNextVgroup
(
pNode
,
&
pVgroup
);
if
(
pVgroup
==
NULL
)
break
;
if
(
pVgroup
->
pDb
==
pDropDb
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录