Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7dc72d3f
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
7dc72d3f
编写于
4月 23, 2020
作者:
S
slguan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-91] make monitor work
上级
6dbca070
变更
5
显示空白变更内容
内联
并排
Showing
5 changed file
with
180 addition
and
197 deletion
+180
-197
src/dnode/src/dnodeModule.c
src/dnode/src/dnodeModule.c
+2
-2
src/dnode/src/dnodeShell.c
src/dnode/src/dnodeShell.c
+6
-5
src/mnode/src/mgmtSdb.c
src/mnode/src/mgmtSdb.c
+2
-2
src/plugins/monitor/inc/monitorSystem.h
src/plugins/monitor/inc/monitorSystem.h
+28
-11
src/plugins/monitor/src/monitorSystem.c
src/plugins/monitor/src/monitorSystem.c
+142
-177
未找到文件。
src/dnode/src/dnodeModule.c
浏览文件 @
7dc72d3f
...
@@ -90,7 +90,7 @@ void dnodeCleanUpModules() {
...
@@ -90,7 +90,7 @@ void dnodeCleanUpModules() {
int32_t
dnodeInitModules
()
{
int32_t
dnodeInitModules
()
{
dnodeAllocModules
();
dnodeAllocModules
();
for
(
int32_t
module
=
0
;
module
<
TSDB_MOD_MAX
;
++
module
)
{
for
(
EModuleType
module
=
0
;
module
<
TSDB_MOD_MAX
;
++
module
)
{
if
(
tsModule
[
module
].
initFp
)
{
if
(
tsModule
[
module
].
initFp
)
{
if
((
*
tsModule
[
module
].
initFp
)()
!=
0
)
{
if
((
*
tsModule
[
module
].
initFp
)()
!=
0
)
{
dError
(
"failed to init module:%s"
,
tsModule
[
module
].
name
);
dError
(
"failed to init module:%s"
,
tsModule
[
module
].
name
);
...
@@ -103,7 +103,7 @@ int32_t dnodeInitModules() {
...
@@ -103,7 +103,7 @@ int32_t dnodeInitModules() {
}
}
void
dnodeStartModules
()
{
void
dnodeStartModules
()
{
for
(
int32_t
module
=
1
;
module
<
TSDB_MOD_MAX
;
++
module
)
{
for
(
EModuleType
module
=
1
;
module
<
TSDB_MOD_MAX
;
++
module
)
{
if
(
tsModule
[
module
].
enable
&&
tsModule
[
module
].
startFp
)
{
if
(
tsModule
[
module
].
enable
&&
tsModule
[
module
].
startFp
)
{
if
((
*
tsModule
[
module
].
startFp
)()
!=
0
)
{
if
((
*
tsModule
[
module
].
startFp
)()
!=
0
)
{
dError
(
"failed to start module:%s"
,
tsModule
[
module
].
name
);
dError
(
"failed to start module:%s"
,
tsModule
[
module
].
name
);
...
...
src/dnode/src/dnodeShell.c
浏览文件 @
7dc72d3f
...
@@ -20,6 +20,7 @@
...
@@ -20,6 +20,7 @@
#include "taosmsg.h"
#include "taosmsg.h"
#include "trpc.h"
#include "trpc.h"
#include "tglobal.h"
#include "tglobal.h"
#include "http.h"
#include "dnode.h"
#include "dnode.h"
#include "dnodeLog.h"
#include "dnodeLog.h"
#include "dnodeRead.h"
#include "dnodeRead.h"
...
@@ -29,7 +30,7 @@
...
@@ -29,7 +30,7 @@
static
void
(
*
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_MAX
])(
SRpcMsg
*
);
static
void
(
*
dnodeProcessShellMsgFp
[
TSDB_MSG_TYPE_MAX
])(
SRpcMsg
*
);
static
void
dnodeProcessMsgFromShell
(
SRpcMsg
*
pMsg
);
static
void
dnodeProcessMsgFromShell
(
SRpcMsg
*
pMsg
);
static
int
dnodeRetrieveUserAuthInfo
(
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
static
int
dnodeRetrieveUserAuthInfo
(
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
static
void
*
tsDnodeShellRpc
=
NULL
;
static
void
*
tsDnodeShellRpc
=
NULL
;
static
int32_t
tsDnodeQueryReqNum
=
0
;
static
int32_t
tsDnodeQueryReqNum
=
0
;
static
int32_t
tsDnodeSubmitReqNum
=
0
;
static
int32_t
tsDnodeSubmitReqNum
=
0
;
...
@@ -110,7 +111,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
...
@@ -110,7 +111,7 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
SDnodeStatisInfo
dnodeGetStatisInfo
()
{
SDnodeStatisInfo
dnodeGetStatisInfo
()
{
SDnodeStatisInfo
info
=
{
0
};
SDnodeStatisInfo
info
=
{
0
};
if
(
dnodeGetRunStatus
()
==
TSDB_DNODE_RUN_STATUS_RUNING
)
{
if
(
dnodeGetRunStatus
()
==
TSDB_DNODE_RUN_STATUS_RUNING
)
{
//
info.httpReqNum = httpGetReqCount();
info
.
httpReqNum
=
httpGetReqCount
();
info
.
queryReqNum
=
atomic_exchange_32
(
&
tsDnodeQueryReqNum
,
0
);
info
.
queryReqNum
=
atomic_exchange_32
(
&
tsDnodeQueryReqNum
,
0
);
info
.
submitReqNum
=
atomic_exchange_32
(
&
tsDnodeSubmitReqNum
,
0
);
info
.
submitReqNum
=
atomic_exchange_32
(
&
tsDnodeSubmitReqNum
,
0
);
}
}
...
...
src/mnode/src/mgmtSdb.c
浏览文件 @
7dc72d3f
...
@@ -333,7 +333,7 @@ void sdbIncRef(void *handle, void *pRow) {
...
@@ -333,7 +333,7 @@ void sdbIncRef(void *handle, void *pRow) {
SSdbTable
*
pTable
=
handle
;
SSdbTable
*
pTable
=
handle
;
int32_t
*
pRefCount
=
(
int32_t
*
)(
pRow
+
pTable
->
refCountPos
);
int32_t
*
pRefCount
=
(
int32_t
*
)(
pRow
+
pTable
->
refCountPos
);
atomic_add_fetch_32
(
pRefCount
,
1
);
atomic_add_fetch_32
(
pRefCount
,
1
);
if
(
1
&&
strcmp
(
pTable
->
tableName
,
"accounts"
)
==
0
)
{
if
(
0
&&
strcmp
(
pTable
->
tableName
,
"accounts"
)
==
0
)
{
sdbTrace
(
"table:%s, add ref to record:%s:%s:%d"
,
pTable
->
tableName
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pRow
),
sdbTrace
(
"table:%s, add ref to record:%s:%s:%d"
,
pTable
->
tableName
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pRow
),
*
pRefCount
);
*
pRefCount
);
}
}
...
@@ -345,7 +345,7 @@ void sdbDecRef(void *handle, void *pRow) {
...
@@ -345,7 +345,7 @@ void sdbDecRef(void *handle, void *pRow) {
SSdbTable
*
pTable
=
handle
;
SSdbTable
*
pTable
=
handle
;
int32_t
*
pRefCount
=
(
int32_t
*
)(
pRow
+
pTable
->
refCountPos
);
int32_t
*
pRefCount
=
(
int32_t
*
)(
pRow
+
pTable
->
refCountPos
);
int32_t
refCount
=
atomic_sub_fetch_32
(
pRefCount
,
1
);
int32_t
refCount
=
atomic_sub_fetch_32
(
pRefCount
,
1
);
if
(
1
&&
strcmp
(
pTable
->
tableName
,
"accounts"
)
==
0
)
{
if
(
0
&&
strcmp
(
pTable
->
tableName
,
"accounts"
)
==
0
)
{
sdbTrace
(
"table:%s, def ref of record:%s:%s:%d"
,
pTable
->
tableName
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pRow
),
sdbTrace
(
"table:%s, def ref of record:%s:%s:%d"
,
pTable
->
tableName
,
pTable
->
tableName
,
sdbGetkeyStr
(
pTable
,
pRow
),
*
pRefCount
);
*
pRefCount
);
}
}
...
...
src/plugins/monitor/inc/monitorSystem.h
浏览文件 @
7dc72d3f
...
@@ -23,17 +23,34 @@ extern "C" {
...
@@ -23,17 +23,34 @@ extern "C" {
#include <stdbool.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdint.h>
int32_t
monitorInitSystem
();
typedef
struct
{
int32_t
monitorStartSystem
();
char
*
acctId
;
void
monitorStopSystem
();
int64_t
currentPointsPerSecond
;
void
monitorCleanUpSystem
();
int64_t
maxPointsPerSecond
;
void
monitorSaveAcctLog
(
char
*
acctId
,
int64_t
currentPointsPerSecond
,
int64_t
maxPointsPerSecond
,
int64_t
totalTimeSeries
;
int64_t
totalTimeSeries
,
int64_t
maxTimeSeries
,
int64_t
totalStorage
,
int64_t
maxStorage
,
int64_t
maxTimeSeries
;
int64_t
totalQueryTime
,
int64_t
maxQueryTime
,
int64_t
totalInbound
,
int64_t
maxInbound
,
int64_t
totalStorage
;
int64_t
totalOutbound
,
int64_t
maxOutbound
,
int64_t
totalDbs
,
int64_t
maxDbs
,
int64_t
maxStorage
;
int64_t
totalUsers
,
int64_t
maxUsers
,
int64_t
totalStreams
,
int64_t
maxStreams
,
int64_t
totalQueryTime
;
int64_t
totalConns
,
int64_t
maxConns
,
int8_t
accessState
);
int64_t
maxQueryTime
;
void
monitorSaveLog
(
int
level
,
const
char
*
const
format
,
...);
int64_t
totalInbound
;
int64_t
maxInbound
;
int64_t
totalOutbound
;
int64_t
maxOutbound
;
int64_t
totalDbs
;
int64_t
maxDbs
;
int64_t
totalUsers
;
int64_t
maxUsers
;
int64_t
totalStreams
;
int64_t
maxStreams
;
int64_t
totalConns
;
int64_t
maxConns
;
int8_t
accessState
;
}
SAcctMonitorObj
;
void
monitorSaveAcctLog
(
SAcctMonitorObj
*
pMonObj
);
void
monitorSaveLog
(
int32_t
level
,
const
char
*
const
format
,
...);
void
monitorExecuteSQL
(
char
*
sql
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
src/plugins/monitor/src/monitorSystem.c
浏览文件 @
7dc72d3f
...
@@ -15,15 +15,16 @@
...
@@ -15,15 +15,16 @@
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include "os.h"
#include "os.h"
#include "tlog.h"
#include "monitor.h"
#include "dnode.h"
#include "tsclient.h"
#include "taosdef.h"
#include "taosdef.h"
#include "tsystem.h"
#include "taoserror.h"
#include "tlog.h"
#include "ttime.h"
#include "ttime.h"
#include "ttimer.h"
#include "ttimer.h"
#include "tutil.h"
#include "tutil.h"
#include "tsystem.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "dnode.h"
#include "monitorSystem.h"
#include "monitorSystem.h"
#define monitorError(...) \
#define monitorError(...) \
...
@@ -41,10 +42,6 @@
...
@@ -41,10 +42,6 @@
#define monitorPrint(...) \
#define monitorPrint(...) \
{ taosPrintLog("MON ", 255, __VA_ARGS__); }
{ taosPrintLog("MON ", 255, __VA_ARGS__); }
#define monitorLError(...) monitorError(__VA_ARGS__)
#define monitorLWarn(...) monitorWarn(__VA_ARGS__)
#define monitorLPrint(...) monitorPrint(__VA_ARGS__)
#define SQL_LENGTH 1024
#define SQL_LENGTH 1024
#define LOG_LEN_STR 80
#define LOG_LEN_STR 80
#define IP_LEN_STR 15
#define IP_LEN_STR 15
...
@@ -59,14 +56,14 @@ typedef enum {
...
@@ -59,14 +56,14 @@ typedef enum {
MONITOR_CMD_CREATE_TB_ACCT_ROOT
,
MONITOR_CMD_CREATE_TB_ACCT_ROOT
,
MONITOR_CMD_CREATE_TB_SLOWQUERY
,
MONITOR_CMD_CREATE_TB_SLOWQUERY
,
MONITOR_CMD_MAX
MONITOR_CMD_MAX
}
MonitorCommand
;
}
E
MonitorCommand
;
typedef
enum
{
typedef
enum
{
MONITOR_STATE_UN_INIT
,
MONITOR_STATE_UN_INIT
,
MONITOR_STATE_INITIALIZING
,
MONITOR_STATE_INITIALIZING
,
MONITOR_STATE_INITIALIZED
,
MONITOR_STATE_INITIALIZED
,
MONITOR_STATE_STOPPED
MONITOR_STATE_STOPPED
}
MonitorState
;
}
E
MonitorState
;
typedef
struct
{
typedef
struct
{
void
*
conn
;
void
*
conn
;
...
@@ -77,89 +74,75 @@ typedef struct {
...
@@ -77,89 +74,75 @@ typedef struct {
char
sql
[
SQL_LENGTH
];
char
sql
[
SQL_LENGTH
];
void
*
initTimer
;
void
*
initTimer
;
void
*
diskTimer
;
void
*
diskTimer
;
}
MonitorConn
;
}
SMonitorConn
;
MonitorConn
*
monitor
=
NULL
;
static
SMonitorConn
tsMonitorConn
;
static
void
monitorInitConn
(
void
*
para
,
void
*
unused
);
TAOS
*
taos_connect_a
(
char
*
ip
,
char
*
user
,
char
*
pass
,
char
*
db
,
uint16_t
port
,
void
(
*
fp
)(
void
*
,
TAOS_RES
*
,
int
),
static
void
monitorInitConnCb
(
void
*
param
,
TAOS_RES
*
result
,
int32_t
code
);
void
*
param
,
void
**
taos
);
static
void
monitorInitDatabase
();
void
monitorInitConn
(
void
*
para
,
void
*
unused
);
static
void
monitorInitDatabaseCb
(
void
*
param
,
TAOS_RES
*
result
,
int32_t
code
);
void
monitorInitConnCb
(
void
*
param
,
TAOS_RES
*
result
,
int
code
);
static
void
monitorStartTimer
();
void
monitorInitDatabase
();
static
void
monitorSaveSystemInfo
();
void
monitorInitDatabaseCb
(
void
*
param
,
TAOS_RES
*
result
,
int
code
);
void
monitorStartTimer
();
static
void
monitorCheckDiskUsage
(
void
*
para
,
void
*
unused
)
{
void
monitorSaveSystemInfo
();
void
monitorSaveLog
(
int
level
,
const
char
*
const
format
,
...);
void
monitorSaveAcctLog
(
char
*
acctId
,
int64_t
currentPointsPerSecond
,
int64_t
maxPointsPerSecond
,
int64_t
totalTimeSeries
,
int64_t
maxTimeSeries
,
int64_t
totalStorage
,
int64_t
maxStorage
,
int64_t
totalQueryTime
,
int64_t
maxQueryTime
,
int64_t
totalInbound
,
int64_t
maxInbound
,
int64_t
totalOutbound
,
int64_t
maxOutbound
,
int64_t
totalDbs
,
int64_t
maxDbs
,
int64_t
totalUsers
,
int64_t
maxUsers
,
int64_t
totalStreams
,
int64_t
maxStreams
,
int64_t
totalConns
,
int64_t
maxConns
,
int8_t
accessState
);
void
(
*
mnodeCountRequestFp
)(
SDnodeStatisInfo
*
info
)
=
NULL
;
void
monitorExecuteSQL
(
char
*
sql
);
void
monitorCheckDiskUsage
(
void
*
para
,
void
*
unused
)
{
taosGetDisk
();
taosGetDisk
();
taosTmrReset
(
monitorCheckDiskUsage
,
CHECK_INTERVAL
,
NULL
,
tscTmr
,
&
monitor
->
diskTimer
);
taosTmrReset
(
monitorCheckDiskUsage
,
CHECK_INTERVAL
,
NULL
,
tscTmr
,
&
tsMonitorConn
.
diskTimer
);
}
}
int
monitorInitSystem
()
{
int32_t
monitorInitSystem
()
{
monitor
=
(
MonitorConn
*
)
malloc
(
sizeof
(
MonitorConn
));
taos_init
();
memset
(
monitor
,
0
,
sizeof
(
MonitorConn
));
taosTmrReset
(
monitorCheckDiskUsage
,
CHECK_INTERVAL
,
NULL
,
tscTmr
,
&
tsMonitorConn
.
diskTimer
);
taosTmrReset
(
monitorCheckDiskUsage
,
CHECK_INTERVAL
,
NULL
,
tscTmr
,
&
monitor
->
diskTimer
);
return
0
;
return
0
;
}
}
int
monitorStartSystem
()
{
int
32_t
monitorStartSystem
()
{
if
(
monitor
==
NULL
)
{
monitorPrint
(
"start monitor module"
);
monitorInitSystem
();
monitorInitSystem
();
}
taosTmrReset
(
monitorInitConn
,
10
,
NULL
,
tscTmr
,
&
tsMonitorConn
.
initTimer
);
taosTmrReset
(
monitorInitConn
,
10
,
NULL
,
tscTmr
,
&
monitor
->
initTimer
);
return
0
;
return
0
;
}
}
void
monitorStartSystemRetry
()
{
static
void
monitorStartSystemRetry
()
{
if
(
monitor
->
initTimer
!=
NULL
)
{
if
(
tsMonitorConn
.
initTimer
!=
NULL
)
{
taosTmrReset
(
monitorInitConn
,
3000
,
NULL
,
tscTmr
,
&
monitor
->
initTimer
);
taosTmrReset
(
monitorInitConn
,
3000
,
NULL
,
tscTmr
,
&
tsMonitorConn
.
initTimer
);
}
}
}
}
void
monitorInitConn
(
void
*
para
,
void
*
unused
)
{
static
void
monitorInitConn
(
void
*
para
,
void
*
unused
)
{
monitorPrint
(
"starting to initialize monitor service .."
);
monitorPrint
(
"starting to initialize monitor service .."
);
monitor
->
state
=
MONITOR_STATE_INITIALIZING
;
tsMonitorConn
.
state
=
MONITOR_STATE_INITIALIZING
;
if
(
monitor
->
privateIpStr
[
0
]
==
0
)
{
if
(
tsMonitorConn
.
privateIpStr
[
0
]
==
0
)
{
strcpy
(
monitor
->
privateIpStr
,
tsPrivateIp
);
strcpy
(
tsMonitorConn
.
privateIpStr
,
tsPrivateIp
);
for
(
int
i
=
0
;
i
<
TSDB_IPv4ADDR_LEN
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
TSDB_IPv4ADDR_LEN
;
++
i
)
{
if
(
monitor
->
privateIpStr
[
i
]
==
'.'
)
{
if
(
tsMonitorConn
.
privateIpStr
[
i
]
==
'.'
)
{
monitor
->
privateIpStr
[
i
]
=
'_'
;
tsMonitorConn
.
privateIpStr
[
i
]
=
'_'
;
}
}
}
}
}
}
if
(
monitor
->
conn
==
NULL
)
{
if
(
tsMonitorConn
.
conn
==
NULL
)
{
taos_connect_a
(
NULL
,
"monitor"
,
tsInternalPass
,
""
,
0
,
monitorInitConnCb
,
monitor
,
&
(
monitor
->
conn
));
taos_connect_a
(
NULL
,
"monitor"
,
tsInternalPass
,
""
,
0
,
monitorInitConnCb
,
&
tsMonitorConn
,
&
(
tsMonitorConn
.
conn
));
}
else
{
}
else
{
monitorInitDatabase
();
monitorInitDatabase
();
}
}
}
}
void
monitorInitConnCb
(
void
*
param
,
TAOS_RES
*
result
,
in
t
code
)
{
static
void
monitorInitConnCb
(
void
*
param
,
TAOS_RES
*
result
,
int32_
t
code
)
{
if
(
code
<
0
)
{
if
(
code
<
0
)
{
monitorError
(
"monitor:%p, connect to database failed,
code:%d"
,
monitor
->
conn
,
code
);
monitorError
(
"monitor:%p, connect to database failed,
reason:%s"
,
tsMonitorConn
.
conn
,
tstrerror
(
code
)
);
taos_close
(
monitor
->
conn
);
taos_close
(
tsMonitorConn
.
conn
);
monitor
->
conn
=
NULL
;
tsMonitorConn
.
conn
=
NULL
;
monitor
->
state
=
MONITOR_STATE_UN_INIT
;
tsMonitorConn
.
state
=
MONITOR_STATE_UN_INIT
;
monitorStartSystemRetry
();
monitorStartSystemRetry
();
return
;
return
;
}
}
monitorTrace
(
"monitor:%p, connect to database success,
code:%d"
,
monitor
->
conn
,
code
);
monitorTrace
(
"monitor:%p, connect to database success,
reason:%s"
,
tsMonitorConn
.
conn
,
tstrerror
(
code
)
);
monitorInitDatabase
();
monitorInitDatabase
();
}
}
void
dnodeBuildMonitorSql
(
char
*
sql
,
in
t
cmd
)
{
static
void
dnodeBuildMonitorSql
(
char
*
sql
,
int32_
t
cmd
)
{
memset
(
sql
,
0
,
SQL_LENGTH
);
memset
(
sql
,
0
,
SQL_LENGTH
);
if
(
cmd
==
MONITOR_CMD_CREATE_DB
)
{
if
(
cmd
==
MONITOR_CMD_CREATE_DB
)
{
...
@@ -180,7 +163,7 @@ void dnodeBuildMonitorSql(char *sql, int cmd) {
...
@@ -180,7 +163,7 @@ void dnodeBuildMonitorSql(char *sql, int cmd) {
tsMonitorDbName
,
IP_LEN_STR
+
1
);
tsMonitorDbName
,
IP_LEN_STR
+
1
);
}
else
if
(
cmd
==
MONITOR_CMD_CREATE_TB_DN
)
{
}
else
if
(
cmd
==
MONITOR_CMD_CREATE_TB_DN
)
{
snprintf
(
sql
,
SQL_LENGTH
,
"create table if not exists %s.dn_%s using %s.dn tags('%s')"
,
tsMonitorDbName
,
snprintf
(
sql
,
SQL_LENGTH
,
"create table if not exists %s.dn_%s using %s.dn tags('%s')"
,
tsMonitorDbName
,
monitor
->
privateIpStr
,
tsMonitorDbName
,
tsPrivateIp
);
tsMonitorConn
.
privateIpStr
,
tsMonitorDbName
,
tsPrivateIp
);
}
else
if
(
cmd
==
MONITOR_CMD_CREATE_MT_ACCT
)
{
}
else
if
(
cmd
==
MONITOR_CMD_CREATE_MT_ACCT
)
{
snprintf
(
sql
,
SQL_LENGTH
,
snprintf
(
sql
,
SQL_LENGTH
,
"create table if not exists %s.acct(ts timestamp "
"create table if not exists %s.acct(ts timestamp "
...
@@ -215,111 +198,107 @@ void dnodeBuildMonitorSql(char *sql, int cmd) {
...
@@ -215,111 +198,107 @@ void dnodeBuildMonitorSql(char *sql, int cmd) {
sql
[
SQL_LENGTH
]
=
0
;
sql
[
SQL_LENGTH
]
=
0
;
}
}
void
monitorInitDatabase
()
{
static
void
monitorInitDatabase
()
{
if
(
monitor
->
cmdIndex
<
MONITOR_CMD_MAX
)
{
if
(
tsMonitorConn
.
cmdIndex
<
MONITOR_CMD_MAX
)
{
dnodeBuildMonitorSql
(
monitor
->
sql
,
monitor
->
cmdIndex
);
dnodeBuildMonitorSql
(
tsMonitorConn
.
sql
,
tsMonitorConn
.
cmdIndex
);
taos_query_a
(
monitor
->
conn
,
monitor
->
sql
,
monitorInitDatabaseCb
,
NULL
);
taos_query_a
(
tsMonitorConn
.
conn
,
tsMonitorConn
.
sql
,
monitorInitDatabaseCb
,
NULL
);
}
else
{
}
else
{
monitor
->
state
=
MONITOR_STATE_INITIALIZED
;
tsMonitorConn
.
state
=
MONITOR_STATE_INITIALIZED
;
monitorPrint
(
"monitor service init success"
);
monitorPrint
(
"monitor service init success"
);
monitorStartTimer
();
monitorStartTimer
();
}
}
}
}
void
monitorInitDatabaseCb
(
void
*
param
,
TAOS_RES
*
result
,
in
t
code
)
{
static
void
monitorInitDatabaseCb
(
void
*
param
,
TAOS_RES
*
result
,
int32_
t
code
)
{
if
(
-
code
==
TSDB_CODE_TABLE_ALREADY_EXIST
||
-
code
==
TSDB_CODE_DB_ALREADY_EXIST
||
code
>=
0
)
{
if
(
-
code
==
TSDB_CODE_TABLE_ALREADY_EXIST
||
-
code
==
TSDB_CODE_DB_ALREADY_EXIST
||
code
>=
0
)
{
monitorTrace
(
"monitor:%p, sql success,
code:%d, %s"
,
monitor
->
conn
,
code
,
monitor
->
sql
);
monitorTrace
(
"monitor:%p, sql success,
reason:%d, %s"
,
tsMonitorConn
.
conn
,
tstrerror
(
code
),
tsMonitorConn
.
sql
);
if
(
monitor
->
cmdIndex
==
MONITOR_CMD_CREATE_TB_LOG
)
{
if
(
tsMonitorConn
.
cmdIndex
==
MONITOR_CMD_CREATE_TB_LOG
)
{
monitor
L
Print
(
"dnode:%s is started"
,
tsPrivateIp
);
monitorPrint
(
"dnode:%s is started"
,
tsPrivateIp
);
}
}
monitor
->
cmdIndex
++
;
tsMonitorConn
.
cmdIndex
++
;
monitorInitDatabase
();
monitorInitDatabase
();
}
else
{
}
else
{
monitorError
(
"monitor:%p, sql failed,
code:%d, %s"
,
monitor
->
conn
,
code
,
monitor
->
sql
);
monitorError
(
"monitor:%p, sql failed,
reason:%s, %s"
,
tsMonitorConn
.
conn
,
tstrerror
(
code
),
tsMonitorConn
.
sql
);
monitor
->
state
=
MONITOR_STATE_UN_INIT
;
tsMonitorConn
.
state
=
MONITOR_STATE_UN_INIT
;
monitorStartSystemRetry
();
monitorStartSystemRetry
();
}
}
}
}
void
monitorStopSystem
()
{
void
monitorStopSystem
()
{
if
(
monitor
==
NULL
)
{
monitorPrint
(
"monitor module is stopped"
);
return
;
tsMonitorConn
.
state
=
MONITOR_STATE_STOPPED
;
}
monitorLPrint
(
"dnode:%s monitor module is stopped"
,
tsPrivateIp
);
monitor
->
state
=
MONITOR_STATE_STOPPED
;
// taosLogFp = NULL;
// taosLogFp = NULL;
if
(
monitor
->
initTimer
!=
NULL
)
{
if
(
tsMonitorConn
.
initTimer
!=
NULL
)
{
taosTmrStopA
(
&
(
monitor
->
initTimer
));
taosTmrStopA
(
&
(
tsMonitorConn
.
initTimer
));
}
}
if
(
monitor
->
timer
!=
NULL
)
{
if
(
tsMonitorConn
.
timer
!=
NULL
)
{
taosTmrStopA
(
&
(
monitor
->
timer
));
taosTmrStopA
(
&
(
tsMonitorConn
.
timer
));
}
}
}
}
void
monitorCleanUpSystem
()
{
void
monitorCleanUpSystem
()
{
monitorPrint
(
"monitor service cleanup"
);
monitorStopSystem
();
monitorStopSystem
();
monitorPrint
(
"monitor module cleanup"
);
}
}
void
monitorStartTimer
()
{
static
void
monitorStartTimer
()
{
taosTmrReset
(
monitorSaveSystemInfo
,
tsMonitorInterval
*
1000
,
NULL
,
tscTmr
,
&
monitor
->
timer
);
taosTmrReset
(
monitorSaveSystemInfo
,
tsMonitorInterval
*
1000
,
NULL
,
tscTmr
,
&
tsMonitorConn
.
timer
);
}
}
void
dnodeMontiorInsertAcctCallback
(
void
*
param
,
TAOS_RES
*
result
,
in
t
code
)
{
static
void
dnodeMontiorInsertAcctCallback
(
void
*
param
,
TAOS_RES
*
result
,
int32_
t
code
)
{
if
(
code
<
0
)
{
if
(
code
<
0
)
{
monitorError
(
"monitor:%p, save account info failed, code:%
d"
,
monitor
->
conn
,
code
);
monitorError
(
"monitor:%p, save account info failed, code:%
s"
,
tsMonitorConn
.
conn
,
tstrerror
(
code
)
);
}
else
if
(
code
==
0
)
{
}
else
if
(
code
==
0
)
{
monitorError
(
"monitor:%p, save account info failed, affect rows:%d"
,
monitor
->
conn
,
code
);
monitorError
(
"monitor:%p, save account info failed, affect rows:%d"
,
tsMonitorConn
.
conn
,
code
);
}
else
{
}
else
{
monitorTrace
(
"monitor:%p, save account info success, code:%
d"
,
monitor
->
conn
,
code
);
monitorTrace
(
"monitor:%p, save account info success, code:%
s"
,
tsMonitorConn
.
conn
,
tstrerror
(
code
)
);
}
}
}
}
void
dnodeMontiorInsertSysCallback
(
void
*
param
,
TAOS_RES
*
result
,
in
t
code
)
{
static
void
dnodeMontiorInsertSysCallback
(
void
*
param
,
TAOS_RES
*
result
,
int32_
t
code
)
{
if
(
code
<
0
)
{
if
(
code
<
0
)
{
monitorError
(
"monitor:%p, save system info failed, code:%
d %s"
,
monitor
->
conn
,
code
,
monitor
->
sql
);
monitorError
(
"monitor:%p, save system info failed, code:%
s %s"
,
tsMonitorConn
.
conn
,
tstrerror
(
code
),
tsMonitorConn
.
sql
);
}
else
if
(
code
==
0
)
{
}
else
if
(
code
==
0
)
{
monitorError
(
"monitor:%p, save system info failed, affect rows:%d %s"
,
monitor
->
conn
,
code
,
monitor
->
sql
);
monitorError
(
"monitor:%p, save system info failed, affect rows:%d %s"
,
tsMonitorConn
.
conn
,
code
,
tsMonitorConn
.
sql
);
}
else
{
}
else
{
monitorTrace
(
"monitor:%p, save system info success, code:%
d %s"
,
monitor
->
conn
,
code
,
monitor
->
sql
);
monitorTrace
(
"monitor:%p, save system info success, code:%
s %s"
,
tsMonitorConn
.
conn
,
tstrerror
(
code
),
tsMonitorConn
.
sql
);
}
}
}
}
void
dnodeMontiorInsertLogCallback
(
void
*
param
,
TAOS_RES
*
result
,
in
t
code
)
{
static
void
dnodeMontiorInsertLogCallback
(
void
*
param
,
TAOS_RES
*
result
,
int32_
t
code
)
{
if
(
code
<
0
)
{
if
(
code
<
0
)
{
monitorError
(
"monitor:%p, save log failed, code:%
d"
,
monitor
->
conn
,
code
);
monitorError
(
"monitor:%p, save log failed, code:%
s"
,
tsMonitorConn
.
conn
,
tstrerror
(
code
)
);
}
else
if
(
code
==
0
)
{
}
else
if
(
code
==
0
)
{
monitorError
(
"monitor:%p, save log failed, affect rows:%d"
,
monitor
->
conn
,
code
);
monitorError
(
"monitor:%p, save log failed, affect rows:%d"
,
tsMonitorConn
.
conn
,
code
);
}
else
{
}
else
{
monitorTrace
(
"monitor:%p, save log info success, code:%
d"
,
monitor
->
conn
,
code
);
monitorTrace
(
"monitor:%p, save log info success, code:%
s"
,
tsMonitorConn
.
conn
,
tstrerror
(
code
)
);
}
}
}
}
// unit is MB
// unit is MB
in
t
monitorBuildMemorySql
(
char
*
sql
)
{
static
int32_
t
monitorBuildMemorySql
(
char
*
sql
)
{
float
sysMemoryUsedMB
=
0
;
float
sysMemoryUsedMB
=
0
;
bool
suc
=
taosGetSysMemory
(
&
sysMemoryUsedMB
);
bool
suc
=
taosGetSysMemory
(
&
sysMemoryUsedMB
);
if
(
!
suc
)
{
if
(
!
suc
)
{
monitorError
(
"monitor:%p, get sys memory info failed."
,
monitor
->
conn
);
monitorError
(
"monitor:%p, get sys memory info failed."
,
tsMonitorConn
.
conn
);
}
}
float
procMemoryUsedMB
=
0
;
float
procMemoryUsedMB
=
0
;
suc
=
taosGetProcMemory
(
&
procMemoryUsedMB
);
suc
=
taosGetProcMemory
(
&
procMemoryUsedMB
);
if
(
!
suc
)
{
if
(
!
suc
)
{
monitorError
(
"monitor:%p, get proc memory info failed."
,
monitor
->
conn
);
monitorError
(
"monitor:%p, get proc memory info failed."
,
tsMonitorConn
.
conn
);
}
}
return
sprintf
(
sql
,
", %f, %f, %d"
,
procMemoryUsedMB
,
sysMemoryUsedMB
,
tsTotalMemoryMB
);
return
sprintf
(
sql
,
", %f, %f, %d"
,
procMemoryUsedMB
,
sysMemoryUsedMB
,
tsTotalMemoryMB
);
}
}
// unit is %
// unit is %
in
t
monitorBuildCpuSql
(
char
*
sql
)
{
static
int32_
t
monitorBuildCpuSql
(
char
*
sql
)
{
float
sysCpuUsage
=
0
,
procCpuUsage
=
0
;
float
sysCpuUsage
=
0
,
procCpuUsage
=
0
;
bool
suc
=
taosGetCpuUsage
(
&
sysCpuUsage
,
&
procCpuUsage
);
bool
suc
=
taosGetCpuUsage
(
&
sysCpuUsage
,
&
procCpuUsage
);
if
(
!
suc
)
{
if
(
!
suc
)
{
monitorError
(
"monitor:%p, get cpu usage failed."
,
monitor
->
conn
);
monitorError
(
"monitor:%p, get cpu usage failed."
,
tsMonitorConn
.
conn
);
}
}
if
(
sysCpuUsage
<=
procCpuUsage
)
{
if
(
sysCpuUsage
<=
procCpuUsage
)
{
...
@@ -330,51 +309,45 @@ int monitorBuildCpuSql(char *sql) {
...
@@ -330,51 +309,45 @@ int monitorBuildCpuSql(char *sql) {
}
}
// unit is GB
// unit is GB
in
t
monitorBuildDiskSql
(
char
*
sql
)
{
static
int32_
t
monitorBuildDiskSql
(
char
*
sql
)
{
return
sprintf
(
sql
,
", %f, %d"
,
(
tsTotalDataDirGB
-
tsAvailDataDirGB
),
(
int32_t
)
tsTotalDataDirGB
);
return
sprintf
(
sql
,
", %f, %d"
,
(
tsTotalDataDirGB
-
tsAvailDataDirGB
),
(
int32_t
)
tsTotalDataDirGB
);
}
}
// unit is Kb
// unit is Kb
in
t
monitorBuildBandSql
(
char
*
sql
)
{
static
int32_
t
monitorBuildBandSql
(
char
*
sql
)
{
float
bandSpeedKb
=
0
;
float
bandSpeedKb
=
0
;
bool
suc
=
taosGetBandSpeed
(
&
bandSpeedKb
);
bool
suc
=
taosGetBandSpeed
(
&
bandSpeedKb
);
if
(
!
suc
)
{
if
(
!
suc
)
{
monitorError
(
"monitor:%p, get bandwidth speed failed."
,
monitor
->
conn
);
monitorError
(
"monitor:%p, get bandwidth speed failed."
,
tsMonitorConn
.
conn
);
}
}
return
sprintf
(
sql
,
", %f"
,
bandSpeedKb
);
return
sprintf
(
sql
,
", %f"
,
bandSpeedKb
);
}
}
int
monitorBuildReqSql
(
char
*
sql
)
{
static
int32_t
monitorBuildReqSql
(
char
*
sql
)
{
SDnodeStatisInfo
info
;
SDnodeStatisInfo
info
=
dnodeGetStatisInfo
();
info
.
httpReqNum
=
info
.
submitReqNum
=
info
.
queryReqNum
=
0
;
(
*
mnodeCountRequestFp
)(
&
info
);
return
sprintf
(
sql
,
", %d, %d, %d)"
,
info
.
httpReqNum
,
info
.
queryReqNum
,
info
.
submitReqNum
);
return
sprintf
(
sql
,
", %d, %d, %d)"
,
info
.
httpReqNum
,
info
.
queryReqNum
,
info
.
submitReqNum
);
}
}
in
t
monitorBuildIoSql
(
char
*
sql
)
{
static
int32_
t
monitorBuildIoSql
(
char
*
sql
)
{
float
readKB
=
0
,
writeKB
=
0
;
float
readKB
=
0
,
writeKB
=
0
;
bool
suc
=
taosGetProcIO
(
&
readKB
,
&
writeKB
);
bool
suc
=
taosGetProcIO
(
&
readKB
,
&
writeKB
);
if
(
!
suc
)
{
if
(
!
suc
)
{
monitorError
(
"monitor:%p, get io info failed."
,
monitor
->
conn
);
monitorError
(
"monitor:%p, get io info failed."
,
tsMonitorConn
.
conn
);
}
}
return
sprintf
(
sql
,
", %f, %f"
,
readKB
,
writeKB
);
return
sprintf
(
sql
,
", %f, %f"
,
readKB
,
writeKB
);
}
}
void
monitorSaveSystemInfo
()
{
static
void
monitorSaveSystemInfo
()
{
if
(
monitor
->
state
!=
MONITOR_STATE_INITIALIZED
)
{
if
(
tsMonitorConn
.
state
!=
MONITOR_STATE_INITIALIZED
)
{
return
;
monitorStartTimer
();
}
if
(
mnodeCountRequestFp
==
NULL
)
{
return
;
return
;
}
}
int64_t
ts
=
taosGetTimestampUs
();
int64_t
ts
=
taosGetTimestampUs
();
char
*
sql
=
monitor
->
sql
;
char
*
sql
=
tsMonitorConn
.
sql
;
int
pos
=
snprintf
(
sql
,
SQL_LENGTH
,
"insert into %s.dn_%s values(%"
PRId64
,
tsMonitorDbName
,
monitor
->
privateIpStr
,
ts
);
int
32_t
pos
=
snprintf
(
sql
,
SQL_LENGTH
,
"insert into %s.dn_%s values(%"
PRId64
,
tsMonitorDbName
,
tsMonitorConn
.
privateIpStr
,
ts
);
pos
+=
monitorBuildCpuSql
(
sql
+
pos
);
pos
+=
monitorBuildCpuSql
(
sql
+
pos
);
pos
+=
monitorBuildMemorySql
(
sql
+
pos
);
pos
+=
monitorBuildMemorySql
(
sql
+
pos
);
...
@@ -383,22 +356,16 @@ void monitorSaveSystemInfo() {
...
@@ -383,22 +356,16 @@ void monitorSaveSystemInfo() {
pos
+=
monitorBuildIoSql
(
sql
+
pos
);
pos
+=
monitorBuildIoSql
(
sql
+
pos
);
pos
+=
monitorBuildReqSql
(
sql
+
pos
);
pos
+=
monitorBuildReqSql
(
sql
+
pos
);
monitorTrace
(
"monitor:%p, save system info, sql:%s"
,
monitor
->
conn
,
sql
);
monitorTrace
(
"monitor:%p, save system info, sql:%s"
,
tsMonitorConn
.
conn
,
sql
);
taos_query_a
(
monitor
->
conn
,
sql
,
dnodeMontiorInsertSysCallback
,
"log"
);
taos_query_a
(
tsMonitorConn
.
conn
,
sql
,
dnodeMontiorInsertSysCallback
,
"log"
);
if
(
monitor
->
timer
!=
NULL
&&
monitor
->
state
!=
MONITOR_STATE_STOPPED
)
{
if
(
tsMonitorConn
.
timer
!=
NULL
&&
tsMonitorConn
.
state
!=
MONITOR_STATE_STOPPED
)
{
monitorStartTimer
();
monitorStartTimer
();
}
}
}
}
void
monitorSaveAcctLog
(
char
*
acctId
,
int64_t
currentPointsPerSecond
,
int64_t
maxPointsPerSecond
,
void
monitorSaveAcctLog
(
SAcctMonitorObj
*
pMon
)
{
int64_t
totalTimeSeries
,
int64_t
maxTimeSeries
,
int64_t
totalStorage
,
int64_t
maxStorage
,
if
(
tsMonitorConn
.
state
!=
MONITOR_STATE_INITIALIZED
)
return
;
int64_t
totalQueryTime
,
int64_t
maxQueryTime
,
int64_t
totalInbound
,
int64_t
maxInbound
,
int64_t
totalOutbound
,
int64_t
maxOutbound
,
int64_t
totalDbs
,
int64_t
maxDbs
,
int64_t
totalUsers
,
int64_t
maxUsers
,
int64_t
totalStreams
,
int64_t
maxStreams
,
int64_t
totalConns
,
int64_t
maxConns
,
int8_t
accessState
)
{
if
(
monitor
==
NULL
)
return
;
if
(
monitor
->
state
!=
MONITOR_STATE_INITIALIZED
)
return
;
char
sql
[
1024
]
=
{
0
};
char
sql
[
1024
]
=
{
0
};
sprintf
(
sql
,
sprintf
(
sql
,
...
@@ -414,33 +381,31 @@ void monitorSaveAcctLog(char *acctId, int64_t currentPointsPerSecond, int64_t ma
...
@@ -414,33 +381,31 @@ void monitorSaveAcctLog(char *acctId, int64_t currentPointsPerSecond, int64_t ma
", %"
PRId64
", %"
PRId64
", %"
PRId64
", %"
PRId64
", %"
PRId64
", %"
PRId64
", %"
PRId64
", %"
PRId64
", %d)"
,
", %d)"
,
tsMonitorDbName
,
acctId
,
tsMonitorDbName
,
acctId
,
tsMonitorDbName
,
pMon
->
acctId
,
tsMonitorDbName
,
pMon
->
acctId
,
currentPointsPerSecond
,
maxPointsPerSecond
,
pMon
->
currentPointsPerSecond
,
pMon
->
maxPointsPerSecond
,
totalTimeSeries
,
maxTimeSeries
,
pMon
->
totalTimeSeries
,
pMon
->
maxTimeSeries
,
totalStorage
,
maxStorage
,
pMon
->
totalStorage
,
pMon
->
maxStorage
,
totalQueryTime
,
maxQueryTime
,
pMon
->
totalQueryTime
,
pMon
->
maxQueryTime
,
totalInbound
,
maxInbound
,
pMon
->
totalInbound
,
pMon
->
maxInbound
,
totalOutbound
,
maxOutbound
,
pMon
->
totalOutbound
,
pMon
->
maxOutbound
,
totalDbs
,
maxDbs
,
pMon
->
totalDbs
,
pMon
->
maxDbs
,
totalUsers
,
maxUsers
,
pMon
->
totalUsers
,
pMon
->
maxUsers
,
totalStreams
,
maxStreams
,
pMon
->
totalStreams
,
pMon
->
maxStreams
,
totalConns
,
maxConns
,
pMon
->
totalConns
,
pMon
->
maxConns
,
accessState
);
pMon
->
accessState
);
monitorTrace
(
"monitor:%p, save account info, sql %s"
,
monitor
->
conn
,
sql
);
monitorTrace
(
"monitor:%p, save account info, sql %s"
,
tsMonitorConn
.
conn
,
sql
);
taos_query_a
(
monitor
->
conn
,
sql
,
dnodeMontiorInsertAcctCallback
,
"account"
);
taos_query_a
(
tsMonitorConn
.
conn
,
sql
,
dnodeMontiorInsertAcctCallback
,
"account"
);
}
}
void
monitorSaveLog
(
int
level
,
const
char
*
const
format
,
...)
{
void
monitorSaveLog
(
int
32_t
level
,
const
char
*
const
format
,
...)
{
va_list
argpointer
;
va_list
argpointer
;
char
sql
[
SQL_LENGTH
]
=
{
0
};
char
sql
[
SQL_LENGTH
]
=
{
0
};
int
max_length
=
SQL_LENGTH
-
30
;
int
32_t
max_length
=
SQL_LENGTH
-
30
;
if
(
monitor
->
state
!=
MONITOR_STATE_INITIALIZED
)
{
if
(
tsMonitorConn
.
state
!=
MONITOR_STATE_INITIALIZED
)
return
;
return
;
}
int
len
=
snprintf
(
sql
,
(
size_t
)
max_length
,
"import into %s.log values(%"
PRId64
", %d,'"
,
tsMonitorDbName
,
int
32_t
len
=
snprintf
(
sql
,
(
size_t
)
max_length
,
"import into %s.log values(%"
PRId64
", %d,'"
,
tsMonitorDbName
,
taosGetTimestampUs
(),
level
);
taosGetTimestampUs
(),
level
);
va_start
(
argpointer
,
format
);
va_start
(
argpointer
,
format
);
...
@@ -451,11 +416,11 @@ void monitorSaveLog(int level, const char *const format, ...) {
...
@@ -451,11 +416,11 @@ void monitorSaveLog(int level, const char *const format, ...) {
len
+=
sprintf
(
sql
+
len
,
"', '%s')"
,
tsPrivateIp
);
len
+=
sprintf
(
sql
+
len
,
"', '%s')"
,
tsPrivateIp
);
sql
[
len
++
]
=
0
;
sql
[
len
++
]
=
0
;
monitorTrace
(
"monitor:%p, save log, sql: %s"
,
monitor
->
conn
,
sql
);
monitorTrace
(
"monitor:%p, save log, sql: %s"
,
tsMonitorConn
.
conn
,
sql
);
taos_query_a
(
monitor
->
conn
,
sql
,
dnodeMontiorInsertLogCallback
,
"log"
);
taos_query_a
(
tsMonitorConn
.
conn
,
sql
,
dnodeMontiorInsertLogCallback
,
"log"
);
}
}
void
monitorExecuteSQL
(
char
*
sql
)
{
void
monitorExecuteSQL
(
char
*
sql
)
{
monitorTrace
(
"monitor:%p, execute sql: %s"
,
monitor
->
conn
,
sql
);
monitorTrace
(
"monitor:%p, execute sql: %s"
,
tsMonitorConn
.
conn
,
sql
);
taos_query_a
(
monitor
->
conn
,
sql
,
NULL
,
NULL
);
taos_query_a
(
tsMonitorConn
.
conn
,
sql
,
NULL
,
NULL
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录