Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
66e5cd34
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
66e5cd34
编写于
3月 12, 2020
作者:
S
slguan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
modify the code for synchronously creating tables
上级
c1d33817
变更
18
隐藏空白更改
内联
并排
Showing
18 changed file
with
557 addition
and
657 deletion
+557
-657
src/dnode/src/dnodeMClient.c
src/dnode/src/dnodeMClient.c
+0
-1
src/dnode/src/dnodeMgmt.c
src/dnode/src/dnodeMgmt.c
+12
-15
src/inc/mnode.h
src/inc/mnode.h
+20
-14
src/inc/taosmsg.h
src/inc/taosmsg.h
+1
-0
src/mnode/inc/mgmtProfile.h
src/mnode/inc/mgmtProfile.h
+0
-16
src/mnode/inc/mgmtShell.h
src/mnode/inc/mgmtShell.h
+4
-1
src/mnode/inc/mgmtTable.h
src/mnode/inc/mgmtTable.h
+0
-5
src/mnode/inc/mgmtVgroup.h
src/mnode/inc/mgmtVgroup.h
+1
-1
src/mnode/src/mgmtBalance.c
src/mnode/src/mgmtBalance.c
+2
-2
src/mnode/src/mgmtDClient.c
src/mnode/src/mgmtDClient.c
+0
-54
src/mnode/src/mgmtDServer.c
src/mnode/src/mgmtDServer.c
+0
-16
src/mnode/src/mgmtDb.c
src/mnode/src/mgmtDb.c
+40
-58
src/mnode/src/mgmtDnode.c
src/mnode/src/mgmtDnode.c
+8
-15
src/mnode/src/mgmtProfile.c
src/mnode/src/mgmtProfile.c
+20
-18
src/mnode/src/mgmtShell.c
src/mnode/src/mgmtShell.c
+141
-101
src/mnode/src/mgmtTable.c
src/mnode/src/mgmtTable.c
+182
-238
src/mnode/src/mgmtUser.c
src/mnode/src/mgmtUser.c
+41
-61
src/mnode/src/mgmtVgroup.c
src/mnode/src/mgmtVgroup.c
+85
-41
未找到文件。
src/dnode/src/dnodeMClient.c
浏览文件 @
66e5cd34
...
...
@@ -57,7 +57,6 @@ void dnodeCleanupMClient() {
}
static
void
dnodeProcessRspFromMnode
(
SRpcMsg
*
pMsg
)
{
if
(
dnodeProcessMgmtRspFp
[
pMsg
->
msgType
])
{
(
*
dnodeProcessMgmtRspFp
[
pMsg
->
msgType
])(
pMsg
);
}
else
{
...
...
src/dnode/src/dnodeMgmt.c
浏览文件 @
66e5cd34
...
...
@@ -98,14 +98,13 @@ void dnodeMgmt(SRpcMsg *pMsg) {
if
(
dnodeProcessMgmtMsgFp
[
pMsg
->
msgType
])
{
(
*
dnodeProcessMgmtMsgFp
[
pMsg
->
msgType
])(
pMsg
);
}
else
{
terrno
=
TSDB_CODE_MSG_NOT_PROCESSED
;
SRpcMsg
rsp
;
rsp
.
handle
=
pMsg
->
handle
;
rsp
.
code
=
TSDB_CODE_MSG_NOT_PROCESSED
;
rsp
.
pCont
=
NULL
;
rpcSendResponse
(
&
rsp
);
}
SRpcMsg
rsp
;
rsp
.
handle
=
pMsg
->
handle
;
rsp
.
code
=
terrno
;
rsp
.
pCont
=
NULL
;
rpcSendResponse
(
&
rsp
);
rpcFreeCont
(
pMsg
->
pCont
);
// free the received message
}
...
...
@@ -275,15 +274,15 @@ static void dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
pCreate
->
cfg
.
maxSessions
=
htonl
(
pCreate
->
cfg
.
maxSessions
);
pCreate
->
cfg
.
daysPerFile
=
htonl
(
pCreate
->
cfg
.
daysPerFile
);
SVnodeObj
*
pVnodeObj
=
(
SVnodeObj
*
)
taosGetIntHashData
(
tsDnodeVnodesHash
,
pCreate
->
cfg
.
vgId
);
if
(
pVnodeObj
!=
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_SUCCESS
;
}
else
{
rpcRsp
.
code
=
dnodeCreateVnode
(
pCreate
);
}
// SVnodeObj *pVnodeObj = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pCreate->cfg.vgId);
// if (pVnodeObj != NULL) {
// rpcRsp.code = TSDB_CODE_SUCCESS;
// } else {
// rpcRsp.code = dnodeCreateVnode(pCreate);
// }
rpcRsp
.
code
=
TSDB_CODE_SUCCESS
;
rpcSendResponse
(
&
rpcRsp
);
rpcFreeCont
(
rpcMsg
->
pCont
);
}
static
void
dnodeProcessDropVnodeMsg
(
SRpcMsg
*
rpcMsg
)
{
...
...
@@ -301,7 +300,6 @@ static void dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
}
rpcSendResponse
(
&
rpcRsp
);
rpcFreeCont
(
rpcMsg
->
pCont
);
}
static
void
dnodeProcessAlterVnodeMsg
(
SRpcMsg
*
rpcMsg
)
{
...
...
@@ -321,7 +319,6 @@ static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
}
rpcSendResponse
(
&
rpcRsp
);
rpcFreeCont
(
rpcMsg
->
pCont
);
}
static
void
dnodeProcessAlterStreamMsg
(
SRpcMsg
*
pMsg
)
{
...
...
src/inc/mnode.h
浏览文件 @
66e5cd34
...
...
@@ -39,12 +39,7 @@ extern "C" {
#include "ttimer.h"
#include "tutil.h"
// internal globals
extern
char
version
[];
extern
void
*
tsMgmtTmr
;
extern
char
tsMgmtDirectory
[];
typedef
struct
{
typedef
struct
{
uint32_t
privateIp
;
int32_t
sid
;
uint32_t
moduleStatus
;
...
...
@@ -87,11 +82,6 @@ typedef struct {
int32_t
vnode
;
}
SVnodeGid
;
typedef
struct
{
int32_t
sid
;
int32_t
vgId
;
// vnode group ID
}
STableGid
;
typedef
struct
{
char
tableId
[
TSDB_TABLE_ID_LEN
+
1
];
int8_t
type
;
...
...
@@ -248,16 +238,32 @@ typedef struct {
int16_t
offset
[
TSDB_MAX_COLUMNS
];
int16_t
bytes
[
TSDB_MAX_COLUMNS
];
void
*
signature
;
uint16_t
payloadLen
;
/* length of payload*/
char
payload
[];
/* payload for wildcard match in show tables */
uint16_t
payloadLen
;
char
payload
[];
}
SShowObj
;
//mgmtSystem
typedef
struct
{
uint8_t
msgType
;
int8_t
expected
;
int8_t
received
;
int8_t
successed
;
int32_t
contLen
;
int32_t
code
;
void
*
ahandle
;
void
*
thandle
;
void
*
pCont
;
SDbObj
*
pDb
;
SUserObj
*
pUser
;
}
SQueuedMsg
;
int32_t
mgmtInitSystem
();
int32_t
mgmtStartSystem
();
void
mgmtCleanUpSystem
();
void
mgmtStopSystem
();
extern
char
version
[];
extern
void
*
tsMgmtTmr
;
extern
char
tsMgmtDirectory
[];
#ifdef __cplusplus
}
...
...
src/inc/taosmsg.h
浏览文件 @
66e5cd34
...
...
@@ -238,6 +238,7 @@ typedef struct SSchema {
}
SSchema
;
typedef
struct
{
int32_t
vgId
;
int32_t
vnode
;
//the index of vnode
uint32_t
ip
;
}
SVnodeDesc
;
...
...
src/mnode/inc/mgmtProfile.h
浏览文件 @
66e5cd34
...
...
@@ -28,22 +28,6 @@ bool mgmtCheckQhandle(uint64_t qhandle);
void
mgmtSaveQhandle
(
void
*
qhandle
);
void
mgmtFreeQhandle
(
void
*
qhandle
);
enum
{
TSDB_PROCESS_CREATE_VGROUP
,
TSDB_PROCESS_CREATE_VGROUP_GET_META
,
TSDB_PROCESS_CREATE_TABLE
,
TSDB_PROCESS_CREATE_TABLE_GET_META
,
};
typedef
struct
{
void
*
thandle
;
// come from uplayer
void
*
ahandle
;
// object to process
void
*
cont
;
// additional information of object to process
int32_t
type
;
// the type of sync process
int32_t
received
;
// num of received, such as numOfVnodes
int32_t
contLen
;
// the length of additional information
}
SProcessInfo
;
#ifdef __cplusplus
}
#endif
...
...
src/mnode/inc/mgmtShell.h
浏览文件 @
66e5cd34
...
...
@@ -23,13 +23,16 @@ extern "C" {
int32_t
mgmtInitShell
();
void
mgmtCleanUpShell
();
void
mgmtAddShellMsgHandle
(
uint8_t
msgType
,
void
(
*
fp
)(
S
RpcMsg
*
rpc
Msg
));
void
mgmtAddShellMsgHandle
(
uint8_t
msgType
,
void
(
*
fp
)(
S
QueuedMsg
*
queued
Msg
));
typedef
int32_t
(
*
SShowMetaFp
)(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
typedef
int32_t
(
*
SShowRetrieveFp
)(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
void
mgmtAddShellShowMetaHandle
(
uint8_t
showType
,
SShowMetaFp
fp
);
void
mgmtAddShellShowRetrieveHandle
(
uint8_t
showType
,
SShowRetrieveFp
fp
);
void
mgmtAddToShellQueue
(
SQueuedMsg
*
queuedMsg
);
void
mgmtSendSimpleResp
(
void
*
thandle
,
int32_t
code
);
#ifdef __cplusplus
}
#endif
...
...
src/mnode/inc/mgmtTable.h
浏览文件 @
66e5cd34
...
...
@@ -33,7 +33,6 @@ STableInfo* mgmtGetTableByPos(uint32_t dnodeIp, int32_t vnode, int32_t sid);
int32_t
mgmtGetTableMeta
(
SDbObj
*
pDb
,
STableInfo
*
pTable
,
STableMeta
*
pMeta
,
bool
usePublicIp
);
int32_t
mgmtRetrieveMetricMeta
(
void
*
pConn
,
char
**
pStart
,
SSuperTableMetaMsg
*
pInfo
);
int32_t
mgmtCreateTable
(
SCMCreateTableMsg
*
pCreate
,
int32_t
contLen
,
void
*
thandle
,
bool
isGetMeta
);
int32_t
mgmtDropTable
(
SDbObj
*
pDb
,
char
*
tableId
,
int32_t
ignore
);
int32_t
mgmtAlterTable
(
SDbObj
*
pDb
,
SCMAlterTableMsg
*
pAlter
);
...
...
@@ -44,10 +43,6 @@ void mgmtSetTableDirty(STableInfo *pTable, bool isDirty);
SMDDropTableMsg
*
mgmtBuildRemoveTableMsg
(
STableInfo
*
pTable
);
SMDDropSTableMsg
*
mgmtBuildRemoveSuperTableMsg
(
STableInfo
*
pTable
);
void
mgmtProcessGetTableMeta
(
STableInfo
*
pTable
,
void
*
thandle
);
void
mgmtProcessCreateTable
(
SVgObj
*
pVgroup
,
SCMCreateTableMsg
*
pCreate
,
int32_t
contLen
,
void
*
thandle
,
bool
isGetMeta
);
void
mgmtProcessCreateVgroup
(
SCMCreateTableMsg
*
pCreate
,
int32_t
contLen
,
void
*
thandle
,
bool
isGetMeta
);
#ifdef __cplusplus
}
#endif
...
...
src/mnode/inc/mgmtVgroup.h
浏览文件 @
66e5cd34
...
...
@@ -29,7 +29,7 @@ void mgmtCleanUpVgroups();
SVgObj
*
mgmtGetVgroup
(
int32_t
vgId
);
SVgObj
*
mgmtGetVgroupByVnode
(
uint32_t
dnode
,
int32_t
vnode
);
SVgObj
*
mgmtCreateVgroup
(
SDbObj
*
pDb
);
void
mgmtCreateVgroup
(
SQueuedMsg
*
pMsg
);
int32_t
mgmtDropVgroup
(
SDbObj
*
pDb
,
SVgObj
*
pVgroup
);
void
mgmtUpdateVgroup
(
SVgObj
*
pVgroup
);
...
...
src/mnode/src/mgmtBalance.c
浏览文件 @
66e5cd34
...
...
@@ -56,10 +56,10 @@ int32_t mgmtAllocVnodes(SVgObj *pVgroup) {
}
if
(
selectedVnode
==
-
1
)
{
mError
(
"
vgroup:%d alloc vnode failed, free vnodes:%d"
,
pVgroup
->
vgId
,
pDnode
->
numOfFreeVnodes
);
mError
(
"
alloc vnode failed, free vnodes:%d"
,
pDnode
->
numOfFreeVnodes
);
return
-
1
;
}
else
{
mTrace
(
"
vgroup:%d allocate vnode:%d, last allocated vnode:%d"
,
pVgroup
->
vgId
,
selectedVnode
,
lastAllocVode
);
mTrace
(
"
allocate vnode:%d, last allocated vnode:%d"
,
selectedVnode
,
lastAllocVode
);
pVgroup
->
vnodeGid
[
0
].
vnode
=
selectedVnode
;
pDnode
->
lastAllocVnode
=
selectedVnode
+
1
;
if
(
pDnode
->
lastAllocVnode
>=
pDnode
->
numOfVnodes
)
pDnode
->
lastAllocVnode
=
0
;
...
...
src/mnode/src/mgmtDClient.c
浏览文件 @
66e5cd34
...
...
@@ -84,39 +84,6 @@ static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) {
rpcFreeCont
(
rpcMsg
->
pCont
);
}
//static void mgmtProcessCreateTableRsp(SRpcMsg *rpcMsg) {
// mTrace("create table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
// if (rpcMsg->handle == NULL) return;
//
// SProcessInfo *info = rpcMsg->handle;
// assert(info->type == TSDB_PROCESS_CREATE_TABLE || info->type == TSDB_PROCESS_CREATE_TABLE_GET_META);
//
// STableInfo *pTable = info->ahandle;
// if (rpcMsg->code != TSDB_CODE_SUCCESS) {
// mError("table:%s, failed to create in dnode, code:%d, set it dirty", pTable->tableId, rpcMsg->code);
// mgmtSetTableDirty(pTable, true);
// } else {
// mTrace("table:%s, created in dnode", pTable->tableId);
// mgmtSetTableDirty(pTable, false);
// }
//
// if (rpcMsg->code != TSDB_CODE_SUCCESS) {
// SRpcMsg rpcRsp = {.handle = info->thandle, .pCont = NULL, .contLen = 0, .code = rpcMsg->code, .msgType = 0};
// rpcSendResponse(&rpcMsg);
// } else {
// if (info->type == TSDB_PROCESS_CREATE_TABLE_GET_META) {
// mTrace("table:%s, start to process get meta", pTable->tableId);
// mgmtProcessGetTableMeta(pTable, rpcMsg->handle);
// } else {
// SRpcMsg rpcRsp = {.handle = info->thandle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
// rpcSendResponse(&rpcMsg);
// }
// }
//
// free(info);
//}
//
//static void mgmtProcessDropTableRsp(SRpcMsg *rpcMsg) {
// mTrace("drop table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
//}
...
...
@@ -125,27 +92,6 @@ static void mgmtProcessRspFromDnode(SRpcMsg *rpcMsg) {
// mTrace("alter table rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
//}
//
//static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
// mTrace("create vnode rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
// if (rpcMsg->handle == NULL) return;
//
// SProcessInfo *info = rpcMsg->handle;
// assert(info->type == TSDB_PROCESS_CREATE_VGROUP || info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META);
//
// info->received++;
// SVgObj *pVgroup = info->ahandle;
//
// bool isGetMeta = false;
// if (info->type == TSDB_PROCESS_CREATE_VGROUP_GET_META) {
// isGetMeta = true;
// }
//
// mTrace("vgroup:%d, received:%d numOfVnodes:%d", pVgroup->vgId, info->received, pVgroup->numOfVnodes);
// if (info->received == pVgroup->numOfVnodes) {
// mgmtProcessCreateTable(pVgroup, info->cont, info->contLen, info->thandle, isGetMeta);
// free(info);
// }
//}
//
//static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
// mTrace("drop vnode rsp received, handle:%p code:%d", rpcMsg->handle, rpcMsg->code);
...
...
src/mnode/src/mgmtDServer.c
浏览文件 @
66e5cd34
...
...
@@ -210,22 +210,6 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s
// }
//}
//
//void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
// mTrace("vgroup:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
// for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
// SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].ip);
// mgmtSendCreateVnodeMsg(pVgroup, pVgroup->vnodeGid[i].vnode, &ipSet, ahandle);
// }
//}
//
//void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, void *ahandle) {
// mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, vnode, ahandle);
// SMDCreateVnodeMsg *pVpeer = mgmtBuildCreateVnodeMsg(pVgroup, vnode);
// if (pVpeer != NULL) {
// mgmtSendMsgToDnode(ipSet, TSDB_MSG_TYPE_MD_CREATE_VNODE, pVpeer, sizeof(SMDCreateVnodeMsg), ahandle);
// }
//}
//
//void mgmtProcessMsgFromDnode(char msgType, void *pCont, int32_t contLen, void *pConn, int32_t code) {
// if (msgType < 0 || msgType >= TSDB_MSG_TYPE_MAX) {
// mError("invalid msg type:%d", msgType);
...
...
src/mnode/src/mgmtDb.c
浏览文件 @
66e5cd34
...
...
@@ -41,9 +41,9 @@ static int32_t mgmtDropDb(SDbObj *pDb);
static
int32_t
mgmtGetDbMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveDbs
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
void
mgmtProcessCreateDbMsg
(
S
RpcMsg
*
rpc
Msg
);
static
void
mgmtProcessAlterDbMsg
(
S
RpcMsg
*
rpc
Msg
);
static
void
mgmtProcessDropDbMsg
(
S
RpcMsg
*
rpc
Msg
);
static
void
mgmtProcessCreateDbMsg
(
S
QueuedMsg
*
p
Msg
);
static
void
mgmtProcessAlterDbMsg
(
S
QueuedMsg
*
p
Msg
);
static
void
mgmtProcessDropDbMsg
(
S
QueuedMsg
*
p
Msg
);
static
void
*
(
*
mgmtDbActionFp
[
SDB_MAX_ACTION_TYPES
])(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtDbActionInsert
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
...
...
@@ -383,6 +383,7 @@ static void mgmtDropDbFromSdb(SDbObj *pDb) {
}
static
int32_t
mgmtDropDb
(
SDbObj
*
pDb
)
{
if
(
pDb
->
dropStatus
==
TSDB_DB_STATUS_DROPPING
)
{
bool
finished
=
mgmtCheckDropDbFinished
(
pDb
);
if
(
!
finished
)
{
...
...
@@ -405,6 +406,7 @@ static int32_t mgmtDropDb(SDbObj *pDb) {
}
}
UNUSED_FUNC
static
int32_t
mgmtDropDbByName
(
SAcctObj
*
pAcct
,
char
*
name
,
short
ignoreNotExists
)
{
SDbObj
*
pDb
=
(
SDbObj
*
)
sdbGetRow
(
tsDbSdb
,
name
);
if
(
pDb
==
NULL
)
{
...
...
@@ -904,19 +906,10 @@ void mgmtRemoveTableFromDb(SDbObj *pDb) {
atomic_add_fetch_32
(
&
pDb
->
numOfTables
,
-
1
);
}
static
void
mgmtProcessCreateDbMsg
(
SRpcMsg
*
rpcMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
if
(
mgmtCheckRedirect
(
rpcMsg
->
handle
))
return
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
rpcMsg
->
handle
);
if
(
pUser
==
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_INVALID_USER
;
rpcSendResponse
(
&
rpcRsp
);
return
;
}
SCMCreateDbMsg
*
pCreate
=
(
SCMCreateDbMsg
*
)
rpcMsg
->
pCont
;
static
void
mgmtProcessCreateDbMsg
(
SQueuedMsg
*
pMsg
)
{
if
(
mgmtCheckRedirect
(
pMsg
->
thandle
))
return
;
SCMCreateDbMsg
*
pCreate
=
pMsg
->
pCont
;
pCreate
->
maxSessions
=
htonl
(
pCreate
->
maxSessions
);
pCreate
->
cacheBlockSize
=
htonl
(
pCreate
->
cacheBlockSize
);
pCreate
->
daysPerFile
=
htonl
(
pCreate
->
daysPerFile
);
...
...
@@ -928,69 +921,58 @@ static void mgmtProcessCreateDbMsg(SRpcMsg *rpcMsg) {
pCreate
->
rowsInFileBlock
=
htonl
(
pCreate
->
rowsInFileBlock
);
// pCreate->cacheNumOfBlocks = htonl(pCreate->cacheNumOfBlocks);
int32_t
code
;
if
(
mgmtCheckExpired
())
{
rpcRsp
.
code
=
TSDB_CODE_GRANT_EXPIRED
;
}
else
if
(
!
pUser
->
writeAuth
)
{
rpcRsp
.
code
=
TSDB_CODE_NO_RIGHTS
;
code
=
TSDB_CODE_GRANT_EXPIRED
;
}
else
if
(
!
p
Msg
->
p
User
->
writeAuth
)
{
code
=
TSDB_CODE_NO_RIGHTS
;
}
else
{
rpcRsp
.
code
=
mgmtCreateDb
(
pUser
->
pAcct
,
pCreate
);
if
(
rpcRsp
.
code
==
TSDB_CODE_SUCCESS
)
{
mLPrint
(
"DB:%s is created by %s"
,
pCreate
->
db
,
pUser
->
user
);
code
=
mgmtCreateDb
(
pMsg
->
pUser
->
pAcct
,
pCreate
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
mLPrint
(
"DB:%s is created by %s"
,
pCreate
->
db
,
p
Msg
->
p
User
->
user
);
}
}
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
code
);
}
static
void
mgmtProcessAlterDbMsg
(
SRpcMsg
*
rpcMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
if
(
mgmtCheckRedirect
(
rpcMsg
->
handle
))
return
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
rpcMsg
->
handle
);
if
(
pUser
==
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_INVALID_USER
;
rpcSendResponse
(
&
rpcRsp
);
return
;
}
static
void
mgmtProcessAlterDbMsg
(
SQueuedMsg
*
pMsg
)
{
if
(
mgmtCheckRedirect
(
pMsg
->
thandle
))
return
;
SCMAlterDbMsg
*
pAlter
=
(
SCMAlterDbMsg
*
)
rpc
Msg
->
pCont
;
SCMAlterDbMsg
*
pAlter
=
p
Msg
->
pCont
;
pAlter
->
daysPerFile
=
htonl
(
pAlter
->
daysPerFile
);
pAlter
->
daysToKeep
=
htonl
(
pAlter
->
daysToKeep
);
pAlter
->
maxSessions
=
htonl
(
pAlter
->
maxSessions
)
+
1
;
if
(
!
pUser
->
writeAuth
)
{
rpcRsp
.
code
=
TSDB_CODE_NO_RIGHTS
;
int32_t
code
;
if
(
!
pMsg
->
pUser
->
writeAuth
)
{
code
=
TSDB_CODE_NO_RIGHTS
;
}
else
{
rpcRsp
.
code
=
mgmtAlterDb
(
pUser
->
pAcct
,
pAlter
);
if
(
rpcRsp
.
code
==
TSDB_CODE_SUCCESS
)
{
mLPrint
(
"DB:%s is altered by %s"
,
pAlter
->
db
,
pUser
->
user
);
code
=
mgmtAlterDb
(
pMsg
->
pUser
->
pAcct
,
pAlter
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
mLPrint
(
"DB:%s is altered by %s"
,
pAlter
->
db
,
p
Msg
->
p
User
->
user
);
}
}
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
code
);
}
static
void
mgmtProcessDropDbMsg
(
SQueuedMsg
*
pMsg
)
{
if
(
mgmtCheckRedirect
(
pMsg
->
thandle
))
return
;
static
void
mgmtProcessDropDbMsg
(
SRpcMsg
*
rpcMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
if
(
mgmtCheckRedirect
(
rpcMsg
->
handle
))
return
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
rpcMsg
->
handle
);
if
(
pUser
==
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_INVALID_USER
;
rpcSendResponse
(
&
rpcRsp
);
return
;
}
if
(
pUser
->
superAuth
)
{
SCMDropDbMsg
*
pDrop
=
rpcMsg
->
pCont
;
rpcRsp
.
code
=
mgmtDropDbByName
(
pUser
->
pAcct
,
pDrop
->
db
,
pDrop
->
ignoreNotExists
);
if
(
rpcRsp
.
code
==
TSDB_CODE_SUCCESS
)
{
mLPrint
(
"DB:%s is dropped by %s"
,
pDrop
->
db
,
pUser
->
user
);
}
int32_t
code
;
if
(
pMsg
->
pUser
->
superAuth
)
{
code
=
TSDB_CODE_OPS_NOT_SUPPORT
;
//SCMDropDbMsg *pDrop = rpcMsg->pCont;
//rpcRsp.code = mgmtDropDbByName(pUser->pAcct, pDrop->db, pDrop->ignoreNotExists);
//if (rpcRsp.code == TSDB_CODE_SUCCESS) {
// mLPrint("DB:%s is dropped by %s", pDrop->db, pUser->user);
//}
}
else
{
rpcRsp
.
code
=
TSDB_CODE_NO_RIGHTS
;
code
=
TSDB_CODE_NO_RIGHTS
;
}
rpcSendResponse
(
&
rpcRsp
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
mgmtSendSimpleResp
(
pMsg
->
thandle
,
code
);
}
}
src/mnode/src/mgmtDnode.c
浏览文件 @
66e5cd34
...
...
@@ -43,7 +43,7 @@ static int32_t mgmtGetConfigMeta(STableMeta *pMeta, SShowObj *pShow, void *pConn
static
int32_t
mgmtRetrieveConfigs
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
int32_t
mgmtGetVnodeMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveVnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
void
mgmtProcessCfgDnodeMsg
(
S
RpcMsg
*
rpc
Msg
);
static
void
mgmtProcessCfgDnodeMsg
(
S
QueuedMsg
*
p
Msg
);
void
mgmtSetDnodeMaxVnodes
(
SDnodeObj
*
pDnode
)
{
int32_t
maxVnodes
=
pDnode
->
numOfCores
*
tsNumOfVnodesPerCore
;
...
...
@@ -93,7 +93,7 @@ void mgmtSetDnodeVgid(SVnodeGid vnodeGid[], int32_t numOfVnodes, int32_t vgId) {
memset
(
pVload
,
0
,
sizeof
(
SVnodeLoad
));
pVload
->
vnode
=
vnodeGid
[
i
].
vnode
;
pVload
->
vgId
=
vgId
;
mTrace
(
"dnode:%s, vnode:%d add to vgroup:%d"
,
taosIpStr
(
vnodeGid
[
i
].
i
p
),
vnodeGid
[
i
].
vnode
,
pVload
->
vgId
);
mTrace
(
"dnode:%s, vnode:%d add to vgroup:%d"
,
taosIpStr
(
pDnode
->
privateI
p
),
vnodeGid
[
i
].
vnode
,
pVload
->
vgId
);
mgmtCalcNumOfFreeVnodes
(
pDnode
);
}
else
{
mError
(
"dnode:%s, not in dnode DB!!!"
,
taosIpStr
(
vnodeGid
[
i
].
ip
));
...
...
@@ -527,21 +527,14 @@ bool mgmtCheckDnodeInOfflineState(SDnodeObj *pDnode) {
return
pDnode
->
status
==
TSDB_DN_STATUS_OFFLINE
;
}
void
mgmtProcessCfgDnodeMsg
(
S
RpcMsg
*
rpc
Msg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
if
(
mgmtCheckRedirect
(
rpcMsg
->
handle
))
return
;
void
mgmtProcessCfgDnodeMsg
(
S
QueuedMsg
*
p
Msg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
t
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
if
(
mgmtCheckRedirect
(
pMsg
->
t
handle
))
return
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
rpcMsg
->
handle
);
if
(
pUser
==
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_INVALID_USER
;
rpcSendResponse
(
&
rpcRsp
);
return
;
}
SCMCfgDnodeMsg
*
pCmCfgDnode
=
(
SCMCfgDnodeMsg
*
)
rpcMsg
->
pCont
;
SCMCfgDnodeMsg
*
pCmCfgDnode
=
pMsg
->
pCont
;
uint32_t
dnodeIp
=
inet_addr
(
pCmCfgDnode
->
ip
);
if
(
strcmp
(
pUser
->
pAcct
->
user
,
"root"
)
!=
0
)
{
if
(
strcmp
(
p
Msg
->
p
User
->
pAcct
->
user
,
"root"
)
!=
0
)
{
rpcRsp
.
code
=
TSDB_CODE_NO_RIGHTS
;
}
else
{
SRpcIpSet
ipSet
=
mgmtGetIpSetFromIp
(
dnodeIp
);
...
...
@@ -560,7 +553,7 @@ void mgmtProcessCfgDnodeMsg(SRpcMsg *rpcMsg) {
}
if
(
rpcRsp
.
code
==
TSDB_CODE_SUCCESS
)
{
mTrace
(
"dnode:%s is configured by %s"
,
pCmCfgDnode
->
ip
,
pUser
->
user
);
mTrace
(
"dnode:%s is configured by %s"
,
pCmCfgDnode
->
ip
,
p
Msg
->
p
User
->
user
);
}
rpcSendResponse
(
&
rpcRsp
);
...
...
src/mnode/src/mgmtProfile.c
浏览文件 @
66e5cd34
...
...
@@ -558,9 +558,11 @@ bool mgmtCheckQhandle(uint64_t qhandle) {
}
void
mgmtSaveQhandle
(
void
*
qhandle
)
{
mTrace
(
"qhandle:%p is allocated"
,
qhandle
);
}
void
mgmtFreeQhandle
(
void
*
qhandle
)
{
mTrace
(
"qhandle:%p is freed"
,
qhandle
);
}
int
mgmtGetConns
(
SShowObj
*
pShow
,
void
*
pConn
)
{
...
...
@@ -673,72 +675,72 @@ int32_t mgmtRetrieveConns(SShowObj *pShow, char *data, int32_t rows, void *pConn
return
numOfRows
;
}
void
mgmtProcessKillQueryMsg
(
S
RpcMsg
*
rpc
Msg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
if
(
mgmtCheckRedirect
(
rpcMsg
->
handle
))
return
;
void
mgmtProcessKillQueryMsg
(
S
QueuedMsg
*
p
Msg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
t
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
if
(
mgmtCheckRedirect
(
pMsg
->
t
handle
))
return
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
rpcMsg
->
handle
);
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pMsg
->
t
handle
);
if
(
pUser
==
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_INVALID_USER
;
rpcSendResponse
(
&
rpcRsp
);
return
;
}
SCMKillQueryMsg
*
pKill
=
(
SCMKillQueryMsg
*
)
rpc
Msg
->
pCont
;
SCMKillQueryMsg
*
pKill
=
p
Msg
->
pCont
;
int32_t
code
;
if
(
!
pUser
->
writeAuth
)
{
code
=
TSDB_CODE_NO_RIGHTS
;
}
else
{
code
=
mgmtKillQuery
(
pKill
->
queryId
,
rpcMsg
->
handle
);
code
=
mgmtKillQuery
(
pKill
->
queryId
,
pMsg
->
t
handle
);
}
rpcRsp
.
code
=
code
;
rpcSendResponse
(
&
rpcRsp
);
}
void
mgmtProcessKillStreamMsg
(
S
RpcMsg
*
rpc
Msg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
if
(
mgmtCheckRedirect
(
rpcMsg
->
handle
))
return
;
void
mgmtProcessKillStreamMsg
(
S
QueuedMsg
*
p
Msg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
t
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
if
(
mgmtCheckRedirect
(
pMsg
->
t
handle
))
return
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
rpcMsg
->
handle
);
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pMsg
->
t
handle
);
if
(
pUser
==
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_INVALID_USER
;
rpcSendResponse
(
&
rpcRsp
);
return
;
}
SCMKillStreamMsg
*
pKill
=
(
SCMKillStreamMsg
*
)
rpc
Msg
->
pCont
;
SCMKillStreamMsg
*
pKill
=
p
Msg
->
pCont
;
int32_t
code
;
if
(
!
pUser
->
writeAuth
)
{
code
=
TSDB_CODE_NO_RIGHTS
;
}
else
{
code
=
mgmtKillStream
(
pKill
->
queryId
,
rpcMsg
->
handle
);
code
=
mgmtKillStream
(
pKill
->
queryId
,
pMsg
->
t
handle
);
}
rpcRsp
.
code
=
code
;
rpcSendResponse
(
&
rpcRsp
);
}
void
mgmtProcessKillConnectionMsg
(
S
RpcMsg
*
rpc
Msg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
if
(
mgmtCheckRedirect
(
rpcMsg
->
handle
))
return
;
void
mgmtProcessKillConnectionMsg
(
S
QueuedMsg
*
p
Msg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
t
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
if
(
mgmtCheckRedirect
(
pMsg
->
t
handle
))
return
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
rpcMsg
->
handle
);
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pMsg
->
t
handle
);
if
(
pUser
==
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_INVALID_USER
;
rpcSendResponse
(
&
rpcRsp
);
return
;
}
SCMKillConnMsg
*
pKill
=
(
SCMKillConnMsg
*
)
rpc
Msg
->
pCont
;
SCMKillConnMsg
*
pKill
=
p
Msg
->
pCont
;
int32_t
code
;
if
(
!
pUser
->
writeAuth
)
{
code
=
TSDB_CODE_NO_RIGHTS
;
}
else
{
code
=
mgmtKillConnection
(
pKill
->
queryId
,
rpcMsg
->
handle
);
code
=
mgmtKillConnection
(
pKill
->
queryId
,
pMsg
->
t
handle
);
}
rpcRsp
.
code
=
code
;
...
...
src/mnode/src/mgmtShell.c
浏览文件 @
66e5cd34
...
...
@@ -41,24 +41,27 @@
typedef
int32_t
(
*
SShowMetaFp
)(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
typedef
int32_t
(
*
SShowRetrieveFp
)(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
void
mgmtProcessMsgFromShell
(
SRpcMsg
*
pMsg
);
static
void
mgmtProcessShowMsg
(
SRpcMsg
*
rpcMsg
);
static
void
mgmtProcessRetrieveMsg
(
SRpcMsg
*
rpcMsg
);
static
void
mgmtProcessUnSupportMsg
(
SRpcMsg
*
rpcMsg
);
static
int
mgmtShellRetriveAuth
(
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
static
bool
mgmtCheckMsgReadOnly
(
int8_t
type
,
void
*
pCont
);
static
void
mgmtProcessHeartBeatMsg
(
SRpcMsg
*
rpcMsg
);
static
void
mgmtProcessConnectMsg
(
SRpcMsg
*
rpcMsg
);
static
void
mgmtProcessMsgFromShell
(
SRpcMsg
*
pMsg
);
static
void
mgmtProcessUnSupportMsg
(
SRpcMsg
*
rpcMsg
);
static
void
mgmtProcessMsgWhileNotReady
(
SRpcMsg
*
rpcMsg
);
static
void
mgmtProcessShowMsg
(
SQueuedMsg
*
queuedMsg
);
static
void
mgmtProcessRetrieveMsg
(
SQueuedMsg
*
queuedMsg
);
static
void
mgmtProcessHeartBeatMsg
(
SQueuedMsg
*
queuedMsg
);
static
void
mgmtProcessConnectMsg
(
SQueuedMsg
*
queuedMsg
);
static
void
*
tsMgmtShellRpc
=
NULL
;
static
void
*
tsMgmtTranQhandle
=
NULL
;
static
void
(
*
tsMgmtProcessShellMsgFp
[
TSDB_MSG_TYPE_MAX
])(
S
Rpc
Msg
*
)
=
{
0
};
static
void
(
*
tsMgmtProcessShellMsgFp
[
TSDB_MSG_TYPE_MAX
])(
S
Queued
Msg
*
)
=
{
0
};
static
SShowMetaFp
tsMgmtShowMetaFp
[
TSDB_MGMT_TABLE_MAX
]
=
{
0
};
static
SShowRetrieveFp
tsMgmtShowRetrieveFp
[
TSDB_MGMT_TABLE_MAX
]
=
{
0
};
int32_t
mgmtInitShell
()
{
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_SHOW
,
mgmtProcessShowMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_RETRIEVE
,
mgmtProcessRetrieveMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_HEARTBEAT
,
mgmtProcessHeartBeatMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_CONNECT
,
mgmtProcessConnectMsg
);
tsMgmtTranQhandle
=
taosInitScheduler
(
tsMaxDnodes
+
tsMaxShellConns
,
1
,
"mnodeT"
);
...
...
@@ -84,9 +87,6 @@ int32_t mgmtInitShell() {
return
-
1
;
}
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_HEARTBEAT
,
mgmtProcessHeartBeatMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_CONNECT
,
mgmtProcessConnectMsg
);
mPrint
(
"server connection to shell is opened"
);
return
0
;
}
...
...
@@ -104,7 +104,7 @@ void mgmtCleanUpShell() {
}
}
void
mgmtAddShellMsgHandle
(
uint8_t
showType
,
void
(
*
fp
)(
S
RpcMsg
*
rpc
Msg
))
{
void
mgmtAddShellMsgHandle
(
uint8_t
showType
,
void
(
*
fp
)(
S
QueuedMsg
*
queued
Msg
))
{
tsMgmtProcessShellMsgFp
[
showType
]
=
fp
;
}
...
...
@@ -117,107 +117,118 @@ void mgmtAddShellShowRetrieveHandle(uint8_t msgType, SShowRetrieveFp fp) {
}
void
mgmtProcessTranRequest
(
SSchedMsg
*
sched
)
{
S
RpcMsg
*
rpc
Msg
=
sched
->
msg
;
(
*
tsMgmtProcessShellMsgFp
[
rpcMsg
->
msgType
])(
rpc
Msg
);
rpcFreeCont
(
rpc
Msg
->
pCont
);
free
(
rpc
Msg
);
S
QueuedMsg
*
queued
Msg
=
sched
->
msg
;
(
*
tsMgmtProcessShellMsgFp
[
queuedMsg
->
msgType
])(
queued
Msg
);
rpcFreeCont
(
queued
Msg
->
pCont
);
free
(
queued
Msg
);
}
void
mgmtAddToTranRequest
(
SRpcMsg
*
rpcMsg
)
{
SRpcMsg
*
queuedRpcMsg
=
malloc
(
sizeof
(
SRpcMsg
));
memcpy
(
queuedRpcMsg
,
rpcMsg
,
sizeof
(
SRpcMsg
));
void
mgmtAddToShellQueue
(
SQueuedMsg
*
queuedMsg
)
{
SSchedMsg
schedMsg
;
schedMsg
.
msg
=
queued
Rpc
Msg
;
schedMsg
.
msg
=
queuedMsg
;
schedMsg
.
fp
=
mgmtProcessTranRequest
;
taosScheduleTask
(
tsMgmtTranQhandle
,
&
schedMsg
);
}
static
void
mgmtProcessMsgFromShell
(
SRpcMsg
*
rpcMsg
)
{
if
(
sdbGetRunStatus
()
!=
SDB_STATUS_SERVING
)
{
mTrace
(
"shell msg is ignored since SDB is not ready"
);
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
TSDB_CODE_NOT_READY
,
.
msgType
=
0
};
rpcSendResponse
(
&
rpcRsp
);
mgmtProcessMsgWhileNotReady
(
rpcMsg
);
rpcFreeCont
(
rpcMsg
->
pCont
);
return
;
}
mTrace
(
"%s is received"
,
taosMsg
[
rpcMsg
->
msgType
]);
if
(
tsMgmtProcessShellMsgFp
[
rpcMsg
->
msgType
])
{
if
(
mgmtCheckMsgReadOnly
(
rpcMsg
->
msgType
,
rpcMsg
->
pCont
))
{
(
*
tsMgmtProcessShellMsgFp
[
rpcMsg
->
msgType
])(
rpcMsg
);
rpcFreeCont
(
rpcMsg
->
pCont
);
}
else
{
mgmtAddToTranRequest
(
rpcMsg
);
}
}
else
{
mError
(
"%s is not processed"
,
taosMsg
[
rpcMsg
->
msgType
]);
if
(
tsMgmtProcessShellMsgFp
[
rpcMsg
->
msgType
]
==
NULL
)
{
mgmtProcessUnSupportMsg
(
rpcMsg
);
rpcFreeCont
(
rpcMsg
->
pCont
);
return
;
}
}
static
void
mgmtProcessShowMsg
(
SRpcMsg
*
rpcMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
rpcMsg
->
handle
);
if
(
pUser
==
NULL
)
{
mError
(
"thandle:%p, failed to retrieve user info"
,
rpcMsg
->
handle
);
mgmtSendSimpleResp
(
rpcMsg
->
handle
,
TSDB_CODE_INVALID_USER
);
rpcFreeCont
(
rpcMsg
->
pCont
);
return
;
}
if
(
mgmtCheckMsgReadOnly
(
rpcMsg
->
msgType
,
rpcMsg
->
pCont
))
{
SQueuedMsg
queuedMsg
=
{
0
};
queuedMsg
.
thandle
=
rpcMsg
->
handle
;
queuedMsg
.
msgType
=
rpcMsg
->
msgType
;
queuedMsg
.
contLen
=
rpcMsg
->
contLen
;
queuedMsg
.
pCont
=
rpcMsg
->
pCont
;
queuedMsg
.
pUser
=
pUser
;
(
*
tsMgmtProcessShellMsgFp
[
rpcMsg
->
msgType
])(
&
queuedMsg
);
rpcFreeCont
(
rpcMsg
->
pCont
);
}
else
{
SQueuedMsg
*
queuedMsg
=
calloc
(
1
,
sizeof
(
SQueuedMsg
));
queuedMsg
->
thandle
=
rpcMsg
->
handle
;
queuedMsg
->
msgType
=
rpcMsg
->
msgType
;
queuedMsg
->
contLen
=
rpcMsg
->
contLen
;
queuedMsg
->
pCont
=
rpcMsg
->
pCont
;
queuedMsg
->
pUser
=
pUser
;
mgmtAddToShellQueue
(
queuedMsg
);
}
}
SCMShowMsg
*
pShowMsg
=
rpcMsg
->
pCont
;
static
void
mgmtProcessShowMsg
(
SQueuedMsg
*
pMsg
)
{
SCMShowMsg
*
pShowMsg
=
pMsg
->
pCont
;
if
(
pShowMsg
->
type
==
TSDB_MGMT_TABLE_DNODE
||
TSDB_MGMT_TABLE_GRANTS
||
TSDB_MGMT_TABLE_SCORES
)
{
if
(
mgmtCheckRedirect
(
rpcMsg
->
handle
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
mgmtCheckRedirect
(
pMsg
->
thandle
)
)
{
return
;
}
}
int32_t
size
=
sizeof
(
SCMShowRsp
)
+
sizeof
(
SSchema
)
*
TSDB_MAX_COLUMNS
+
TSDB_EXTRA_PAYLOAD_SIZE
;
if
(
pShowMsg
->
type
>=
TSDB_MGMT_TABLE_MAX
)
{
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_INVALID_MSG_TYPE
);
return
;
}
if
(
!
tsMgmtShowMetaFp
[
pShowMsg
->
type
])
{
mError
(
"show type:%d %s is not support"
,
pShowMsg
->
type
,
taosMsg
[
pShowMsg
->
type
]);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_OPS_NOT_SUPPORT
);
return
;
}
int32_t
size
=
sizeof
(
SCMShowRsp
)
+
sizeof
(
SSchema
)
*
TSDB_MAX_COLUMNS
+
TSDB_EXTRA_PAYLOAD_SIZE
;
SCMShowRsp
*
pShowRsp
=
rpcMallocCont
(
size
);
if
(
pShowRsp
==
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_SERV_OUT_OF_MEMORY
;
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_SERV_OUT_OF_MEMORY
);
return
;
}
int32_t
code
;
if
(
pShowMsg
->
type
>=
TSDB_MGMT_TABLE_MAX
)
{
code
=
TSDB_CODE_INVALID_MSG_TYPE
;
SShowObj
*
pShow
=
(
SShowObj
*
)
calloc
(
1
,
sizeof
(
SShowObj
)
+
htons
(
pShowMsg
->
payloadLen
));
pShow
->
signature
=
pShow
;
pShow
->
type
=
pShowMsg
->
type
;
pShow
->
payloadLen
=
htons
(
pShowMsg
->
payloadLen
);
strcpy
(
pShow
->
db
,
pShowMsg
->
db
);
memcpy
(
pShow
->
payload
,
pShowMsg
->
payload
,
pShow
->
payloadLen
);
mgmtSaveQhandle
(
pShow
);
pShowRsp
->
qhandle
=
htobe64
((
uint64_t
)
pShow
);
int32_t
code
=
(
*
tsMgmtShowMetaFp
[
pShowMsg
->
type
])(
&
pShowRsp
->
tableMeta
,
pShow
,
pMsg
->
thandle
);
if
(
code
==
0
)
{
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
thandle
,
.
pCont
=
pShowRsp
,
.
contLen
=
sizeof
(
SCMShowRsp
)
+
sizeof
(
SSchema
)
*
pShow
->
numOfColumns
,
.
code
=
code
,
.
msgType
=
0
};
rpcSendResponse
(
&
rpcRsp
);
}
else
{
SShowObj
*
pShow
=
(
SShowObj
*
)
calloc
(
1
,
sizeof
(
SShowObj
)
+
htons
(
pShowMsg
->
payloadLen
));
pShow
->
signature
=
pShow
;
pShow
->
type
=
pShowMsg
->
type
;
strcpy
(
pShow
->
db
,
pShowMsg
->
db
);
mTrace
(
"pShow:%p is allocated"
,
pShow
);
// set the table name query condition
pShow
->
payloadLen
=
htons
(
pShowMsg
->
payloadLen
);
memcpy
(
pShow
->
payload
,
pShowMsg
->
payload
,
pShow
->
payloadLen
);
mgmtSaveQhandle
(
pShow
);
pShowRsp
->
qhandle
=
htobe64
((
uint64_t
)
pShow
);
if
(
tsMgmtShowMetaFp
[
pShowMsg
->
type
])
{
code
=
(
*
tsMgmtShowMetaFp
[
pShowMsg
->
type
])(
&
pShowRsp
->
tableMeta
,
pShow
,
rpcMsg
->
handle
);
if
(
code
==
0
)
{
size
=
sizeof
(
SCMShowRsp
)
+
sizeof
(
SSchema
)
*
pShow
->
numOfColumns
;
}
else
{
mError
(
"pShow:%p, type:%d %s, failed to get Meta, code:%d"
,
pShow
,
pShowMsg
->
type
,
taosMsg
[(
uint8_t
)
pShowMsg
->
type
],
code
);
free
(
pShow
);
}
}
else
{
code
=
TSDB_CODE_OPS_NOT_SUPPORT
;
}
mError
(
"pShow:%p, type:%d %s, failed to get Meta, code:%d"
,
pShow
,
pShowMsg
->
type
,
taosMsg
[
pShowMsg
->
type
],
code
);
mgmtFreeQhandle
(
pShow
);
rpcFreeCont
(
pShowRsp
);
}
rpcRsp
.
code
=
code
;
rpcRsp
.
pCont
=
pShowRsp
;
rpcRsp
.
contLen
=
size
;
rpcSendResponse
(
&
rpcRsp
);
}
static
void
mgmtProcessRetrieveMsg
(
SRpcMsg
*
rpcMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
static
void
mgmtProcessRetrieveMsg
(
SQueuedMsg
*
pMsg
)
{
int32_t
rowsToRead
=
0
;
int32_t
size
=
0
;
int32_t
rowsRead
=
0
;
SRetrieveTableMsg
*
pRetrieve
=
(
SRetrieveTableMsg
*
)
rpc
Msg
->
pCont
;
SRetrieveTableMsg
*
pRetrieve
=
p
Msg
->
pCont
;
pRetrieve
->
qhandle
=
htobe64
(
pRetrieve
->
qhandle
);
/*
...
...
@@ -226,16 +237,14 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) {
*/
if
(
!
mgmtCheckQhandle
(
pRetrieve
->
qhandle
))
{
mError
(
"retrieve:%p, qhandle:%p is invalid"
,
pRetrieve
,
pRetrieve
->
qhandle
);
rpcRsp
.
code
=
TSDB_CODE_INVALID_QHANDLE
;
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_INVALID_QHANDLE
);
return
;
}
SShowObj
*
pShow
=
(
SShowObj
*
)
pRetrieve
->
qhandle
;
if
(
pShow
->
signature
!=
(
void
*
)
pShow
)
{
mError
(
"pShow:%p, signature:%p, query memory is corrupted"
,
pShow
,
pShow
->
signature
);
rpcRsp
.
code
=
TSDB_CODE_MEMORY_CORRUPTED
;
rpcSendResponse
(
&
rpcRsp
);
if
(
!
mgmtCheckQhandle
(
pShow
))
{
mError
(
"pShow:%p, query memory is corrupted"
,
pShow
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_MEMORY_CORRUPTED
);
return
;
}
else
{
if
((
pRetrieve
->
free
&
TSDB_QUERY_TYPE_FREE_RESOURCE
)
!=
TSDB_QUERY_TYPE_FREE_RESOURCE
)
{
...
...
@@ -258,10 +267,9 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) {
// if free flag is set, client wants to clean the resources
if
((
pRetrieve
->
free
&
TSDB_QUERY_TYPE_FREE_RESOURCE
)
!=
TSDB_QUERY_TYPE_FREE_RESOURCE
)
rowsRead
=
(
*
tsMgmtShowRetrieveFp
[
pShow
->
type
])(
pShow
,
pRsp
->
data
,
rowsToRead
,
rpcMsg
->
handle
);
rowsRead
=
(
*
tsMgmtShowRetrieveFp
[
pShow
->
type
])(
pShow
,
pRsp
->
data
,
rowsToRead
,
pMsg
->
t
handle
);
if
(
rowsRead
<
0
)
{
rowsRead
=
0
;
// TSDB_CODE_ACTION_IN_PROGRESS;
if
(
rowsRead
<
0
)
{
// TSDB_CODE_ACTION_IN_PROGRESS;
rpcFreeCont
(
pRsp
);
return
;
}
...
...
@@ -269,8 +277,13 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) {
pRsp
->
numOfRows
=
htonl
(
rowsRead
);
pRsp
->
precision
=
htonl
(
TSDB_TIME_PRECISION_MILLI
);
// millisecond time precision
rpcRsp
.
pCont
=
pRsp
;
rpcRsp
.
contLen
=
size
;
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
thandle
,
.
pCont
=
pRsp
,
.
contLen
=
size
,
.
code
=
0
,
.
msgType
=
0
};
rpcSendResponse
(
&
rpcRsp
);
if
(
rowsToRead
==
0
)
{
...
...
@@ -278,21 +291,19 @@ static void mgmtProcessRetrieveMsg(SRpcMsg *rpcMsg) {
}
}
static
void
mgmtProcessHeartBeatMsg
(
SRpcMsg
*
rpcMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
static
void
mgmtProcessHeartBeatMsg
(
SQueuedMsg
*
pMsg
)
{
//SCMHeartBeatMsg *pHBMsg = (SCMHeartBeatMsg *) rpcMsg->pCont;
//mgmtSaveQueryStreamList(pHBMsg);
SCMHeartBeatRsp
*
pHBRsp
=
(
SCMHeartBeatRsp
*
)
rpcMallocCont
(
sizeof
(
SCMHeartBeatRsp
));
if
(
pHBRsp
==
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_SERV_OUT_OF_MEMORY
;
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_SERV_OUT_OF_MEMORY
);
return
;
}
SRpcConnInfo
connInfo
;
if
(
rpcGetConnInfo
(
rpcMsg
->
handle
,
&
connInfo
)
!=
0
)
{
mError
(
"conn:%p is already released while process heart beat msg"
,
rpcMsg
->
handle
);
if
(
rpcGetConnInfo
(
pMsg
->
t
handle
,
&
connInfo
)
!=
0
)
{
mError
(
"conn:%p is already released while process heart beat msg"
,
pMsg
->
t
handle
);
return
;
}
...
...
@@ -320,8 +331,13 @@ static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg) {
pHBRsp
->
streamId
=
0
;
pHBRsp
->
killConnection
=
0
;
rpcRsp
.
pCont
=
pHBRsp
;
rpcRsp
.
contLen
=
sizeof
(
SCMHeartBeatRsp
);
SRpcMsg
rpcRsp
=
{
.
handle
=
pMsg
->
thandle
,
.
pCont
=
pHBRsp
,
.
contLen
=
sizeof
(
SCMHeartBeatRsp
),
.
code
=
0
,
.
msgType
=
0
};
rpcSendResponse
(
&
rpcRsp
);
}
...
...
@@ -340,13 +356,13 @@ static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secr
}
}
static
void
mgmtProcessConnectMsg
(
S
RpcMsg
*
rpc
Msg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
SCMConnectMsg
*
pConnectMsg
=
(
SCMConnectMsg
*
)
rpc
Msg
->
pCont
;
static
void
mgmtProcessConnectMsg
(
S
QueuedMsg
*
p
Msg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
pMsg
->
t
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
SCMConnectMsg
*
pConnectMsg
=
p
Msg
->
pCont
;
SRpcConnInfo
connInfo
;
if
(
rpcGetConnInfo
(
rpcMsg
->
handle
,
&
connInfo
)
!=
0
)
{
mError
(
"
conn:%p is already released while process connect msg"
,
rpcMsg
->
handle
);
if
(
rpcGetConnInfo
(
pMsg
->
t
handle
,
&
connInfo
)
!=
0
)
{
mError
(
"
thandle:%p is already released while process connect msg"
,
pMsg
->
t
handle
);
return
;
}
...
...
@@ -450,6 +466,7 @@ static bool mgmtCheckMsgReadOnly(int8_t type, void *pCont) {
}
static
void
mgmtProcessUnSupportMsg
(
SRpcMsg
*
rpcMsg
)
{
mError
(
"%s is not processed"
,
taosMsg
[
rpcMsg
->
msgType
]);
SRpcMsg
rpcRsp
=
{
.
msgType
=
0
,
.
pCont
=
0
,
...
...
@@ -459,3 +476,26 @@ static void mgmtProcessUnSupportMsg(SRpcMsg *rpcMsg) {
};
rpcSendResponse
(
&
rpcRsp
);
}
static
void
mgmtProcessMsgWhileNotReady
(
SRpcMsg
*
rpcMsg
)
{
mTrace
(
"%s is ignored since SDB is not ready"
,
taosMsg
[
rpcMsg
->
msgType
]);
SRpcMsg
rpcRsp
=
{
.
msgType
=
0
,
.
pCont
=
0
,
.
contLen
=
0
,
.
code
=
TSDB_CODE_NOT_READY
,
.
handle
=
rpcMsg
->
handle
};
rpcSendResponse
(
&
rpcRsp
);
}
void
mgmtSendSimpleResp
(
void
*
thandle
,
int32_t
code
)
{
SRpcMsg
rpcRsp
=
{
.
msgType
=
0
,
.
pCont
=
0
,
.
contLen
=
0
,
.
code
=
code
,
.
handle
=
thandle
};
rpcSendResponse
(
&
rpcRsp
);
}
src/mnode/src/mgmtTable.c
浏览文件 @
66e5cd34
...
...
@@ -44,14 +44,16 @@
extern
void
*
tsNormalTableSdb
;
extern
void
*
tsChildTableSdb
;
static
void
mgmtProcessCreateTableMsg
(
SRpcMsg
*
rpcMsg
);
static
void
mgmtProcessDropTableMsg
(
SRpcMsg
*
rpcMsg
);
static
void
mgmtProcessAlterTableMsg
(
SRpcMsg
*
rpcMsg
);
static
void
mgmtProcessTableMetaMsg
(
SRpcMsg
*
rpcMsg
);
static
void
mgmtProcessMultiTableMetaMsg
(
SRpcMsg
*
rpcMsg
);
static
void
mgmtProcessSuperTableMetaMsg
(
SRpcMsg
*
rpcMsg
);
static
void
mgmtProcessCreateTableMsg
(
SQueuedMsg
*
queueMsg
);
static
void
mgmtProcessDropTableMsg
(
SQueuedMsg
*
queueMsg
);
static
void
mgmtProcessAlterTableMsg
(
SQueuedMsg
*
queueMsg
);
static
void
mgmtProcessTableMetaMsg
(
SQueuedMsg
*
queueMsg
);
static
void
mgmtProcessMultiTableMetaMsg
(
SQueuedMsg
*
queueMsg
);
static
void
mgmtProcessSuperTableMetaMsg
(
SQueuedMsg
*
queueMsg
);
static
void
mgmtProcessCreateTableRsp
(
SRpcMsg
*
rpcMsg
);
static
int32_t
mgmtGetShowTableMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveShowTables
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
void
mgmtProcessGetTableMeta
(
STableInfo
*
pTable
,
void
*
thandle
);
int32_t
mgmtInitTables
()
{
int32_t
code
=
mgmtInitSuperTables
();
...
...
@@ -79,6 +81,7 @@ int32_t mgmtInitTables() {
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CM_STABLE_META
,
mgmtProcessSuperTableMetaMsg
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_TABLE
,
mgmtGetShowTableMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_TABLE
,
mgmtRetrieveShowTables
);
mgmtAddDClientRspHandle
(
TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP
,
mgmtProcessCreateTableRsp
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -131,170 +134,56 @@ int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, boo
return
TSDB_CODE_SUCCESS
;
}
static
void
mgmtCreateTable
(
SVgObj
*
pVgroup
,
SQueuedMsg
*
pMsg
)
{
SCMCreateTableMsg
*
pCreate
=
pMsg
->
pCont
;
pCreate
->
numOfColumns
=
htons
(
pCreate
->
numOfColumns
);
pCreate
->
numOfTags
=
htons
(
pCreate
->
numOfTags
);
pCreate
->
sqlLen
=
htons
(
pCreate
->
sqlLen
);
void
mgmtProcessCreateVgroup
(
SCMCreateTableMsg
*
pCreate
,
int32_t
contLen
,
void
*
thandle
,
bool
isGetMeta
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
thandle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
SDbObj
*
pDb
=
mgmtGetDb
(
pCreate
->
db
);
if
(
pDb
==
NULL
)
{
mError
(
"table:%s, failed to create vgroup, db not found"
,
pCreate
->
tableId
);
rpcRsp
.
code
=
TSDB_CODE_INVALID_DB
;
rpcSendResponse
(
&
rpcRsp
);
return
;
}
SVgObj
*
pVgroup
=
mgmtCreateVgroup
(
pDb
);
if
(
pVgroup
==
NULL
)
{
mError
(
"table:%s, failed to alloc vnode to vgroup"
,
pCreate
->
tableId
);
rpcRsp
.
code
=
TSDB_CODE_NO_ENOUGH_DNODES
;
rpcSendResponse
(
&
rpcRsp
);
return
;
}
void
*
cont
=
rpcMallocCont
(
contLen
);
if
(
cont
==
NULL
)
{
mError
(
"table:%s, failed to create table, can not alloc memory"
,
pCreate
->
tableId
);
rpcRsp
.
code
=
TSDB_CODE_SERV_OUT_OF_MEMORY
;
rpcSendResponse
(
&
rpcRsp
);
return
;
}
memcpy
(
cont
,
pCreate
,
contLen
);
SProcessInfo
*
info
=
calloc
(
1
,
sizeof
(
SProcessInfo
));
info
->
type
=
TSDB_PROCESS_CREATE_VGROUP
;
info
->
thandle
=
thandle
;
info
->
ahandle
=
pVgroup
;
info
->
cont
=
cont
;
info
->
contLen
=
contLen
;
if
(
isGetMeta
)
{
info
->
type
=
TSDB_PROCESS_CREATE_VGROUP_GET_META
;
SSchema
*
pSchema
=
(
SSchema
*
)
pCreate
->
schema
;
for
(
int32_t
i
=
0
;
i
<
pCreate
->
numOfColumns
+
pCreate
->
numOfTags
;
++
i
)
{
pSchema
->
bytes
=
htons
(
pSchema
->
bytes
);
pSchema
->
colId
=
i
;
pSchema
++
;
}
mgmtSendCreateVgroupMsg
(
pVgroup
,
info
);
}
//void mgmtSendCreateTableMsg(SMDCreateTableMsg *pCreate, SRpcIpSet *ipSet, void *ahandle) {
// mTrace("table:%s, send create table msg, ahandle:%p", pCreate->tableId, ahandle);
// SRpcMsg rpcMsg = {
// .handle = ahandle,
// .pCont = pCreate,
// .contLen = htonl(pCreate->contLen),
// .code = 0,
// .msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
// };
// rpcSendRequest(tsMgmtDClientRpc, ipSet, &rpcMsg);
//}
//
void
mgmtProcessCreateTable
(
SVgObj
*
pVgroup
,
SCMCreateTableMsg
*
pCreate
,
int32_t
contLen
,
void
*
thandle
,
bool
isGetMeta
)
{
assert
(
pVgroup
!=
NULL
);
SRpcMsg
rpcRsp
=
{.
handle
=
thandle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
int32_t
sid
=
taosAllocateId
(
pVgroup
->
idPool
);
if
(
sid
<
0
)
{
mTrace
(
"t
able:%s, no enough sid in vgroup:%d, start to create a new vgroup"
,
pCreate
->
tableId
,
pVgroup
->
vgId
);
mgmt
ProcessCreateVgroup
(
pCreate
,
contLen
,
thandle
,
isGetMeta
);
mTrace
(
"t
handle:%p, no enough sid in vgroup:%d, start to create a new one"
,
pMsg
->
thandle
,
pVgroup
->
vgId
);
mgmt
CreateVgroup
(
pMsg
);
return
;
}
int32_t
code
;
STableInfo
*
pTable
;
SMDCreateTableMsg
*
pDCreate
=
NULL
;
SMDCreateTableMsg
*
p
M
DCreate
=
NULL
;
if
(
pCreate
->
numOfColumns
==
0
)
{
mTrace
(
"t
able:%s, start to create child table, vgroup:%d sid:%d"
,
pCreate
->
tableId
,
pVgroup
->
vgId
,
sid
);
rpcRsp
.
code
=
mgmtCreateChildTable
(
pCreate
,
contLen
,
pVgroup
,
sid
,
&
p
DCreate
,
&
pTable
);
mTrace
(
"t
handle:%p, create ctable:%s, vgroup:%d sid:%d ahandle:%p"
,
pMsg
->
thandle
,
pCreate
->
tableId
,
pVgroup
->
vgId
,
sid
,
pMsg
);
code
=
mgmtCreateChildTable
(
pCreate
,
pMsg
->
contLen
,
pVgroup
,
sid
,
&
pM
DCreate
,
&
pTable
);
}
else
{
mTrace
(
"t
able:%s, start to create normal table, vgroup:%d sid:%d"
,
pCreate
->
tableId
,
pVgroup
->
vgId
,
sid
);
rpcRsp
.
code
=
mgmtCreateNormalTable
(
pCreate
,
contLen
,
pVgroup
,
sid
,
&
p
DCreate
,
&
pTable
);
mTrace
(
"t
handle:%p, create ntable:%s, vgroup:%d sid:%d ahandle:%p"
,
pMsg
->
thandle
,
pCreate
->
tableId
,
pVgroup
->
vgId
,
sid
,
pMsg
);
code
=
mgmtCreateNormalTable
(
pCreate
,
pMsg
->
contLen
,
pVgroup
,
sid
,
&
pM
DCreate
,
&
pTable
);
}
if
(
rpcRsp
.
code
!=
TSDB_CODE_SUCCESS
)
{
mTrace
(
"t
able:%s, failed to create table in vgroup:%d sid:%d "
,
pCreate
->
tableId
,
pVgroup
->
vgId
,
si
d
);
rpcSendResponse
(
&
rpcRsp
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
mTrace
(
"t
handle:%p, failed to create table:%s in vgroup:%d"
,
pMsg
->
thandle
,
pCreate
->
tableId
,
pVgroup
->
vgI
d
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
code
);
return
;
}
assert
(
pDCreate
!=
NULL
);
assert
(
pTable
!=
NULL
);
SProcessInfo
*
info
=
calloc
(
1
,
sizeof
(
SProcessInfo
));
info
->
type
=
TSDB_PROCESS_CREATE_TABLE
;
info
->
thandle
=
thandle
;
info
->
ahandle
=
pTable
;
SRpcIpSet
ipSet
=
mgmtGetIpSetFromVgroup
(
pVgroup
);
if
(
isGetMeta
)
{
info
->
type
=
TSDB_PROCESS_CREATE_TABLE_GET_META
;
}
SRpcMsg
rpcMsg
=
{
.
handle
=
info
,
.
pCont
=
pCreate
,
.
contLen
=
htonl
(
p
DCreate
->
contLen
),
.
code
=
0
,
.
msgType
=
TSDB_MSG_TYPE_MD_CREATE_TABLE
.
handle
=
pMsg
,
.
pCont
=
pCreate
,
.
contLen
=
htonl
(
pM
DCreate
->
contLen
),
.
code
=
0
,
.
msgType
=
TSDB_MSG_TYPE_MD_CREATE_TABLE
};
mgmtSendMsgToDnode
(
&
ipSet
,
&
rpcMsg
);
}
int32_t
mgmtCreateTable
(
SCMCreateTableMsg
*
pCreate
,
int32_t
contLen
,
void
*
thandle
,
bool
isGetMeta
)
{
SDbObj
*
pDb
=
mgmtGetDb
(
pCreate
->
db
);
if
(
pDb
==
NULL
)
{
mError
(
"table:%s, failed to create table, db not selected"
,
pCreate
->
tableId
);
return
TSDB_CODE_DB_NOT_SELECTED
;
}
STableInfo
*
pTable
=
mgmtGetTable
(
pCreate
->
tableId
);
if
(
pTable
!=
NULL
)
{
if
(
pCreate
->
igExists
)
{
mTrace
(
"table:%s, table is already exist, think it success"
,
pCreate
->
tableId
);
return
TSDB_CODE_SUCCESS
;
}
else
{
mError
(
"table:%s, failed to create table, table already exist"
,
pCreate
->
tableId
);
return
TSDB_CODE_TABLE_ALREADY_EXIST
;
}
}
SAcctObj
*
pAcct
=
mgmtGetAcct
(
pDb
->
cfg
.
acct
);
assert
(
pAcct
!=
NULL
);
int32_t
code
=
mgmtCheckTableLimit
(
pAcct
,
pCreate
->
numOfColumns
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"table:%s, failed to create table, table num exceed the limit"
,
pCreate
->
tableId
);
return
code
;
}
if
(
mgmtCheckExpired
())
{
mError
(
"table:%s, failed to create table, grant expired"
,
pCreate
->
tableId
);
return
TSDB_CODE_GRANT_EXPIRED
;
}
if
(
pCreate
->
numOfTags
!=
0
)
{
mTrace
(
"table:%s, start to create super table, tags:%d columns:%d"
,
pCreate
->
tableId
,
pCreate
->
numOfTags
,
pCreate
->
numOfColumns
);
return
mgmtCreateSuperTable
(
pDb
,
pCreate
);
}
code
=
mgmtCheckTimeSeries
(
pCreate
->
numOfColumns
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"table:%s, failed to create table, timeseries exceed the limit"
,
pCreate
->
tableId
);
return
TSDB_CODE_SUCCESS
;
}
SVgObj
*
pVgroup
=
mgmtGetAvailableVgroup
(
pDb
);
if
(
pVgroup
==
NULL
)
{
mTrace
(
"table:%s, no avaliable vgroup, start to create a new one"
,
pCreate
->
tableId
);
mgmtProcessCreateVgroup
(
pCreate
,
contLen
,
thandle
,
isGetMeta
);
}
else
{
mTrace
(
"table:%s, try to create table in vgroup:%d"
,
pCreate
->
tableId
,
pVgroup
->
vgId
);
mgmtProcessCreateTable
(
pVgroup
,
pCreate
,
contLen
,
thandle
,
isGetMeta
);
}
return
TSDB_CODE_ACTION_IN_PROGRESS
;
}
int32_t
mgmtDropTable
(
SDbObj
*
pDb
,
char
*
tableId
,
int32_t
ignore
)
{
STableInfo
*
pTable
=
mgmtGetTable
(
tableId
);
if
(
pTable
==
NULL
)
{
...
...
@@ -547,114 +436,145 @@ SMDDropTableMsg *mgmtBuildRemoveTableMsg(STableInfo *pTable) {
}
void
mgmtSetTableDirty
(
STableInfo
*
pTable
,
bool
isDirty
)
{
// TODO: if dirty, delete from sdb
pTable
->
dirty
=
isDirty
;
}
void
mgmtProcessCreateTableMsg
(
SRpcMsg
*
rpcMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
void
mgmtProcessCreateTableMsg
(
SQueuedMsg
*
pMsg
)
{
SCMCreateTableMsg
*
pCreate
=
pMsg
->
pCont
;
mTrace
(
"thandle:%p, start to create table:%s"
,
pMsg
->
thandle
,
pCreate
->
tableId
);
SCMCreateTableMsg
*
pCreate
=
(
SCMCreateTableMsg
*
)
rpcMsg
->
pCont
;
pCreate
->
numOfColumns
=
htons
(
pCreate
->
numOfColumns
);
pCreate
->
numOfTags
=
htons
(
pCreate
->
numOfTags
)
;
pCreate
->
sqlLen
=
htons
(
pCreate
->
sqlLen
);
if
(
mgmtCheckRedirect
(
pMsg
->
thandle
))
{
mError
(
"thandle:%p, failed to create table:%s, need redirect"
,
pMsg
->
thandle
,
pCreate
->
tableId
);
return
;
}
SSchema
*
pSchema
=
(
SSchema
*
)
pCreate
->
schema
;
for
(
int32_t
i
=
0
;
i
<
pCreate
->
numOfColumns
+
pCreate
->
numOfTags
;
++
i
)
{
pSchema
->
bytes
=
htons
(
pSchema
->
bytes
);
pSchema
->
colId
=
i
;
pSchema
++
;
if
(
mgmtCheckExpired
())
{
mError
(
"thandle:%p, failed to create table:%s, grant expired"
,
pCreate
->
tableId
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_GRANT_EXPIRED
);
return
;
}
if
(
mgmtCheckRedirect
(
rpcMsg
->
handle
)
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"table:%s, failed to create table, need redirect message"
,
pCreate
->
tableId
);
if
(
!
pMsg
->
pUser
->
writeAuth
)
{
mError
(
"thandle:%p, failed to create table:%s, no rights"
,
pMsg
->
thandle
,
pCreate
->
tableId
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_NO_RIGHTS
);
return
;
}
S
UserObj
*
pUser
=
mgmtGetUserFromConn
(
rpcMsg
->
handle
)
;
i
f
(
pUser
==
NULL
)
{
mError
(
"table:%s, failed to create table, invalid user"
,
pCreate
->
tableId
);
rpcRsp
.
code
=
TSDB_CODE_INVALID_USER
;
rpcSendResponse
(
&
rpcRsp
);
S
AcctObj
*
pAcct
=
pMsg
->
pUser
->
pAcct
;
i
nt32_t
code
=
mgmtCheckTableLimit
(
pAcct
,
htons
(
pCreate
->
numOfColumns
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"thandle:%p, failed to create table:%s, exceed the limit"
,
pMsg
->
thandle
,
pCreate
->
tableId
)
;
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_NO_RIGHTS
);
return
;
}
if
(
!
pUser
->
writeAuth
)
{
mError
(
"table:%s, failed to create table, no rights"
,
pCreate
->
tableId
);
rpcRsp
.
code
=
TSDB_CODE_NO_RIGHTS
;
rpcSendResponse
(
&
rpcRsp
);
SDbObj
*
pDb
=
mgmtGetDb
(
pCreate
->
db
);
if
(
pDb
==
NULL
)
{
mError
(
"thandle:%p, failed to create table:%s, db not selected"
,
pMsg
->
thandle
,
pCreate
->
tableId
)
;
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_DB_NOT_SELECTED
);
return
;
}
int32_t
code
=
mgmtCreateTable
(
pCreate
,
rpcMsg
->
contLen
,
rpcMsg
->
handle
,
false
);
if
(
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
rpcRsp
.
code
=
code
;
rpcSendResponse
(
&
rpcRsp
);
STableInfo
*
pTable
=
mgmtGetTable
(
pCreate
->
tableId
);
if
(
pTable
!=
NULL
)
{
if
(
pCreate
->
igExists
)
{
mTrace
(
"thandle:%p, table:%s is already exist"
,
pMsg
->
thandle
,
pCreate
->
tableId
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_SUCCESS
);
return
;
}
else
{
mError
(
"thandle:%p, failed to create table:%s, table already exist"
,
pMsg
->
thandle
,
pCreate
->
tableId
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_TABLE_ALREADY_EXIST
);
return
;
}
}
if
(
pCreate
->
numOfTags
!=
0
)
{
mTrace
(
"thandle:%p, start to create super table:%s, tags:%d columns:%d"
,
pMsg
->
thandle
,
pCreate
->
tableId
,
pCreate
->
numOfTags
,
pCreate
->
numOfColumns
);
code
=
mgmtCreateSuperTable
(
pDb
,
pCreate
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
code
);
return
;
}
code
=
mgmtCheckTimeSeries
(
pCreate
->
numOfColumns
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"thandle:%p, failed to create table:%s, timeseries exceed the limit"
,
pMsg
->
thandle
,
pCreate
->
tableId
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
code
);
return
;
}
SQueuedMsg
*
newMsg
=
malloc
(
sizeof
(
SQueuedMsg
));
memcpy
(
newMsg
,
pMsg
,
sizeof
(
SQueuedMsg
));
pMsg
->
pCont
=
NULL
;
SVgObj
*
pVgroup
=
mgmtGetAvailableVgroup
(
pDb
);
if
(
pVgroup
==
NULL
)
{
mTrace
(
"thandle:%p, table:%s start to create a new vgroup"
,
pMsg
->
thandle
,
pCreate
->
tableId
);
mgmtCreateVgroup
(
pMsg
);
}
else
{
mTrace
(
"thandle:%p, create table:%s in vgroup:%d"
,
pMsg
->
thandle
,
pCreate
->
tableId
,
pVgroup
->
vgId
);
mgmtCreateTable
(
pVgroup
,
pMsg
);
}
}
void
mgmtProcessDropTableMsg
(
SRpcMsg
*
rpcMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
SCMDropTableMsg
*
pDrop
=
(
SCMDropTableMsg
*
)
rpcMsg
->
pCont
;
void
mgmtProcessDropTableMsg
(
SQueuedMsg
*
pMsg
)
{
SCMDropTableMsg
*
pDrop
=
pMsg
->
pCont
;
if
(
mgmtCheckRedirect
(
rpcMsg
->
handle
)
!=
TSDB_CODE_SUCCESS
)
{
mError
(
"t
able:%s, failed to drop table, need redirect message"
,
pDrop
->
tableId
);
if
(
mgmtCheckRedirect
(
pMsg
->
thandle
)
)
{
mError
(
"t
handle:%p, failed to drop table:%s, need redirect message"
,
pMsg
->
thandle
,
pDrop
->
tableId
);
return
;
}
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
rpcMsg
->
handle
);
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pMsg
->
t
handle
);
if
(
pUser
==
NULL
)
{
mError
(
"table:%s, failed to drop table, invalid user"
,
pDrop
->
tableId
);
rpcRsp
.
code
=
TSDB_CODE_INVALID_USER
;
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_INVALID_USER
);
return
;
}
if
(
!
pUser
->
writeAuth
)
{
mError
(
"table:%s, failed to drop table, no rights"
,
pDrop
->
tableId
);
rpcRsp
.
code
=
TSDB_CODE_NO_RIGHTS
;
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_NO_RIGHTS
);
return
;
}
SDbObj
*
pDb
=
mgmtGetDbByTableId
(
pDrop
->
tableId
);
if
(
pDb
==
NULL
)
{
mError
(
"table:%s, failed to drop table, db not selected"
,
pDrop
->
tableId
);
rpcRsp
.
code
=
TSDB_CODE_DB_NOT_SELECTED
;
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_DB_NOT_SELECTED
);
return
;
}
int32_t
code
=
mgmtDropTable
(
pDb
,
pDrop
->
tableId
,
pDrop
->
igNotExists
);
if
(
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
rpcRsp
.
code
=
code
;
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
code
);
}
}
void
mgmtProcessAlterTableMsg
(
SRpcMsg
*
rpcMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
if
(
mgmtCheckRedirect
(
rpcMsg
->
handle
)
!=
TSDB_CODE_SUCCESS
)
{
void
mgmtProcessAlterTableMsg
(
SQueuedMsg
*
pMsg
)
{
if
(
mgmtCheckRedirect
(
pMsg
->
thandle
))
{
return
;
}
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
rpcMsg
->
handle
);
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pMsg
->
t
handle
);
if
(
pUser
==
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_INVALID_USER
;
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_INVALID_USER
);
return
;
}
SCMAlterTableMsg
*
pAlter
=
(
SCMAlterTableMsg
*
)
rpc
Msg
->
pCont
;
SCMAlterTableMsg
*
pAlter
=
p
Msg
->
pCont
;
int32_t
code
;
if
(
!
pUser
->
writeAuth
)
{
rpcRsp
.
code
=
TSDB_CODE_NO_RIGHTS
;
code
=
TSDB_CODE_NO_RIGHTS
;
}
else
{
pAlter
->
type
=
htons
(
pAlter
->
type
);
pAlter
->
numOfCols
=
htons
(
pAlter
->
numOfCols
);
if
(
pAlter
->
numOfCols
>
2
)
{
mError
(
"table:%s error numOfCols:%d in alter table"
,
pAlter
->
tableId
,
pAlter
->
numOfCols
);
rpcRsp
.
code
=
TSDB_CODE_APP_ERROR
;
code
=
TSDB_CODE_APP_ERROR
;
}
else
{
SDbObj
*
pDb
=
mgmtGetDb
(
pAlter
->
db
);
if
(
pDb
)
{
...
...
@@ -662,17 +582,17 @@ void mgmtProcessAlterTableMsg(SRpcMsg *rpcMsg) {
pAlter
->
schema
[
i
].
bytes
=
htons
(
pAlter
->
schema
[
i
].
bytes
);
}
rpcRsp
.
code
=
mgmtAlterTable
(
pDb
,
pAlter
);
if
(
rpcRsp
.
code
==
0
)
{
code
=
mgmtAlterTable
(
pDb
,
pAlter
);
if
(
code
==
0
)
{
mLPrint
(
"table:%s is altered by %s"
,
pAlter
->
tableId
,
pUser
->
user
);
}
}
else
{
rpcRsp
.
code
=
TSDB_CODE_DB_NOT_SELECTED
;
code
=
TSDB_CODE_DB_NOT_SELECTED
;
}
}
}
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
code
);
}
void
mgmtProcessGetTableMeta
(
STableInfo
*
pTable
,
void
*
thandle
)
{
...
...
@@ -707,21 +627,14 @@ void mgmtProcessGetTableMeta(STableInfo *pTable, void *thandle) {
rpcSendResponse
(
&
rpcRsp
);
}
void
mgmtProcessTableMetaMsg
(
SRpcMsg
*
rpcMsg
)
{
SRpcMsg
rpcRsp
;
rpcRsp
.
handle
=
rpcMsg
->
handle
;
rpcRsp
.
pCont
=
NULL
;
rpcRsp
.
contLen
=
0
;
SCMTableInfoMsg
*
pInfo
=
rpcMsg
->
pCont
;
void
mgmtProcessTableMetaMsg
(
SQueuedMsg
*
pMsg
)
{
SCMTableInfoMsg
*
pInfo
=
pMsg
->
pCont
;
pInfo
->
createFlag
=
htons
(
pInfo
->
createFlag
);
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
rpcMsg
->
handle
);
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
pMsg
->
t
handle
);
if
(
pUser
==
NULL
)
{
mError
(
"table:%s, failed to get table meta, invalid user"
,
pInfo
->
tableId
);
rpcRsp
.
code
=
TSDB_CODE_INVALID_USER
;
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_INVALID_USER
);
return
;
}
...
...
@@ -729,12 +642,11 @@ void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg) {
if
(
pTable
==
NULL
)
{
if
(
pInfo
->
createFlag
!=
1
)
{
mError
(
"table:%s, failed to get table meta, table not exist"
,
pInfo
->
tableId
);
rpcRsp
.
code
=
TSDB_CODE_INVALID_TABLE
;
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_INVALID_TABLE
);
return
;
}
else
{
// on demand create table from super table if table does not exists
if
(
mgmtCheckRedirect
(
rpcMsg
->
handle
)
!=
TSDB_CODE_SUCCESS
)
{
if
(
mgmtCheckRedirect
(
pMsg
->
thandle
)
)
{
mError
(
"table:%s, failed to create table while get meta info, need redirect message"
,
pInfo
->
tableId
);
return
;
}
...
...
@@ -743,8 +655,7 @@ void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg) {
SCMCreateTableMsg
*
pCreateMsg
=
rpcMallocCont
(
contLen
);
if
(
pCreateMsg
==
NULL
)
{
mError
(
"table:%s, failed to create table while get meta info, no enough memory"
,
pInfo
->
tableId
);
rpcRsp
.
code
=
TSDB_CODE_SERV_OUT_OF_MEMORY
;
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_SERV_OUT_OF_MEMORY
);
return
;
}
...
...
@@ -752,41 +663,34 @@ void mgmtProcessTableMetaMsg(SRpcMsg *rpcMsg) {
strcpy
(
pCreateMsg
->
tableId
,
pInfo
->
tableId
);
mError
(
"table:%s, start to create table while get meta info"
,
pInfo
->
tableId
);
mgmtCreateTable
(
pCreateMsg
,
contLen
,
rpcMsg
->
handle
,
true
);
// mgmtCreateTable(pCreateMsg, contLen, pMsg->t
handle, true);
}
}
else
{
mgmtProcessGetTableMeta
(
pTable
,
rpcMsg
->
handle
);
mgmtProcessGetTableMeta
(
pTable
,
pMsg
->
t
handle
);
}
}
void
mgmtProcessMultiTableMetaMsg
(
SRpcMsg
*
rpcMsg
)
{
SRpcMsg
rpcRsp
;
rpcRsp
.
handle
=
rpcMsg
->
handle
;
rpcRsp
.
pCont
=
NULL
;
rpcRsp
.
contLen
=
0
;
void
mgmtProcessMultiTableMetaMsg
(
SQueuedMsg
*
pMsg
)
{
SRpcConnInfo
connInfo
;
if
(
rpcGetConnInfo
(
rpcMsg
->
handle
,
&
connInfo
)
!=
0
)
{
mError
(
"conn:%p is already released while get mulit table meta"
,
rpcMsg
->
handle
);
if
(
rpcGetConnInfo
(
pMsg
->
t
handle
,
&
connInfo
)
!=
0
)
{
mError
(
"conn:%p is already released while get mulit table meta"
,
pMsg
->
t
handle
);
return
;
}
bool
usePublicIp
=
(
connInfo
.
serverIp
==
tsPublicIpInt
);
SUserObj
*
pUser
=
mgmtGetUser
(
connInfo
.
user
);
if
(
pUser
==
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_INVALID_USER
;
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_INVALID_USER
);
return
;
}
SCMMultiTableInfoMsg
*
pInfo
=
rpc
Msg
->
pCont
;
SCMMultiTableInfoMsg
*
pInfo
=
p
Msg
->
pCont
;
pInfo
->
numOfTables
=
htonl
(
pInfo
->
numOfTables
);
int32_t
totalMallocLen
=
4
*
1024
*
1024
;
// first malloc 4 MB, subsequent reallocation as twice
SMultiTableMeta
*
pMultiMeta
=
rpcMallocCont
(
totalMallocLen
);
if
(
pMultiMeta
==
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_SERV_OUT_OF_MEMORY
;
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_SERV_OUT_OF_MEMORY
);
return
;
}
...
...
@@ -823,29 +727,69 @@ void mgmtProcessMultiTableMetaMsg(SRpcMsg *rpcMsg) {
}
}
SRpcMsg
rpcRsp
=
{
0
};
rpcRsp
.
handle
=
pMsg
->
thandle
;
rpcRsp
.
pCont
=
pMultiMeta
;
rpcRsp
.
contLen
=
pMultiMeta
->
contLen
;
rpcSendResponse
(
&
rpcRsp
);
}
void
mgmtProcessSuperTableMetaMsg
(
SRpcMsg
*
rpcMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
SCMSuperTableInfoMsg
*
pInfo
=
rpcMsg
->
pCont
;
void
mgmtProcessSuperTableMetaMsg
(
SQueuedMsg
*
pMsg
)
{
SCMSuperTableInfoMsg
*
pInfo
=
pMsg
->
pCont
;
STableInfo
*
pTable
=
mgmtGetSuperTable
(
pInfo
->
tableId
);
if
(
pTable
==
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_INVALID_TABLE
;
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_INVALID_TABLE
);
return
;
}
SCMSuperTableInfoRsp
*
pRsp
=
mgmtGetSuperTableVgroup
((
SSuperTableObj
*
)
pTable
);
if
(
pRsp
!=
NULL
)
{
int32_t
msgLen
=
sizeof
(
SSuperTableObj
)
+
htonl
(
pRsp
->
numOfDnodes
)
*
sizeof
(
int32_t
);
SRpcMsg
rpcRsp
=
{
0
};
rpcRsp
.
handle
=
pMsg
->
thandle
;
rpcRsp
.
pCont
=
pRsp
;
rpcRsp
.
contLen
=
msgLen
;
rpcSendResponse
(
&
rpcRsp
);
}
else
{
rpcRsp
.
code
=
TSDB_CODE_INVALID_TABLE
;
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_INVALID_TABLE
);
}
}
\ No newline at end of file
}
static
void
mgmtProcessCreateTableRsp
(
SRpcMsg
*
rpcMsg
)
{
if
(
rpcMsg
->
handle
==
NULL
)
return
;
SQueuedMsg
*
queueMsg
=
rpcMsg
->
handle
;
queueMsg
->
received
++
;
STableInfo
*
pTable
=
queueMsg
->
ahandle
;
mTrace
(
"thandle:%p, create table:%s rsp received, ahandle:%p code:%d received:%d"
,
queueMsg
->
thandle
,
pTable
->
tableId
,
rpcMsg
->
handle
,
rpcMsg
->
code
,
queueMsg
->
received
);
if
(
rpcMsg
->
code
!=
TSDB_CODE_SUCCESS
)
{
mgmtSetTableDirty
(
pTable
,
true
);
//sdbDeleteRow(tsVgroupSdb, pVgroup);
mgmtSendSimpleResp
(
queueMsg
->
thandle
,
rpcMsg
->
code
);
mError
(
"table:%s, failed to create in dnode, code:%d, set it dirty"
,
pTable
->
tableId
,
rpcMsg
->
code
);
mgmtSetTableDirty
(
pTable
,
true
);
}
else
{
mTrace
(
"table:%s, created in dnode"
,
pTable
->
tableId
);
mgmtSetTableDirty
(
pTable
,
false
);
if
(
queueMsg
->
msgType
!=
TSDB_MSG_TYPE_CM_CREATE_TABLE
)
{
SQueuedMsg
*
newMsg
=
calloc
(
1
,
sizeof
(
SQueuedMsg
));
newMsg
->
msgType
=
queueMsg
->
msgType
;
newMsg
->
thandle
=
queueMsg
->
thandle
;
newMsg
->
pDb
=
queueMsg
->
pDb
;
newMsg
->
pUser
=
queueMsg
->
pUser
;
newMsg
->
contLen
=
queueMsg
->
contLen
;
newMsg
->
pCont
=
rpcMallocCont
(
newMsg
->
contLen
);
memcpy
(
newMsg
->
pCont
,
queueMsg
->
pCont
,
newMsg
->
contLen
);
mTrace
(
"table:%s, start to process get meta"
,
pTable
->
tableId
);
mgmtAddToShellQueue
(
newMsg
);
}
else
{
mgmtSendSimpleResp
(
queueMsg
->
thandle
,
rpcMsg
->
code
);
}
}
free
(
queueMsg
);
}
src/mnode/src/mgmtUser.c
浏览文件 @
66e5cd34
...
...
@@ -33,9 +33,9 @@ static int32_t mgmtUpdateUser(SUserObj *pUser);
static
int32_t
mgmtGetUserMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveUsers
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
void
mgmtProcessCreateUserMsg
(
S
RpcMsg
*
rpc
Msg
);
static
void
mgmtProcessAlterUserMsg
(
S
RpcMsg
*
rpc
Msg
);
static
void
mgmtProcessDropUserMsg
(
S
RpcMsg
*
rpc
Msg
);
static
void
mgmtProcessCreateUserMsg
(
S
QueuedMsg
*
p
Msg
);
static
void
mgmtProcessAlterUserMsg
(
S
QueuedMsg
*
p
Msg
);
static
void
mgmtProcessDropUserMsg
(
S
QueuedMsg
*
p
Msg
);
static
void
*
(
*
mgmtUserActionFp
[
SDB_MAX_ACTION_TYPES
])(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
static
void
*
mgmtUserActionInsert
(
void
*
row
,
char
*
str
,
int32_t
size
,
int32_t
*
ssize
);
...
...
@@ -337,52 +337,40 @@ SUserObj *mgmtGetUserFromConn(void *pConn) {
return
NULL
;
}
static
void
mgmtProcessCreateUserMsg
(
SRpcMsg
*
rpcMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
if
(
mgmtCheckRedirect
(
rpcMsg
->
handle
))
return
;
SUserObj
*
pUser
=
mgmtGetUserFromConn
(
rpcMsg
->
handle
);
if
(
pUser
==
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_INVALID_USER
;
rpcSendResponse
(
&
rpcRsp
);
return
;
}
static
void
mgmtProcessCreateUserMsg
(
SQueuedMsg
*
pMsg
)
{
if
(
mgmtCheckRedirect
(
pMsg
->
thandle
))
return
;
int32_t
code
;
SUserObj
*
pUser
=
pMsg
->
pUser
;
if
(
pUser
->
superAuth
)
{
SCMCreateUserMsg
*
pCreate
=
rpc
Msg
->
pCont
;
rpcRsp
.
code
=
mgmtCreateUser
(
pUser
->
pAcct
,
pCreate
->
user
,
pCreate
->
pass
);
if
(
rpcRsp
.
code
==
TSDB_CODE_SUCCESS
)
{
SCMCreateUserMsg
*
pCreate
=
p
Msg
->
pCont
;
code
=
mgmtCreateUser
(
pUser
->
pAcct
,
pCreate
->
user
,
pCreate
->
pass
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
mLPrint
(
"user:%s is created by %s"
,
pCreate
->
user
,
pUser
->
user
);
}
}
else
{
rpcRsp
.
code
=
TSDB_CODE_NO_RIGHTS
;
code
=
TSDB_CODE_NO_RIGHTS
;
}
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
code
);
}
static
void
mgmtProcessAlterUserMsg
(
SRpcMsg
*
rpcMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
if
(
mgmtCheckRedirect
(
rpcMsg
->
handle
))
return
;
static
void
mgmtProcessAlterUserMsg
(
SQueuedMsg
*
pMsg
)
{
if
(
mgmtCheckRedirect
(
pMsg
->
thandle
))
return
;
SUserObj
*
pOperUser
=
mgmtGetUserFromConn
(
rpcMsg
->
handle
);
if
(
pOperUser
==
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_INVALID_USER
;
rpcSendResponse
(
&
rpcRsp
);
return
;
}
SCMAlterUserMsg
*
pAlter
=
rpcMsg
->
pCont
;
int32_t
code
;
SUserObj
*
pOperUser
=
pMsg
->
pUser
;
SCMAlterUserMsg
*
pAlter
=
pMsg
->
pCont
;
SUserObj
*
pUser
=
mgmtGetUser
(
pAlter
->
user
);
if
(
pUser
==
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_INVALID_USER
;
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_INVALID_USER
);
return
;
}
if
(
strcmp
(
pUser
->
user
,
"monitor"
)
==
0
||
(
strcmp
(
pUser
->
user
+
1
,
pUser
->
acct
)
==
0
&&
pUser
->
user
[
0
]
==
'_'
))
{
rpcRsp
.
code
=
TSDB_CODE_NO_RIGHTS
;
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_NO_RIGHTS
);
return
;
}
...
...
@@ -405,13 +393,13 @@ static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg) {
if
(
hasRight
)
{
memset
(
pUser
->
pass
,
0
,
sizeof
(
pUser
->
pass
));
taosEncryptPass
((
uint8_t
*
)
pAlter
->
pass
,
strlen
(
pAlter
->
pass
),
pUser
->
pass
);
rpcRsp
.
code
=
mgmtUpdateUser
(
pUser
);
mLPrint
(
"user:%s password is altered by %s, code:%d"
,
pAlter
->
user
,
pUser
->
user
,
rpcRsp
.
code
);
code
=
mgmtUpdateUser
(
pUser
);
mLPrint
(
"user:%s password is altered by %s, code:%d"
,
pAlter
->
user
,
pUser
->
user
,
code
);
}
else
{
rpcRsp
.
code
=
TSDB_CODE_NO_RIGHTS
;
code
=
TSDB_CODE_NO_RIGHTS
;
}
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
code
);
return
;
}
...
...
@@ -454,42 +442,34 @@ static void mgmtProcessAlterUserMsg(SRpcMsg *rpcMsg) {
pUser
->
writeAuth
=
1
;
}
rpcRsp
.
code
=
mgmtUpdateUser
(
pUser
);
mLPrint
(
"user:%s privilege is altered by %s, code:%d"
,
pAlter
->
user
,
pUser
->
user
,
rpcRsp
.
code
);
code
=
mgmtUpdateUser
(
pUser
);
mLPrint
(
"user:%s privilege is altered by %s, code:%d"
,
pAlter
->
user
,
pUser
->
user
,
code
);
}
else
{
rpcRsp
.
code
=
TSDB_CODE_NO_RIGHTS
;
code
=
TSDB_CODE_NO_RIGHTS
;
}
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
code
);
return
;
}
rpcRsp
.
code
=
TSDB_CODE_NO_RIGHTS
;
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_NO_RIGHTS
);
}
static
void
mgmtProcessDropUserMsg
(
SRpcMsg
*
rpcMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
if
(
mgmtCheckRedirect
(
rpcMsg
->
handle
))
return
;
static
void
mgmtProcessDropUserMsg
(
SQueuedMsg
*
pMsg
)
{
if
(
mgmtCheckRedirect
(
pMsg
->
thandle
))
return
;
SUserObj
*
pOperUser
=
mgmtGetUserFromConn
(
rpcMsg
->
handle
);
if
(
pOperUser
==
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_INVALID_USER
;
rpcSendResponse
(
&
rpcRsp
);
return
;
}
int32_t
code
;
SUserObj
*
pOperUser
=
pMsg
->
pUser
;
SCMDropUserMsg
*
pDrop
=
rpc
Msg
->
pCont
;
SCMDropUserMsg
*
pDrop
=
p
Msg
->
pCont
;
SUserObj
*
pUser
=
mgmtGetUser
(
pDrop
->
user
);
if
(
pUser
==
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_INVALID_USER
;
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_INVALID_USER
);
return
;
}
if
(
strcmp
(
pUser
->
user
,
"monitor"
)
==
0
||
(
strcmp
(
pUser
->
user
+
1
,
pUser
->
acct
)
==
0
&&
pUser
->
user
[
0
]
==
'_'
))
{
rpcRsp
.
code
=
TSDB_CODE_NO_RIGHTS
;
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_NO_RIGHTS
);
return
;
}
...
...
@@ -511,13 +491,13 @@ static void mgmtProcessDropUserMsg(SRpcMsg *rpcMsg) {
}
if
(
hasRight
)
{
rpcRsp
.
code
=
mgmtDropUser
(
pUser
->
pAcct
,
pDrop
->
user
);
if
(
rpcRsp
.
code
==
TSDB_CODE_SUCCESS
)
{
code
=
mgmtDropUser
(
pUser
->
pAcct
,
pDrop
->
user
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
mLPrint
(
"user:%s is dropped by %s"
,
pDrop
->
user
,
pUser
->
user
);
}
}
else
{
rpcRsp
.
code
=
TSDB_CODE_NO_RIGHTS
;
code
=
TSDB_CODE_NO_RIGHTS
;
}
rpcSendResponse
(
&
rpcRsp
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
code
);
}
src/mnode/src/mgmtVgroup.c
浏览文件 @
66e5cd34
...
...
@@ -24,6 +24,7 @@
#include "mgmtDb.h"
#include "mgmtDClient.h"
#include "mgmtDnode.h"
#include "mgmtProfile.h"
#include "mgmtShell.h"
#include "mgmtTable.h"
#include "mgmtVgroup.h"
...
...
@@ -42,6 +43,9 @@ static void *mgmtVgroupActionDestroy(void *row, char *str, int32_t size, int32_t
static
int32_t
mgmtGetVgroupMeta
(
STableMeta
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mgmtRetrieveVgroups
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
static
void
mgmtProcessCreateVnodeRsp
(
SRpcMsg
*
rpcMsg
);
void
mgmtSendCreateVgroupMsg
(
SVgObj
*
pVgroup
,
void
*
ahandle
);
static
void
mgmtVgroupActionInit
()
{
SVgObj
tObj
;
...
...
@@ -114,6 +118,7 @@ int32_t mgmtInitVgroups() {
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_VGROUP
,
mgmtGetVgroupMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_VGROUP
,
mgmtRetrieveVgroups
);
mgmtAddDClientRspHandle
(
TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP
,
mgmtProcessCreateVnodeRsp
);
mTrace
(
"vgroup is initialized"
);
return
0
;
...
...
@@ -123,19 +128,6 @@ SVgObj *mgmtGetVgroup(int32_t vgId) {
return
(
SVgObj
*
)
sdbGetRow
(
tsVgroupSdb
,
&
vgId
);
}
int32_t
mgmtAllocateSid
(
SDbObj
*
pDb
,
SVgObj
*
pVgroup
)
{
int32_t
sid
=
taosAllocateId
(
pVgroup
->
idPool
);
if
(
sid
<
0
)
{
mWarn
(
"table:%s, vgroup:%d run out of ID, num:%d"
,
pDb
->
name
,
pVgroup
->
vgId
,
taosIdPoolNumOfUsed
(
pVgroup
->
idPool
));
pDb
->
vgStatus
=
TSDB_VG_STATUS_IN_PROGRESS
;
mgmtCreateVgroup
(
pDb
);
terrno
=
TSDB_CODE_ACTION_IN_PROGRESS
;
}
terrno
=
0
;
return
sid
;
}
/*
* TODO: check if there is enough sids
*/
...
...
@@ -155,21 +147,25 @@ void mgmtProcessVgTimer(void *handle, void *tmrId) {
pDb
->
vgTimer
=
NULL
;
}
SVgObj
*
mgmtCreateVgroup
(
SDbObj
*
pDb
)
{
void
mgmtCreateVgroup
(
SQueuedMsg
*
pMsg
)
{
SDbObj
*
pDb
=
pMsg
->
pDb
;
if
(
pDb
==
NULL
)
{
mError
(
"thandle:%p, failed to create vgroup, db not found"
,
pMsg
->
thandle
);
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_INVALID_DB
);
return
;
}
SVgObj
*
pVgroup
=
(
SVgObj
*
)
calloc
(
sizeof
(
SVgObj
),
1
);
strcpy
(
pVgroup
->
dbName
,
pDb
->
name
);
pVgroup
->
numOfVnodes
=
pDb
->
cfg
.
replications
;
pVgroup
->
createdTime
=
taosGetTimestampMs
();
// based on load balance, create a new one
if
(
mgmtAllocVnodes
(
pVgroup
)
!=
0
)
{
mError
(
"
db:%s, no enough free dnode to alloc %d vnodes"
,
pDb
->
name
,
pVgroup
->
numOfVnodes
);
mError
(
"
thandle:%p, db:%s no enough dnode to alloc %d vnodes"
,
pMsg
->
thandle
,
pDb
->
name
,
pVgroup
->
numOfVnodes
);
free
(
pVgroup
);
pDb
->
vgStatus
=
TSDB_VG_STATUS_FULL
;
taosTmrReset
(
mgmtProcessVgTimer
,
5000
,
pDb
,
tsMgmtTmr
,
&
pDb
->
vgTimer
);
return
NULL
;
mgmtSendSimpleResp
(
pMsg
->
thandle
,
TSDB_CODE_NO_ENOUGH_DNODES
);
return
;
}
pVgroup
->
createdTime
=
taosGetTimestampMs
();
pVgroup
->
tableList
=
(
STableInfo
**
)
calloc
(
sizeof
(
STableInfo
*
),
pDb
->
cfg
.
maxSessions
);
pVgroup
->
numOfTables
=
0
;
pVgroup
->
idPool
=
taosInitIdPool
(
pDb
->
cfg
.
maxSessions
);
...
...
@@ -179,11 +175,14 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) {
sdbInsertRow
(
tsVgroupSdb
,
pVgroup
,
0
);
mTrace
(
"vgroup:%d, vgroup is created, db:%s replica:%d"
,
pVgroup
->
vgId
,
pDb
->
name
,
pVgroup
->
numOfVnodes
);
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
mTrace
(
"vgroup:%d, dnode:%s vnode:%d is created"
,
pVgroup
->
vgId
,
taosIpStr
(
pVgroup
->
vnodeGid
[
i
].
ip
),
pVgroup
->
vnodeGid
[
i
].
vnode
);
mPrint
(
"thandle:%p, vgroup:%d is created in mnode, db:%s replica:%d"
,
pMsg
->
thandle
,
pVgroup
->
vgId
,
pDb
->
name
,
pVgroup
->
numOfVnodes
);
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
mPrint
(
"thandle:%p, vgroup:%d, dnode:%s vnode:%d"
,
pMsg
->
thandle
,
pVgroup
->
vgId
,
taosIpStr
(
pVgroup
->
vnodeGid
[
i
].
ip
),
pVgroup
->
vnodeGid
[
i
].
vnode
);
}
return
pVgroup
;
mgmtSendCreateVgroupMsg
(
pVgroup
,
pMsg
)
;
}
int32_t
mgmtDropVgroup
(
SDbObj
*
pDb
,
SVgObj
*
pVgroup
)
{
...
...
@@ -514,13 +513,13 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode) {
SDbObj
*
pDb
=
mgmtGetDb
(
pVgroup
->
dbName
);
if
(
pDb
==
NULL
)
return
NULL
;
SMDCreateVnodeMsg
*
pV
Peers
=
rpcMallocCont
(
sizeof
(
SMDCreateVnodeMsg
));
if
(
pV
Peers
==
NULL
)
return
NULL
;
SMDCreateVnodeMsg
*
pV
node
=
rpcMallocCont
(
sizeof
(
SMDCreateVnodeMsg
));
if
(
pV
node
==
NULL
)
return
NULL
;
pV
Peers
->
vnode
=
htonl
(
vnode
);
pV
Peers
->
cfg
=
pDb
->
cfg
;
pV
node
->
vnode
=
htonl
(
vnode
);
pV
node
->
cfg
=
pDb
->
cfg
;
SVnodeCfg
*
pCfg
=
&
pV
Peers
->
cfg
;
SVnodeCfg
*
pCfg
=
&
pV
node
->
cfg
;
pCfg
->
vgId
=
htonl
(
pVgroup
->
vgId
);
pCfg
->
maxSessions
=
htonl
(
pCfg
->
maxSessions
);
pCfg
->
cacheBlockSize
=
htonl
(
pCfg
->
cacheBlockSize
);
...
...
@@ -534,13 +533,14 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode) {
pCfg
->
replications
=
(
char
)
pVgroup
->
numOfVnodes
;
pCfg
->
rowsInFileBlock
=
htonl
(
pCfg
->
rowsInFileBlock
);
SVnodeDesc
*
vpeerDesc
=
pV
Peers
->
vpeerDesc
;
SVnodeDesc
*
vpeerDesc
=
pV
node
->
vpeerDesc
;
for
(
int32_t
j
=
0
;
j
<
pVgroup
->
numOfVnodes
;
++
j
)
{
vpeerDesc
[
j
].
ip
=
htonl
(
pVgroup
->
vnodeGid
[
j
].
ip
);
vpeerDesc
[
j
].
vgId
=
htonl
(
pVgroup
->
vgId
);
vpeerDesc
[
j
].
ip
=
htonl
(
pVgroup
->
vnodeGid
[
j
].
ip
);
vpeerDesc
[
j
].
vnode
=
htonl
(
pVgroup
->
vnodeGid
[
j
].
vnode
);
}
return
pV
Peers
;
return
pV
node
;
}
SVgObj
*
mgmtGetVgroupByVnode
(
uint32_t
dnode
,
int32_t
vnode
)
{
...
...
@@ -558,7 +558,11 @@ SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode) {
}
SRpcIpSet
mgmtGetIpSetFromVgroup
(
SVgObj
*
pVgroup
)
{
SRpcIpSet
ipSet
=
{.
numOfIps
=
pVgroup
->
numOfVnodes
,
.
inUse
=
0
,
.
port
=
tsMnodeDnodePort
+
1
};
SRpcIpSet
ipSet
=
{
.
numOfIps
=
pVgroup
->
numOfVnodes
,
.
inUse
=
0
,
.
port
=
tsMnodeDnodePort
};
for
(
int
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
ipSet
.
ip
[
i
]
=
pVgroup
->
vnodeGid
[
i
].
ip
;
}
...
...
@@ -566,7 +570,12 @@ SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) {
}
SRpcIpSet
mgmtGetIpSetFromIp
(
uint32_t
ip
)
{
SRpcIpSet
ipSet
=
{.
ip
[
0
]
=
ip
,
.
numOfIps
=
1
,
.
inUse
=
0
,
.
port
=
tsMnodeDnodePort
+
1
};
SRpcIpSet
ipSet
=
{
.
ip
[
0
]
=
ip
,
.
numOfIps
=
1
,
.
inUse
=
0
,
.
port
=
tsMnodeDnodePort
};
return
ipSet
;
}
...
...
@@ -574,19 +583,54 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, int32_t vnode, SRpcIpSet *ipSet, vo
mTrace
(
"vgroup:%d, send create vnode:%d msg, ahandle:%p"
,
pVgroup
->
vgId
,
vnode
,
ahandle
);
SMDCreateVnodeMsg
*
pCreate
=
mgmtBuildCreateVnodeMsg
(
pVgroup
,
vnode
);
SRpcMsg
rpcMsg
=
{
.
handle
=
ahandle
,
.
pCont
=
pCreate
,
.
contLen
=
pCreate
?
sizeof
(
SMDCreateVnodeMsg
)
:
0
,
.
code
=
0
,
.
msgType
=
TSDB_MSG_TYPE_MD_CREATE_VNODE
.
handle
=
ahandle
,
.
pCont
=
pCreate
,
.
contLen
=
pCreate
?
sizeof
(
SMDCreateVnodeMsg
)
:
0
,
.
code
=
0
,
.
msgType
=
TSDB_MSG_TYPE_MD_CREATE_VNODE
};
mgmtSendMsgToDnode
(
ipSet
,
&
rpcMsg
);
}
void
mgmtSendCreateVgroupMsg
(
SVgObj
*
pVgroup
,
void
*
ahandle
)
{
mTrace
(
"
vgroup:%d, send create all vnodes msg,
handle:%p"
,
pVgroup
->
vgId
,
ahandle
);
mTrace
(
"
send create vgroup:%d msg, a
handle:%p"
,
pVgroup
->
vgId
,
ahandle
);
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
SRpcIpSet
ipSet
=
mgmtGetIpSetFromIp
(
pVgroup
->
vnodeGid
[
i
].
ip
);
mgmtSendCreateVnodeMsg
(
pVgroup
,
pVgroup
->
vnodeGid
[
i
].
vnode
,
&
ipSet
,
ahandle
);
}
}
static
void
mgmtProcessCreateVnodeRsp
(
SRpcMsg
*
rpcMsg
)
{
if
(
rpcMsg
->
handle
==
NULL
)
return
;
SQueuedMsg
*
queueMsg
=
rpcMsg
->
handle
;
queueMsg
->
received
++
;
if
(
rpcMsg
->
code
==
TSDB_CODE_SUCCESS
)
{
queueMsg
->
code
=
rpcMsg
->
code
;
queueMsg
->
successed
++
;
}
SVgObj
*
pVgroup
=
queueMsg
->
ahandle
;
mTrace
(
"thandle:%p, vgroup:%d create vnode rsp received, ahandle:%p code:%d received:%d successed:%d expected:%d"
,
queueMsg
->
thandle
,
pVgroup
->
vgId
,
rpcMsg
->
handle
,
rpcMsg
->
code
,
queueMsg
->
received
,
queueMsg
->
successed
,
queueMsg
->
expected
);
if
(
queueMsg
->
received
!=
queueMsg
->
expected
)
return
;
if
(
queueMsg
->
received
==
queueMsg
->
successed
)
{
SQueuedMsg
*
newMsg
=
calloc
(
1
,
sizeof
(
SQueuedMsg
));
newMsg
->
msgType
=
queueMsg
->
msgType
;
newMsg
->
thandle
=
queueMsg
->
thandle
;
newMsg
->
pDb
=
queueMsg
->
pDb
;
newMsg
->
pUser
=
queueMsg
->
pUser
;
newMsg
->
contLen
=
queueMsg
->
contLen
;
newMsg
->
pCont
=
rpcMallocCont
(
newMsg
->
contLen
);
memcpy
(
newMsg
->
pCont
,
queueMsg
->
pCont
,
newMsg
->
contLen
);
mgmtAddToShellQueue
(
newMsg
);
}
else
{
sdbDeleteRow
(
tsVgroupSdb
,
pVgroup
);
mgmtSendSimpleResp
(
queueMsg
->
thandle
,
rpcMsg
->
code
);
}
free
(
queueMsg
);
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录