提交 a41dc27e 编写于 作者: S Shengliang Guan

refactor: dnode monitor

上级 e0f53dfc
......@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "bmInt.h"
static void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {}
void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {}
int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) {
SMonBmInfo bmInfo = {0};
......
......@@ -32,7 +32,9 @@ typedef struct SDnodeMgmt {
SSingleWorker mgmtWorker;
ProcessCreateNodeFp processCreateNodeFp;
ProcessDropNodeFp processDropNodeFp;
IsNodeRequiredFp isNodeRequiredFp;
SendMonitorReportFp sendMonitorReportFp;
GetVnodeLoadsFp getVnodeLoadsFp;
GetMnodeLoadsFp getMnodeLoadsFp;
} SDnodeMgmt;
// dmHandle.c
......@@ -43,11 +45,6 @@ int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
// dmMonitor.c
void dmGetVnodeLoads(SDnodeMgmt *pMgmt, SMonVloadInfo *pInfo);
void dmGetMnodeLoads(SDnodeMgmt *pMgmt, SMonMloadInfo *pInfo);
void dmSendMonitorReport(SDnodeMgmt *pMgmt);
// dmWorker.c
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt);
......
......@@ -72,11 +72,11 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
taosRUnLockLatch(&pMgmt->pData->latch);
SMonVloadInfo vinfo = {0};
dmGetVnodeLoads(pMgmt, &vinfo);
(*pMgmt->getVnodeLoadsFp)(&vinfo);
req.pVloads = vinfo.pVloads;
SMonMloadInfo minfo = {0};
dmGetMnodeLoads(pMgmt, &minfo);
(*pMgmt->getMnodeLoadsFp)(&minfo);
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen);
......@@ -115,7 +115,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
SServerStatusRsp statusRsp = {0};
SMonMloadInfo minfo = {0};
dmGetMnodeLoads(pMgmt, &minfo);
(*pMgmt->getMnodeLoadsFp)(&minfo);
if (minfo.isMnode && minfo.load.syncState == TAOS_SYNC_STATE_ERROR) {
pStatus->statusCode = TSDB_SRV_STATUS_SERVICE_DEGRADED;
snprintf(pStatus->details, sizeof(pStatus->details), "mnode sync state is %s", syncStr(minfo.load.syncState));
......@@ -123,7 +123,7 @@ static void dmGetServerRunStatus(SDnodeMgmt *pMgmt, SServerStatusRsp *pStatus) {
}
SMonVloadInfo vinfo = {0};
dmGetVnodeLoads(pMgmt, &vinfo);
(*pMgmt->getVnodeLoadsFp)(&vinfo);
for (int32_t i = 0; i < taosArrayGetSize(vinfo.pVloads); ++i) {
SVnodeLoad *pLoad = taosArrayGet(vinfo.pVloads, i);
if (pLoad->syncState == TAOS_SYNC_STATE_ERROR) {
......
......@@ -45,7 +45,9 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->name = pInput->name;
pMgmt->processCreateNodeFp = pInput->processCreateNodeFp;
pMgmt->processDropNodeFp = pInput->processDropNodeFp;
pMgmt->isNodeRequiredFp = pInput->isNodeRequiredFp;
pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp;
pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp;
pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp;
if (dmStartWorker(pMgmt) != 0) {
return -1;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dmInt.h"
#define dmSendLocalRecv(pMgmt, mtype, func, pInfo) \
if (!tsMultiProcess) { \
SRpcMsg rsp = {0}; \
SRpcMsg req = {.msgType = mtype}; \
SEpSet epset = {.inUse = 0, .numOfEps = 1}; \
tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN); \
epset.eps[0].port = tsServerPort; \
rpcSendRecv(pMgmt->msgCb.clientRpc, &epset, &req, &rsp); \
if (rsp.code == 0 && rsp.contLen > 0) { \
func(rsp.pCont, rsp.contLen, pInfo); \
} \
rpcFreeCont(rsp.pCont); \
}
static void dmGetMonitorBasicInfo(SDnodeMgmt *pMgmt, SMonBasicInfo *pInfo) {
pInfo->protocol = 1;
pInfo->dnode_id = pMgmt->pData->dnodeId;
pInfo->cluster_id = pMgmt->pData->clusterId;
tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN);
}
static void dmGetMonitorDnodeInfo(SDnodeMgmt *pMgmt, SMonDnodeInfo *pInfo) {
pInfo->uptime = (taosGetTimestampMs() - pMgmt->pData->rebootTime) / (86400000.0f);
pInfo->has_mnode = (*pMgmt->isNodeRequiredFp)(MNODE);
pInfo->has_qnode = (*pMgmt->isNodeRequiredFp)(QNODE);
pInfo->has_snode = (*pMgmt->isNodeRequiredFp)(SNODE);
pInfo->has_bnode = (*pMgmt->isNodeRequiredFp)(BNODE);
tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name));
pInfo->logdir.size = tsLogSpace.size;
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
pInfo->tempdir.size = tsTempSpace.size;
}
static void dmGetMonitorInfo(SDnodeMgmt *pMgmt, SMonDmInfo *pInfo) {
dmGetMonitorBasicInfo(pMgmt, &pInfo->basic);
dmGetMonitorDnodeInfo(pMgmt, &pInfo->dnode);
dmGetMonitorSystemInfo(&pInfo->sys);
}
void dmSendMonitorReport(SDnodeMgmt *pMgmt) {
if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return;
dTrace("send monitor report to %s:%u", tsMonitorFqdn, tsMonitorPort);
SMonDmInfo dmInfo = {0};
SMonMmInfo mmInfo = {0};
SMonVmInfo vmInfo = {0};
SMonQmInfo qmInfo = {0};
SMonSmInfo smInfo = {0};
SMonBmInfo bmInfo = {0};
dmGetMonitorInfo(pMgmt, &dmInfo);
dmSendLocalRecv(pMgmt, TDMT_MON_VM_INFO, tDeserializeSMonVmInfo, &vmInfo);
if (dmInfo.dnode.has_mnode) {
dmSendLocalRecv(pMgmt, TDMT_MON_MM_INFO, tDeserializeSMonMmInfo, &mmInfo);
}
if (dmInfo.dnode.has_qnode) {
dmSendLocalRecv(pMgmt, TDMT_MON_QM_INFO, tDeserializeSMonQmInfo, &qmInfo);
}
if (dmInfo.dnode.has_snode) {
dmSendLocalRecv(pMgmt, TDMT_MON_SM_INFO, tDeserializeSMonSmInfo, &smInfo);
}
if (dmInfo.dnode.has_bnode) {
dmSendLocalRecv(pMgmt, TDMT_MON_BM_INFO, tDeserializeSMonBmInfo, &bmInfo);
}
monSetDmInfo(&dmInfo);
monSetMmInfo(&mmInfo);
monSetVmInfo(&vmInfo);
monSetQmInfo(&qmInfo);
monSetSmInfo(&smInfo);
monSetBmInfo(&bmInfo);
tFreeSMonMmInfo(&mmInfo);
tFreeSMonVmInfo(&vmInfo);
tFreeSMonQmInfo(&qmInfo);
tFreeSMonSmInfo(&smInfo);
tFreeSMonBmInfo(&bmInfo);
monSendReport();
}
void dmGetVnodeLoads(SDnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
dmSendLocalRecv(pMgmt, TDMT_MON_VM_LOAD, tDeserializeSMonVloadInfo, pInfo);
}
void dmGetMnodeLoads(SDnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
dmSendLocalRecv(pMgmt, TDMT_MON_MM_LOAD, tDeserializeSMonMloadInfo, pInfo);
}
......@@ -50,7 +50,7 @@ static void *dmMonitorThreadFp(void *param) {
int64_t curTime = taosGetTimestampMs();
float interval = (curTime - lastTime) / 1000.0f;
if (interval >= tsMonitorInterval) {
dmSendMonitorReport(pMgmt);
(*pMgmt->sendMonitorReportFp)();
lastTime = curTime;
}
}
......
......@@ -16,8 +16,13 @@
#define _DEFAULT_SOURCE
#include "mmInt.h"
static void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *mmInfo) {
mndGetMonitorInfo(pMgmt->pMnode, &mmInfo->cluster, &mmInfo->vgroup, &mmInfo->grant);
void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *pInfo) {
mndGetMonitorInfo(pMgmt->pMnode, &pInfo->cluster, &pInfo->vgroup, &pInfo->grant);
}
void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
pInfo->isMnode = 1;
mndGetLoad(pMgmt->pMnode, &pInfo->load);
}
int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
......@@ -45,11 +50,6 @@ int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
return 0;
}
static void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) {
pInfo->isMnode = 1;
mndGetLoad(pMgmt->pMnode, &pInfo->load);
}
int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) {
SMonMloadInfo mloads = {0};
mmGetMnodeLoads(pMgmt, &mloads);
......
......@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "qmInt.h"
static void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {}
void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {}
int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) {
SMonQmInfo qmInfo = {0};
......
......@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "smInt.h"
static void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {}
int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) {
SMonSmInfo smInfo = {0};
......
......@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
static void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
if (pInfo->pVloads == NULL) return;
......@@ -37,7 +37,7 @@ static void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
taosRUnLockLatch(&pMgmt->latch);
}
static void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
SMonVloadInfo vloads = {0};
vmGetVnodeLoads(pMgmt, &vloads);
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_IMP_H_
#define _TD_DND_IMP_H_
#ifndef _TD_DND_MGMT_H_
#define _TD_DND_MGMT_H_
// tobe deleted
#include "uv.h"
......@@ -165,16 +165,13 @@ SMsgCb dmGetMsgcb(SDnode *pDnode);
int32_t dmInitMsgHandle(SDnode *pDnode);
int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
// mgmt nodes
SMgmtFunc dmGetMgmtFunc();
SMgmtFunc bmGetMgmtFunc();
SMgmtFunc qmGetMgmtFunc();
SMgmtFunc smGetMgmtFunc();
SMgmtFunc vmGetMgmtFunc();
SMgmtFunc mmGetMgmtFunc();
// dmMonitor.c
void dmSendMonitorReport();
void dmGetVnodeLoads(SMonVloadInfo *pInfo);
void dmGetMnodeLoads(SMonMloadInfo *pInfo);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_IMP_H_*/
\ No newline at end of file
#endif /*_TD_DND_MGMT_H_*/
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_NODES_H_
#define _TD_DND_NODES_H_
#include "dmInt.h"
#ifdef __cplusplus
extern "C" {
#endif
SMgmtFunc dmGetMgmtFunc();
SMgmtFunc bmGetMgmtFunc();
SMgmtFunc qmGetMgmtFunc();
SMgmtFunc smGetMgmtFunc();
SMgmtFunc vmGetMgmtFunc();
SMgmtFunc mmGetMgmtFunc();
void mmGetMonitorInfo(void *pMgmt, SMonMmInfo *pInfo);
void vmGetMonitorInfo(void *pMgmt, SMonVmInfo *pInfo);
void qmGetMonitorInfo(void *pMgmt, SMonQmInfo *pInfo);
void smGetMonitorInfo(void *pMgmt, SMonSmInfo *pInfo);
void bmGetMonitorInfo(void *pMgmt, SMonBmInfo *pInfo);
void vmGetVnodeLoads(void *pMgmt, SMonVloadInfo *pInfo);
void mmGetMnodeLoads(void *pMgmt, SMonMloadInfo *pInfo);
#ifdef __cplusplus
}
#endif
#endif /*_TD_DND_NODES_H_*/
\ No newline at end of file
......@@ -168,11 +168,6 @@ static int32_t dmProcessDropNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
return code;
}
static bool dmIsNodeRequired(EDndNodeType ntype) {
SDnode *pDnode = dmInstance();
return pDnode->wrappers[ntype].required;
}
SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
SMgmtInputOpt opt = {
.path = pWrapper->path,
......@@ -180,7 +175,9 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
.pData = &pWrapper->pDnode->data,
.processCreateNodeFp = dmProcessCreateNodeReq,
.processDropNodeFp = dmProcessDropNodeReq,
.isNodeRequiredFp = dmIsNodeRequired,
.sendMonitorReportFp = dmSendMonitorReport,
.getVnodeLoadsFp = dmGetVnodeLoads,
.getMnodeLoadsFp = dmGetMnodeLoads,
};
opt.msgCb = dmGetMsgcb(pWrapper->pDnode);
......
......@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "dmMgmt.h"
#include "dmNodes.h"
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "dmMgmt.h"
#include "dmNodes.h"
#define dmSendLocalRecv(pDnode, mtype, func, pInfo) \
SRpcMsg rsp = {0}; \
SRpcMsg req = {.msgType = mtype}; \
SEpSet epset = {.inUse = 0, .numOfEps = 1}; \
tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN); \
epset.eps[0].port = tsServerPort; \
rpcSendRecv(pDnode->trans.clientRpc, &epset, &req, &rsp); \
if (rsp.code == 0 && rsp.contLen > 0) { \
func(rsp.pCont, rsp.contLen, pInfo); \
} \
rpcFreeCont(rsp.pCont);
static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
pInfo->protocol = 1;
pInfo->dnode_id = pDnode->data.dnodeId;
pInfo->cluster_id = pDnode->data.clusterId;
tstrncpy(pInfo->dnode_ep, tsLocalEp, TSDB_EP_LEN);
}
static void dmGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
pInfo->uptime = (taosGetTimestampMs() - pDnode->data.rebootTime) / (86400000.0f);
pInfo->has_mnode = pDnode->wrappers[MNODE].required;
pInfo->has_qnode = pDnode->wrappers[QNODE].required;
pInfo->has_snode = pDnode->wrappers[SNODE].required;
pInfo->has_bnode = pDnode->wrappers[BNODE].required;
tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name));
pInfo->logdir.size = tsLogSpace.size;
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
pInfo->tempdir.size = tsTempSpace.size;
}
static void dmGetDmMonitorInfo(SDnode *pDnode) {
SMonDmInfo dmInfo = {0};
dmGetMonitorBasicInfo(pDnode, &dmInfo.basic);
dmGetMonitorDnodeInfo(pDnode, &dmInfo.dnode);
dmGetMonitorSystemInfo(&dmInfo.sys);
monSetDmInfo(&dmInfo);
}
static void dmGetMmMonitorInfo(SDnode *pDnode) {
SMonMmInfo mmInfo = {0};
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, MNODE);
if (pWrapper != NULL && pWrapper->required) {
if (tsMultiProcess) {
dmSendLocalRecv(pDnode, TDMT_MON_MM_INFO, tDeserializeSMonMmInfo, &mmInfo);
} else if (pWrapper->pMgmt != NULL) {
mmGetMonitorInfo(pWrapper->pMgmt, &mmInfo);
}
}
dmReleaseWrapper(pWrapper);
monSetMmInfo(&mmInfo);
tFreeSMonMmInfo(&mmInfo);
}
static void dmGetVmMonitorInfo(SDnode *pDnode) {
SMonVmInfo vmInfo = {0};
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, VNODE);
if (pWrapper != NULL && pWrapper->required) {
if (tsMultiProcess) {
dmSendLocalRecv(pDnode, TDMT_MON_VM_INFO, tDeserializeSMonVmInfo, &vmInfo);
} else if (pWrapper->pMgmt != NULL) {
vmGetMonitorInfo(pWrapper->pMgmt, &vmInfo);
}
}
dmReleaseWrapper(pWrapper);
monSetVmInfo(&vmInfo);
tFreeSMonVmInfo(&vmInfo);
}
static void dmGetQmMonitorInfo(SDnode *pDnode) {
SMonQmInfo qmInfo = {0};
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, VNODE);
if (pWrapper != NULL && pWrapper->required) {
if (tsMultiProcess) {
dmSendLocalRecv(pDnode, TDMT_MON_QM_INFO, tDeserializeSMonQmInfo, &qmInfo);
} else if (pWrapper->pMgmt != NULL) {
qmGetMonitorInfo(pWrapper->pMgmt, &qmInfo);
}
}
dmReleaseWrapper(pWrapper);
monSetQmInfo(&qmInfo);
tFreeSMonQmInfo(&qmInfo);
}
static void dmGetSmMonitorInfo(SDnode *pDnode) {
SMonSmInfo smInfo = {0};
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, VNODE);
if (pWrapper != NULL && pWrapper->required) {
if (tsMultiProcess) {
dmSendLocalRecv(pDnode, TDMT_MON_SM_INFO, tDeserializeSMonSmInfo, &smInfo);
} else if (pWrapper->pMgmt != NULL) {
smGetMonitorInfo(pWrapper->pMgmt, &smInfo);
}
}
dmReleaseWrapper(pWrapper);
monSetSmInfo(&smInfo);
tFreeSMonSmInfo(&smInfo);
}
static void dmGetBmMonitorInfo(SDnode *pDnode) {
SMonBmInfo bmInfo = {0};
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, VNODE);
if (pWrapper != NULL && pWrapper->required) {
if (tsMultiProcess) {
dmSendLocalRecv(pDnode, TDMT_MON_BM_INFO, tDeserializeSMonBmInfo, &bmInfo);
} else if (pWrapper->pMgmt != NULL) {
bmGetMonitorInfo(pWrapper->pMgmt, &bmInfo);
}
}
dmReleaseWrapper(pWrapper);
monSetBmInfo(&bmInfo);
tFreeSMonBmInfo(&bmInfo);
}
void dmSendMonitorReport() {
if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return;
dTrace("send monitor report to %s:%u", tsMonitorFqdn, tsMonitorPort);
SDnode *pDnode = dmInstance();
dmGetDmMonitorInfo(pDnode);
dmGetMmMonitorInfo(pDnode);
dmGetVmMonitorInfo(pDnode);
dmGetQmMonitorInfo(pDnode);
dmGetSmMonitorInfo(pDnode);
dmGetBmMonitorInfo(pDnode);
monSendReport();
}
void dmGetVnodeLoads(SMonVloadInfo *pInfo) {
SDnode *pDnode = dmInstance();
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, VNODE);
if (pWrapper != NULL && pWrapper->required) {
if (tsMultiProcess) {
dmSendLocalRecv(pDnode, TDMT_MON_VM_LOAD, tDeserializeSMonVloadInfo, pInfo);
} else if (pWrapper->pMgmt != NULL) {
vmGetVnodeLoads(pWrapper->pMgmt, pInfo);
}
}
dmReleaseWrapper(pWrapper);
}
void dmGetMnodeLoads(SMonMloadInfo *pInfo) {
SDnode *pDnode = dmInstance();
SMgmtWrapper *pWrapper = dmAcquireWrapper(pDnode, MNODE);
if (pWrapper != NULL && pWrapper->required) {
if (tsMultiProcess) {
dmSendLocalRecv(pDnode, TDMT_MON_MM_LOAD, tDeserializeSMonMloadInfo, pInfo);
} else if (pWrapper->pMgmt != NULL) {
mmGetMnodeLoads(pWrapper->pMgmt, pInfo);
}
}
dmReleaseWrapper(pWrapper);
}
......@@ -89,21 +89,23 @@ typedef enum {
typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
typedef bool (*IsNodeRequiredFp)(EDndNodeType ntype);
typedef void (*SendMonitorReportFp)();
typedef void (*GetVnodeLoadsFp)();
typedef void (*GetMnodeLoadsFp)();
typedef struct {
int32_t dnodeId;
int64_t clusterId;
int64_t dnodeVer;
int64_t updateTime;
int64_t rebootTime;
bool dropped;
bool stopped;
SEpSet mnodeEps;
SArray *dnodeEps;
SHashObj *dnodeHash;
SRWLatch latch;
SMsgCb msgCb;
int32_t dnodeId;
int64_t clusterId;
int64_t dnodeVer;
int64_t updateTime;
int64_t rebootTime;
bool dropped;
bool stopped;
SEpSet mnodeEps;
SArray *dnodeEps;
SHashObj *dnodeHash;
SRWLatch latch;
SMsgCb msgCb;
} SDnodeData;
typedef struct {
......@@ -113,7 +115,9 @@ typedef struct {
SMsgCb msgCb;
ProcessCreateNodeFp processCreateNodeFp;
ProcessDropNodeFp processDropNodeFp;
IsNodeRequiredFp isNodeRequiredFp;
SendMonitorReportFp sendMonitorReportFp;
GetVnodeLoadsFp getVnodeLoadsFp;
GetMnodeLoadsFp getMnodeLoadsFp;
} SMgmtInputOpt;
typedef struct {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册