Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c0fc32f6
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c0fc32f6
编写于
12月 06, 2021
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-10431 profile test
上级
57a722a6
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
611 addition
and
170 deletion
+611
-170
include/common/taosmsg.h
include/common/taosmsg.h
+62
-58
source/dnode/mgmt/impl/test/profile/profile.cpp
source/dnode/mgmt/impl/test/profile/profile.cpp
+420
-8
source/dnode/mgmt/impl/test/sut/deploy.cpp
source/dnode/mgmt/impl/test/sut/deploy.cpp
+33
-11
source/dnode/mgmt/impl/test/sut/deploy.h
source/dnode/mgmt/impl/test/sut/deploy.h
+4
-2
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+0
-1
source/dnode/mnode/impl/src/mndProfile.c
source/dnode/mnode/impl/src/mndProfile.c
+11
-6
source/dnode/mnode/impl/src/mndShow.c
source/dnode/mnode/impl/src/mndShow.c
+81
-84
未找到文件。
include/common/taosmsg.h
浏览文件 @
c0fc32f6
...
...
@@ -549,56 +549,61 @@ typedef struct {
// todo: the show handle should be replaced with id
typedef
struct
{
SMsgHead
header
;
union
{
uint64_t
qhandle
;
uint64_t
qId
;};
// query handle
uint16_t
free
;
union
{
int32_t
showId
;
int64_t
qhandle
;
int64_t
qId
;
};
// query handle
int8_t
free
;
}
SRetrieveTableMsg
;
typedef
struct
SRetrieveTableRsp
{
int32_t
numOfRows
;
int8_t
completed
;
// all results are returned to client
int16_t
precision
;
int64_t
offset
;
// updated offset value for multi-vnode projection query
int64_t
offset
;
// updated offset value for multi-vnode projection query
int64_t
useconds
;
int8_t
completed
;
// all results are returned to client
int8_t
precision
;
int8_t
compressed
;
int8_t
reserved
;
int32_t
compLen
;
char
data
[];
}
SRetrieveTableRsp
;
typedef
struct
{
char
db
[
TSDB_FULL_DB_NAME_LEN
];
int32_t
cacheBlockSize
;
//
MB
int32_t
totalBlocks
;
int32_t
maxTables
;
int32_t
daysPerFile
;
int32_t
daysToKeep0
;
int32_t
daysToKeep1
;
int32_t
daysToKeep2
;
int32_t
minRowsPerFileBlock
;
int32_t
maxRowsPerFileBlock
;
int32_t
commitTime
;
int32_t
fsyncPeriod
;
uint8_t
precision
;
// time resolution
int8_t
compression
;
int8_t
walLevel
;
int8_t
replications
;
int8_t
quorum
;
int8_t
ignoreExist
;
int8_t
update
;
int8_t
cacheLastRow
;
int8_t
dbType
;
int16_t
partitions
;
int8_t
reserve
[
5
];
char
db
[
TSDB_FULL_DB_NAME_LEN
];
int32_t
cacheBlockSize
;
//
MB
int32_t
totalBlocks
;
int32_t
maxTables
;
int32_t
daysPerFile
;
int32_t
daysToKeep0
;
int32_t
daysToKeep1
;
int32_t
daysToKeep2
;
int32_t
minRowsPerFileBlock
;
int32_t
maxRowsPerFileBlock
;
int32_t
commitTime
;
int32_t
fsyncPeriod
;
int8_t
precision
;
// time resolution
int8_t
compression
;
int8_t
walLevel
;
int8_t
replications
;
int8_t
quorum
;
int8_t
ignoreExist
;
int8_t
update
;
int8_t
cacheLastRow
;
int8_t
dbType
;
int16_t
partitions
;
int8_t
reserve
[
5
];
}
SCreateDbMsg
,
SAlterDbMsg
;
typedef
struct
{
char
name
[
TSDB_FUNC_NAME_LEN
];
char
path
[
PATH_MAX
];
int32_t
funcType
;
u
int8_t
outputType
;
int16_t
outputLen
;
int32_t
bufSize
;
int32_t
codeLen
;
char
code
[];
char
name
[
TSDB_FUNC_NAME_LEN
];
char
path
[
PATH_MAX
];
int32_t
funcType
;
int8_t
outputType
;
int16_t
outputLen
;
int32_t
bufSize
;
int32_t
codeLen
;
char
code
[];
}
SCreateFuncMsg
;
typedef
struct
{
...
...
@@ -626,8 +631,8 @@ typedef struct {
}
SDropFuncMsg
;
typedef
struct
{
char
db
[
TSDB_TABLE_FNAME_LEN
];
u
int8_t
ignoreNotExists
;
char
db
[
TSDB_TABLE_FNAME_LEN
];
int8_t
ignoreNotExists
;
}
SDropDbMsg
,
SUseDbMsg
,
SSyncDbMsg
;
typedef
struct
{
...
...
@@ -762,21 +767,20 @@ typedef struct {
}
SVgroupsMsg
,
SVgroupsInfo
;
typedef
struct
STableMetaMsg
{
int32_t
contLen
;
char
tableFname
[
TSDB_TABLE_FNAME_LEN
];
// table id
uint8_t
numOfTags
;
uint8_t
precision
;
uint8_t
tableType
;
int16_t
numOfColumns
;
int16_t
sversion
;
int16_t
tversion
;
int32_t
tid
;
uint64_t
uid
;
SVgroupMsg
vgroup
;
char
sTableName
[
TSDB_TABLE_FNAME_LEN
];
uint64_t
suid
;
SSchema
schema
[];
int32_t
contLen
;
char
tableFname
[
TSDB_TABLE_FNAME_LEN
];
// table id
int8_t
numOfTags
;
int8_t
precision
;
int8_t
tableType
;
int16_t
numOfColumns
;
int16_t
sversion
;
int16_t
tversion
;
int32_t
tid
;
int64_t
uid
;
SVgroupMsg
vgroup
;
char
sTableName
[
TSDB_TABLE_FNAME_LEN
];
int64_t
suid
;
SSchema
schema
[];
}
STableMetaMsg
;
typedef
struct
SMultiTableMeta
{
...
...
@@ -802,10 +806,10 @@ typedef struct {
* payloadLen is the length of payload
*/
typedef
struct
{
int8_t
type
;
char
db
[
TSDB_FULL_DB_NAME_LEN
];
u
int16_t
payloadLen
;
char
payload
[];
int8_t
type
;
char
db
[
TSDB_FULL_DB_NAME_LEN
];
int16_t
payloadLen
;
char
payload
[];
}
SShowMsg
;
typedef
struct
{
...
...
@@ -815,7 +819,7 @@ typedef struct {
}
SCompactMsg
;
typedef
struct
SShowRsp
{
uint64_t
qhandle
;
int32_t
showId
;
STableMetaMsg
tableMeta
;
}
SShowRsp
;
...
...
source/dnode/mgmt/impl/test/profile/profile.cpp
浏览文件 @
c0fc32f6
...
...
@@ -21,9 +21,15 @@ class DndTestProfile : public ::testing::Test {
void
TearDown
()
override
{}
static
void
SetUpTestSuite
()
{
pServer
=
createServer
(
"/tmp/dndTestProfile"
,
"localhost"
,
7100
);
const
char
*
user
=
"root"
;
const
char
*
pass
=
"taosdata"
;
const
char
*
path
=
"/tmp/dndTestProfile"
;
const
char
*
fqdn
=
"localhost"
;
uint16_t
port
=
9527
;
pServer
=
createServer
(
path
,
fqdn
,
port
);
ASSERT
(
pServer
);
pClient
=
createClient
(
"root"
,
"taosdata"
);
pClient
=
createClient
(
user
,
pass
,
fqdn
,
port
);
}
static
void
TearDownTestSuite
()
{
...
...
@@ -33,12 +39,14 @@ class DndTestProfile : public ::testing::Test {
static
SServer
*
pServer
;
static
SClient
*
pClient
;
static
int32_t
connId
;
};
SServer
*
DndTestProfile
::
pServer
;
SClient
*
DndTestProfile
::
pClient
;
int32_t
DndTestProfile
::
connId
;
TEST_F
(
DndTestProfile
,
c
onnectMsg_01
)
{
TEST_F
(
DndTestProfile
,
SC
onnectMsg_01
)
{
ASSERT_NE
(
pClient
,
nullptr
);
SConnectMsg
*
pReq
=
(
SConnectMsg
*
)
rpcMallocCont
(
sizeof
(
SConnectMsg
));
...
...
@@ -52,8 +60,10 @@ TEST_F(DndTestProfile, connectMsg_01) {
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_CONNECT
;
sendMsg
(
pClient
,
&
rpcMsg
);
SRpcMsg
*
pMsg
=
pClient
->
pRsp
;
ASSERT_NE
(
pMsg
,
nullptr
);
SConnectRsp
*
pRsp
=
(
SConnectRsp
*
)
p
Client
->
pRsp
->
pCont
;
SConnectRsp
*
pRsp
=
(
SConnectRsp
*
)
p
Msg
->
pCont
;
ASSERT_NE
(
pRsp
,
nullptr
);
pRsp
->
acctId
=
htonl
(
pRsp
->
acctId
);
pRsp
->
clusterId
=
htonl
(
pRsp
->
clusterId
);
...
...
@@ -71,13 +81,165 @@ TEST_F(DndTestProfile, connectMsg_01) {
EXPECT_EQ
(
pRsp
->
epSet
.
numOfEps
,
1
);
EXPECT_EQ
(
pRsp
->
epSet
.
port
[
0
],
9527
);
EXPECT_STREQ
(
pRsp
->
epSet
.
fqdn
[
0
],
"localhost"
);
connId
=
pRsp
->
connId
;
}
TEST_F
(
DndTestProfile
,
heartbeatMsg_01
)
{
TEST_F
(
DndTestProfile
,
SConnectMsg_02
)
{
ASSERT_NE
(
pClient
,
nullptr
);
SConnectMsg
*
pReq
=
(
SConnectMsg
*
)
rpcMallocCont
(
sizeof
(
SConnectMsg
));
pReq
->
pid
=
htonl
(
1234
);
strcpy
(
pReq
->
app
,
"test01"
);
strcpy
(
pReq
->
db
,
"invalid_db"
);
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
sizeof
(
SConnectMsg
);
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_CONNECT
;
sendMsg
(
pClient
,
&
rpcMsg
);
SRpcMsg
*
pMsg
=
pClient
->
pRsp
;
ASSERT_NE
(
pMsg
,
nullptr
);
ASSERT_EQ
(
pMsg
->
code
,
TSDB_CODE_MND_INVALID_DB
);
ASSERT_EQ
(
pMsg
->
contLen
,
0
);
}
TEST_F
(
DndTestProfile
,
SConnectMsg_03
)
{
ASSERT_NE
(
pClient
,
nullptr
);
int32_t
showId
=
0
;
{
SShowMsg
*
pReq
=
(
SShowMsg
*
)
rpcMallocCont
(
sizeof
(
SShowMsg
));
pReq
->
type
=
TSDB_MGMT_TABLE_CONNS
;
strcpy
(
pReq
->
db
,
""
);
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
sizeof
(
SShowMsg
);
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_SHOW
;
sendMsg
(
pClient
,
&
rpcMsg
);
SRpcMsg
*
pMsg
=
pClient
->
pRsp
;
ASSERT_NE
(
pMsg
,
nullptr
);
SShowRsp
*
pRsp
=
(
SShowRsp
*
)
pMsg
->
pCont
;
ASSERT_NE
(
pRsp
,
nullptr
);
pRsp
->
showId
=
htonl
(
pRsp
->
showId
);
STableMetaMsg
*
pMeta
=
&
pRsp
->
tableMeta
;
pMeta
->
contLen
=
htonl
(
pMeta
->
contLen
);
pMeta
->
numOfColumns
=
htons
(
pMeta
->
numOfColumns
);
pMeta
->
sversion
=
htons
(
pMeta
->
sversion
);
pMeta
->
tversion
=
htons
(
pMeta
->
tversion
);
pMeta
->
tid
=
htonl
(
pMeta
->
tid
);
pMeta
->
uid
=
htobe64
(
pMeta
->
uid
);
pMeta
->
suid
=
htobe64
(
pMeta
->
suid
);
showId
=
pRsp
->
showId
;
EXPECT_NE
(
pRsp
->
showId
,
0
);
EXPECT_EQ
(
pMeta
->
contLen
,
0
);
EXPECT_STREQ
(
pMeta
->
tableFname
,
""
);
EXPECT_EQ
(
pMeta
->
numOfTags
,
0
);
EXPECT_EQ
(
pMeta
->
precision
,
0
);
EXPECT_EQ
(
pMeta
->
tableType
,
0
);
EXPECT_EQ
(
pMeta
->
numOfColumns
,
7
);
EXPECT_EQ
(
pMeta
->
sversion
,
0
);
EXPECT_EQ
(
pMeta
->
tversion
,
0
);
EXPECT_EQ
(
pMeta
->
tid
,
0
);
EXPECT_EQ
(
pMeta
->
uid
,
0
);
EXPECT_STREQ
(
pMeta
->
sTableName
,
""
);
EXPECT_EQ
(
pMeta
->
suid
,
0
);
SSchema
*
pSchema
=
NULL
;
pSchema
=
&
pMeta
->
schema
[
0
];
pSchema
->
bytes
=
htons
(
pSchema
->
bytes
);
EXPECT_EQ
(
pSchema
->
colId
,
0
);
EXPECT_EQ
(
pSchema
->
type
,
TSDB_DATA_TYPE_INT
);
EXPECT_EQ
(
pSchema
->
bytes
,
4
);
EXPECT_STREQ
(
pSchema
->
name
,
"connId"
);
pSchema
=
&
pMeta
->
schema
[
1
];
pSchema
->
bytes
=
htons
(
pSchema
->
bytes
);
EXPECT_EQ
(
pSchema
->
colId
,
0
);
EXPECT_EQ
(
pSchema
->
type
,
TSDB_DATA_TYPE_BINARY
);
EXPECT_EQ
(
pSchema
->
bytes
,
TSDB_USER_LEN
+
VARSTR_HEADER_SIZE
);
EXPECT_STREQ
(
pSchema
->
name
,
"user"
);
pSchema
=
&
pMeta
->
schema
[
2
];
pSchema
->
bytes
=
htons
(
pSchema
->
bytes
);
EXPECT_EQ
(
pSchema
->
colId
,
0
);
EXPECT_EQ
(
pSchema
->
type
,
TSDB_DATA_TYPE_BINARY
);
EXPECT_EQ
(
pSchema
->
bytes
,
TSDB_USER_LEN
+
VARSTR_HEADER_SIZE
);
EXPECT_STREQ
(
pSchema
->
name
,
"program"
);
pSchema
=
&
pMeta
->
schema
[
3
];
pSchema
->
bytes
=
htons
(
pSchema
->
bytes
);
EXPECT_EQ
(
pSchema
->
colId
,
0
);
EXPECT_EQ
(
pSchema
->
type
,
TSDB_DATA_TYPE_INT
);
EXPECT_EQ
(
pSchema
->
bytes
,
4
);
EXPECT_STREQ
(
pSchema
->
name
,
"pid"
);
pSchema
=
&
pMeta
->
schema
[
4
];
pSchema
->
bytes
=
htons
(
pSchema
->
bytes
);
EXPECT_EQ
(
pSchema
->
colId
,
0
);
EXPECT_EQ
(
pSchema
->
type
,
TSDB_DATA_TYPE_BINARY
);
EXPECT_EQ
(
pSchema
->
bytes
,
TSDB_IPv4ADDR_LEN
+
6
+
VARSTR_HEADER_SIZE
);
EXPECT_STREQ
(
pSchema
->
name
,
"ip:port"
);
pSchema
=
&
pMeta
->
schema
[
5
];
pSchema
->
bytes
=
htons
(
pSchema
->
bytes
);
EXPECT_EQ
(
pSchema
->
colId
,
0
);
EXPECT_EQ
(
pSchema
->
type
,
TSDB_DATA_TYPE_TIMESTAMP
);
EXPECT_EQ
(
pSchema
->
bytes
,
8
);
EXPECT_STREQ
(
pSchema
->
name
,
"login_time"
);
pSchema
=
&
pMeta
->
schema
[
6
];
pSchema
->
bytes
=
htons
(
pSchema
->
bytes
);
EXPECT_EQ
(
pSchema
->
colId
,
0
);
EXPECT_EQ
(
pSchema
->
type
,
TSDB_DATA_TYPE_TIMESTAMP
);
EXPECT_EQ
(
pSchema
->
bytes
,
8
);
EXPECT_STREQ
(
pSchema
->
name
,
"last_access"
);
}
{
SRetrieveTableMsg
*
pReq
=
(
SRetrieveTableMsg
*
)
rpcMallocCont
(
sizeof
(
SRetrieveTableMsg
));
pReq
->
showId
=
htonl
(
showId
);
pReq
->
free
=
0
;
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
sizeof
(
SRetrieveTableMsg
);
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_SHOW_RETRIEVE
;
sendMsg
(
pClient
,
&
rpcMsg
);
SRpcMsg
*
pMsg
=
pClient
->
pRsp
;
ASSERT_NE
(
pMsg
,
nullptr
);
ASSERT_NE
(
pMsg
->
code
,
0
);
SRetrieveTableRsp
*
pRsp
=
(
SRetrieveTableRsp
*
)
pMsg
->
pCont
;
ASSERT_NE
(
pRsp
,
nullptr
);
pRsp
->
numOfRows
=
htonl
(
pRsp
->
numOfRows
);
pRsp
->
offset
=
htobe64
(
pRsp
->
offset
);
pRsp
->
useconds
=
htobe64
(
pRsp
->
useconds
);
pRsp
->
compLen
=
htonl
(
pRsp
->
compLen
);
EXPECT_EQ
(
pRsp
->
numOfRows
,
1
);
EXPECT_EQ
(
pRsp
->
offset
,
0
);
EXPECT_EQ
(
pRsp
->
useconds
,
0
);
EXPECT_EQ
(
pRsp
->
completed
,
1
);
EXPECT_EQ
(
pRsp
->
precision
,
TSDB_TIME_PRECISION_MILLI
);
EXPECT_EQ
(
pRsp
->
compressed
,
0
);
EXPECT_EQ
(
pRsp
->
reserved
,
0
);
EXPECT_EQ
(
pRsp
->
compLen
,
0
);
}
}
TEST_F
(
DndTestProfile
,
SHeartBeatMsg_01
)
{
ASSERT_NE
(
pClient
,
nullptr
);
SHeartBeatMsg
*
pReq
=
(
SHeartBeatMsg
*
)
rpcMallocCont
(
sizeof
(
SHeartBeatMsg
));
pReq
->
connId
=
htonl
(
1
);
pReq
->
connId
=
htonl
(
connId
);
pReq
->
pid
=
htonl
(
1234
);
pReq
->
numOfQueries
=
htonl
(
0
);
pReq
->
numOfStreams
=
htonl
(
0
);
...
...
@@ -89,8 +251,10 @@ TEST_F(DndTestProfile, heartbeatMsg_01) {
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_HEARTBEAT
;
sendMsg
(
pClient
,
&
rpcMsg
);
SRpcMsg
*
pMsg
=
pClient
->
pRsp
;
ASSERT_NE
(
pMsg
,
nullptr
);
SHeartBeatRsp
*
pRsp
=
(
SHeartBeatRsp
*
)
p
Client
->
pRsp
->
pCont
;
SHeartBeatRsp
*
pRsp
=
(
SHeartBeatRsp
*
)
p
Msg
->
pCont
;
ASSERT_NE
(
pRsp
,
nullptr
);
pRsp
->
connId
=
htonl
(
pRsp
->
connId
);
pRsp
->
queryId
=
htonl
(
pRsp
->
queryId
);
...
...
@@ -99,7 +263,7 @@ TEST_F(DndTestProfile, heartbeatMsg_01) {
pRsp
->
onlineDnodes
=
htonl
(
pRsp
->
onlineDnodes
);
pRsp
->
epSet
.
port
[
0
]
=
htons
(
pRsp
->
epSet
.
port
[
0
]);
EXPECT_EQ
(
pRsp
->
connId
,
1
);
EXPECT_EQ
(
pRsp
->
connId
,
connId
);
EXPECT_EQ
(
pRsp
->
queryId
,
0
);
EXPECT_EQ
(
pRsp
->
streamId
,
0
);
EXPECT_EQ
(
pRsp
->
totalDnodes
,
1
);
...
...
@@ -111,3 +275,251 @@ TEST_F(DndTestProfile, heartbeatMsg_01) {
EXPECT_EQ
(
pRsp
->
epSet
.
port
[
0
],
9527
);
EXPECT_STREQ
(
pRsp
->
epSet
.
fqdn
[
0
],
"localhost"
);
}
TEST_F
(
DndTestProfile
,
SKillConnMsg_01
)
{
ASSERT_NE
(
pClient
,
nullptr
);
{
SKillConnMsg
*
pReq
=
(
SKillConnMsg
*
)
rpcMallocCont
(
sizeof
(
SKillConnMsg
));
pReq
->
connId
=
htonl
(
connId
);
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
sizeof
(
SKillConnMsg
);
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_KILL_CONN
;
sendMsg
(
pClient
,
&
rpcMsg
);
SRpcMsg
*
pMsg
=
pClient
->
pRsp
;
ASSERT_NE
(
pMsg
,
nullptr
);
ASSERT_EQ
(
pMsg
->
code
,
0
);
}
{
SHeartBeatMsg
*
pReq
=
(
SHeartBeatMsg
*
)
rpcMallocCont
(
sizeof
(
SHeartBeatMsg
));
pReq
->
connId
=
htonl
(
connId
);
pReq
->
pid
=
htonl
(
1234
);
pReq
->
numOfQueries
=
htonl
(
0
);
pReq
->
numOfStreams
=
htonl
(
0
);
strcpy
(
pReq
->
app
,
"test01"
);
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
sizeof
(
SHeartBeatMsg
);
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_HEARTBEAT
;
sendMsg
(
pClient
,
&
rpcMsg
);
SRpcMsg
*
pMsg
=
pClient
->
pRsp
;
ASSERT_NE
(
pMsg
,
nullptr
);
ASSERT_EQ
(
pMsg
->
code
,
TSDB_CODE_TSC_INVALID_CONNECTION
);
ASSERT_EQ
(
pMsg
->
contLen
,
0
);
}
{
SConnectMsg
*
pReq
=
(
SConnectMsg
*
)
rpcMallocCont
(
sizeof
(
SConnectMsg
));
pReq
->
pid
=
htonl
(
1234
);
strcpy
(
pReq
->
app
,
"test01"
);
strcpy
(
pReq
->
db
,
""
);
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
sizeof
(
SConnectMsg
);
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_CONNECT
;
sendMsg
(
pClient
,
&
rpcMsg
);
SRpcMsg
*
pMsg
=
pClient
->
pRsp
;
ASSERT_NE
(
pMsg
,
nullptr
);
SConnectRsp
*
pRsp
=
(
SConnectRsp
*
)
pMsg
->
pCont
;
ASSERT_NE
(
pRsp
,
nullptr
);
pRsp
->
acctId
=
htonl
(
pRsp
->
acctId
);
pRsp
->
clusterId
=
htonl
(
pRsp
->
clusterId
);
pRsp
->
connId
=
htonl
(
pRsp
->
connId
);
pRsp
->
epSet
.
port
[
0
]
=
htons
(
pRsp
->
epSet
.
port
[
0
]);
EXPECT_EQ
(
pRsp
->
acctId
,
1
);
EXPECT_GT
(
pRsp
->
clusterId
,
0
);
EXPECT_EQ
(
pRsp
->
superAuth
,
1
);
EXPECT_EQ
(
pRsp
->
readAuth
,
1
);
EXPECT_EQ
(
pRsp
->
writeAuth
,
1
);
EXPECT_EQ
(
pRsp
->
epSet
.
inUse
,
0
);
EXPECT_EQ
(
pRsp
->
epSet
.
numOfEps
,
1
);
EXPECT_EQ
(
pRsp
->
epSet
.
port
[
0
],
9527
);
EXPECT_STREQ
(
pRsp
->
epSet
.
fqdn
[
0
],
"localhost"
);
connId
=
pRsp
->
connId
;
}
}
TEST_F
(
DndTestProfile
,
SKillConnMsg_02
)
{
ASSERT_NE
(
pClient
,
nullptr
);
SKillConnMsg
*
pReq
=
(
SKillConnMsg
*
)
rpcMallocCont
(
sizeof
(
SKillConnMsg
));
pReq
->
connId
=
htonl
(
2345
);
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
sizeof
(
SKillConnMsg
);
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_KILL_CONN
;
sendMsg
(
pClient
,
&
rpcMsg
);
SRpcMsg
*
pMsg
=
pClient
->
pRsp
;
ASSERT_NE
(
pMsg
,
nullptr
);
ASSERT_EQ
(
pMsg
->
code
,
TSDB_CODE_MND_INVALID_CONN_ID
);
}
TEST_F
(
DndTestProfile
,
SKillQueryMsg_01
)
{
ASSERT_NE
(
pClient
,
nullptr
);
{
SKillQueryMsg
*
pReq
=
(
SKillQueryMsg
*
)
rpcMallocCont
(
sizeof
(
SKillQueryMsg
));
pReq
->
connId
=
htonl
(
connId
);
pReq
->
queryId
=
htonl
(
1234
);
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
sizeof
(
SKillQueryMsg
);
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_KILL_QUERY
;
sendMsg
(
pClient
,
&
rpcMsg
);
SRpcMsg
*
pMsg
=
pClient
->
pRsp
;
ASSERT_NE
(
pMsg
,
nullptr
);
ASSERT_EQ
(
pMsg
->
code
,
0
);
ASSERT_EQ
(
pMsg
->
contLen
,
0
);
}
{
SHeartBeatMsg
*
pReq
=
(
SHeartBeatMsg
*
)
rpcMallocCont
(
sizeof
(
SHeartBeatMsg
));
pReq
->
connId
=
htonl
(
connId
);
pReq
->
pid
=
htonl
(
1234
);
pReq
->
numOfQueries
=
htonl
(
0
);
pReq
->
numOfStreams
=
htonl
(
0
);
strcpy
(
pReq
->
app
,
"test01"
);
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
sizeof
(
SHeartBeatMsg
);
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_HEARTBEAT
;
sendMsg
(
pClient
,
&
rpcMsg
);
SRpcMsg
*
pMsg
=
pClient
->
pRsp
;
ASSERT_NE
(
pMsg
,
nullptr
);
SHeartBeatRsp
*
pRsp
=
(
SHeartBeatRsp
*
)
pMsg
->
pCont
;
ASSERT_NE
(
pRsp
,
nullptr
);
pRsp
->
connId
=
htonl
(
pRsp
->
connId
);
pRsp
->
queryId
=
htonl
(
pRsp
->
queryId
);
pRsp
->
streamId
=
htonl
(
pRsp
->
streamId
);
pRsp
->
totalDnodes
=
htonl
(
pRsp
->
totalDnodes
);
pRsp
->
onlineDnodes
=
htonl
(
pRsp
->
onlineDnodes
);
pRsp
->
epSet
.
port
[
0
]
=
htons
(
pRsp
->
epSet
.
port
[
0
]);
EXPECT_EQ
(
pRsp
->
connId
,
connId
);
EXPECT_EQ
(
pRsp
->
queryId
,
1234
);
EXPECT_EQ
(
pRsp
->
streamId
,
0
);
EXPECT_EQ
(
pRsp
->
totalDnodes
,
1
);
EXPECT_EQ
(
pRsp
->
onlineDnodes
,
1
);
EXPECT_EQ
(
pRsp
->
killConnection
,
0
);
EXPECT_EQ
(
pRsp
->
epSet
.
inUse
,
0
);
EXPECT_EQ
(
pRsp
->
epSet
.
numOfEps
,
1
);
EXPECT_EQ
(
pRsp
->
epSet
.
port
[
0
],
9527
);
EXPECT_STREQ
(
pRsp
->
epSet
.
fqdn
[
0
],
"localhost"
);
}
}
TEST_F
(
DndTestProfile
,
SKillQueryMsg_02
)
{
ASSERT_NE
(
pClient
,
nullptr
);
SKillQueryMsg
*
pReq
=
(
SKillQueryMsg
*
)
rpcMallocCont
(
sizeof
(
SKillQueryMsg
));
pReq
->
connId
=
htonl
(
2345
);
pReq
->
queryId
=
htonl
(
1234
);
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
sizeof
(
SKillQueryMsg
);
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_KILL_QUERY
;
sendMsg
(
pClient
,
&
rpcMsg
);
SRpcMsg
*
pMsg
=
pClient
->
pRsp
;
ASSERT_NE
(
pMsg
,
nullptr
);
ASSERT_EQ
(
pMsg
->
code
,
TSDB_CODE_MND_INVALID_CONN_ID
);
}
TEST_F
(
DndTestProfile
,
SKillStreamMsg_01
)
{
ASSERT_NE
(
pClient
,
nullptr
);
{
SKillStreamMsg
*
pReq
=
(
SKillStreamMsg
*
)
rpcMallocCont
(
sizeof
(
SKillStreamMsg
));
pReq
->
connId
=
htonl
(
connId
);
pReq
->
streamId
=
htonl
(
3579
);
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
sizeof
(
SKillStreamMsg
);
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_KILL_STREAM
;
sendMsg
(
pClient
,
&
rpcMsg
);
SRpcMsg
*
pMsg
=
pClient
->
pRsp
;
ASSERT_NE
(
pMsg
,
nullptr
);
ASSERT_EQ
(
pMsg
->
code
,
0
);
ASSERT_EQ
(
pMsg
->
contLen
,
0
);
}
{
SHeartBeatMsg
*
pReq
=
(
SHeartBeatMsg
*
)
rpcMallocCont
(
sizeof
(
SHeartBeatMsg
));
pReq
->
connId
=
htonl
(
connId
);
pReq
->
pid
=
htonl
(
1234
);
pReq
->
numOfQueries
=
htonl
(
0
);
pReq
->
numOfStreams
=
htonl
(
0
);
strcpy
(
pReq
->
app
,
"test01"
);
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
sizeof
(
SHeartBeatMsg
);
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_HEARTBEAT
;
sendMsg
(
pClient
,
&
rpcMsg
);
SRpcMsg
*
pMsg
=
pClient
->
pRsp
;
ASSERT_NE
(
pMsg
,
nullptr
);
SHeartBeatRsp
*
pRsp
=
(
SHeartBeatRsp
*
)
pMsg
->
pCont
;
ASSERT_NE
(
pRsp
,
nullptr
);
pRsp
->
connId
=
htonl
(
pRsp
->
connId
);
pRsp
->
queryId
=
htonl
(
pRsp
->
queryId
);
pRsp
->
streamId
=
htonl
(
pRsp
->
streamId
);
pRsp
->
totalDnodes
=
htonl
(
pRsp
->
totalDnodes
);
pRsp
->
onlineDnodes
=
htonl
(
pRsp
->
onlineDnodes
);
pRsp
->
epSet
.
port
[
0
]
=
htons
(
pRsp
->
epSet
.
port
[
0
]);
EXPECT_EQ
(
pRsp
->
connId
,
connId
);
EXPECT_EQ
(
pRsp
->
queryId
,
0
);
EXPECT_EQ
(
pRsp
->
streamId
,
3579
);
EXPECT_EQ
(
pRsp
->
totalDnodes
,
1
);
EXPECT_EQ
(
pRsp
->
onlineDnodes
,
1
);
EXPECT_EQ
(
pRsp
->
killConnection
,
0
);
EXPECT_EQ
(
pRsp
->
epSet
.
inUse
,
0
);
EXPECT_EQ
(
pRsp
->
epSet
.
numOfEps
,
1
);
EXPECT_EQ
(
pRsp
->
epSet
.
port
[
0
],
9527
);
EXPECT_STREQ
(
pRsp
->
epSet
.
fqdn
[
0
],
"localhost"
);
}
}
TEST_F
(
DndTestProfile
,
SKillStreamMsg_02
)
{
ASSERT_NE
(
pClient
,
nullptr
);
SKillStreamMsg
*
pReq
=
(
SKillStreamMsg
*
)
rpcMallocCont
(
sizeof
(
SKillStreamMsg
));
pReq
->
connId
=
htonl
(
2345
);
pReq
->
streamId
=
htonl
(
1234
);
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pReq
;
rpcMsg
.
contLen
=
sizeof
(
SKillStreamMsg
);
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_KILL_QUERY
;
sendMsg
(
pClient
,
&
rpcMsg
);
SRpcMsg
*
pMsg
=
pClient
->
pRsp
;
ASSERT_NE
(
pMsg
,
nullptr
);
ASSERT_EQ
(
pMsg
->
code
,
TSDB_CODE_MND_INVALID_CONN_ID
);
}
source/dnode/mgmt/impl/test/sut/deploy.cpp
浏览文件 @
c0fc32f6
...
...
@@ -15,8 +15,27 @@
#include "deploy.h"
void
initLog
(
char
*
path
)
{
void
initLog
(
const
char
*
path
)
{
dDebugFlag
=
0
;
vDebugFlag
=
0
;
mDebugFlag
=
207
;
cDebugFlag
=
0
;
jniDebugFlag
=
0
;
tmrDebugFlag
=
0
;
sdbDebugFlag
=
0
;
httpDebugFlag
=
0
;
mqttDebugFlag
=
0
;
monDebugFlag
=
0
;
uDebugFlag
=
0
;
rpcDebugFlag
=
0
;
odbcDebugFlag
=
0
;
qDebugFlag
=
0
;
wDebugFlag
=
0
;
sDebugFlag
=
0
;
tsdbDebugFlag
=
0
;
cqDebugFlag
=
0
;
debugFlag
=
0
;
char
temp
[
PATH_MAX
];
snprintf
(
temp
,
PATH_MAX
,
"%s/taosdlog"
,
path
);
if
(
taosInitLog
(
temp
,
tsNumOfLogLines
,
1
)
!=
0
)
{
...
...
@@ -32,7 +51,7 @@ void* runServer(void* param) {
}
}
void
initOption
(
SDnodeOpt
*
pOption
,
c
har
*
path
,
char
*
fqdn
,
uint16_t
port
)
{
void
initOption
(
SDnodeOpt
*
pOption
,
c
onst
char
*
path
,
const
char
*
fqdn
,
uint16_t
port
)
{
pOption
->
sver
=
1
;
pOption
->
numOfCores
=
1
;
pOption
->
numOfSupportMnodes
=
1
;
...
...
@@ -46,15 +65,16 @@ void initOption(SDnodeOpt* pOption, char* path, char* fqdn, uint16_t port) {
pOption
->
shellActivityTimer
=
30
;
pOption
->
serverPort
=
port
;
strcpy
(
pOption
->
dataDir
,
path
);
snprintf
(
pOption
->
localEp
,
TSDB_EP_LEN
,
"%s:
&
u"
,
fqdn
,
port
);
snprintf
(
pOption
->
localEp
,
TSDB_EP_LEN
,
"%s:
%
u"
,
fqdn
,
port
);
snprintf
(
pOption
->
localFqdn
,
TSDB_FQDN_LEN
,
"%s"
,
fqdn
);
snprintf
(
pOption
->
firstEp
,
TSDB_EP_LEN
,
"%s:&u"
,
fqdn
,
port
);
snprintf
(
pOption
->
firstEp
,
TSDB_EP_LEN
,
"%s:%u"
,
fqdn
,
port
);
}
SServer
*
createServer
(
const
char
*
path
,
const
char
*
fqdn
,
uint16_t
port
)
{
taosRemoveDir
(
path
);
taosMkDir
(
path
);
}
initLog
(
path
);
SServer
*
createServer
(
char
*
path
,
char
*
fqdn
,
uint16_t
port
)
{
SDnodeOpt
option
=
{
0
};
initOption
(
&
option
,
path
,
fqdn
,
port
);
...
...
@@ -80,11 +100,11 @@ void dropServer(SServer* pServer) {
void
processClientRsp
(
void
*
parent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
SClient
*
pClient
=
(
SClient
*
)
parent
;
pClient
->
pRsp
=
pMsg
;
//taosMsleep(1000000);
//
taosMsleep(1000000);
tsem_post
(
&
pClient
->
sem
);
}
SClient
*
createClient
(
c
har
*
user
,
char
*
pass
)
{
SClient
*
createClient
(
c
onst
char
*
user
,
const
char
*
pass
,
const
char
*
fqdn
,
uint16_t
port
)
{
SClient
*
pClient
=
(
SClient
*
)
calloc
(
1
,
sizeof
(
SClient
));
ASSERT
(
pClient
);
...
...
@@ -99,7 +119,7 @@ SClient* createClient(char *user, char *pass) {
rpcInit
.
sessions
=
1024
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
rpcInit
.
idleTime
=
30
*
1000
;
rpcInit
.
user
=
user
;
rpcInit
.
user
=
(
char
*
)
user
;
rpcInit
.
ckey
=
"key"
;
rpcInit
.
parent
=
pClient
;
rpcInit
.
secret
=
(
char
*
)
secretEncrypt
;
...
...
@@ -110,6 +130,8 @@ SClient* createClient(char *user, char *pass) {
ASSERT
(
pClient
->
clientRpc
);
tsem_init
(
&
pClient
->
sem
,
0
,
0
);
strcpy
(
pClient
->
fqdn
,
fqdn
);
pClient
->
port
=
port
;
return
pClient
;
}
...
...
@@ -123,8 +145,8 @@ void sendMsg(SClient* pClient, SRpcMsg* pMsg) {
SEpSet
epSet
=
{
0
};
epSet
.
inUse
=
0
;
epSet
.
numOfEps
=
1
;
epSet
.
port
[
0
]
=
9527
;
strcpy
(
epSet
.
fqdn
[
0
],
"localhost"
);
epSet
.
port
[
0
]
=
pClient
->
port
;
strcpy
(
epSet
.
fqdn
[
0
],
pClient
->
fqdn
);
rpcSendRequest
(
pClient
->
clientRpc
,
&
epSet
,
pMsg
,
NULL
);
tsem_wait
(
&
pClient
->
sem
);
...
...
source/dnode/mgmt/impl/test/sut/deploy.h
浏览文件 @
c0fc32f6
...
...
@@ -31,13 +31,15 @@ typedef struct {
}
SServer
;
typedef
struct
{
char
fqdn
[
TSDB_FQDN_LEN
];
uint16_t
port
;
void
*
clientRpc
;
SRpcMsg
*
pRsp
;
tsem_t
sem
;
}
SClient
;
SServer
*
createServer
(
c
har
*
path
,
char
*
fqdn
,
uint16_t
port
);
SServer
*
createServer
(
c
onst
char
*
path
,
const
char
*
fqdn
,
uint16_t
port
);
void
dropServer
(
SServer
*
pServer
);
SClient
*
createClient
(
c
har
*
user
,
char
*
pass
);
SClient
*
createClient
(
c
onst
char
*
user
,
const
char
*
pass
,
const
char
*
fqdn
,
uint16_t
port
);
void
dropClient
(
SClient
*
pClient
);
void
sendMsg
(
SClient
*
pClient
,
SRpcMsg
*
pMsg
);
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
c0fc32f6
...
...
@@ -296,7 +296,6 @@ typedef struct SShowObj {
void
*
pIter
;
void
*
pVgIter
;
SMnode
*
pMnode
;
SShowObj
**
ppShow
;
char
db
[
TSDB_FULL_DB_NAME_LEN
];
int16_t
offset
[
TSDB_MAX_COLUMNS
];
int32_t
bytes
[
TSDB_MAX_COLUMNS
];
...
...
source/dnode/mnode/impl/src/mndProfile.c
浏览文件 @
c0fc32f6
...
...
@@ -172,9 +172,9 @@ static SConnObj *mndAcquireConn(SMnode *pMnode, int32_t connId, char *newUser, u
}
static
void
mndReleaseConn
(
SMnode
*
pMnode
,
SConnObj
*
pConn
)
{
SProfileMgmt
*
pMgmt
=
&
pMnode
->
profileMgmt
;
if
(
pConn
==
NULL
)
return
;
SProfileMgmt
*
pMgmt
=
&
pMnode
->
profileMgmt
;
taosCacheRelease
(
pMgmt
->
cache
,
(
void
**
)
&
pConn
,
false
);
}
...
...
@@ -316,6 +316,11 @@ static int32_t mndProcessHeartBeatMsg(SMnodeMsg *pMsg) {
}
else
{
mDebug
(
"user:%s, conn:%d is freed and create a new conn:%d"
,
pMsg
->
user
,
pReq
->
connId
,
pConn
->
connId
);
}
}
else
if
(
pConn
->
killed
)
{
mDebug
(
"user:%s, conn:%d is already killed"
,
pMsg
->
user
,
pReq
->
connId
,
pConn
->
connId
);
terrno
=
TSDB_CODE_TSC_INVALID_CONNECTION
;
return
-
1
;
}
else
{
}
SHeartBeatRsp
*
pRsp
=
rpcMallocCont
(
sizeof
(
SHeartBeatRsp
));
...
...
@@ -368,7 +373,7 @@ static int32_t mndProcessKillQueryMsg(SMnodeMsg *pMsg) {
SKillQueryMsg
*
pKill
=
pMsg
->
rpcMsg
.
pCont
;
int32_t
connId
=
htonl
(
pKill
->
connId
);
int32_t
queryId
=
htonl
(
pKill
->
queryId
);
mInfo
(
"kill query msg is received, queryId:%
s
"
,
pKill
->
queryId
);
mInfo
(
"kill query msg is received, queryId:%
d
"
,
pKill
->
queryId
);
SConnObj
*
pConn
=
taosCacheAcquireByKey
(
pMgmt
->
cache
,
&
connId
,
sizeof
(
int32_t
));
if
(
pConn
==
NULL
)
{
...
...
@@ -399,7 +404,7 @@ static int32_t mndProcessKillStreamMsg(SMnodeMsg *pMsg) {
SKillStreamMsg
*
pKill
=
pMsg
->
rpcMsg
.
pCont
;
int32_t
connId
=
htonl
(
pKill
->
connId
);
int32_t
streamId
=
htonl
(
pKill
->
streamId
);
mDebug
(
"kill stream msg is received, streamId:%
s
"
,
streamId
);
mDebug
(
"kill stream msg is received, streamId:%
d
"
,
streamId
);
SConnObj
*
pConn
=
taosCacheAcquireByKey
(
pMgmt
->
cache
,
&
connId
,
sizeof
(
int32_t
));
if
(
pConn
==
NULL
)
{
...
...
@@ -432,11 +437,11 @@ static int32_t mndProcessKillConnectionMsg(SMnodeMsg *pMsg) {
SConnObj
*
pConn
=
taosCacheAcquireByKey
(
pMgmt
->
cache
,
&
connId
,
sizeof
(
int32_t
));
if
(
pConn
==
NULL
)
{
mError
(
"connId:%
s
, failed to kill connection, conn not exist"
,
connId
);
mError
(
"connId:%
d
, failed to kill connection, conn not exist"
,
connId
);
terrno
=
TSDB_CODE_MND_INVALID_CONN_ID
;
return
-
1
;
}
else
{
mInfo
(
"connId:%
s
, is killed by user:%s"
,
connId
,
pMsg
->
user
);
mInfo
(
"connId:%
d
, is killed by user:%s"
,
connId
,
pMsg
->
user
);
pConn
->
killed
=
1
;
taosCacheRelease
(
pMgmt
->
cache
,
(
void
**
)
&
pConn
,
false
);
return
TSDB_CODE_SUCCESS
;
...
...
source/dnode/mnode/impl/src/mndShow.c
浏览文件 @
c0fc32f6
...
...
@@ -16,19 +16,19 @@
#define _DEFAULT_SOURCE
#include "mndShow.h"
static
int32_t
mndProcessShowMsg
(
SMnodeMsg
*
pMnodeMsg
);
static
int32_t
mndProcessRetrieveMsg
(
SMnodeMsg
*
pMsg
);
static
bool
mndCheckRetrieveFinished
(
SShowObj
*
pShow
);
static
int32_t
mndAcquireShowObj
(
SMnode
*
pMnode
,
SShowObj
*
pShow
);
static
void
mndReleaseShowObj
(
SShowObj
*
pShow
,
bool
forceRemove
);
static
int32_t
mndPutShowObj
(
SMnode
*
pMnode
,
SShowObj
*
pShow
);
static
void
mndFreeShowObj
(
void
*
ppShow
);
static
char
*
mndShowStr
(
int32_t
showType
);
static
int32_t
mndProcessShowMsg
(
SMnodeMsg
*
pMnodeMsg
);
static
int32_t
mndProcessRetrieveMsg
(
SMnodeMsg
*
pMsg
);
static
bool
mndCheckRetrieveFinished
(
SShowObj
*
pShow
);
static
SShowObj
*
mndCreateShowObj
(
SMnode
*
pMnode
,
SShowMsg
*
pMsg
);
static
void
mndFreeShowObj
(
SShowObj
*
pShow
);
static
SShowObj
*
mndAcquireShowObj
(
SMnode
*
pMnode
,
int32_t
showId
);
static
void
mndReleaseShowObj
(
SShowObj
*
pShow
,
bool
forceRemove
);
static
char
*
mndShowStr
(
int32_t
showType
);
int32_t
mndInitShow
(
SMnode
*
pMnode
)
{
SShowMgmt
*
pMgmt
=
&
pMnode
->
showMgmt
;
pMgmt
->
cache
=
taosCacheInit
(
TSDB_
CACHE_PTR_KEY
,
5
,
true
,
mndFreeShowObj
,
"show"
);
pMgmt
->
cache
=
taosCacheInit
(
TSDB_
DATA_TYPE_INT
,
5
,
true
,
(
__cache_free_fn_t
)
mndFreeShowObj
,
"show"
);
if
(
pMgmt
->
cache
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
mError
(
"failed to alloc show cache since %s"
,
terrstr
());
...
...
@@ -48,47 +48,41 @@ void mndCleanupShow(SMnode *pMnode) {
}
}
static
int32_t
mndAcquireShowObj
(
SMnode
*
pMnode
,
SShowObj
*
pShow
)
{
TSDB_CACHE_PTR_TYPE
handleVal
=
(
TSDB_CACHE_PTR_TYPE
)
pShow
;
SShowMgmt
*
pMgmt
=
&
pMnode
->
showMgmt
;
SShowObj
**
ppShow
=
taosCacheAcquireByKey
(
pMgmt
->
cache
,
&
handleVal
,
sizeof
(
TSDB_CACHE_PTR_TYPE
));
if
(
ppShow
)
{
mTrace
(
"show:%d, data:%p acquired from cache"
,
pShow
->
id
,
ppShow
);
return
0
;
}
return
-
1
;
}
static
void
mndReleaseShowObj
(
SShowObj
*
pShow
,
bool
forceRemove
)
{
SMnode
*
pMnode
=
pShow
->
pMnode
;
static
SShowObj
*
mndCreateShowObj
(
SMnode
*
pMnode
,
SShowMsg
*
pMsg
)
{
SShowMgmt
*
pMgmt
=
&
pMnode
->
showMgmt
;
SShowObj
**
ppShow
=
(
SShowObj
**
)
pShow
->
ppShow
;
taosCacheRelease
(
pMgmt
->
cache
,
(
void
**
)(
&
ppShow
),
forceRemove
);
mDebug
(
"show:%d, data:%p released from cache, force:%d"
,
pShow
->
id
,
ppShow
,
forceRemove
);
}
static
int32_t
mndPutShowObj
(
SMnode
*
pMnode
,
SShowObj
*
pShow
)
{
SShowMgmt
*
pMgmt
=
&
pMnode
->
showMgmt
;
int32_t
lifeSpan
=
pMnode
->
shellActivityTimer
*
6
*
1000
;
int32_t
showId
=
atomic_add_fetch_32
(
&
pMgmt
->
showId
,
1
);
if
(
showId
==
0
)
atomic_add_fetch_32
(
&
pMgmt
->
showId
,
1
);
TSDB_CACHE_PTR_TYPE
val
=
(
TSDB_CACHE_PTR_TYPE
)
pShow
;
pShow
->
id
=
atomic_add_fetch_32
(
&
pMgmt
->
showId
,
1
);
SShowObj
**
ppShow
=
taosCachePut
(
pMgmt
->
cache
,
&
val
,
sizeof
(
TSDB_CACHE_PTR_TYPE
),
&
pShow
,
sizeof
(
TSDB_CACHE_PTR_TYPE
),
lifeSpan
);
if
(
ppShow
==
NULL
)
{
int32_t
size
=
sizeof
(
SShowObj
)
+
pMsg
->
payloadLen
;
SShowObj
*
pShow
=
calloc
(
1
,
size
);
if
(
pShow
!=
NULL
)
{
pShow
->
id
=
showId
;
pShow
->
pMnode
=
pMnode
;
pShow
->
type
=
pMsg
->
type
;
pShow
->
payloadLen
=
pMsg
->
payloadLen
;
memcpy
(
pShow
->
db
,
pMsg
->
db
,
TSDB_FULL_DB_NAME_LEN
);
memcpy
(
pShow
->
payload
,
pMsg
->
payload
,
pMsg
->
payloadLen
);
}
else
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
mError
(
"
show:%d, failed to put into cache"
,
pShow
->
id
);
return
-
1
;
mError
(
"
failed to process show-meta msg:%s since %s"
,
mndShowStr
(
pMsg
->
type
),
terrstr
()
);
return
NULL
;
}
mTrace
(
"show:%d, data:%p put into cache"
,
pShow
->
id
,
ppShow
);
return
0
;
int32_t
keepTime
=
pMnode
->
shellActivityTimer
*
6
*
1000
;
SShowObj
*
pShowRet
=
taosCachePut
(
pMgmt
->
cache
,
&
showId
,
sizeof
(
int32_t
),
pShow
,
size
,
keepTime
);
free
(
pShow
);
if
(
pShowRet
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
mError
(
"show:%d, failed to put into cache since %s"
,
showId
,
terrstr
());
return
NULL
;
}
else
{
mTrace
(
"show:%d, data:%p created"
,
showId
,
pShowRet
);
return
pShowRet
;
}
}
static
void
mndFreeShowObj
(
void
*
ppShow
)
{
SShowObj
*
pShow
=
*
(
SShowObj
**
)
ppShow
;
static
void
mndFreeShowObj
(
SShowObj
*
pShow
)
{
SMnode
*
pMnode
=
pShow
->
pMnode
;
SShowMgmt
*
pMgmt
=
&
pMnode
->
showMgmt
;
...
...
@@ -103,8 +97,29 @@ static void mndFreeShowObj(void *ppShow) {
}
}
mDebug
(
"show:%d, data:%p destroyed"
,
pShow
->
id
,
ppShow
);
tfree
(
pShow
);
mTrace
(
"show:%d, data:%p destroyed"
,
pShow
->
id
,
pShow
);
}
static
SShowObj
*
mndAcquireShowObj
(
SMnode
*
pMnode
,
int32_t
showId
)
{
SShowMgmt
*
pMgmt
=
&
pMnode
->
showMgmt
;
SShowObj
*
pShow
=
taosCacheAcquireByKey
(
pMgmt
->
cache
,
&
showId
,
sizeof
(
int32_t
));
if
(
pShow
==
NULL
)
{
mError
(
"show:%d, already destroyed"
,
showId
);
return
NULL
;
}
mTrace
(
"show:%d, data:%p acquired from cache"
,
pShow
->
id
,
pShow
);
return
pShow
;
}
static
void
mndReleaseShowObj
(
SShowObj
*
pShow
,
bool
forceRemove
)
{
if
(
pShow
==
NULL
)
return
;
mTrace
(
"show:%d, data:%p released from cache, force:%d"
,
pShow
->
id
,
pShow
,
forceRemove
);
SMnode
*
pMnode
=
pShow
->
pMnode
;
SShowMgmt
*
pMgmt
=
&
pMnode
->
showMgmt
;
taosCacheRelease
(
pMgmt
->
cache
,
(
void
**
)(
&
pShow
),
forceRemove
);
}
static
int32_t
mndProcessShowMsg
(
SMnodeMsg
*
pMnodeMsg
)
{
...
...
@@ -112,7 +127,7 @@ static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) {
SShowMgmt
*
pMgmt
=
&
pMnode
->
showMgmt
;
SShowMsg
*
pMsg
=
pMnodeMsg
->
rpcMsg
.
pCont
;
int8_t
type
=
pMsg
->
type
;
uint16_t
payloadLen
=
htonl
(
pMsg
->
payloadLen
);
int16_t
payloadLen
=
htonl
(
pMsg
->
payloadLen
);
if
(
type
<=
TSDB_MGMT_TABLE_START
||
type
>=
TSDB_MGMT_TABLE_MAX
)
{
terrno
=
TSDB_CODE_MND_INVALID_MSG_TYPE
;
...
...
@@ -127,27 +142,13 @@ static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) {
return
-
1
;
}
int32_t
size
=
sizeof
(
SShowObj
)
+
payloadLen
;
SShowObj
*
pShow
=
calloc
(
1
,
size
);
if
(
pShow
!=
NULL
)
{
pShow
->
pMnode
=
pMnode
;
pShow
->
type
=
type
;
pShow
->
payloadLen
=
payloadLen
;
memcpy
(
pShow
->
db
,
pMsg
->
db
,
TSDB_FULL_DB_NAME_LEN
);
memcpy
(
pShow
->
payload
,
pMsg
->
payload
,
payloadLen
);
}
else
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
SShowObj
*
pShow
=
mndCreateShowObj
(
pMnode
,
pMsg
);
if
(
pShow
==
NULL
)
{
mError
(
"failed to process show-meta msg:%s since %s"
,
mndShowStr
(
type
),
terrstr
());
return
-
1
;
}
if
(
mndPutShowObj
(
pMnode
,
pShow
)
==
0
)
{
mError
(
"failed to process show-meta msg:%s since %s"
,
mndShowStr
(
type
),
terrstr
());
free
(
pShow
);
return
-
1
;
}
size
=
sizeof
(
SShowRsp
)
+
sizeof
(
SSchema
)
*
TSDB_MAX_COLUMNS
+
TSDB_EXTRA_PAYLOAD_SIZE
;
int32_t
size
=
sizeof
(
SShowRsp
)
+
sizeof
(
SSchema
)
*
TSDB_MAX_COLUMNS
+
TSDB_EXTRA_PAYLOAD_SIZE
;
SShowRsp
*
pRsp
=
rpcMallocCont
(
size
);
if
(
pRsp
==
NULL
)
{
mndReleaseShowObj
(
pShow
,
true
);
...
...
@@ -156,15 +157,14 @@ static int32_t mndProcessShowMsg(SMnodeMsg *pMnodeMsg) {
return
-
1
;
}
pRsp
->
qhandle
=
htobe64
((
uint64_t
)
pShow
);
int32_t
code
=
(
*
metaFp
)(
pMnodeMsg
,
pShow
,
&
pRsp
->
tableMeta
);
mDebug
(
"show:%d, type:%s, get meta finished, numOfRows:%d cols:%d result:%s"
,
pShow
->
id
,
mndShowStr
(
type
),
pShow
->
numOfRows
,
pShow
->
numOfColumns
,
tstrerror
(
code
));
int32_t
code
=
(
*
metaFp
)(
pMnodeMsg
,
pShow
,
&
pRsp
->
tableMeta
);
mDebug
(
"show:%d, data:%p get meta finished, numOfRows:%d cols:%d type:%s result:%s"
,
pShow
->
id
,
pShow
,
pShow
->
numOfRows
,
pShow
->
numOfColumns
,
mndShowStr
(
type
),
tstrerror
(
code
));
if
(
code
==
TSDB_CODE_SUCCESS
)
{
pMnodeMsg
->
contLen
=
sizeof
(
SShowRsp
)
+
sizeof
(
SSchema
)
*
pShow
->
numOfColumns
;
pMnodeMsg
->
pCont
=
pRsp
;
pRsp
->
showId
=
htonl
(
pShow
->
id
);
mndReleaseShowObj
(
pShow
,
false
);
return
TSDB_CODE_SUCCESS
;
}
else
{
...
...
@@ -182,14 +182,10 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) {
int32_t
rowsRead
=
0
;
SRetrieveTableMsg
*
pRetrieve
=
pMnodeMsg
->
rpcMsg
.
pCont
;
pRetrieve
->
qhandle
=
htobe64
(
pRetrieve
->
qhandle
);
SShowObj
*
pShow
=
(
SShowObj
*
)
pRetrieve
->
qhandle
;
int32_t
showId
=
htonl
(
pRetrieve
->
showId
);
/*
* in case of server restart, apps may hold qhandle created by server before
* restart, which is actually invalid, therefore, signature check is required.
*/
if
(
mndAcquireShowObj
(
pMnode
,
pShow
)
!=
0
)
{
SShowObj
*
pShow
=
mndAcquireShowObj
(
pMnode
,
showId
);
if
(
pShow
==
NULL
)
{
terrno
=
TSDB_CODE_MND_INVALID_SHOWOBJ
;
mError
(
"failed to process show-retrieve msg:%p since %s"
,
pShow
,
terrstr
());
return
-
1
;
...
...
@@ -199,15 +195,16 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) {
if
(
retrieveFp
==
NULL
)
{
mndReleaseShowObj
(
pShow
,
false
);
terrno
=
TSDB_CODE_MSG_NOT_PROCESSED
;
mError
(
"show:%d,
failed to retrieve data since %s"
,
pShow
->
id
,
terrstr
());
mError
(
"show:%d,
data:%p failed to retrieve data since %s"
,
pShow
->
id
,
pShow
,
terrstr
());
return
-
1
;
}
mDebug
(
"show:%d,
type:%s, start retrieve data, numOfReads:%d numOfRows:%d"
,
pShow
->
id
,
mndShowStr
(
pShow
->
type
)
,
pShow
->
numOfReads
,
pShow
->
numOfRows
);
mDebug
(
"show:%d,
data:%p start retrieve data, numOfReads:%d numOfRows:%d type:%s"
,
pShow
->
id
,
pShow
,
pShow
->
numOfReads
,
pShow
->
numOfRows
,
mndShowStr
(
pShow
->
type
)
);
if
(
mndCheckRetrieveFinished
(
pShow
))
{
mDebug
(
"show:%d, read finished, numOfReads:%d numOfRows:%d"
,
pShow
->
id
,
pShow
->
numOfReads
,
pShow
->
numOfRows
);
mDebug
(
"show:%d, data:%p read finished, numOfReads:%d numOfRows:%d"
,
pShow
->
id
,
pShow
,
pShow
->
numOfReads
,
pShow
->
numOfRows
);
pShow
->
numOfReads
=
pShow
->
numOfRows
;
}
...
...
@@ -230,7 +227,7 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) {
if
(
pRsp
==
NULL
)
{
mndReleaseShowObj
(
pShow
,
false
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
mError
(
"show:%d,
failed to retrieve data since %s"
,
pShow
->
id
,
terrstr
());
mError
(
"show:%d,
data:%p failed to retrieve data since %s"
,
pShow
->
id
,
pShow
,
terrstr
());
return
-
1
;
}
...
...
@@ -239,20 +236,20 @@ static int32_t mndProcessRetrieveMsg(SMnodeMsg *pMnodeMsg) {
rowsRead
=
(
*
retrieveFp
)(
pMnodeMsg
,
pShow
,
pRsp
->
data
,
rowsToRead
);
}
mDebug
(
"show:%d,
stop retrieve data, rowsRead:%d rowsToRead:%d"
,
pShow
->
id
,
rowsRead
,
rowsToRead
);
mDebug
(
"show:%d,
data:%p stop retrieve data, rowsRead:%d rowsToRead:%d"
,
pShow
->
id
,
pShow
,
rowsRead
,
rowsToRead
);
pRsp
->
numOfRows
=
htonl
(
rowsRead
);
pRsp
->
precision
=
(
int16_t
)
htonl
(
TSDB_TIME_PRECISION_MILLI
)
;
// millisecond time precision
pRsp
->
precision
=
TSDB_TIME_PRECISION_MILLI
;
// millisecond time precision
pMnodeMsg
->
pCont
=
pRsp
;
pMnodeMsg
->
contLen
=
size
;
if
(
rowsToRead
==
0
||
(
rowsRead
==
rowsToRead
&&
pShow
->
numOfRows
==
pShow
->
numOfReads
))
{
pRsp
->
completed
=
1
;
mDebug
(
"
%p, retrieve completed"
,
pShow
);
mDebug
(
"
show:%d, data:%p retrieve completed"
,
pShow
->
id
,
pShow
);
mndReleaseShowObj
(
pShow
,
true
);
}
else
{
mDebug
(
"
%p, retrieve not completed yet"
,
pShow
);
mDebug
(
"
show:%d, data:%p retrieve not completed yet"
,
pShow
->
id
,
pShow
);
mndReleaseShowObj
(
pShow
,
false
);
}
...
...
@@ -307,7 +304,7 @@ static char *mndShowStr(int32_t showType) {
static
bool
mndCheckRetrieveFinished
(
SShowObj
*
pShow
)
{
if
(
pShow
->
pIter
==
NULL
&&
pShow
->
numOfReads
!=
0
)
{
return
true
;
}
}
return
false
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录