Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b66718a4
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b66718a4
编写于
4月 19, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat: report service status
上级
a3ed0703
变更
28
隐藏空白更改
内联
并排
Showing
28 changed file
with
274 addition
and
177 deletion
+274
-177
include/common/tmsg.h
include/common/tmsg.h
+6
-4
include/common/tmsgdef.h
include/common/tmsgdef.h
+2
-1
include/dnode/mnode/mnode.h
include/dnode/mnode/mnode.h
+1
-0
include/libs/monitor/monitor.h
include/libs/monitor/monitor.h
+8
-0
include/libs/sync/sync.h
include/libs/sync/sync.h
+2
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+0
-2
source/common/src/tmsg.c
source/common/src/tmsg.c
+4
-17
source/dnode/mgmt/implement/inc/dmImp.h
source/dnode/mgmt/implement/inc/dmImp.h
+2
-0
source/dnode/mgmt/implement/src/dmHandle.c
source/dnode/mgmt/implement/src/dmHandle.c
+17
-3
source/dnode/mgmt/implement/src/dmMonitor.c
source/dnode/mgmt/implement/src/dmMonitor.c
+26
-0
source/dnode/mgmt/interface/inc/dmDef.h
source/dnode/mgmt/interface/inc/dmDef.h
+5
-0
source/dnode/mgmt/interface/src/dmInt.c
source/dnode/mgmt/interface/src/dmInt.c
+14
-17
source/dnode/mgmt/mgmt_mnode/inc/mmInt.h
source/dnode/mgmt/mgmt_mnode/inc/mmInt.h
+2
-0
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+29
-0
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
+2
-0
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
+0
-1
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+35
-14
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
+3
-32
source/dnode/mnode/impl/inc/mndInt.h
source/dnode/mnode/impl/inc/mndInt.h
+0
-16
source/dnode/mnode/impl/inc/mndMnode.h
source/dnode/mnode/impl/inc/mndMnode.h
+7
-8
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+3
-3
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+1
-14
source/dnode/mnode/impl/src/mndTelem.c
source/dnode/mnode/impl/src/mndTelem.c
+53
-13
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+2
-2
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+7
-27
source/dnode/vnode/src/vnd/vnodeQuery.c
source/dnode/vnode/src/vnd/vnodeQuery.c
+1
-1
source/libs/monitor/src/monMsg.c
source/libs/monitor/src/monMsg.c
+29
-2
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+13
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
b66718a4
...
...
@@ -723,7 +723,7 @@ typedef struct {
typedef
struct
{
int32_t
vgId
;
int
8_t
rol
e
;
int
32_t
syncStat
e
;
int64_t
numOfTables
;
int64_t
numOfTimeSeries
;
int64_t
totalStorage
;
...
...
@@ -736,6 +736,10 @@ typedef struct {
int64_t
numOfBatchInsertSuccessReqs
;
}
SVnodeLoad
;
typedef
struct
{
int32_t
syncState
;
}
SMnodeLoad
;
typedef
struct
{
int32_t
sver
;
// software version
int64_t
dnodeVer
;
// dnode table version in sdb
...
...
@@ -1072,13 +1076,11 @@ int32_t tDeserializeSAuthReq(void* buf, int32_t bufLen, SAuthReq* pReq);
typedef
struct
{
int32_t
statusCode
;
int32_t
detailLen
;
char
*
details
;
char
details
[
1024
];
}
SServerStatusRsp
;
int32_t
tSerializeSServerStatusRsp
(
void
*
buf
,
int32_t
bufLen
,
SServerStatusRsp
*
pRsp
);
int32_t
tDeserializeSServerStatusRsp
(
void
*
buf
,
int32_t
bufLen
,
SServerStatusRsp
*
pRsp
);
void
tFreeSServerStatusRsp
(
SServerStatusRsp
*
pRsp
);
/**
* The layout of the query message payload is as following:
...
...
include/common/tmsgdef.h
浏览文件 @
b66718a4
...
...
@@ -228,7 +228,8 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_MON_SM_INFO
,
"monitor-sinfo"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MON_BM_INFO
,
"monitor-binfo"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MON_VM_LOAD
,
"monitor-vload"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MON_MM_LOAD
,
"monitor-mload"
,
NULL
,
NULL
)
#if defined(TD_MSG_NUMBER_)
TDMT_MAX
#endif
...
...
include/dnode/mnode/mnode.h
浏览文件 @
b66718a4
...
...
@@ -80,6 +80,7 @@ void mndStop(SMnode *pMnode);
* @return int32_t 0 for success, -1 for failure.
*/
int32_t
mndGetMonitorInfo
(
SMnode
*
pMnode
,
SMonClusterInfo
*
pCluster
,
SMonVgroupInfo
*
pVgroup
,
SMonGrantInfo
*
pGrant
);
int32_t
mndGetLoad
(
SMnode
*
pMnode
,
SMnodeLoad
*
pLoad
);
/**
* @brief Process the read, write, sync request.
...
...
include/libs/monitor/monitor.h
浏览文件 @
b66718a4
...
...
@@ -202,6 +202,14 @@ int32_t tSerializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInfo)
int32_t
tDeserializeSMonVloadInfo
(
void
*
buf
,
int32_t
bufLen
,
SMonVloadInfo
*
pInfo
);
void
tFreeSMonVloadInfo
(
SMonVloadInfo
*
pInfo
);
typedef
struct
{
int8_t
isMnode
;
SMnodeLoad
load
;
}
SMonMloadInfo
;
int32_t
tSerializeSMonMloadInfo
(
void
*
buf
,
int32_t
bufLen
,
SMonMloadInfo
*
pInfo
);
int32_t
tDeserializeSMonMloadInfo
(
void
*
buf
,
int32_t
bufLen
,
SMonMloadInfo
*
pInfo
);
typedef
struct
{
const
char
*
server
;
uint16_t
port
;
...
...
include/libs/sync/sync.h
浏览文件 @
b66718a4
...
...
@@ -172,6 +172,8 @@ int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak);
extern
int32_t
sDebugFlag
;
const
char
*
syncStr
(
ESyncState
state
);
#ifdef __cplusplus
}
#endif
...
...
source/client/src/clientImpl.c
浏览文件 @
b66718a4
...
...
@@ -878,7 +878,5 @@ _OVER:
if
(
rpcRsp
.
pCont
!=
NULL
)
{
rpcFreeCont
(
rpcRsp
.
pCont
);
}
tFreeSServerStatusRsp
(
&
statusRsp
);
return
code
;
}
source/common/src/tmsg.c
浏览文件 @
b66718a4
...
...
@@ -1003,7 +1003,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
for
(
int32_t
i
=
0
;
i
<
vlen
;
++
i
)
{
SVnodeLoad
*
pload
=
taosArrayGet
(
pReq
->
pVloads
,
i
);
if
(
tEncodeI32
(
&
encoder
,
pload
->
vgId
)
<
0
)
return
-
1
;
if
(
tEncodeI
8
(
&
encoder
,
pload
->
rol
e
)
<
0
)
return
-
1
;
if
(
tEncodeI
32
(
&
encoder
,
pload
->
syncStat
e
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pload
->
numOfTables
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pload
->
numOfTimeSeries
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pload
->
totalStorage
)
<
0
)
return
-
1
;
...
...
@@ -1054,7 +1054,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
for
(
int32_t
i
=
0
;
i
<
vlen
;
++
i
)
{
SVnodeLoad
vload
=
{
0
};
if
(
tDecodeI32
(
&
decoder
,
&
vload
.
vgId
)
<
0
)
return
-
1
;
if
(
tDecodeI
8
(
&
decoder
,
&
vload
.
rol
e
)
<
0
)
return
-
1
;
if
(
tDecodeI
32
(
&
decoder
,
&
vload
.
syncStat
e
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
vload
.
numOfTables
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
vload
.
numOfTimeSeries
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
vload
.
totalStorage
)
<
0
)
return
-
1
;
...
...
@@ -3105,10 +3105,7 @@ int32_t tSerializeSServerStatusRsp(void *buf, int32_t bufLen, SServerStatusRsp *
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
statusCode
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pRsp
->
detailLen
)
<
0
)
return
-
1
;
if
(
pRsp
->
detailLen
>
0
)
{
if
(
tEncodeCStr
(
&
encoder
,
pRsp
->
details
)
<
0
)
return
-
1
;
}
if
(
tEncodeCStr
(
&
encoder
,
pRsp
->
details
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
...
...
@@ -3123,23 +3120,13 @@ int32_t tDeserializeSServerStatusRsp(void *buf, int32_t bufLen, SServerStatusRsp
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
statusCode
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pRsp
->
detailLen
)
<
0
)
return
-
1
;
if
(
pRsp
->
detailLen
>
0
)
{
pRsp
->
details
=
taosMemoryCalloc
(
1
,
pRsp
->
detailLen
);
if
(
pRsp
->
details
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
if
(
tDecodeCStrTo
(
&
decoder
,
pRsp
->
details
)
<
0
)
return
-
1
;
}
if
(
tDecodeCStrTo
(
&
decoder
,
pRsp
->
details
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tCoderClear
(
&
decoder
);
return
0
;
}
void
tFreeSServerStatusRsp
(
SServerStatusRsp
*
pRsp
)
{
taosMemoryFree
(
pRsp
->
details
);
}
int32_t
tEncodeSMqOffset
(
SCoder
*
encoder
,
const
SMqOffset
*
pOffset
)
{
if
(
tEncodeI32
(
encoder
,
pOffset
->
vgId
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
encoder
,
pOffset
->
offset
)
<
0
)
return
-
1
;
...
...
source/dnode/mgmt/implement/inc/dmImp.h
浏览文件 @
b66718a4
...
...
@@ -49,6 +49,7 @@ int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg)
// dmMonitor.c
void
dmGetVnodeLoads
(
SDnode
*
pDnode
,
SMonVloadInfo
*
pInfo
);
void
dmGetMnodeLoads
(
SDnode
*
pDnode
,
SMonMloadInfo
*
pInfo
);
void
dmSendMonitorReport
(
SDnode
*
pDnode
);
// dmWorker.c
...
...
@@ -70,6 +71,7 @@ void vmSetMgmtFp(SMgmtWrapper *pWrapper);
void
mmSetMgmtFp
(
SMgmtWrapper
*
pWrapper
);
void
vmGetVnodeLoads
(
SMgmtWrapper
*
pWrapper
,
SMonVloadInfo
*
pInfo
);
void
mmGetMnodeLoads
(
SMgmtWrapper
*
pWrapper
,
SMonMloadInfo
*
pInfo
);
void
mmGetMonitorInfo
(
SMgmtWrapper
*
pWrapper
,
SMonMmInfo
*
mmInfo
);
void
vmGetMonitorInfo
(
SMgmtWrapper
*
pWrapper
,
SMonVmInfo
*
vmInfo
);
void
qmGetMonitorInfo
(
SMgmtWrapper
*
pWrapper
,
SMonQmInfo
*
qmInfo
);
...
...
source/dnode/mgmt/implement/src/dmHandle.c
浏览文件 @
b66718a4
...
...
@@ -72,9 +72,23 @@ void dmSendStatusReq(SDnode *pDnode) {
memcpy
(
req
.
clusterCfg
.
charset
,
tsCharset
,
TD_LOCALE_LEN
);
taosRUnLockLatch
(
&
pDnode
->
data
.
latch
);
SMonVloadInfo
info
=
{
0
};
dmGetVnodeLoads
(
pDnode
,
&
info
);
req
.
pVloads
=
info
.
pVloads
;
SMonVloadInfo
vinfo
=
{
0
};
dmGetVnodeLoads
(
pDnode
,
&
vinfo
);
req
.
pVloads
=
vinfo
.
pVloads
;
pDnode
->
data
.
unsyncedVgId
=
0
;
pDnode
->
data
.
vndState
=
TAOS_SYNC_STATE_LEADER
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
req
.
pVloads
);
++
i
)
{
SVnodeLoad
*
pLoad
=
taosArrayGet
(
req
.
pVloads
,
i
);
if
(
pLoad
->
syncState
!=
TAOS_SYNC_STATE_LEADER
&&
pLoad
->
syncState
!=
TAOS_SYNC_STATE_FOLLOWER
)
{
pDnode
->
data
.
unsyncedVgId
=
pLoad
->
vgId
;
pDnode
->
data
.
vndState
=
pLoad
->
syncState
;
}
}
SMonMloadInfo
minfo
=
{
0
};
dmGetMnodeLoads
(
pDnode
,
&
minfo
);
pDnode
->
data
.
isMnode
=
minfo
.
isMnode
;
pDnode
->
data
.
mndState
=
minfo
.
load
.
syncState
;
int32_t
contLen
=
tSerializeSStatusReq
(
NULL
,
0
,
&
req
);
void
*
pHead
=
rpcMallocCont
(
contLen
);
...
...
source/dnode/mgmt/implement/src/dmMonitor.c
浏览文件 @
b66718a4
...
...
@@ -183,3 +183,29 @@ void dmGetVnodeLoads(SDnode *pDnode, SMonVloadInfo *pInfo) {
}
dmReleaseWrapper
(
pWrapper
);
}
void
dmGetMnodeLoads
(
SDnode
*
pDnode
,
SMonMloadInfo
*
pInfo
)
{
SMgmtWrapper
*
pWrapper
=
dmAcquireWrapper
(
pDnode
,
MNODE
);
if
(
pWrapper
==
NULL
)
{
pInfo
->
isMnode
=
0
;
return
;
}
bool
getFromAPI
=
!
tsMultiProcess
;
if
(
getFromAPI
)
{
mmGetMnodeLoads
(
pWrapper
,
pInfo
);
}
else
{
SRpcMsg
req
=
{.
msgType
=
TDMT_MON_MM_LOAD
};
SRpcMsg
rsp
=
{
0
};
SEpSet
epset
=
{.
inUse
=
0
,
.
numOfEps
=
1
};
tstrncpy
(
epset
.
eps
[
0
].
fqdn
,
pDnode
->
data
.
localFqdn
,
TSDB_FQDN_LEN
);
epset
.
eps
[
0
].
port
=
tsServerPort
;
dmSendRecv
(
pDnode
,
&
epset
,
&
req
,
&
rsp
);
if
(
rsp
.
code
==
0
&&
rsp
.
contLen
>
0
)
{
tDeserializeSMonMloadInfo
(
rsp
.
pCont
,
rsp
.
contLen
,
pInfo
);
}
rpcFreeCont
(
rsp
.
pCont
);
}
dmReleaseWrapper
(
pWrapper
);
}
source/dnode/mgmt/interface/inc/dmDef.h
浏览文件 @
b66718a4
...
...
@@ -38,6 +38,7 @@
#include "dnode.h"
#include "mnode.h"
#include "monitor.h"
#include "sync.h"
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -110,6 +111,10 @@ typedef struct {
int64_t
dnodeVer
;
int64_t
updateTime
;
int64_t
rebootTime
;
int32_t
unsyncedVgId
;
ESyncState
vndState
;
ESyncState
mndState
;
bool
isMnode
;
bool
dropped
;
SEpSet
mnodeEps
;
SArray
*
dnodeEps
;
...
...
source/dnode/mgmt/interface/src/dmInt.c
浏览文件 @
b66718a4
...
...
@@ -148,29 +148,27 @@ void dmReportStartupByWrapper(SMgmtWrapper *pWrapper, const char *pName, const c
}
static
void
dmGetServerStatus
(
SDnode
*
pDnode
,
SServerStatusRsp
*
pStatus
)
{
pStatus
->
details
[
0
]
=
0
;
if
(
pDnode
->
status
==
DND_STAT_INIT
)
{
pStatus
->
statusCode
=
TSDB_SRV_STATUS_NETWORK_OK
;
snprintf
(
pStatus
->
details
,
sizeof
(
pStatus
->
details
),
"%s: %s"
,
pDnode
->
startup
.
name
,
pDnode
->
startup
.
desc
);
}
else
if
(
pDnode
->
status
==
DND_STAT_STOPPED
)
{
pStatus
->
statusCode
=
TSDB_SRV_STATUS_EXTING
;
}
else
{
pStatus
->
statusCode
=
TSDB_SRV_STATUS_SERVICE_OK
;
}
if
(
pStatus
->
statusCode
==
TSDB_SRV_STATUS_NETWORK_OK
)
{
SStartupInfo
*
pStartup
=
&
pDnode
->
startup
;
int32_t
len
=
strlen
(
pStartup
->
name
)
+
strlen
(
pStartup
->
desc
);
if
(
len
>
0
)
{
pStatus
->
details
=
taosMemoryCalloc
(
1
,
len
+
24
);
if
(
pStatus
->
details
!=
NULL
)
{
pStatus
->
detailLen
=
snprintf
(
pStatus
->
details
,
len
+
20
,
"%s: %s"
,
pStartup
->
name
,
pStartup
->
desc
)
+
1
;
}
SDnodeData
*
pData
=
&
pDnode
->
data
;
if
(
pData
->
isMnode
&&
pData
->
mndState
!=
TAOS_SYNC_STATE_FOLLOWER
&&
pData
->
mndState
!=
TAOS_SYNC_STATE_FOLLOWER
)
{
pStatus
->
statusCode
=
TSDB_SRV_STATUS_SERVICE_DEGRADED
;
snprintf
(
pStatus
->
details
,
sizeof
(
pStatus
->
details
),
"mnode sync state is %s"
,
syncStr
(
pData
->
mndState
));
}
else
if
(
pData
->
unsyncedVgId
!=
0
&&
pData
->
vndState
!=
TAOS_SYNC_STATE_FOLLOWER
&&
pData
->
vndState
!=
TAOS_SYNC_STATE_FOLLOWER
)
{
pStatus
->
statusCode
=
TSDB_SRV_STATUS_SERVICE_DEGRADED
;
snprintf
(
pStatus
->
details
,
sizeof
(
pStatus
->
details
),
"vnode:%d sync state is %s"
,
pData
->
unsyncedVgId
,
syncStr
(
pData
->
vndState
));
}
else
{
pStatus
->
statusCode
=
TSDB_SRV_STATUS_SERVICE_OK
;
}
}
if
(
pStatus
->
statusCode
==
TSDB_SRV_STATUS_SERVICE_OK
)
{
// check the status of mnode and vnode
}
}
void
dmProcessServerStatusReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pReq
)
{
...
...
@@ -198,7 +196,6 @@ void dmProcessServerStatusReq(SDnode *pDnode, SRpcMsg *pReq) {
_OVER:
rpcSendResponse
(
&
rspMsg
);
tFreeSServerStatusRsp
(
&
statusRsp
);
}
void
dmGetMonitorSysInfo
(
SMonSysInfo
*
pInfo
)
{
...
...
source/dnode/mgmt/mgmt_mnode/inc/mmInt.h
浏览文件 @
b66718a4
...
...
@@ -52,6 +52,8 @@ int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t
mmProcessDropReq
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
);
int32_t
mmProcessAlterReq
(
SMnodeMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
);
int32_t
mmProcessGetMonMmInfoReq
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pReq
);
int32_t
mmProcessGetMnodeLoadsReq
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pReq
);
void
mmGetMnodeLoads
(
SMgmtWrapper
*
pWrapper
,
SMonMloadInfo
*
pInfo
);
// mmWorker.c
int32_t
mmStartWorker
(
SMnodeMgmt
*
pMgmt
);
...
...
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
浏览文件 @
b66718a4
...
...
@@ -46,6 +46,34 @@ int32_t mmProcessGetMonMmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
return
0
;
}
void
mmGetMnodeLoads
(
SMgmtWrapper
*
pWrapper
,
SMonMloadInfo
*
pInfo
)
{
SMnodeMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
pInfo
->
isMnode
=
1
;
mndGetLoad
(
pMgmt
->
pMnode
,
&
pInfo
->
load
);
}
int32_t
mmProcessGetMnodeLoadsReq
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pReq
)
{
SMonMloadInfo
mloads
=
{
0
};
mmGetMnodeLoads
(
pWrapper
,
&
mloads
);
int32_t
rspLen
=
tSerializeSMonMloadInfo
(
NULL
,
0
,
&
mloads
);
if
(
rspLen
<
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
void
*
pRsp
=
rpcMallocCont
(
rspLen
);
if
(
pRsp
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
tSerializeSMonMloadInfo
(
pRsp
,
rspLen
,
&
mloads
);
pReq
->
pRsp
=
pRsp
;
pReq
->
rspLen
=
rspLen
;
return
0
;
}
int32_t
mmProcessCreateReq
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
)
{
SDnode
*
pDnode
=
pWrapper
->
pDnode
;
SRpcMsg
*
pReq
=
&
pMsg
->
rpcMsg
;
...
...
@@ -117,6 +145,7 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
void
mmInitMsgHandle
(
SMgmtWrapper
*
pWrapper
)
{
dmSetMsgHandle
(
pWrapper
,
TDMT_MON_MM_INFO
,
mmProcessMonitorMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_MON_MM_LOAD
,
mmProcessMonitorMsg
,
DEFAULT_HANDLE
);
// Requests handled by DNODE
dmSetMsgHandle
(
pWrapper
,
TDMT_DND_CREATE_MNODE_RSP
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
...
...
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
浏览文件 @
b66718a4
...
...
@@ -36,6 +36,8 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
code
=
mmProcessAlterReq
(
pMgmt
,
pMsg
);
}
else
if
(
pMsg
->
rpcMsg
.
msgType
==
TDMT_MON_MM_INFO
)
{
code
=
mmProcessGetMonMmInfoReq
(
pMgmt
->
pWrapper
,
pMsg
);
}
else
if
(
pMsg
->
rpcMsg
.
msgType
==
TDMT_MON_MM_LOAD
)
{
code
=
mmProcessGetMnodeLoadsReq
(
pMgmt
->
pWrapper
,
pMsg
);
}
else
{
pMsg
->
pNode
=
pMgmt
->
pMnode
;
code
=
mndProcessMsg
(
pMsg
);
...
...
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
浏览文件 @
b66718a4
...
...
@@ -29,7 +29,6 @@ typedef struct SVnodesMgmt {
SHashObj
*
hash
;
SRWLatch
latch
;
SVnodesStat
state
;
SVnodesStat
lastState
;
STfs
*
pTfs
;
SQWorkerPool
queryPool
;
SQWorkerPool
fetchPool
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
b66718a4
...
...
@@ -16,21 +16,42 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
void
vmGetMonitorInfo
(
SMgmtWrapper
*
pWrapper
,
SMonVmInfo
*
vm
Info
)
{
void
vmGetMonitorInfo
(
SMgmtWrapper
*
pWrapper
,
SMonVmInfo
*
p
Info
)
{
SVnodesMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
tfsGetMonitorInfo
(
pMgmt
->
pTfs
,
&
vmInfo
->
tfs
);
taosWLockLatch
(
&
pMgmt
->
latch
);
vmInfo
->
vstat
.
totalVnodes
=
pMgmt
->
state
.
totalVnodes
;
vmInfo
->
vstat
.
masterNum
=
pMgmt
->
state
.
masterNum
;
vmInfo
->
vstat
.
numOfSelectReqs
=
pMgmt
->
state
.
numOfSelectReqs
-
pMgmt
->
lastState
.
numOfSelectReqs
;
vmInfo
->
vstat
.
numOfInsertReqs
=
pMgmt
->
state
.
numOfInsertReqs
-
pMgmt
->
lastState
.
numOfInsertReqs
;
vmInfo
->
vstat
.
numOfInsertSuccessReqs
=
pMgmt
->
state
.
numOfInsertSuccessReqs
-
pMgmt
->
lastState
.
numOfInsertSuccessReqs
;
vmInfo
->
vstat
.
numOfBatchInsertReqs
=
pMgmt
->
state
.
numOfBatchInsertReqs
-
pMgmt
->
lastState
.
numOfBatchInsertReqs
;
vmInfo
->
vstat
.
numOfBatchInsertSuccessReqs
=
pMgmt
->
state
.
numOfBatchInsertSuccessReqs
-
pMgmt
->
lastState
.
numOfBatchInsertSuccessReqs
;
pMgmt
->
lastState
=
pMgmt
->
state
;
taosWUnLockLatch
(
&
pMgmt
->
latch
);
SMonVloadInfo
vloads
=
{
0
};
vmGetVnodeLoads
(
pWrapper
,
&
vloads
);
if
(
vloads
.
pVloads
==
NULL
)
return
;
int32_t
totalVnodes
=
0
;
int32_t
masterNum
=
0
;
int64_t
numOfSelectReqs
=
0
;
int64_t
numOfInsertReqs
=
0
;
int64_t
numOfInsertSuccessReqs
=
0
;
int64_t
numOfBatchInsertReqs
=
0
;
int64_t
numOfBatchInsertSuccessReqs
=
0
;
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
vloads
.
pVloads
);
++
i
)
{
SVnodeLoad
*
pLoad
=
taosArrayGet
(
vloads
.
pVloads
,
i
);
numOfSelectReqs
+=
pLoad
->
numOfSelectReqs
;
numOfInsertReqs
+=
pLoad
->
numOfInsertReqs
;
numOfInsertSuccessReqs
+=
pLoad
->
numOfInsertSuccessReqs
;
numOfBatchInsertReqs
+=
pLoad
->
numOfBatchInsertReqs
;
numOfBatchInsertSuccessReqs
+=
pLoad
->
numOfBatchInsertSuccessReqs
;
if
(
pLoad
->
syncState
==
TAOS_SYNC_STATE_LEADER
)
masterNum
++
;
totalVnodes
++
;
}
pInfo
->
vstat
.
totalVnodes
=
totalVnodes
;
pInfo
->
vstat
.
masterNum
=
masterNum
;
pInfo
->
vstat
.
numOfSelectReqs
=
numOfSelectReqs
-
pMgmt
->
state
.
numOfSelectReqs
;
pInfo
->
vstat
.
numOfInsertReqs
=
numOfInsertReqs
-
pMgmt
->
state
.
numOfInsertReqs
;
pInfo
->
vstat
.
numOfInsertSuccessReqs
=
numOfInsertSuccessReqs
-
pMgmt
->
state
.
numOfInsertSuccessReqs
;
pInfo
->
vstat
.
numOfBatchInsertReqs
=
numOfBatchInsertReqs
-
pMgmt
->
state
.
numOfBatchInsertReqs
;
pInfo
->
vstat
.
numOfBatchInsertSuccessReqs
=
numOfBatchInsertSuccessReqs
-
pMgmt
->
state
.
numOfBatchInsertSuccessReqs
;
pMgmt
->
state
=
pInfo
->
vstat
;
taosArrayDestroy
(
vloads
.
pVloads
);
}
int32_t
vmProcessGetMonVmInfoReq
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pReq
)
{
...
...
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
浏览文件 @
b66718a4
...
...
@@ -356,19 +356,9 @@ void vmSetMgmtFp(SMgmtWrapper *pWrapper) {
void
vmGetVnodeLoads
(
SMgmtWrapper
*
pWrapper
,
SMonVloadInfo
*
pInfo
)
{
SVnodesMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
SVnodesStat
*
pStat
=
&
pMgmt
->
state
;
SArray
*
pLoads
=
taosArrayInit
(
pMgmt
->
state
.
totalVnodes
,
sizeof
(
SVnodeLoad
));
int32_t
totalVnodes
=
0
;
int32_t
masterNum
=
0
;
int64_t
numOfSelectReqs
=
0
;
int64_t
numOfInsertReqs
=
0
;
int64_t
numOfInsertSuccessReqs
=
0
;
int64_t
numOfBatchInsertReqs
=
0
;
int64_t
numOfBatchInsertSuccessReqs
=
0
;
pInfo
->
pVloads
=
pLoads
;
if
(
pLoads
==
NULL
)
return
;
pInfo
->
pVloads
=
taosArrayInit
(
pMgmt
->
state
.
totalVnodes
,
sizeof
(
SVnodeLoad
));
if
(
pInfo
->
pVloads
==
NULL
)
return
;
taosRLockLatch
(
&
pMgmt
->
latch
);
...
...
@@ -380,28 +370,9 @@ void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) {
SVnodeObj
*
pVnode
=
*
ppVnode
;
SVnodeLoad
vload
=
{
0
};
vnodeGetLoad
(
pVnode
->
pImpl
,
&
vload
);
taosArrayPush
(
pLoads
,
&
vload
);
numOfSelectReqs
+=
vload
.
numOfSelectReqs
;
numOfInsertReqs
+=
vload
.
numOfInsertReqs
;
numOfInsertSuccessReqs
+=
vload
.
numOfInsertSuccessReqs
;
numOfBatchInsertReqs
+=
vload
.
numOfBatchInsertReqs
;
numOfBatchInsertSuccessReqs
+=
vload
.
numOfBatchInsertSuccessReqs
;
totalVnodes
++
;
if
(
vload
.
role
==
TAOS_SYNC_STATE_LEADER
)
masterNum
++
;
taosArrayPush
(
pInfo
->
pVloads
,
&
vload
);
pIter
=
taosHashIterate
(
pMgmt
->
hash
,
pIter
);
}
taosRUnLockLatch
(
&
pMgmt
->
latch
);
taosWLockLatch
(
&
pMgmt
->
latch
);
pStat
->
totalVnodes
=
totalVnodes
;
pStat
->
masterNum
=
masterNum
;
pStat
->
numOfSelectReqs
=
numOfSelectReqs
;
pStat
->
numOfInsertReqs
=
numOfInsertReqs
;
pStat
->
numOfInsertSuccessReqs
=
numOfInsertSuccessReqs
;
pStat
->
numOfBatchInsertReqs
=
numOfBatchInsertReqs
;
pStat
->
numOfBatchInsertSuccessReqs
=
numOfBatchInsertSuccessReqs
;
taosWUnLockLatch
(
&
pMgmt
->
latch
);
}
\ No newline at end of file
source/dnode/mnode/impl/inc/mndInt.h
浏览文件 @
b66718a4
...
...
@@ -47,21 +47,6 @@ typedef int32_t (*MndInitFp)(SMnode *pMnode);
typedef
void
(
*
MndCleanupFp
)(
SMnode
*
pMnode
);
typedef
int32_t
(
*
ShowRetrieveFp
)(
SNodeMsg
*
pMsg
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
typedef
void
(
*
ShowFreeIterFp
)(
SMnode
*
pMnode
,
void
*
pIter
);
typedef
struct
SMnodeLoad
{
int64_t
numOfDnode
;
int64_t
numOfMnode
;
int64_t
numOfVgroup
;
int64_t
numOfDatabase
;
int64_t
numOfSuperTable
;
int64_t
numOfChildTable
;
int64_t
numOfNormalTable
;
int64_t
numOfColumn
;
int64_t
totalPoints
;
int64_t
totalStorage
;
int64_t
compStorage
;
}
SMnodeLoad
;
typedef
struct
SQWorkerMgmt
SQHandle
;
typedef
struct
{
...
...
@@ -129,7 +114,6 @@ struct SMnode {
void
mndSetMsgHandle
(
SMnode
*
pMnode
,
tmsg_t
msgType
,
MndMsgFp
fp
);
int64_t
mndGenerateUid
(
char
*
name
,
int32_t
len
);
void
mndGetLoad
(
SMnode
*
pMnode
,
SMnodeLoad
*
pLoad
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/inc/mndMnode.h
浏览文件 @
b66718a4
...
...
@@ -22,14 +22,13 @@
extern
"C"
{
#endif
int32_t
mndInitMnode
(
SMnode
*
pMnode
);
void
mndCleanupMnode
(
SMnode
*
pMnode
);
SMnodeObj
*
mndAcquireMnode
(
SMnode
*
pMnode
,
int32_t
mnodeId
);
void
mndReleaseMnode
(
SMnode
*
pMnode
,
SMnodeObj
*
pObj
);
bool
mndIsMnode
(
SMnode
*
pMnode
,
int32_t
dnodeId
);
void
mndGetMnodeEpSet
(
SMnode
*
pMnode
,
SEpSet
*
pEpSet
);
void
mndUpdateMnodeRole
(
SMnode
*
pMnode
);
const
char
*
mndGetRoleStr
(
int32_t
role
);
int32_t
mndInitMnode
(
SMnode
*
pMnode
);
void
mndCleanupMnode
(
SMnode
*
pMnode
);
SMnodeObj
*
mndAcquireMnode
(
SMnode
*
pMnode
,
int32_t
mnodeId
);
void
mndReleaseMnode
(
SMnode
*
pMnode
,
SMnodeObj
*
pObj
);
bool
mndIsMnode
(
SMnode
*
pMnode
,
int32_t
dnodeId
);
void
mndGetMnodeEpSet
(
SMnode
*
pMnode
,
SEpSet
*
pEpSet
);
void
mndUpdateMnodeRole
(
SMnode
*
pMnode
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
b66718a4
...
...
@@ -326,7 +326,7 @@ static int32_t mndProcessStatusReq(SNodeMsg *pReq) {
SVgObj
*
pVgroup
=
mndAcquireVgroup
(
pMnode
,
pVload
->
vgId
);
if
(
pVgroup
!=
NULL
)
{
if
(
pVload
->
rol
e
==
TAOS_SYNC_STATE_LEADER
)
{
if
(
pVload
->
syncStat
e
==
TAOS_SYNC_STATE_LEADER
)
{
pVgroup
->
numOfTables
=
pVload
->
numOfTables
;
pVgroup
->
numOfTimeSeries
=
pVload
->
numOfTimeSeries
;
pVgroup
->
totalStorage
=
pVload
->
totalStorage
;
...
...
@@ -335,10 +335,10 @@ static int32_t mndProcessStatusReq(SNodeMsg *pReq) {
}
bool
roleChanged
=
false
;
for
(
int32_t
vg
=
0
;
vg
<
pVgroup
->
replica
;
++
vg
)
{
if
(
pVgroup
->
vnodeGid
[
vg
].
role
!=
pVload
->
rol
e
)
{
if
(
pVgroup
->
vnodeGid
[
vg
].
role
!=
pVload
->
syncStat
e
)
{
roleChanged
=
true
;
}
pVgroup
->
vnodeGid
[
vg
].
role
=
pVload
->
rol
e
;
pVgroup
->
vnodeGid
[
vg
].
role
=
pVload
->
syncStat
e
;
}
if
(
roleChanged
)
{
// notify scheduler role has changed
...
...
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
b66718a4
...
...
@@ -75,19 +75,6 @@ void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj) {
sdbRelease
(
pMnode
->
pSdb
,
pObj
);
}
const
char
*
mndGetRoleStr
(
int32_t
showType
)
{
switch
(
showType
)
{
case
TAOS_SYNC_STATE_FOLLOWER
:
return
"FOLLOWER"
;
case
TAOS_SYNC_STATE_CANDIDATE
:
return
"CANDIDATE"
;
case
TAOS_SYNC_STATE_LEADER
:
return
"LEADER"
;
default:
return
"ERROR"
;
}
}
void
mndUpdateMnodeRole
(
SMnode
*
pMnode
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
...
...
@@ -637,7 +624,7 @@ static int32_t mndRetrieveMnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *p
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
b1
,
false
);
const
char
*
roles
=
mndGetRole
Str
(
pObj
->
role
);
const
char
*
roles
=
sync
Str
(
pObj
->
role
);
char
*
b2
=
taosMemoryCalloc
(
1
,
strlen
(
roles
)
+
VARSTR_HEADER_SIZE
);
STR_WITH_MAXSIZE_TO_VARSTR
(
b2
,
roles
,
pShow
->
bytes
[
cols
]);
...
...
source/dnode/mnode/impl/src/mndTelem.c
浏览文件 @
b66718a4
...
...
@@ -24,20 +24,60 @@
#define TELEMETRY_SERVER "telemetry.taosdata.com"
#define TELEMETRY_PORT 80
typedef
struct
{
int64_t
numOfDnode
;
int64_t
numOfMnode
;
int64_t
numOfVgroup
;
int64_t
numOfDatabase
;
int64_t
numOfSuperTable
;
int64_t
numOfChildTable
;
int64_t
numOfNormalTable
;
int64_t
numOfColumn
;
int64_t
totalPoints
;
int64_t
totalStorage
;
int64_t
compStorage
;
}
SMnodeStat
;
static
void
mndGetStat
(
SMnode
*
pMnode
,
SMnodeStat
*
pStat
)
{
memset
(
pStat
,
0
,
sizeof
(
SMnodeStat
));
SSdb
*
pSdb
=
pMnode
->
pSdb
;
pStat
->
numOfDnode
=
sdbGetSize
(
pSdb
,
SDB_DNODE
);
pStat
->
numOfMnode
=
sdbGetSize
(
pSdb
,
SDB_MNODE
);
pStat
->
numOfVgroup
=
sdbGetSize
(
pSdb
,
SDB_VGROUP
);
pStat
->
numOfDatabase
=
sdbGetSize
(
pSdb
,
SDB_DB
);
pStat
->
numOfSuperTable
=
sdbGetSize
(
pSdb
,
SDB_STB
);
void
*
pIter
=
NULL
;
while
(
1
)
{
SVgObj
*
pVgroup
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
break
;
pStat
->
numOfChildTable
+=
pVgroup
->
numOfTables
;
pStat
->
numOfColumn
+=
pVgroup
->
numOfTimeSeries
;
pStat
->
totalPoints
+=
pVgroup
->
pointsWritten
;
pStat
->
totalStorage
+=
pVgroup
->
totalStorage
;
pStat
->
compStorage
+=
pVgroup
->
compStorage
;
sdbRelease
(
pSdb
,
pVgroup
);
}
}
static
void
mndBuildRuntimeInfo
(
SMnode
*
pMnode
,
SJson
*
pJson
)
{
SMnode
Load
load
=
{
0
};
mndGet
Load
(
pMnode
,
&
load
);
tjsonAddDoubleToObject
(
pJson
,
"numOfDnode"
,
load
.
numOfDnode
);
tjsonAddDoubleToObject
(
pJson
,
"numOfMnode"
,
load
.
numOfMnode
);
tjsonAddDoubleToObject
(
pJson
,
"numOfVgroup"
,
load
.
numOfVgroup
);
tjsonAddDoubleToObject
(
pJson
,
"numOfDatabase"
,
load
.
numOfDatabase
);
tjsonAddDoubleToObject
(
pJson
,
"numOfSuperTable"
,
load
.
numOfSuperTable
);
tjsonAddDoubleToObject
(
pJson
,
"numOfChildTable"
,
load
.
numOfChildTable
);
tjsonAddDoubleToObject
(
pJson
,
"numOfColumn"
,
load
.
numOfColumn
);
tjsonAddDoubleToObject
(
pJson
,
"numOfPoint"
,
load
.
totalPoints
);
tjsonAddDoubleToObject
(
pJson
,
"totalStorage"
,
load
.
totalStorage
);
tjsonAddDoubleToObject
(
pJson
,
"compStorage"
,
load
.
compStorage
);
SMnode
Stat
mstat
=
{
0
};
mndGet
Stat
(
pMnode
,
&
mstat
);
tjsonAddDoubleToObject
(
pJson
,
"numOfDnode"
,
mstat
.
numOfDnode
);
tjsonAddDoubleToObject
(
pJson
,
"numOfMnode"
,
mstat
.
numOfMnode
);
tjsonAddDoubleToObject
(
pJson
,
"numOfVgroup"
,
mstat
.
numOfVgroup
);
tjsonAddDoubleToObject
(
pJson
,
"numOfDatabase"
,
mstat
.
numOfDatabase
);
tjsonAddDoubleToObject
(
pJson
,
"numOfSuperTable"
,
mstat
.
numOfSuperTable
);
tjsonAddDoubleToObject
(
pJson
,
"numOfChildTable"
,
mstat
.
numOfChildTable
);
tjsonAddDoubleToObject
(
pJson
,
"numOfColumn"
,
mstat
.
numOfColumn
);
tjsonAddDoubleToObject
(
pJson
,
"numOfPoint"
,
mstat
.
totalPoints
);
tjsonAddDoubleToObject
(
pJson
,
"totalStorage"
,
mstat
.
totalStorage
);
tjsonAddDoubleToObject
(
pJson
,
"compStorage"
,
mstat
.
compStorage
);
}
static
char
*
mndBuildTelemetryReport
(
SMnode
*
pMnode
)
{
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
b66718a4
...
...
@@ -545,7 +545,7 @@ static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock*
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pVgroup
->
vnodeGid
[
i
].
dnodeId
,
false
);
char
buf1
[
20
]
=
{
0
};
const
char
*
role
=
mndGetRole
Str
(
pVgroup
->
vnodeGid
[
i
].
role
);
const
char
*
role
=
sync
Str
(
pVgroup
->
vnodeGid
[
i
].
role
);
STR_WITH_MAXSIZE_TO_VARSTR
(
buf1
,
role
,
pShow
->
bytes
[
cols
]);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
...
...
@@ -636,7 +636,7 @@ static int32_t mndRetrieveVnodes(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* p
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
val
,
false
);
char
buf
[
20
]
=
{
0
};
STR_TO_VARSTR
(
buf
,
mndGetRole
Str
(
pVgid
->
role
));
STR_TO_VARSTR
(
buf
,
sync
Str
(
pVgid
->
role
));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
buf
,
false
);
...
...
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
b66718a4
...
...
@@ -414,31 +414,6 @@ int64_t mndGenerateUid(char *name, int32_t len) {
}
while
(
true
);
}
void
mndGetLoad
(
SMnode
*
pMnode
,
SMnodeLoad
*
pLoad
)
{
memset
(
pLoad
,
0
,
sizeof
(
SMnodeLoad
));
SSdb
*
pSdb
=
pMnode
->
pSdb
;
pLoad
->
numOfDnode
=
sdbGetSize
(
pSdb
,
SDB_DNODE
);
pLoad
->
numOfMnode
=
sdbGetSize
(
pSdb
,
SDB_MNODE
);
pLoad
->
numOfVgroup
=
sdbGetSize
(
pSdb
,
SDB_VGROUP
);
pLoad
->
numOfDatabase
=
sdbGetSize
(
pSdb
,
SDB_DB
);
pLoad
->
numOfSuperTable
=
sdbGetSize
(
pSdb
,
SDB_STB
);
void
*
pIter
=
NULL
;
while
(
1
)
{
SVgObj
*
pVgroup
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
if
(
pIter
==
NULL
)
break
;
pLoad
->
numOfChildTable
+=
pVgroup
->
numOfTables
;
pLoad
->
numOfColumn
+=
pVgroup
->
numOfTimeSeries
;
pLoad
->
totalPoints
+=
pVgroup
->
pointsWritten
;
pLoad
->
totalStorage
+=
pVgroup
->
totalStorage
;
pLoad
->
compStorage
+=
pVgroup
->
compStorage
;
sdbRelease
(
pSdb
,
pVgroup
);
}
}
int32_t
mndGetMonitorInfo
(
SMnode
*
pMnode
,
SMonClusterInfo
*
pClusterInfo
,
SMonVgroupInfo
*
pVgroupInfo
,
SMonGrantInfo
*
pGrantInfo
)
{
...
...
@@ -486,7 +461,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
SMonMnodeDesc
desc
=
{
0
};
desc
.
mnode_id
=
pObj
->
id
;
tstrncpy
(
desc
.
mnode_ep
,
pObj
->
pDnode
->
ep
,
sizeof
(
desc
.
mnode_ep
));
tstrncpy
(
desc
.
role
,
mndGetRole
Str
(
pObj
->
role
),
sizeof
(
desc
.
role
));
tstrncpy
(
desc
.
role
,
sync
Str
(
pObj
->
role
),
sizeof
(
desc
.
role
));
taosArrayPush
(
pClusterInfo
->
mnodes
,
&
desc
);
sdbRelease
(
pSdb
,
pObj
);
...
...
@@ -520,7 +495,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
SVnodeGid
*
pVgid
=
&
pVgroup
->
vnodeGid
[
i
];
SMonVnodeDesc
*
pVnDesc
=
&
desc
.
vnodes
[
i
];
pVnDesc
->
dnode_id
=
pVgid
->
dnodeId
;
tstrncpy
(
pVnDesc
->
vnode_role
,
mndGetRole
Str
(
pVgid
->
role
),
sizeof
(
pVnDesc
->
vnode_role
));
tstrncpy
(
pVnDesc
->
vnode_role
,
sync
Str
(
pVgid
->
role
),
sizeof
(
pVnDesc
->
vnode_role
));
if
(
pVgid
->
role
==
TAOS_SYNC_STATE_LEADER
)
{
tstrncpy
(
desc
.
status
,
"ready"
,
sizeof
(
desc
.
status
));
pClusterInfo
->
vgroups_alive
++
;
...
...
@@ -545,3 +520,8 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
return
0
;
}
int32_t
mndGetLoad
(
SMnode
*
pMnode
,
SMnodeLoad
*
pLoad
)
{
pLoad
->
syncState
=
pMnode
->
syncMgmt
.
state
;
return
0
;
}
\ No newline at end of file
source/dnode/vnode/src/vnd/vnodeQuery.c
浏览文件 @
b66718a4
...
...
@@ -152,7 +152,7 @@ _exit:
int32_t
vnodeGetLoad
(
SVnode
*
pVnode
,
SVnodeLoad
*
pLoad
)
{
pLoad
->
vgId
=
TD_VID
(
pVnode
);
pLoad
->
rol
e
=
TAOS_SYNC_STATE_LEADER
;
pLoad
->
syncStat
e
=
TAOS_SYNC_STATE_LEADER
;
pLoad
->
numOfTables
=
metaGetTbNum
(
pVnode
->
pMeta
);
pLoad
->
numOfTimeSeries
=
400
;
pLoad
->
totalStorage
=
300
;
...
...
source/libs/monitor/src/monMsg.c
浏览文件 @
b66718a4
...
...
@@ -473,7 +473,7 @@ int32_t tSerializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInfo)
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pInfo
->
pVloads
);
++
i
)
{
SVnodeLoad
*
pLoad
=
taosArrayGet
(
pInfo
->
pVloads
,
i
);
if
(
tEncodeI32
(
&
encoder
,
pLoad
->
vgId
)
<
0
)
return
-
1
;
if
(
tEncodeI
8
(
&
encoder
,
pLoad
->
rol
e
)
<
0
)
return
-
1
;
if
(
tEncodeI
32
(
&
encoder
,
pLoad
->
syncStat
e
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pLoad
->
numOfTables
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pLoad
->
numOfTimeSeries
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pLoad
->
totalStorage
)
<
0
)
return
-
1
;
...
...
@@ -507,7 +507,7 @@ int32_t tDeserializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInf
for
(
int32_t
i
=
0
;
i
<
arraySize
;
++
i
)
{
SVnodeLoad
load
=
{
0
};
if
(
tDecodeI32
(
&
decoder
,
&
load
.
vgId
)
<
0
)
return
-
1
;
if
(
tDecodeI
8
(
&
decoder
,
&
load
.
rol
e
)
<
0
)
return
-
1
;
if
(
tDecodeI
32
(
&
decoder
,
&
load
.
syncStat
e
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
load
.
numOfTables
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
load
.
numOfTimeSeries
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
&
decoder
,
&
load
.
totalStorage
)
<
0
)
return
-
1
;
...
...
@@ -530,3 +530,30 @@ void tFreeSMonVloadInfo(SMonVloadInfo *pInfo) {
taosArrayDestroy
(
pInfo
->
pVloads
);
pInfo
->
pVloads
=
NULL
;
}
int32_t
tSerializeSMonMloadInfo
(
void
*
buf
,
int32_t
bufLen
,
SMonMloadInfo
*
pInfo
)
{
SCoder
encoder
=
{
0
};
tCoderInit
(
&
encoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_ENCODER
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pInfo
->
isMnode
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pInfo
->
load
.
syncState
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tCoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSMonMloadInfo
(
void
*
buf
,
int32_t
bufLen
,
SMonMloadInfo
*
pInfo
)
{
SCoder
decoder
=
{
0
};
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
buf
,
bufLen
,
TD_DECODER
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pInfo
->
isMnode
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pInfo
->
load
.
syncState
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tCoderClear
(
&
decoder
);
return
0
;
}
\ No newline at end of file
source/libs/sync/src/syncMain.c
浏览文件 @
b66718a4
...
...
@@ -929,4 +929,17 @@ static void syncFreeNode(void* param) {
syncNodePrint2
((
char
*
)
"==syncFreeNode=="
,
pNode
);
taosMemoryFree
(
pNode
);
}
const
char
*
syncStr
(
ESyncState
state
)
{
switch
(
state
)
{
case
TAOS_SYNC_STATE_FOLLOWER
:
return
"FOLLOWER"
;
case
TAOS_SYNC_STATE_CANDIDATE
:
return
"CANDIDATE"
;
case
TAOS_SYNC_STATE_LEADER
:
return
"LEADER"
;
default:
return
"ERROR"
;
}
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录