Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
311b40b6
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
311b40b6
编写于
3月 11, 2020
作者:
S
slguan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-10] change some message types
上级
7642da92
变更
26
隐藏空白更改
内联
并排
Showing
26 changed file
with
343 addition
and
315 deletion
+343
-315
src/client/src/tscServer.c
src/client/src/tscServer.c
+26
-26
src/client/src/tscSql.c
src/client/src/tscSql.c
+3
-3
src/client/src/tscSystem.c
src/client/src/tscSystem.c
+1
-1
src/dnode/inc/dnodeMgmt.h
src/dnode/inc/dnodeMgmt.h
+1
-1
src/dnode/src/dnodeMClient.c
src/dnode/src/dnodeMClient.c
+2
-2
src/dnode/src/dnodeMgmt.c
src/dnode/src/dnodeMgmt.c
+119
-108
src/dnode/src/dnodeMnode.c
src/dnode/src/dnodeMnode.c
+10
-6
src/dnode/src/dnodeShell.c
src/dnode/src/dnodeShell.c
+4
-4
src/dnode/src/dnodeWrite.c
src/dnode/src/dnodeWrite.c
+14
-2
src/inc/taosmsg.h
src/inc/taosmsg.h
+69
-77
src/kit/shell/src/shellDarwin.c
src/kit/shell/src/shellDarwin.c
+1
-1
src/kit/shell/src/shellEngine.c
src/kit/shell/src/shellEngine.c
+1
-1
src/kit/shell/src/shellImport.c
src/kit/shell/src/shellImport.c
+1
-1
src/kit/shell/src/shellLinux.c
src/kit/shell/src/shellLinux.c
+1
-1
src/kit/shell/src/shellWindows.c
src/kit/shell/src/shellWindows.c
+1
-1
src/mnode/src/mgmtDServer.c
src/mnode/src/mgmtDServer.c
+4
-4
src/mnode/src/mgmtDb.c
src/mnode/src/mgmtDb.c
+3
-3
src/mnode/src/mgmtProfile.c
src/mnode/src/mgmtProfile.c
+3
-3
src/mnode/src/mgmtShell.c
src/mnode/src/mgmtShell.c
+10
-10
src/mnode/src/mgmtTable.c
src/mnode/src/mgmtTable.c
+6
-6
src/mnode/src/mgmtUser.c
src/mnode/src/mgmtUser.c
+3
-3
src/mnode/src/mgmtVgroup.c
src/mnode/src/mgmtVgroup.c
+2
-2
src/rpc/src/rpcMain.c
src/rpc/src/rpcMain.c
+5
-5
src/util/inc/tglobalcfg.h
src/util/inc/tglobalcfg.h
+4
-3
src/util/src/tglobalcfg.c
src/util/src/tglobalcfg.c
+7
-6
src/util/src/tstring.c
src/util/src/tstring.c
+42
-35
未找到文件。
src/client/src/tscServer.c
浏览文件 @
311b40b6
...
...
@@ -72,7 +72,7 @@ void tscSetMgmtIpListFromEdge() {
if
(
tscMgmtIpList
.
numOfIps
!=
1
)
{
tscMgmtIpList
.
numOfIps
=
1
;
tscMgmtIpList
.
inUse
=
0
;
tscMgmtIpList
.
port
=
tsM
gmt
ShellPort
;
tscMgmtIpList
.
port
=
tsM
node
ShellPort
;
tscMgmtIpList
.
ip
[
0
]
=
inet_addr
(
tsMasterIp
);
tscTrace
(
"edge mgmt IP list:"
);
tscPrintMgmtIp
();
...
...
@@ -185,7 +185,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
if
(
pSql
->
cmd
.
command
<
TSDB_SQL_MGMT
)
{
pSql
->
ipList
->
port
=
ts
V
nodeShellPort
;
pSql
->
ipList
->
port
=
ts
D
nodeShellPort
;
tscPrint
(
"%p msg:%s is sent to server %d"
,
pSql
,
taosMsg
[
pSql
->
cmd
.
msgType
],
pSql
->
ipList
->
port
);
memcpy
(
pMsg
,
pSql
->
cmd
.
payload
+
tsRpcHeadSize
,
pSql
->
cmd
.
payloadLen
);
...
...
@@ -198,7 +198,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
};
rpcSendRequest
(
pVnodeConn
,
pSql
->
ipList
,
&
rpcMsg
);
}
else
{
pSql
->
ipList
->
port
=
tsM
gmt
ShellPort
;
pSql
->
ipList
->
port
=
tsM
node
ShellPort
;
tscPrint
(
"%p msg:%s is sent to server %d"
,
pSql
,
taosMsg
[
pSql
->
cmd
.
msgType
],
pSql
->
ipList
->
port
);
memcpy
(
pMsg
,
pSql
->
cmd
.
payload
,
pSql
->
cmd
.
payloadLen
);
SRpcMsg
rpcMsg
=
{
...
...
@@ -306,7 +306,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg) {
}
// ignore the error information returned from mnode when set ignore flag in sql
if
(
pRes
->
code
==
TSDB_CODE_DB_ALREADY_EXIST
&&
pCmd
->
existsCheck
&&
pRes
->
rspType
==
TSDB_MSG_TYPE_CREATE_DB_RSP
)
{
if
(
pRes
->
code
==
TSDB_CODE_DB_ALREADY_EXIST
&&
pCmd
->
existsCheck
&&
pRes
->
rspType
==
TSDB_MSG_TYPE_C
M_C
REATE_DB_RSP
)
{
pRes
->
code
=
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1685,7 +1685,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t
tscBuildCreateDbMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
pCmd
->
payloadLen
=
sizeof
(
SCreateDbMsg
);
pCmd
->
msgType
=
TSDB_MSG_TYPE_CREATE_DB
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_C
M_C
REATE_DB
;
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
tscError
(
"%p failed to malloc for query msg"
,
pSql
);
...
...
@@ -1711,7 +1711,7 @@ int32_t tscBuildCreateDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SCreateDnodeMsg
*
pCreate
=
(
SCreateDnodeMsg
*
)
pCmd
->
payload
;
strncpy
(
pCreate
->
ip
,
pInfo
->
pDCLInfo
->
a
[
0
].
z
,
pInfo
->
pDCLInfo
->
a
[
0
].
n
);
pCmd
->
msgType
=
TSDB_MSG_TYPE_CREATE_DNODE
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_C
M_C
REATE_DNODE
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1757,7 +1757,7 @@ int32_t tscBuildAcctMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
}
pCmd
->
msgType
=
TSDB_MSG_TYPE_CREATE_ACCT
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_C
M_C
REATE_ACCT
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1785,9 +1785,9 @@ int32_t tscBuildUserMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
if
(
pUser
->
type
==
TSDB_ALTER_USER_PASSWD
||
pUser
->
type
==
TSDB_ALTER_USER_PRIVILEGES
)
{
pCmd
->
msgType
=
TSDB_MSG_TYPE_ALTER_USER
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_
CM_
ALTER_USER
;
}
else
{
pCmd
->
msgType
=
TSDB_MSG_TYPE_CREATE_USER
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_C
M_C
REATE_USER
;
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -1821,7 +1821,7 @@ int32_t tscBuildDropDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
strncpy
(
pDropDbMsg
->
db
,
pMeterMetaInfo
->
name
,
tListLen
(
pDropDbMsg
->
db
));
pDropDbMsg
->
ignoreNotExists
=
pInfo
->
pDCLInfo
->
existsCheck
?
1
:
0
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_DROP_DB
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_
CM_
DROP_DB
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1839,7 +1839,7 @@ int32_t tscBuildDropTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
strcpy
(
pDropTableMsg
->
tableId
,
pMeterMetaInfo
->
name
);
pDropTableMsg
->
igNotExists
=
pInfo
->
pDCLInfo
->
existsCheck
?
1
:
0
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_DROP_TABLE
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_
CM_
DROP_TABLE
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1854,7 +1854,7 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SDropDnodeMsg
*
pDrop
=
(
SDropDnodeMsg
*
)
pCmd
->
payload
;
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
pCmd
->
clauseIndex
,
0
);
strcpy
(
pDrop
->
ip
,
pMeterMetaInfo
->
name
);
pCmd
->
msgType
=
TSDB_MSG_TYPE_DROP_DNODE
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_
CM_
DROP_DNODE
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1862,7 +1862,7 @@ int32_t tscBuildDropDnodeMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t
tscBuildDropAcctMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
pCmd
->
payloadLen
=
sizeof
(
SDropUserMsg
);
pCmd
->
msgType
=
TSDB_MSG_TYPE_DROP_USER
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_
CM_
DROP_USER
;
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
tscError
(
"%p failed to malloc for query msg"
,
pSql
);
...
...
@@ -1888,7 +1888,7 @@ int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SUseDbMsg
*
pUseDbMsg
=
(
SUseDbMsg
*
)
pCmd
->
payload
;
SMeterMetaInfo
*
pMeterMetaInfo
=
tscGetMeterMetaInfo
(
pCmd
,
pCmd
->
clauseIndex
,
0
);
strcpy
(
pUseDbMsg
->
db
,
pMeterMetaInfo
->
name
);
pCmd
->
msgType
=
TSDB_MSG_TYPE_USE_DB
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_
CM_
USE_DB
;
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -1896,7 +1896,7 @@ int32_t tscBuildUseDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int32_t
tscBuildShowMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
STscObj
*
pObj
=
pSql
->
pTscObj
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_SHOW
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_
CM_
SHOW
;
pCmd
->
payloadLen
=
sizeof
(
SShowMsg
)
+
100
;
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
...
...
@@ -1948,13 +1948,13 @@ int32_t tscBuildKillMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
strncpy
(
pKill
->
queryId
,
pInfo
->
pDCLInfo
->
ip
.
z
,
pInfo
->
pDCLInfo
->
ip
.
n
);
switch
(
pCmd
->
command
)
{
case
TSDB_SQL_KILL_QUERY
:
pCmd
->
msgType
=
TSDB_MSG_TYPE_KILL_QUERY
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_
CM_
KILL_QUERY
;
break
;
case
TSDB_SQL_KILL_CONNECTION
:
pCmd
->
msgType
=
TSDB_MSG_TYPE_
KILL_CONNECTIO
N
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_
CM_KILL_CON
N
;
break
;
case
TSDB_SQL_KILL_STREAM
:
pCmd
->
msgType
=
TSDB_MSG_TYPE_KILL_STREAM
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_
CM_
KILL_STREAM
;
break
;
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -2043,7 +2043,7 @@ int tscBuildCreateTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
msgLen
=
pMsg
-
(
char
*
)
pCreateTableMsg
;
pCmd
->
payloadLen
=
msgLen
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_CREATE_TABLE
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_C
M_C
REATE_TABLE
;
assert
(
msgLen
+
minMsgSize
()
<=
size
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -2098,7 +2098,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
msgLen
=
pMsg
-
(
char
*
)
pAlterTableMsg
;
pCmd
->
payloadLen
=
msgLen
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_ALTER_TABLE
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_
CM_
ALTER_TABLE
;
assert
(
msgLen
+
minMsgSize
()
<=
size
);
...
...
@@ -2108,7 +2108,7 @@ int tscBuildAlterTableMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int
tscAlterDbMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
pCmd
->
payloadLen
=
sizeof
(
SAlterDbMsg
);
pCmd
->
msgType
=
TSDB_MSG_TYPE_ALTER_DB
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_
CM_
ALTER_DB
;
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
tscError
(
"%p failed to malloc for query msg"
,
pSql
);
...
...
@@ -2243,7 +2243,7 @@ int tscProcessEmptyResultRsp(SSqlObj *pSql) { return tscLocalResultCommonBuilder
int
tscBuildConnectMsg
(
SSqlObj
*
pSql
,
SSqlInfo
*
pInfo
)
{
STscObj
*
pObj
=
pSql
->
pTscObj
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_CONNECT
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_C
M_C
ONNECT
;
pCmd
->
payloadLen
=
sizeof
(
SConnectMsg
);
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
pCmd
->
payloadLen
))
{
...
...
@@ -2297,7 +2297,7 @@ int tscBuildMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
msgLen
=
pMsg
-
(
char
*
)
pInfoMsg
;
pCmd
->
payloadLen
=
msgLen
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_TABLE_META
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_
CM_
TABLE_META
;
tfree
(
tmpData
);
...
...
@@ -2335,7 +2335,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
tfree
(
tmpData
);
pCmd
->
payloadLen
+=
sizeof
(
SMgmtHead
)
+
sizeof
(
SMultiTableInfoMsg
);
pCmd
->
msgType
=
TSDB_MSG_TYPE_
MULTI_TABLE
_META
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_
CM_TABLES
_META
;
assert
(
pCmd
->
payloadLen
+
minMsgSize
()
<=
pCmd
->
allocSize
);
...
...
@@ -2509,7 +2509,7 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
msgLen
=
pMsg
-
pStart
;
pCmd
->
payloadLen
=
msgLen
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_STABLE_META
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_
CM_
STABLE_META
;
assert
(
msgLen
+
minMsgSize
()
<=
size
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -2566,7 +2566,7 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
msgLen
=
pMsg
-
pStart
;
pCmd
->
payloadLen
=
msgLen
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_HEARTBEAT
;
pCmd
->
msgType
=
TSDB_MSG_TYPE_
CM_
HEARTBEAT
;
assert
(
msgLen
+
minMsgSize
()
<=
size
);
return
msgLen
;
...
...
src/client/src/tscSql.c
浏览文件 @
311b40b6
...
...
@@ -67,7 +67,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
if
(
ip
&&
ip
[
0
])
{
tscMgmtIpList
.
inUse
=
0
;
tscMgmtIpList
.
port
=
tsM
gmt
ShellPort
;
tscMgmtIpList
.
port
=
tsM
node
ShellPort
;
tscMgmtIpList
.
numOfIps
=
1
;
tscMgmtIpList
.
ip
[
0
]
=
inet_addr
(
ip
);
...
...
@@ -82,7 +82,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
}
}
tscMgmtIpList
.
port
=
port
?
port
:
tsM
gmt
ShellPort
;
tscMgmtIpList
.
port
=
port
?
port
:
tsM
node
ShellPort
;
pObj
=
(
STscObj
*
)
malloc
(
sizeof
(
STscObj
));
if
(
NULL
==
pObj
)
{
...
...
@@ -95,7 +95,7 @@ TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const
strncpy
(
pObj
->
user
,
user
,
TSDB_USER_LEN
);
taosEncryptPass
((
uint8_t
*
)
pass
,
strlen
(
pass
),
pObj
->
pass
);
pObj
->
mgmtPort
=
port
?
port
:
tsM
gmt
ShellPort
;
pObj
->
mgmtPort
=
port
?
port
:
tsM
node
ShellPort
;
if
(
db
)
{
int32_t
len
=
strlen
(
db
);
...
...
src/client/src/tscSystem.c
浏览文件 @
311b40b6
...
...
@@ -151,7 +151,7 @@ void taos_init_imp() {
}
tscMgmtIpList
.
inUse
=
0
;
tscMgmtIpList
.
port
=
tsM
gmt
ShellPort
;
tscMgmtIpList
.
port
=
tsM
node
ShellPort
;
tscMgmtIpList
.
numOfIps
=
1
;
tscMgmtIpList
.
ip
[
0
]
=
inet_addr
(
tsMasterIp
);
...
...
src/dnode/inc/dnodeMgmt.h
浏览文件 @
311b40b6
...
...
@@ -22,7 +22,7 @@ extern "C" {
int32_t
dnodeInitMgmt
();
void
dnodeCleanupMgmt
();
void
dnodeMgmt
(
void
*
rpcMsg
);
void
dnodeMgmt
(
SRpcMsg
*
rpcMsg
);
void
*
dnodeGetVnode
(
int32_t
vgId
);
int32_t
dnodeGetVnodeStatus
(
void
*
pVnode
);
...
...
src/dnode/src/dnodeMClient.c
浏览文件 @
311b40b6
...
...
@@ -26,7 +26,7 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg);
static
void
*
tsDnodeMClientRpc
;
int32_t
dnodeInitMClient
()
{
dnodeProcessMgmtRspFp
[
TSDB_MSG_TYPE_STATUS_RSP
]
=
dnodeProcessStatusRsp
;
dnodeProcessMgmtRspFp
[
TSDB_MSG_TYPE_
DM_
STATUS_RSP
]
=
dnodeProcessStatusRsp
;
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
...
...
@@ -34,7 +34,7 @@ int32_t dnodeInitMClient() {
rpcInit
.
localPort
=
0
;
rpcInit
.
label
=
"DND-MC"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
dnodeProcessRspFromMnode
;
rpcInit
.
cfp
=
dnodeProcessRspFromMnode
;
rpcInit
.
sessions
=
TSDB_SESSIONS_PER_DNODE
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
...
...
src/dnode/src/dnodeMgmt.c
浏览文件 @
311b40b6
...
...
@@ -21,7 +21,7 @@
#include "tlog.h"
#include "trpc.h"
#include "tstatus.h"
//
#include "tsdb.h"
#include "tsdb.h"
#include "dnodeMgmt.h"
#include "dnodeRead.h"
#include "dnodeWrite.h"
...
...
@@ -46,17 +46,21 @@ static int32_t dnodeOpenVnode(int32_t vgId);
static
void
dnodeCleanupVnode
(
SVnodeObj
*
pVnode
);
static
int32_t
dnodeCreateVnode
(
SMDCreateVnodeMsg
*
cfg
);
static
void
dnodeDropVnode
(
SVnodeObj
*
pVnode
);
static
void
dnodeProces
SMD
CreateVnodeMsg
(
SRpcMsg
*
pMsg
);
static
void
dnodeProces
SMD
DropVnodeMsg
(
SRpcMsg
*
pMsg
);
static
void
dnodeProces
s
CreateVnodeMsg
(
SRpcMsg
*
pMsg
);
static
void
dnodeProces
s
DropVnodeMsg
(
SRpcMsg
*
pMsg
);
static
void
dnodeProcessAlterVnodeMsg
(
SRpcMsg
*
pMsg
);
static
void
dnodeProcessAlterStreamMsg
(
SRpcMsg
*
pMsg
);
static
void
dnodeProcessConfigDnodeMsg
(
SRpcMsg
*
pMsg
);
static
void
(
*
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MAX
])(
SRpcMsg
*
pMsg
);
static
void
*
tsDnodeVnodesHash
=
NULL
;
int32_t
dnodeInitMgmt
()
{
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_CREATE_VNODE
]
=
dnodeProces
SMD
CreateVnodeMsg
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_DROP_VNODE
]
=
dnodeProces
SMD
DropVnodeMsg
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_CREATE_VNODE
]
=
dnodeProces
s
CreateVnodeMsg
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_DROP_VNODE
]
=
dnodeProces
s
DropVnodeMsg
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_ALTER_VNODE
]
=
dnodeProcessAlterVnodeMsg
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_ALTER_STREAM
]
=
dnodeProcessAlterStreamMsg
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_CONFIG_DNODE
]
=
dnodeProcessConfigDnodeMsg
;
tsDnodeVnodesHash
=
taosInitIntHash
(
TSDB_MAX_VNODES
,
sizeof
(
SVnodeObj
),
taosHashInt
);
if
(
tsDnodeVnodesHash
==
NULL
)
{
...
...
@@ -72,8 +76,7 @@ void dnodeCleanupMgmt() {
taosCleanUpIntHash
(
tsDnodeVnodesHash
);
}
void
dnodeMgmt
(
void
*
rpcMsg
)
{
SRpcMsg
*
pMsg
=
rpcMsg
;
void
dnodeMgmt
(
SRpcMsg
*
pMsg
)
{
terrno
=
0
;
if
(
dnodeProcessMgmtMsgFp
[
pMsg
->
msgType
])
{
...
...
@@ -91,7 +94,7 @@ void dnodeMgmt(void *rpcMsg) {
}
void
*
dnodeGetVnode
(
int32_t
vgId
)
{
SVnodeObj
*
pVnode
=
taosGetIntHashData
(
tsDnodeVnodesHash
,
vgId
);
SVnodeObj
*
pVnode
=
(
SVnodeObj
*
)
taosGetIntHashData
(
tsDnodeVnodesHash
,
vgId
);
if
(
pVnode
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_VGROUP_ID
;
return
NULL
;
...
...
@@ -140,123 +143,123 @@ static void dnodeCleanupVnodes() {
}
static
int32_t
dnodeOpenVnode
(
int32_t
vgId
)
{
//
char rootDir[TSDB_FILENAME_LEN] = {0};
//
sprintf(rootDir, "%s/vnode%d", tsDirectory, vgId);
//
//
void *pTsdb = tsdbOpenRepo(rootDir);
//
if (pTsdb != NULL) {
//
return terrno;
//
}
//
//
SVnodeObj vnodeObj;
//
vnodeObj.vgId = vgId;
//
vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
//
vnodeObj.refCount = 1;
//
vnodeObj.version = 0;
//
vnodeObj.wworker = dnodeAllocateWriteWorker();
//
vnodeObj.rworker = dnodeAllocateReadWorker();
//
vnodeObj.wal = NULL;
//
vnodeObj.tsdb = pTsdb;
//
vnodeObj.replica = NULL;
//
vnodeObj.events = NULL;
//
vnodeObj.cq = NULL;
//
// taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, &vnodeObj
);
char
rootDir
[
TSDB_FILENAME_LEN
]
=
{
0
};
sprintf
(
rootDir
,
"%s/vnode%d"
,
tsDirectory
,
vgId
);
void
*
pTsdb
=
tsdbOpenRepo
(
rootDir
);
if
(
pTsdb
!=
NULL
)
{
return
terrno
;
}
SVnodeObj
vnodeObj
;
vnodeObj
.
vgId
=
vgId
;
vnodeObj
.
status
=
TSDB_VN_STATUS_NOT_READY
;
vnodeObj
.
refCount
=
1
;
vnodeObj
.
version
=
0
;
vnodeObj
.
wworker
=
dnodeAllocateWriteWorker
();
vnodeObj
.
rworker
=
dnodeAllocateReadWorker
();
vnodeObj
.
wal
=
NULL
;
vnodeObj
.
tsdb
=
pTsdb
;
vnodeObj
.
replica
=
NULL
;
vnodeObj
.
events
=
NULL
;
vnodeObj
.
cq
=
NULL
;
taosAddIntHash
(
tsDnodeVnodesHash
,
vnodeObj
.
vgId
,
(
char
*
)
(
&
vnodeObj
)
);
return
TSDB_CODE_SUCCESS
;
}
static
void
dnodeCleanupVnode
(
SVnodeObj
*
pVnode
)
{
//
pVnode->status = TSDB_VN_STATUS_NOT_READY;
//
int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1);
//
if (count > 0) {
//
// wait refcount
//
}
//
//
// remove replica
//
//
// remove read queue
//
dnodeFreeReadWorker(pVnode->rworker);
//
pVnode->rworker = NULL;
//
//
// remove write queue
//
dnodeFreeWriteWorker(pVnode->wworker);
//
pVnode->wworker = NULL;
//
//
// remove wal
//
//
// remove tsdb
//
if (pVnode->tsdb) {
//
tsdbCloseRepo(pVnode->tsdb);
//
pVnode->tsdb = NULL;
//
}
//
//
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
pVnode
->
status
=
TSDB_VN_STATUS_NOT_READY
;
int32_t
count
=
atomic_sub_fetch_32
(
&
pVnode
->
refCount
,
1
);
if
(
count
>
0
)
{
// wait refcount
}
// remove replica
// remove read queue
dnodeFreeReadWorker
(
pVnode
->
rworker
);
pVnode
->
rworker
=
NULL
;
// remove write queue
dnodeFreeWriteWorker
(
pVnode
->
wworker
);
pVnode
->
wworker
=
NULL
;
// remove wal
// remove tsdb
if
(
pVnode
->
tsdb
)
{
tsdbCloseRepo
(
pVnode
->
tsdb
);
pVnode
->
tsdb
=
NULL
;
}
taosDeleteIntHash
(
tsDnodeVnodesHash
,
pVnode
->
vgId
);
}
static
int32_t
dnodeCreateVnode
(
SMDCreateVnodeMsg
*
pVnodeCfg
)
{
//
STsdbCfg tsdbCfg;
//
tsdbCfg.precision = pVnodeCfg->cfg.precision;
//
tsdbCfg.tsdbId = pVnodeCfg->vnode;
//
tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions;
//
tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;
//
tsdbCfg.minRowsPerFileBlock = -1;
//
tsdbCfg.maxRowsPerFileBlock = -1;
//
tsdbCfg.keep = -1;
//
tsdbCfg.maxCacheSize = -1;
//
char rootDir[TSDB_FILENAME_LEN] = {0};
//
sprintf(rootDir, "%s/vnode%d", tsDirectory, pVnodeCfg->cfg.vgId);
//
//
void *pTsdb = tsdbCreateRepo(rootDir, &tsdbCfg, NULL);
//
if (pTsdb != NULL) {
//
return terrno;
//
}
//
//
SVnodeObj vnodeObj;
//
vnodeObj.vgId = pVnodeCfg->cfg.vgId;
//
vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
//
vnodeObj.refCount = 1;
//
vnodeObj.version = 0;
//
vnodeObj.wworker = dnodeAllocateWriteWorker();
//
vnodeObj.rworker = dnodeAllocateReadWorker();
//
vnodeObj.wal = NULL;
//
vnodeObj.tsdb = pTsdb;
//
vnodeObj.replica = NULL;
//
vnodeObj.events = NULL;
//
vnodeObj.cq = NULL;
//
// taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, &vnodeObj
);
STsdbCfg
tsdbCfg
;
tsdbCfg
.
precision
=
pVnodeCfg
->
cfg
.
precision
;
tsdbCfg
.
tsdbId
=
pVnodeCfg
->
vnode
;
tsdbCfg
.
maxTables
=
pVnodeCfg
->
cfg
.
maxSessions
;
tsdbCfg
.
daysPerFile
=
pVnodeCfg
->
cfg
.
daysPerFile
;
tsdbCfg
.
minRowsPerFileBlock
=
-
1
;
tsdbCfg
.
maxRowsPerFileBlock
=
-
1
;
tsdbCfg
.
keep
=
-
1
;
tsdbCfg
.
maxCacheSize
=
-
1
;
char
rootDir
[
TSDB_FILENAME_LEN
]
=
{
0
};
sprintf
(
rootDir
,
"%s/vnode%d"
,
tsDirectory
,
pVnodeCfg
->
cfg
.
vgId
);
void
*
pTsdb
=
tsdbCreateRepo
(
rootDir
,
&
tsdbCfg
,
NULL
);
if
(
pTsdb
!=
NULL
)
{
return
terrno
;
}
SVnodeObj
vnodeObj
;
vnodeObj
.
vgId
=
pVnodeCfg
->
cfg
.
vgId
;
vnodeObj
.
status
=
TSDB_VN_STATUS_NOT_READY
;
vnodeObj
.
refCount
=
1
;
vnodeObj
.
version
=
0
;
vnodeObj
.
wworker
=
dnodeAllocateWriteWorker
();
vnodeObj
.
rworker
=
dnodeAllocateReadWorker
();
vnodeObj
.
wal
=
NULL
;
vnodeObj
.
tsdb
=
pTsdb
;
vnodeObj
.
replica
=
NULL
;
vnodeObj
.
events
=
NULL
;
vnodeObj
.
cq
=
NULL
;
taosAddIntHash
(
tsDnodeVnodesHash
,
vnodeObj
.
vgId
,
(
char
*
)
(
&
vnodeObj
)
);
return
TSDB_CODE_SUCCESS
;
}
static
void
dnodeDropVnode
(
SVnodeObj
*
pVnode
)
{
//
pVnode->status = TSDB_VN_STATUS_NOT_READY;
//
//
int32_t count = atomic_sub_fetch_32(&pVnode->refCount, 1);
//
if (count > 0) {
//
// wait refcount
//
}
//
//
if (pVnode->tsdb) {
//
tsdbDropRepo(pVnode->tsdb);
//
pVnode->tsdb = NULL;
//
}
//
//
dnodeCleanupVnode(pVnode);
pVnode
->
status
=
TSDB_VN_STATUS_NOT_READY
;
int32_t
count
=
atomic_sub_fetch_32
(
&
pVnode
->
refCount
,
1
);
if
(
count
>
0
)
{
// wait refcount
}
if
(
pVnode
->
tsdb
)
{
tsdbDropRepo
(
pVnode
->
tsdb
);
pVnode
->
tsdb
=
NULL
;
}
dnodeCleanupVnode
(
pVnode
);
}
static
void
dnodeProces
SMD
CreateVnodeMsg
(
SRpcMsg
*
rpcMsg
)
{
static
void
dnodeProces
s
CreateVnodeMsg
(
SRpcMsg
*
rpcMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
SMDCreateVnodeMsg
*
pCreate
=
(
SMDCreateVnodeMsg
*
)
rpcMsg
->
pCont
;
SMDCreateVnodeMsg
*
pCreate
=
rpcMsg
->
pCont
;
pCreate
->
vnode
=
htonl
(
pCreate
->
vnode
);
pCreate
->
cfg
.
vgId
=
htonl
(
pCreate
->
cfg
.
vgId
);
pCreate
->
cfg
.
maxSessions
=
htonl
(
pCreate
->
cfg
.
maxSessions
);
pCreate
->
cfg
.
daysPerFile
=
htonl
(
pCreate
->
cfg
.
daysPerFile
);
SVnodeObj
*
pVnodeObj
=
taosGetIntHashData
(
tsDnodeVnodesHash
,
pCreate
->
cfg
.
vgId
);
SVnodeObj
*
pVnodeObj
=
(
SVnodeObj
*
)
taosGetIntHashData
(
tsDnodeVnodesHash
,
pCreate
->
cfg
.
vgId
);
if
(
pVnodeObj
!=
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_SUCCESS
;
}
else
{
...
...
@@ -267,13 +270,13 @@ static void dnodeProcesSMDCreateVnodeMsg(SRpcMsg *rpcMsg) {
rpcFreeCont
(
rpcMsg
->
pCont
);
}
static
void
dnodeProces
SMD
DropVnodeMsg
(
SRpcMsg
*
rpcMsg
)
{
static
void
dnodeProces
s
DropVnodeMsg
(
SRpcMsg
*
rpcMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
SMDDropVnodeMsg
*
pDrop
=
(
SMDCreateVnodeMsg
*
)
rpcMsg
->
pCont
;
SMDDropVnodeMsg
*
pDrop
=
rpcMsg
->
pCont
;
pDrop
->
vgId
=
htonl
(
pDrop
->
vgId
);
SVnodeObj
*
pVnodeObj
=
taosGetIntHashData
(
tsDnodeVnodesHash
,
pDrop
->
vgId
);
SVnodeObj
*
pVnodeObj
=
(
SVnodeObj
*
)
taosGetIntHashData
(
tsDnodeVnodesHash
,
pDrop
->
vgId
);
if
(
pVnodeObj
!=
NULL
)
{
dnodeDropVnode
(
pVnodeObj
);
rpcRsp
.
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -288,13 +291,13 @@ static void dnodeProcesSMDDropVnodeMsg(SRpcMsg *rpcMsg) {
static
void
dnodeProcessAlterVnodeMsg
(
SRpcMsg
*
rpcMsg
)
{
SRpcMsg
rpcRsp
=
{.
handle
=
rpcMsg
->
handle
,
.
pCont
=
NULL
,
.
contLen
=
0
,
.
code
=
0
,
.
msgType
=
0
};
SMDCreateVnodeMsg
*
pCreate
=
(
SMDCreateVnodeMsg
*
)
rpcMsg
->
pCont
;
SMDCreateVnodeMsg
*
pCreate
=
rpcMsg
->
pCont
;
pCreate
->
vnode
=
htonl
(
pCreate
->
vnode
);
pCreate
->
cfg
.
vgId
=
htonl
(
pCreate
->
cfg
.
vgId
);
pCreate
->
cfg
.
maxSessions
=
htonl
(
pCreate
->
cfg
.
maxSessions
);
pCreate
->
cfg
.
daysPerFile
=
htonl
(
pCreate
->
cfg
.
daysPerFile
);
SVnodeObj
*
pVnodeObj
=
taosGetIntHashData
(
tsDnodeVnodesHash
,
pCreate
->
cfg
.
vgId
);
SVnodeObj
*
pVnodeObj
=
(
SVnodeObj
*
)
taosGetIntHashData
(
tsDnodeVnodesHash
,
pCreate
->
cfg
.
vgId
);
if
(
pVnodeObj
!=
NULL
)
{
rpcRsp
.
code
=
TSDB_CODE_SUCCESS
;
}
else
{
...
...
@@ -304,3 +307,11 @@ static void dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
rpcSendResponse
(
&
rpcRsp
);
rpcFreeCont
(
rpcMsg
->
pCont
);
}
static
void
dnodeProcessAlterStreamMsg
(
SRpcMsg
*
pMsg
)
{
}
static
void
dnodeProcessConfigDnodeMsg
(
SRpcMsg
*
pMsg
)
{
}
\ No newline at end of file
src/dnode/src/dnodeMnode.c
浏览文件 @
311b40b6
...
...
@@ -27,18 +27,22 @@ static void *tsDnodeMnodeRpc = NULL;
int32_t
dnodeInitMnode
()
{
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_CREATE_TABLE
]
=
dnodeWrite
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_DROP_TABLE
]
=
dnodeWrite
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_DROP_VNODE
]
=
dnodeMgmt
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_DROP_TABLE
]
=
dnodeWrite
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_ALTER_TABLE
]
=
dnodeWrite
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_DROP_STABLE
]
=
dnodeWrite
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_CREATE_VNODE
]
=
dnodeMgmt
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_DROP_VNODE
]
=
dnodeMgmt
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_ALTER_VNODE
]
=
dnodeMgmt
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_ALTER_STREAM
]
=
dnodeMgmt
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_CONFIG_DNODE
]
=
dnodeMgmt
;
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localIp
=
tsAnyIp
?
"0.0.0.0"
:
tsPrivateIp
;
// note: a new port shall be assigned
// rpcInit.localPort = tsDnodeMnodePort;
rpcInit
.
localPort
=
tsDnodeMnodePort
;
rpcInit
.
label
=
"DND-mgmt"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
dnodeProcessMsgFromMnode
;
rpcInit
.
cfp
=
dnodeProcessMsgFromMnode
;
rpcInit
.
sessions
=
TSDB_SESSIONS_PER_DNODE
;
rpcInit
.
connType
=
TAOS_CONN_SERVER
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1500
;
...
...
src/dnode/src/dnodeShell.c
浏览文件 @
311b40b6
...
...
@@ -30,8 +30,8 @@ static void dnodeProcessMsgFromShell(SRpcMsg *pMsg);
static
void
*
tsDnodeShellRpc
=
NULL
;
int32_t
dnodeInitShell
()
{
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_SUBMIT
]
=
dnodeWrite
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_QUERY
]
=
dnodeRead
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_SUBMIT
]
=
dnodeWrite
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_QUERY
]
=
dnodeRead
;
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_RETRIEVE
]
=
dnodeRead
;
int
numOfThreads
=
tsNumOfCores
*
tsNumOfThreadsPerCore
;
...
...
@@ -43,10 +43,10 @@ int32_t dnodeInitShell() {
SRpcInit
rpcInit
;
memset
(
&
rpcInit
,
0
,
sizeof
(
rpcInit
));
rpcInit
.
localIp
=
tsAnyIp
?
"0.0.0.0"
:
tsPrivateIp
;
rpcInit
.
localPort
=
ts
V
nodeShellPort
;
rpcInit
.
localPort
=
ts
D
nodeShellPort
;
rpcInit
.
label
=
"DND-shell"
;
rpcInit
.
numOfThreads
=
numOfThreads
;
rpcInit
.
cfp
=
dnodeProcessMsgFromShell
;
rpcInit
.
cfp
=
dnodeProcessMsgFromShell
;
rpcInit
.
sessions
=
TSDB_SESSIONS_PER_DNODE
;
rpcInit
.
connType
=
TAOS_CONN_SERVER
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1500
;
...
...
src/dnode/src/dnodeWrite.c
浏览文件 @
311b40b6
...
...
@@ -56,13 +56,17 @@ static void dnodeProcessWriteResult(SWriteMsg *pWrite);
static
void
dnodeProcessSubmitMsg
(
SWriteMsg
*
pMsg
);
static
void
dnodeProcessCreateTableMsg
(
SWriteMsg
*
pMsg
);
static
void
dnodeProcessDropTableMsg
(
SWriteMsg
*
pMsg
);
static
void
dnodeProcessAlterTableMsg
(
SWriteMsg
*
pMsg
);
static
void
dnodeProcessDropStableMsg
(
SWriteMsg
*
pMsg
);
SWriteWorkerPool
wWorkerPool
;
int32_t
dnodeInitWrite
()
{
dnodeProcessWriteMsgFp
[
TSDB_MSG_TYPE_SUBMIT
]
=
dnodeProcessSubmitMsg
;
dnodeProcessWriteMsgFp
[
TSDB_MSG_TYPE_SUBMIT
]
=
dnodeProcessSubmitMsg
;
dnodeProcessWriteMsgFp
[
TSDB_MSG_TYPE_MD_CREATE_TABLE
]
=
dnodeProcessCreateTableMsg
;
dnodeProcessWriteMsgFp
[
TSDB_MSG_TYPE_MD_DROP_TABLE
]
=
dnodeProcessDropTableMsg
;
dnodeProcessWriteMsgFp
[
TSDB_MSG_TYPE_MD_DROP_TABLE
]
=
dnodeProcessDropTableMsg
;
dnodeProcessWriteMsgFp
[
TSDB_MSG_TYPE_MD_ALTER_TABLE
]
=
dnodeProcessAlterTableMsg
;
dnodeProcessWriteMsgFp
[
TSDB_MSG_TYPE_MD_DROP_STABLE
]
=
dnodeProcessDropStableMsg
;
wWorkerPool
.
max
=
tsNumOfCores
;
wWorkerPool
.
writeWorker
=
(
SWriteWorker
*
)
calloc
(
sizeof
(
SWriteWorker
),
wWorkerPool
.
max
);
...
...
@@ -253,3 +257,11 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) {
static
void
dnodeProcessDropTableMsg
(
SWriteMsg
*
pMsg
)
{
}
static
void
dnodeProcessAlterTableMsg
(
SWriteMsg
*
pMsg
)
{
}
static
void
dnodeProcessDropStableMsg
(
SWriteMsg
*
pMsg
)
{
}
\ No newline at end of file
src/inc/taosmsg.h
浏览文件 @
311b40b6
...
...
@@ -38,7 +38,7 @@ extern "C" {
#define TSDB_MSG_TYPE_RETRIEVE 7
#define TSDB_MSG_TYPE_RETRIEVE_RSP 8
// message from m
gmt
to dnode
// message from m
node
to dnode
#define TSDB_MSG_TYPE_MD_CREATE_TABLE 9
#define TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP 10
#define TSDB_MSG_TYPE_MD_DROP_TABLE 11
...
...
@@ -58,84 +58,76 @@ extern "C" {
#define TSDB_MSG_TYPE_MD_CONFIG_DNODE 25
#define TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP 26
// message from client to mnode
#define TSDB_MSG_TYPE_CM_CONNECT 31
#define TSDB_MSG_TYPE_CM_CONNECT_RSP 32
#define TSDB_MSG_TYPE_CM_CREATE_ACCT 33
#define TSDB_MSG_TYPE_CM_CREATE_ACCT_RSP 34
#define TSDB_MSG_TYPE_CM_ALTER_ACCT 35
#define TSDB_MSG_TYPE_CM_ALTER_ACCT_RSP 36
#define TSDB_MSG_TYPE_CM_DROP_ACCT 37
#define TSDB_MSG_TYPE_CM_DROP_ACCT_RSP 38
#define TSDB_MSG_TYPE_CM_CREATE_USER 39
#define TSDB_MSG_TYPE_CM_CREATE_USER_RSP 40
#define TSDB_MSG_TYPE_CM_ALTER_USER 41
#define TSDB_MSG_TYPE_CM_ALTER_USER_RSP 42
#define TSDB_MSG_TYPE_CM_DROP_USER 43
#define TSDB_MSG_TYPE_CM_DROP_USER_RSP 44
#define TSDB_MSG_TYPE_CM_CREATE_DNODE 45
#define TSDB_MSG_TYPE_CM_CREATE_DNODE_RSP 46
#define TSDB_MSG_TYPE_CM_DROP_DNODE 47
#define TSDB_MSG_TYPE_CM_DROP_DNODE_RSP 48
#define TSDB_MSG_TYPE_CM_CONFIG_DNODE TSDB_MSG_TYPE_MD_CONFIG_DNODE
#define TSDB_MSG_TYPE_CM_CONFIG_DNODE_RSP TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP
#define TSDB_MSG_TYPE_DM_CONFIG_VNODE 19
#define TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP 20
#define TSDB_MSG_TYPE_SDB_SYNC 21
#define TSDB_MSG_TYPE_SDB_SYNC_RSP 22
#define TSDB_MSG_TYPE_SDB_FORWARD 23
#define TSDB_MSG_TYPE_SDB_FORWARD_RSP 24
#define TSDB_MSG_TYPE_CONNECT 31
#define TSDB_MSG_TYPE_CONNECT_RSP 32
#define TSDB_MSG_TYPE_CREATE_ACCT 33
#define TSDB_MSG_TYPE_CREATE_ACCT_RSP 34
#define TSDB_MSG_TYPE_ALTER_ACCT 35
#define TSDB_MSG_TYPE_ALTER_ACCT_RSP 36
#define TSDB_MSG_TYPE_DROP_ACCT 37
#define TSDB_MSG_TYPE_DROP_ACCT_RSP 38
#define TSDB_MSG_TYPE_CREATE_USER 39
#define TSDB_MSG_TYPE_CREATE_USER_RSP 40
#define TSDB_MSG_TYPE_ALTER_USER 41
#define TSDB_MSG_TYPE_ALTER_USER_RSP 42
#define TSDB_MSG_TYPE_DROP_USER 43
#define TSDB_MSG_TYPE_DROP_USER_RSP 44
#define TSDB_MSG_TYPE_CREATE_MNODE 45
#define TSDB_MSG_TYPE_CREATE_MNODE_RSP 46
#define TSDB_MSG_TYPE_DROP_MNODE 47
#define TSDB_MSG_TYPE_DROP_MNODE_RSP 48
#define TSDB_MSG_TYPE_CREATE_DNODE 49
#define TSDB_MSG_TYPE_CREATE_DNODE_RSP 50
#define TSDB_MSG_TYPE_DROP_DNODE 51
#define TSDB_MSG_TYPE_DROP_DNODE_RSP 52
#define TSDB_MSG_TYPE_ALTER_DNODE 53
#define TSDB_MSG_TYPE_ALTER_DNODE_RSP 54
#define TSDB_MSG_TYPE_CREATE_DB 55
#define TSDB_MSG_TYPE_CREATE_DB_RSP 56
#define TSDB_MSG_TYPE_DROP_DB 57
#define TSDB_MSG_TYPE_DROP_DB_RSP 58
#define TSDB_MSG_TYPE_USE_DB 59
#define TSDB_MSG_TYPE_USE_DB_RSP 60
#define TSDB_MSG_TYPE_ALTER_DB 61
#define TSDB_MSG_TYPE_ALTER_DB_RSP 62
#define TSDB_MSG_TYPE_CREATE_TABLE 63
#define TSDB_MSG_TYPE_CREATE_TABLE_RSP 64
#define TSDB_MSG_TYPE_DROP_TABLE 65
#define TSDB_MSG_TYPE_DROP_TABLE_RSP 66
#define TSDB_MSG_TYPE_ALTER_TABLE 67
#define TSDB_MSG_TYPE_ALTER_TABLE_RSP 68
#define TSDB_MSG_TYPE_TABLE_CFG 71
#define TSDB_MSG_TYPE_TABLE_CFG_RSP 72
#define TSDB_MSG_TYPE_TABLE_META 73
#define TSDB_MSG_TYPE_TABLE_META_RSP 74
#define TSDB_MSG_TYPE_STABLE_META 75
#define TSDB_MSG_TYPE_STABLE_META_RSP 76
#define TSDB_MSG_TYPE_MULTI_TABLE_META 77
#define TSDB_MSG_TYPE_MULTI_TABLE_META_RSP 78
#define TSDB_MSG_TYPE_ALTER_STREAM 79
#define TSDB_MSG_TYPE_ALTER_STREAM_RSP 80
#define TSDB_MSG_TYPE_SHOW 81
#define TSDB_MSG_TYPE_SHOW_RSP 82
#define TSDB_MSG_TYPE_CFG_MNODE 83
#define TSDB_MSG_TYPE_CFG_MNODE_RSP 84
#define TSDB_MSG_TYPE_KILL_QUERY 85
#define TSDB_MSG_TYPE_KILL_QUERY_RSP 86
#define TSDB_MSG_TYPE_KILL_STREAM 87
#define TSDB_MSG_TYPE_KILL_STREAM_RSP 88
#define TSDB_MSG_TYPE_KILL_CONNECTION 89
#define TSDB_MSG_TYPE_KILL_CONNECTION_RSP 90
#define TSDB_MSG_TYPE_HEARTBEAT 91
#define TSDB_MSG_TYPE_HEARTBEAT_RSP 92
#define TSDB_MSG_TYPE_STATUS 93
#define TSDB_MSG_TYPE_STATUS_RSP 94
#define TSDB_MSG_TYPE_GRANT 95
#define TSDB_MSG_TYPE_GRANT_RSP 96
#define TSDB_MSG_TYPE_MAX 97
#define TSDB_MSG_TYPE_CM_CREATE_DB 49
#define TSDB_MSG_TYPE_CM_CREATE_DB_RSP 50
#define TSDB_MSG_TYPE_CM_DROP_DB 51
#define TSDB_MSG_TYPE_CM_DROP_DB_RSP 52
#define TSDB_MSG_TYPE_CM_USE_DB 53
#define TSDB_MSG_TYPE_CM_USE_DB_RSP 54
#define TSDB_MSG_TYPE_CM_ALTER_DB 55
#define TSDB_MSG_TYPE_CM_ALTER_DB_RSP 56
#define TSDB_MSG_TYPE_CM_CREATE_TABLE 57
#define TSDB_MSG_TYPE_CM_CREATE_TABLE_RSP 58
#define TSDB_MSG_TYPE_CM_DROP_TABLE 59
#define TSDB_MSG_TYPE_CM_DROP_TABLE_RSP 60
#define TSDB_MSG_TYPE_CM_ALTER_TABLE 61
#define TSDB_MSG_TYPE_CM_ALTER_TABLE_RSP 62
#define TSDB_MSG_TYPE_CM_TABLE_META 63
#define TSDB_MSG_TYPE_CM_TABLE_META_RSP 64
#define TSDB_MSG_TYPE_CM_STABLE_META 65
#define TSDB_MSG_TYPE_CM_STABLE_META_RSP 66
#define TSDB_MSG_TYPE_CM_TABLES_META 67
#define TSDB_MSG_TYPE_CM_TABLES_META_RSP 68
#define TSDB_MSG_TYPE_CM_ALTER_STREAM 69
#define TSDB_MSG_TYPE_CM_ALTER_STREAM_RSP 70
#define TSDB_MSG_TYPE_CM_SHOW 71
#define TSDB_MSG_TYPE_CM_SHOW_RSP 72
#define TSDB_MSG_TYPE_CM_KILL_QUERY 73
#define TSDB_MSG_TYPE_CM_KILL_QUERY_RSP 74
#define TSDB_MSG_TYPE_CM_KILL_STREAM 75
#define TSDB_MSG_TYPE_CM_KILL_STREAM_RSP 76
#define TSDB_MSG_TYPE_CM_KILL_CONN 77
#define TSDB_MSG_TYPE_CM_KILL_CONN_RSP 78
#define TSDB_MSG_TYPE_CM_HEARTBEAT 79
#define TSDB_MSG_TYPE_CM_HEARTBEAT_RSP 80
// message from dnode to mnode
#define TSDB_MSG_TYPE_DM_CONFIG_TABLE 91
#define TSDB_MSG_TYPE_DM_CONFIG_TABLE_RSP 92
#define TSDB_MSG_TYPE_DM_CONFIG_VNODE 93
#define TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP 94
#define TSDB_MSG_TYPE_DM_STATUS 95
#define TSDB_MSG_TYPE_DM_STATUS_RSP 96
#define TSDB_MSG_TYPE_DM_GRANT 97
#define TSDB_MSG_TYPE_DM_GRANT_RSP 98
#define TSDB_MSG_TYPE_SDB_SYNC 101
#define TSDB_MSG_TYPE_SDB_SYNC_RSP 102
#define TSDB_MSG_TYPE_SDB_FORWARD 103
#define TSDB_MSG_TYPE_SDB_FORWARD_RSP 104
#define TSDB_MSG_TYPE_MAX 105
// IE type
#define TSDB_IE_TYPE_SEC 1
...
...
src/kit/shell/src/shellDarwin.c
浏览文件 @
311b40b6
...
...
@@ -81,7 +81,7 @@ void shellParseArgument(int argc, char *argv[], struct arguments *arguments) {
// for management port
else
if
(
strcmp
(
argv
[
i
],
"-P"
)
==
0
)
{
if
(
i
<
argc
-
1
)
{
tsM
gmt
ShellPort
=
atoi
(
argv
[
++
i
]);
tsM
node
ShellPort
=
atoi
(
argv
[
++
i
]);
}
else
{
fprintf
(
stderr
,
"option -P requires an argument
\n
"
);
exit
(
EXIT_FAILURE
);
...
...
src/kit/shell/src/shellEngine.c
浏览文件 @
311b40b6
...
...
@@ -68,7 +68,7 @@ TAOS *shellInit(struct arguments *args) {
tsMeterMetaKeepTimer
=
3000
;
// Connect to the database.
TAOS
*
con
=
taos_connect
(
args
->
host
,
args
->
user
,
args
->
password
,
args
->
database
,
tsM
gmt
ShellPort
);
TAOS
*
con
=
taos_connect
(
args
->
host
,
args
->
user
,
args
->
password
,
args
->
database
,
tsM
node
ShellPort
);
if
(
con
==
NULL
)
{
return
con
;
}
...
...
src/kit/shell/src/shellImport.c
浏览文件 @
311b40b6
...
...
@@ -227,7 +227,7 @@ static void shellRunImportThreads(struct arguments* args)
ShellThreadObj
*
pThread
=
threadObj
+
t
;
pThread
->
threadIndex
=
t
;
pThread
->
totalThreads
=
args
->
threadNum
;
pThread
->
taos
=
taos_connect
(
args
->
host
,
args
->
user
,
args
->
password
,
args
->
database
,
tsM
gmt
ShellPort
);
pThread
->
taos
=
taos_connect
(
args
->
host
,
args
->
user
,
args
->
password
,
args
->
database
,
tsM
node
ShellPort
);
if
(
pThread
->
taos
==
NULL
)
{
fprintf
(
stderr
,
"ERROR: thread:%d failed connect to TDengine, error:%s
\n
"
,
pThread
->
threadIndex
,
taos_errstr
(
pThread
->
taos
));
exit
(
0
);
...
...
src/kit/shell/src/shellLinux.c
浏览文件 @
311b40b6
...
...
@@ -62,7 +62,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
if
(
arg
)
arguments
->
password
=
arg
;
break
;
case
'P'
:
tsM
gmt
ShellPort
=
atoi
(
arg
);
tsM
node
ShellPort
=
atoi
(
arg
);
break
;
case
't'
:
arguments
->
timezone
=
arg
;
...
...
src/kit/shell/src/shellWindows.c
浏览文件 @
311b40b6
...
...
@@ -61,7 +61,7 @@ void shellParseArgument(int argc, char *argv[], struct arguments *arguments) {
// for management port
else
if
(
strcmp
(
argv
[
i
],
"-P"
)
==
0
)
{
if
(
i
<
argc
-
1
)
{
tsM
gmt
ShellPort
=
atoi
(
argv
[
++
i
]);
tsM
node
ShellPort
=
atoi
(
argv
[
++
i
]);
}
else
{
fprintf
(
stderr
,
"option -P requires an argument
\n
"
);
exit
(
EXIT_FAILURE
);
...
...
src/mnode/src/mgmtDServer.c
浏览文件 @
311b40b6
...
...
@@ -41,7 +41,7 @@ static void *tsMgmtDServerRpc;
int32_t
mgmtInitDServer
()
{
SRpcInit
rpcInit
=
{
0
};
rpcInit
.
localIp
=
tsAnyIp
?
"0.0.0.0"
:
tsPrivateIp
;;
rpcInit
.
localPort
=
tsM
gmt
DnodePort
;
rpcInit
.
localPort
=
tsM
node
DnodePort
;
rpcInit
.
label
=
"MND-DS"
;
rpcInit
.
numOfThreads
=
1
;
rpcInit
.
cfp
=
mgmtProcessMsgFromDnode
;
...
...
@@ -234,7 +234,7 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s
//
// mTrace("msg:%d:%s is received from dnode, pConn:%p", msgType, taosMsg[(int8_t)msgType], pConn);
//
// if (msgType == TSDB_MSG_TYPE_
TABLE_CFG
) {
// if (msgType == TSDB_MSG_TYPE_
DM_CONFIG_TABLE
) {
// mgmtProcessTableCfgMsg(msgType, pCont, contLen, pConn);
// } else if (msgType == TSDB_MSG_TYPE_DM_CONFIG_VNODE) {
// mgmtProcessVnodeCfgMsg(msgType, pCont, contLen, pConn);
...
...
@@ -249,8 +249,8 @@ static int mgmtDServerRetrieveAuth(char *user, char *spi, char *encrypt, char *s
// } else if (msgType == TSDB_MSG_TYPE_MD_DROP_STABLE) {
// mgmtProcessDropStableRsp(msgType, pCont, contLen, pConn, code);
// } else if (msgType == TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP) {
// } else if (msgType == TSDB_MSG_TYPE_ALTER_STREAM_RSP) {
// } else if (msgType == TSDB_MSG_TYPE_STATUS) {
// } else if (msgType == TSDB_MSG_TYPE_
CM_
ALTER_STREAM_RSP) {
// } else if (msgType == TSDB_MSG_TYPE_
DM_
STATUS) {
// mgmtProcessDnodeStatus(msgType, pCont, contLen, pConn, code);
// } else {
// mError("%s from dnode is not processed", taosMsg[(int8_t)msgType]);
...
...
src/mnode/src/mgmtDb.c
浏览文件 @
311b40b6
...
...
@@ -108,9 +108,9 @@ int32_t mgmtInitDbs() {
}
}
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CREATE_DB
,
mgmtProcessCreateDbMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_ALTER_DB
,
mgmtProcessAlterDbMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_DROP_DB
,
mgmtProcessDropDbMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_C
M_C
REATE_DB
,
mgmtProcessCreateDbMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_
CM_
ALTER_DB
,
mgmtProcessAlterDbMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_
CM_
DROP_DB
,
mgmtProcessDropDbMsg
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_DB
,
mgmtGetDbMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_DB
,
mgmtRetrieveDbs
);
...
...
src/mnode/src/mgmtProfile.c
浏览文件 @
311b40b6
...
...
@@ -752,9 +752,9 @@ int32_t mgmtInitProfile() {
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_CONNS
,
mgmtRetrieveConns
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_STREAMS
,
mgmtGetStreamMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_STREAMS
,
mgmtRetrieveStreams
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_KILL_QUERY
,
mgmtProcessKillQueryMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_KILL_STREAM
,
mgmtProcessKillStreamMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_
KILL_CONNECTIO
N
,
mgmtProcessKillConnectionMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_
CM_
KILL_QUERY
,
mgmtProcessKillQueryMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_
CM_
KILL_STREAM
,
mgmtProcessKillStreamMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_
CM_KILL_CON
N
,
mgmtProcessKillConnectionMsg
);
return
0
;
}
...
...
src/mnode/src/mgmtShell.c
浏览文件 @
311b40b6
...
...
@@ -56,7 +56,7 @@ static SShowMetaFp tsMgmtShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0};
static
SShowRetrieveFp
tsMgmtShowRetrieveFp
[
TSDB_MGMT_TABLE_MAX
]
=
{
0
};
int32_t
mgmtInitShell
()
{
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_SHOW
,
mgmtProcessShowMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_
CM_
SHOW
,
mgmtProcessShowMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_RETRIEVE
,
mgmtProcessRetrieveMsg
);
int32_t
numOfThreads
=
tsNumOfCores
*
tsNumOfThreadsPerCore
/
4
.
0
;
...
...
@@ -66,7 +66,7 @@ int32_t mgmtInitShell() {
SRpcInit
rpcInit
=
{
0
};
rpcInit
.
localIp
=
tsAnyIp
?
"0.0.0.0"
:
tsPrivateIp
;
rpcInit
.
localPort
=
tsM
gmt
ShellPort
;
rpcInit
.
localPort
=
tsM
node
ShellPort
;
rpcInit
.
label
=
"MND-shell"
;
rpcInit
.
numOfThreads
=
numOfThreads
;
rpcInit
.
cfp
=
mgmtProcessMsgFromShell
;
...
...
@@ -81,8 +81,8 @@ int32_t mgmtInitShell() {
return
-
1
;
}
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_HEARTBEAT
,
mgmtProcessHeartBeatMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CONNECT
,
mgmtProcessConnectMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_
CM_
HEARTBEAT
,
mgmtProcessHeartBeatMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_C
M_C
ONNECT
,
mgmtProcessConnectMsg
);
mPrint
(
"server connection to shell is opened"
);
return
0
;
...
...
@@ -281,7 +281,7 @@ static void mgmtProcessHeartBeatMsg(SRpcMsg *rpcMsg) {
rpcGetConnInfo
(
rpcMsg
->
handle
,
&
connInfo
);
pHBRsp
->
ipList
.
inUse
=
0
;
pHBRsp
->
ipList
.
port
=
htons
(
tsM
gmt
ShellPort
);
pHBRsp
->
ipList
.
port
=
htons
(
tsM
node
ShellPort
);
pHBRsp
->
ipList
.
numOfIps
=
0
;
if
(
pSdbPublicIpList
!=
NULL
&&
pSdbIpList
!=
NULL
)
{
pHBRsp
->
ipList
.
numOfIps
=
htons
(
pSdbPublicIpList
->
numOfIps
);
...
...
@@ -375,7 +375,7 @@ static void mgmtProcessConnectMsg(SRpcMsg *rpcMsg) {
pConnectRsp
->
writeAuth
=
pUser
->
writeAuth
;
pConnectRsp
->
superAuth
=
pUser
->
superAuth
;
pConnectRsp
->
ipList
.
inUse
=
0
;
pConnectRsp
->
ipList
.
port
=
htons
(
tsM
gmt
ShellPort
);
pConnectRsp
->
ipList
.
port
=
htons
(
tsM
node
ShellPort
);
pConnectRsp
->
ipList
.
numOfIps
=
0
;
if
(
pSdbPublicIpList
!=
NULL
&&
pSdbIpList
!=
NULL
)
{
pConnectRsp
->
ipList
.
numOfIps
=
htons
(
pSdbPublicIpList
->
numOfIps
);
...
...
@@ -420,10 +420,10 @@ static bool mgmtCheckMeterMetaMsgType(void *pMsg) {
}
static
bool
mgmtCheckMsgReadOnly
(
int8_t
type
,
void
*
pCont
)
{
if
((
type
==
TSDB_MSG_TYPE_TABLE_META
&&
(
!
mgmtCheckMeterMetaMsgType
(
pCont
)))
||
type
==
TSDB_MSG_TYPE_STABLE_META
||
type
==
TSDB_MSG_TYPE_RETRIEVE
||
type
==
TSDB_MSG_TYPE_
SHOW
||
type
==
TSDB_MSG_TYPE_MULTI_TABLE
_META
||
type
==
TSDB_MSG_TYPE_CONNECT
)
{
if
((
type
==
TSDB_MSG_TYPE_
CM_
TABLE_META
&&
(
!
mgmtCheckMeterMetaMsgType
(
pCont
)))
||
type
==
TSDB_MSG_TYPE_
CM_
STABLE_META
||
type
==
TSDB_MSG_TYPE_RETRIEVE
||
type
==
TSDB_MSG_TYPE_
CM_SHOW
||
type
==
TSDB_MSG_TYPE_CM_TABLES
_META
||
type
==
TSDB_MSG_TYPE_C
M_C
ONNECT
)
{
return
true
;
}
...
...
src/mnode/src/mgmtTable.c
浏览文件 @
311b40b6
...
...
@@ -71,12 +71,12 @@ int32_t mgmtInitTables() {
mgmtSetVgroupIdPool
();
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CREATE_TABLE
,
mgmtProcessCreateTableMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_DROP_TABLE
,
mgmtProcessDropTableMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_ALTER_TABLE
,
mgmtProcessAlterTableMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_TABLE_META
,
mgmtProcessTableMetaMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_
MULTI_TABLE
_META
,
mgmtProcessMultiTableMetaMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_STABLE_META
,
mgmtProcessSuperTableMetaMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_C
M_C
REATE_TABLE
,
mgmtProcessCreateTableMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_
CM_
DROP_TABLE
,
mgmtProcessDropTableMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_
CM_
ALTER_TABLE
,
mgmtProcessAlterTableMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_
CM_
TABLE_META
,
mgmtProcessTableMetaMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_
CM_TABLES
_META
,
mgmtProcessMultiTableMetaMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_
CM_
STABLE_META
,
mgmtProcessSuperTableMetaMsg
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_TABLE
,
mgmtGetShowTableMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_TABLE
,
mgmtRetrieveShowTables
);
...
...
src/mnode/src/mgmtUser.c
浏览文件 @
311b40b6
...
...
@@ -83,9 +83,9 @@ int32_t mgmtInitUsers() {
mgmtCreateUser
(
pAcct
,
"monitor"
,
tsInternalPass
);
mgmtCreateUser
(
pAcct
,
"_root"
,
tsInternalPass
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_CREATE_USER
,
mgmtProcessCreateUserMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_ALTER_USER
,
mgmtProcessAlterUserMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_DROP_USER
,
mgmtProcessDropUserMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_C
M_C
REATE_USER
,
mgmtProcessCreateUserMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_
CM_
ALTER_USER
,
mgmtProcessAlterUserMsg
);
mgmtAddShellMsgHandle
(
TSDB_MSG_TYPE_
CM_
DROP_USER
,
mgmtProcessDropUserMsg
);
mgmtAddShellShowMetaHandle
(
TSDB_MGMT_TABLE_USER
,
mgmtGetUserMeta
);
mgmtAddShellShowRetrieveHandle
(
TSDB_MGMT_TABLE_USER
,
mgmtRetrieveUsers
);
...
...
src/mnode/src/mgmtVgroup.c
浏览文件 @
311b40b6
...
...
@@ -558,7 +558,7 @@ SVgObj *mgmtGetVgroupByVnode(uint32_t dnode, int32_t vnode) {
}
SRpcIpSet
mgmtGetIpSetFromVgroup
(
SVgObj
*
pVgroup
)
{
SRpcIpSet
ipSet
=
{.
numOfIps
=
pVgroup
->
numOfVnodes
,
.
inUse
=
0
,
.
port
=
tsM
gmt
DnodePort
+
1
};
SRpcIpSet
ipSet
=
{.
numOfIps
=
pVgroup
->
numOfVnodes
,
.
inUse
=
0
,
.
port
=
tsM
node
DnodePort
+
1
};
for
(
int
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
ipSet
.
ip
[
i
]
=
pVgroup
->
vnodeGid
[
i
].
ip
;
}
...
...
@@ -566,7 +566,7 @@ SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) {
}
SRpcIpSet
mgmtGetIpSetFromIp
(
uint32_t
ip
)
{
SRpcIpSet
ipSet
=
{.
ip
[
0
]
=
ip
,
.
numOfIps
=
1
,
.
inUse
=
0
,
.
port
=
tsM
gmt
DnodePort
+
1
};
SRpcIpSet
ipSet
=
{.
ip
[
0
]
=
ip
,
.
numOfIps
=
1
,
.
inUse
=
0
,
.
port
=
tsM
node
DnodePort
+
1
};
return
ipSet
;
}
...
...
src/rpc/src/rpcMain.c
浏览文件 @
311b40b6
...
...
@@ -360,8 +360,8 @@ void rpcSendRequest(void *shandle, SRpcIpSet *pIpSet, SRpcMsg *pMsg) {
// for TDengine, all the query, show commands shall have TCP connection
char
type
=
pMsg
->
msgType
;
if
(
type
==
TSDB_MSG_TYPE_QUERY
||
type
==
TSDB_MSG_TYPE_RETRIEVE
||
type
==
TSDB_MSG_TYPE_
STABLE_META
||
type
==
TSDB_MSG_TYPE_MULTI_TABLE
_META
||
type
==
TSDB_MSG_TYPE_
SHOW
)
type
==
TSDB_MSG_TYPE_
CM_STABLE_META
||
type
==
TSDB_MSG_TYPE_CM_TABLES
_META
||
type
==
TSDB_MSG_TYPE_
CM_SHOW
)
pContext
->
connType
=
RPC_CONN_TCPC
;
rpcSendReqToServer
(
pRpc
,
pContext
);
...
...
@@ -814,7 +814,7 @@ static void *rpcProcessMsgFromPeer(SRecvInfo *pRecv) {
terrno
=
0
;
pConn
=
rpcProcessMsgHead
(
pRpc
,
pRecv
);
if
(
pHead
->
msgType
<
TSDB_MSG_TYPE_HEARTBEAT
||
(
rpcDebugFlag
&
16
))
{
if
(
pHead
->
msgType
<
TSDB_MSG_TYPE_
CM_
HEARTBEAT
||
(
rpcDebugFlag
&
16
))
{
tTrace
(
"%s %p, %s received from 0x%x:%hu, parse code:%x len:%d sig:0x%08x:0x%08x:%d"
,
pRpc
->
label
,
pConn
,
taosMsg
[
pHead
->
msgType
],
pRecv
->
ip
,
pRecv
->
port
,
terrno
,
pRecv
->
msgLen
,
pHead
->
sourceId
,
pHead
->
destId
,
pHead
->
tranId
,
pHead
->
port
);
...
...
@@ -983,12 +983,12 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) {
msgLen
=
rpcAddAuthPart
(
pConn
,
msg
,
msgLen
);
if
(
rpcIsReq
(
pHead
->
msgType
))
{
if
(
pHead
->
msgType
<
TSDB_MSG_TYPE_HEARTBEAT
||
(
rpcDebugFlag
&
16
))
if
(
pHead
->
msgType
<
TSDB_MSG_TYPE_
CM_
HEARTBEAT
||
(
rpcDebugFlag
&
16
))
tTrace
(
"%s %p, %s is sent to %s:%hu, len:%d sig:0x%08x:0x%08x:%d"
,
pRpc
->
label
,
pConn
,
taosMsg
[
pHead
->
msgType
],
pConn
->
peerIpstr
,
pConn
->
peerPort
,
msgLen
,
pHead
->
sourceId
,
pHead
->
destId
,
pHead
->
tranId
);
}
else
{
if
(
pHead
->
msgType
<
TSDB_MSG_TYPE_HEARTBEAT
||
(
rpcDebugFlag
&
16
))
if
(
pHead
->
msgType
<
TSDB_MSG_TYPE_
CM_
HEARTBEAT
||
(
rpcDebugFlag
&
16
))
tTrace
(
"%s %p, %s is sent to %s:%hu, code:%u len:%d sig:0x%08x:0x%08x:%d"
,
pRpc
->
label
,
pConn
,
taosMsg
[
pHead
->
msgType
],
pConn
->
peerIpstr
,
pConn
->
peerPort
,
(
uint8_t
)
pHead
->
content
[
0
],
msgLen
,
pHead
->
sourceId
,
pHead
->
destId
,
pHead
->
tranId
);
...
...
src/util/inc/tglobalcfg.h
浏览文件 @
311b40b6
...
...
@@ -58,9 +58,10 @@ extern char osName[];
extern
char
tsMasterIp
[];
extern
char
tsSecondIp
[];
extern
uint16_t
tsMgmtDnodePort
;
extern
uint16_t
tsMgmtShellPort
;
extern
uint16_t
tsVnodeShellPort
;
extern
uint16_t
tsMnodeDnodePort
;
extern
uint16_t
tsMnodeShellPort
;
extern
uint16_t
tsDnodeShellPort
;
extern
uint16_t
tsDnodeMnodePort
;
extern
uint16_t
tsVnodeVnodePort
;
extern
uint16_t
tsMgmtMgmtPort
;
extern
uint16_t
tsMgmtSyncPort
;
...
...
src/util/src/tglobalcfg.c
浏览文件 @
311b40b6
...
...
@@ -61,9 +61,10 @@ int64_t tsMsPerDay[] = {86400000L, 86400000000L};
char
tsMasterIp
[
TSDB_IPv4ADDR_LEN
]
=
{
0
};
char
tsSecondIp
[
TSDB_IPv4ADDR_LEN
]
=
{
0
};
uint16_t
tsMgmtShellPort
=
6030
;
// udp[6030-6034] tcp[6030]
uint16_t
tsVnodeShellPort
=
6035
;
// udp[6035-6039] tcp[6035]
uint16_t
tsMgmtDnodePort
=
6040
;
// udp[6040-6044] tcp[6040]
uint16_t
tsMnodeShellPort
=
6030
;
// udp[6030-6034] tcp[6030]
uint16_t
tsDnodeShellPort
=
6035
;
// udp[6035-6039] tcp[6035]
uint16_t
tsMnodeDnodePort
=
6040
;
// udp/tcp
uint16_t
tsDnodeMnodePort
=
6041
;
// udp/tcp
uint16_t
tsVnodeVnodePort
=
6045
;
// tcp[6045]
uint16_t
tsMgmtMgmtPort
=
6050
;
// udp, numOfVnodes fixed to 1, range udp[6050]
uint16_t
tsMgmtSyncPort
=
6050
;
// tcp, range tcp[6050]
...
...
@@ -492,13 +493,13 @@ static void doInitGlobalConfig() {
tsInitConfigOption
(
cfg
++
,
"httpPort"
,
&
tsHttpPort
,
TSDB_CFG_VTYPE_SHORT
,
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
,
1
,
65535
,
0
,
TSDB_CFG_UTYPE_NONE
);
tsInitConfigOption
(
cfg
++
,
"mgmtShellPort"
,
&
tsM
gmt
ShellPort
,
TSDB_CFG_VTYPE_SHORT
,
tsInitConfigOption
(
cfg
++
,
"mgmtShellPort"
,
&
tsM
node
ShellPort
,
TSDB_CFG_VTYPE_SHORT
,
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
|
TSDB_CFG_CTYPE_B_CLIENT
,
1
,
65535
,
0
,
TSDB_CFG_UTYPE_NONE
);
tsInitConfigOption
(
cfg
++
,
"vnodeShellPort"
,
&
ts
V
nodeShellPort
,
TSDB_CFG_VTYPE_SHORT
,
tsInitConfigOption
(
cfg
++
,
"vnodeShellPort"
,
&
ts
D
nodeShellPort
,
TSDB_CFG_VTYPE_SHORT
,
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
|
TSDB_CFG_CTYPE_B_CLIENT
,
1
,
65535
,
0
,
TSDB_CFG_UTYPE_NONE
);
tsInitConfigOption
(
cfg
++
,
"mgmtVnodePort"
,
&
tsM
gmt
DnodePort
,
TSDB_CFG_VTYPE_SHORT
,
tsInitConfigOption
(
cfg
++
,
"mgmtVnodePort"
,
&
tsM
node
DnodePort
,
TSDB_CFG_VTYPE_SHORT
,
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
|
TSDB_CFG_CTYPE_B_CLUSTER
,
1
,
65535
,
0
,
TSDB_CFG_UTYPE_NONE
);
tsInitConfigOption
(
cfg
++
,
"vnodeVnodePort"
,
&
tsVnodeVnodePort
,
TSDB_CFG_VTYPE_SHORT
,
...
...
src/util/src/tstring.c
浏览文件 @
311b40b6
...
...
@@ -26,23 +26,23 @@ char *taosMsg[] = {
"create-table"
,
"create-table-rsp"
,
//10
"remove-table"
,
"remove-table-rsp"
,
"drop-table"
,
"drop-table-rsp"
,
"alter-table"
,
"alter-table-rsp"
,
"create-vnode"
,
"create-vnode-rsp"
,
"free-vnode"
,
"free-vnode-rsp"
,
"cfg-dnode"
,
"cfg-dnode-rsp"
,
"alter-stream"
,
"alter-stream-rsp"
,
//20
"drop-vnode"
,
"drop-vnode-rsp"
,
"alter-vnode"
,
"alter-vnode-rsp"
,
//20
"sync"
,
"sync-rsp"
,
"forward"
,
"forward-rsp"
,
"drop-stable"
,
"drop-stable-rsp"
,
"alter-stream"
,
"alter-stream-rsp"
,
"config-dnode"
,
"config-dnode-rsp"
,
""
,
""
,
""
,
...
...
@@ -63,37 +63,26 @@ char *taosMsg[] = {
"alter-user-rsp"
,
"drop-user"
,
"drop-user-rsp"
,
"create-mnode"
,
"create-mnode-rsp"
,
"drop-mnode"
,
"drop-mnode-rsp"
,
"create-dnode"
,
"create-dnode-rsp"
,
//50
"create-dnode-rsp"
,
"drop-dnode"
,
"drop-dnode-rsp"
,
"alter-dnode"
,
"alter-dnode-rsp"
,
"create-db"
,
"create-db-rsp"
,
"create-db-rsp"
,
//50
"drop-db"
,
"drop-db-rsp"
,
"use-db"
,
"use-db-rsp"
,
//60
"use-db-rsp"
,
"alter-db"
,
"alter-db-rsp"
,
"create-table"
,
"create-table-rsp"
,
"drop-table"
,
"drop-table-rsp"
,
"drop-table-rsp"
,
//60
"alter-table"
,
"alter-table-rsp"
,
"cfg-vnode"
,
"cfg-vnode-rsp"
,
//70
"cfg-table"
,
"cfg-table-rsp"
,
"table-meta"
,
"table-meta-rsp"
,
"super-table-meta"
,
...
...
@@ -101,24 +90,42 @@ char *taosMsg[] = {
"multi-table-meta"
,
"multi-table-meta-rsp"
,
"alter-stream"
,
"alter-stream-rsp"
,
//8
0
"alter-stream-rsp"
,
//7
0
"show"
,
"show-rsp"
,
"cfg-mnode"
,
"cfg-mnode-rsp"
,
"kill-query"
,
"kill-query-rsp"
,
"kill-stream"
,
"kill-stream-rsp"
,
"kill-connection"
,
"kill-connectoin-rsp"
,
//90
"kill-connectoin-rsp"
,
"heart-beat"
,
"heart-beat-rsp"
,
"heart-beat-rsp"
,
//80
""
,
""
,
""
,
""
,
""
,
""
,
""
,
""
,
//90
"config-table"
,
"config-table-rsp"
,
"config-vnode"
,
"config-vnode-rsp"
,
"status"
,
"status-rsp"
,
"grant"
,
"grant-rsp"
,
""
,
""
,
//100
"sdb-sync"
,
"sdb-sync-rsp"
,
"sdb-forward"
,
"sdb-forward-rsp"
,
"max"
};
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录