提交 f98ceb04 编写于 作者: S Shengliang Guan

feat: send monitor information in multi-process mode

上级 e84abcfb
...@@ -162,7 +162,7 @@ typedef struct { ...@@ -162,7 +162,7 @@ typedef struct {
} SMonVmInfo; } SMonVmInfo;
int32_t tSerializeSMonVmInfo(void *buf, int32_t bufLen, SMonVmInfo *pInfo); int32_t tSerializeSMonVmInfo(void *buf, int32_t bufLen, SMonVmInfo *pInfo);
int32_t tDeserializeSMonVMmInfo(void *buf, int32_t bufLen, SMonVmInfo *pInfo); int32_t tDeserializeSMonVmInfo(void *buf, int32_t bufLen, SMonVmInfo *pInfo);
void tFreeSMonVmInfo(SMonVmInfo *pInfo); void tFreeSMonVmInfo(SMonVmInfo *pInfo);
typedef struct { typedef struct {
...@@ -171,7 +171,7 @@ typedef struct { ...@@ -171,7 +171,7 @@ typedef struct {
} SMonQmInfo; } SMonQmInfo;
int32_t tSerializeSMonQmInfo(void *buf, int32_t bufLen, SMonQmInfo *pInfo); int32_t tSerializeSMonQmInfo(void *buf, int32_t bufLen, SMonQmInfo *pInfo);
int32_t tDeserializeSMonQMmInfo(void *buf, int32_t bufLen, SMonQmInfo *pInfo); int32_t tDeserializeSMonQmInfo(void *buf, int32_t bufLen, SMonQmInfo *pInfo);
void tFreeSMonQmInfo(SMonQmInfo *pInfo); void tFreeSMonQmInfo(SMonQmInfo *pInfo);
typedef struct { typedef struct {
...@@ -191,6 +191,14 @@ int32_t tSerializeSMonBmInfo(void *buf, int32_t bufLen, SMonBmInfo *pInfo); ...@@ -191,6 +191,14 @@ int32_t tSerializeSMonBmInfo(void *buf, int32_t bufLen, SMonBmInfo *pInfo);
int32_t tDeserializeSMonBmInfo(void *buf, int32_t bufLen, SMonBmInfo *pInfo); int32_t tDeserializeSMonBmInfo(void *buf, int32_t bufLen, SMonBmInfo *pInfo);
void tFreeSMonBmInfo(SMonBmInfo *pInfo); void tFreeSMonBmInfo(SMonBmInfo *pInfo);
typedef struct {
SArray *pVloads; // SVnodeLoad
} SMonVloadInfo;
int32_t tSerializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInfo);
int32_t tDeserializeSMonVloadInfo(void *buf, int32_t bufLen, SMonVloadInfo *pInfo);
void tFreeSMonVloadInfo(SMonVloadInfo *pInfo);
typedef struct { typedef struct {
const char *server; const char *server;
uint16_t port; uint16_t port;
......
...@@ -16,6 +16,13 @@ ...@@ -16,6 +16,13 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "bmInt.h" #include "bmInt.h"
void bmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonBmInfo *bmInfo) {
if (pWrapper->procType == PROC_CHILD) {
dmGetMonitorSysInfo(&bmInfo->sys);
monGetLogs(&bmInfo->logs);
}
}
int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg; SRpcMsg *pReq = &pMsg->rpcMsg;
......
...@@ -42,8 +42,9 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { ...@@ -42,8 +42,9 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, VNODES); SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, VNODES);
if (pWrapper != NULL) { if (pWrapper != NULL) {
req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad)); SMonVloadInfo info = {0};
vmMonitorVnodeLoads(pWrapper, req.pVloads); dmGetVnodeLoads(pWrapper, &info);
req.pVloads = info.pVloads;
dndReleaseWrapper(pWrapper); dndReleaseWrapper(pWrapper);
} }
...@@ -117,7 +118,6 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -117,7 +118,6 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return TSDB_CODE_OPS_NOT_SUPPORT; return TSDB_CODE_OPS_NOT_SUPPORT;
} }
static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, EDndType ntype, SNodeMsg *pMsg) { static int32_t dmProcessCreateNodeMsg(SDnode *pDnode, EDndType ntype, SNodeMsg *pMsg) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype); SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, ntype);
if (pWrapper != NULL) { if (pWrapper != NULL) {
...@@ -209,10 +209,15 @@ void dmInitMsgHandle(SMgmtWrapper *pWrapper) { ...@@ -209,10 +209,15 @@ void dmInitMsgHandle(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, dmProcessMgmtMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, dmProcessMgmtMsg, DEFAULT_HANDLE);
// Requests handled by MNODE // Requests handled by MNODE
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dmProcessStatusMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dmProcessMonitorMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE);
// Monitor info exchange between processes // Monitor info exchange between processes
dndSetMsgHandle(pWrapper, TDMT_MON_DISK_INFO, dmProcessStatusMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_MON_MM_INFO, dmProcessMonitorMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_MON_VM_INFO, dmProcessMonitorMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_MON_QM_INFO, dmProcessMonitorMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_MON_SM_INFO, dmProcessMonitorMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_MON_BM_INFO, dmProcessMonitorMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_MON_VM_LOAD, dmProcessMonitorMsg, DEFAULT_HANDLE);
} }
...@@ -16,20 +16,6 @@ ...@@ -16,20 +16,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dmInt.h" #include "dmInt.h"
static int32_t dmGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) {
tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name));
pInfo->logdir.size = tsLogSpace.size;
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
pInfo->tempdir.size = tsTempSpace.size;
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, VNODES);
if (pWrapper != NULL) {
vmMonitorTfsInfo(pWrapper, pInfo);
dndReleaseWrapper(pWrapper);
}
return 0;
}
static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) { static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
pInfo->protocol = 1; pInfo->protocol = 1;
pInfo->dnode_id = pDnode->dnodeId; pInfo->dnode_id = pDnode->dnodeId;
...@@ -39,101 +25,313 @@ static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) { ...@@ -39,101 +25,313 @@ static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
static void dmGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { static void dmGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
pInfo->uptime = (taosGetTimestampMs() - pDnode->rebootTime) / (86400000.0f); pInfo->uptime = (taosGetTimestampMs() - pDnode->rebootTime) / (86400000.0f);
taosGetCpuUsage(&pInfo->cpu_engine, &pInfo->cpu_system); SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, MNODE);
taosGetCpuCores(&pInfo->cpu_cores);
taosGetProcMemory(&pInfo->mem_engine);
taosGetSysMemory(&pInfo->mem_system);
pInfo->mem_total = tsTotalMemoryKB;
pInfo->disk_engine = 0;
pInfo->disk_used = tsDataSpace.size.used;
pInfo->disk_total = tsDataSpace.size.total;
taosGetCardInfo(&pInfo->net_in, &pInfo->net_out);
taosGetProcIO(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk);
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, VNODES);
if (pWrapper != NULL) {
vmMonitorVnodeReqs(pWrapper, pInfo);
dndReleaseWrapper(pWrapper);
}
pWrapper = dndAcquireWrapper(pDnode, MNODE);
if (pWrapper != NULL) { if (pWrapper != NULL) {
pInfo->has_mnode = pWrapper->required; pInfo->has_mnode = pWrapper->required;
dndReleaseWrapper(pWrapper); dndReleaseWrapper(pWrapper);
} }
tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name));
pInfo->logdir.size = tsLogSpace.size;
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
pInfo->tempdir.size = tsTempSpace.size;
}
static void dmGetMonitorInfo(SDnode *pDnode, SMonDmInfo *pInfo) {
dmGetMonitorBasicInfo(pDnode, &pInfo->basic);
dmGetMonitorSysInfo(&pInfo->sys);
dmGetMonitorDnodeInfo(pDnode, &pInfo->dnode);
} }
void dmSendMonitorReport(SDnode *pDnode) { void dmSendMonitorReport(SDnode *pDnode) {
if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return; if (!tsEnableMonitor || tsMonitorFqdn[0] == 0 || tsMonitorPort == 0) return;
dTrace("send monitor report to %s:%u", tsMonitorFqdn, tsMonitorPort); dTrace("send monitor report to %s:%u", tsMonitorFqdn, tsMonitorPort);
SMonInfo *pMonitor = monCreateMonitorInfo(); SMonDmInfo dmInfo = {0};
if (pMonitor == NULL) return; SMonMmInfo mmInfo = {0};
SMonVmInfo vmInfo = {0};
SMonQmInfo qmInfo = {0};
SMonSmInfo smInfo = {0};
SMonBmInfo bmInfo = {0};
SMonBasicInfo basicInfo = {0}; SRpcMsg req = {0};
dmGetMonitorBasicInfo(pDnode, &basicInfo); SRpcMsg rsp;
monSetBasicInfo(pMonitor, &basicInfo); SEpSet epset = {.inUse = 0, .numOfEps = 1};
tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
epset.eps[0].port = tsServerPort;
SMonClusterInfo clusterInfo = {0}; dmGetMonitorInfo(pDnode, &dmInfo);
SMonVgroupInfo vgroupInfo = {0};
SMonGrantInfo grantInfo = {0};
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, MNODE); SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, MNODE);
if (pWrapper != NULL) { if (pWrapper != NULL) {
if (mmMonitorMnodeInfo(pWrapper, &clusterInfo, &vgroupInfo, &grantInfo) == 0) { if (!tsMultiProcess) {
monSetClusterInfo(pMonitor, &clusterInfo); mmGetMonitorInfo(pWrapper, &mmInfo);
monSetVgroupInfo(pMonitor, &vgroupInfo); } else {
monSetGrantInfo(pMonitor, &grantInfo); req.msgType = TDMT_MON_MM_INFO;
dndSendRecv(pDnode, &epset, &req, &rsp);
tDeserializeSMonMmInfo(rsp.pCont, rsp.contLen, &mmInfo);
rpcFreeCont(rsp.pCont);
} }
dndReleaseWrapper(pWrapper); dndReleaseWrapper(pWrapper);
} }
SMonDnodeInfo dnodeInfo = {0}; pWrapper = dndAcquireWrapper(pDnode, VNODES);
dmGetMonitorDnodeInfo(pDnode, &dnodeInfo); if (pWrapper != NULL) {
monSetDnodeInfo(pMonitor, &dnodeInfo); if (!tsMultiProcess) {
vmGetMonitorInfo(pWrapper, &vmInfo);
} else {
req.msgType = TDMT_MON_VM_INFO;
dndSendRecv(pDnode, &epset, &req, &rsp);
dndReleaseWrapper(pWrapper);
tDeserializeSMonVmInfo(rsp.pCont, rsp.contLen, &vmInfo);
rpcFreeCont(rsp.pCont);
}
}
pWrapper = dndAcquireWrapper(pDnode, QNODE);
if (pWrapper != NULL) {
if (!tsMultiProcess) {
qmGetMonitorInfo(pWrapper, &qmInfo);
} else {
req.msgType = TDMT_MON_QM_INFO;
dndSendRecv(pDnode, &epset, &req, &rsp);
dndReleaseWrapper(pWrapper);
tDeserializeSMonQmInfo(rsp.pCont, rsp.contLen, &qmInfo);
rpcFreeCont(rsp.pCont);
}
dndReleaseWrapper(pWrapper);
}
pWrapper = dndAcquireWrapper(pDnode, SNODE);
if (pWrapper != NULL) {
if (!tsMultiProcess) {
smGetMonitorInfo(pWrapper, &smInfo);
} else {
req.msgType = TDMT_MON_SM_INFO;
dndSendRecv(pDnode, &epset, &req, &rsp);
dndReleaseWrapper(pWrapper);
tDeserializeSMonSmInfo(rsp.pCont, rsp.contLen, &smInfo);
rpcFreeCont(rsp.pCont);
}
dndReleaseWrapper(pWrapper);
}
pWrapper = dndAcquireWrapper(pDnode, BNODE);
if (pWrapper != NULL) {
if (!tsMultiProcess) {
bmGetMonitorInfo(pWrapper, &bmInfo);
} else {
req.msgType = TDMT_MON_BM_INFO;
dndSendRecv(pDnode, &epset, &req, &rsp);
dndReleaseWrapper(pWrapper);
tDeserializeSMonBmInfo(rsp.pCont, rsp.contLen, &bmInfo);
rpcFreeCont(rsp.pCont);
}
dndReleaseWrapper(pWrapper);
}
monSetDmInfo(&dmInfo);
monSetMmInfo(&mmInfo);
monSetVmInfo(&vmInfo);
monSetQmInfo(&qmInfo);
monSetSmInfo(&smInfo);
monSetBmInfo(&bmInfo);
tFreeSMonMmInfo(&mmInfo);
tFreeSMonVmInfo(&vmInfo);
tFreeSMonQmInfo(&qmInfo);
tFreeSMonSmInfo(&smInfo);
tFreeSMonBmInfo(&bmInfo);
monSendReport();
}
int32_t dmProcessGetMonMmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pMgmt->pDnode, MNODE);
if (pWrapper == NULL) return -1;
SMonMmInfo mmInfo = {0};
mmGetMonitorInfo(pWrapper, &mmInfo);
dndReleaseWrapper(pWrapper);
int32_t rspLen = tSerializeSMonMmInfo(NULL, 0, &mmInfo);
if (rspLen < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSMonMmInfo(pRsp, rspLen, &mmInfo);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
tFreeSMonMmInfo(&mmInfo);
return 0;
}
int32_t dmProcessGetMonVmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pMgmt->pDnode, VNODES);
if (pWrapper == NULL) return -1;
SMonVmInfo vmInfo = {0};
vmGetMonitorInfo(pWrapper, &vmInfo);
dndReleaseWrapper(pWrapper);
int32_t rspLen = tSerializeSMonVmInfo(NULL, 0, &vmInfo);
if (rspLen < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSMonVmInfo(pRsp, rspLen, &vmInfo);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
tFreeSMonVmInfo(&vmInfo);
return 0;
}
int32_t dmProcessGetMonQmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pMgmt->pDnode, QNODE);
if (pWrapper == NULL) return -1;
SMonQmInfo qmInfo = {0};
qmGetMonitorInfo(pWrapper, &qmInfo);
dndReleaseWrapper(pWrapper);
SMonDiskInfo diskInfo = {0}; int32_t rspLen = tSerializeSMonQmInfo(NULL, 0, &qmInfo);
if (dmGetMonitorDiskInfo(pDnode, &diskInfo) == 0) { if (rspLen < 0) {
monSetDiskInfo(pMonitor, &diskInfo); terrno = TSDB_CODE_INVALID_MSG;
return -1;
} }
taosArrayDestroy(clusterInfo.dnodes); void *pRsp = rpcMallocCont(rspLen);
taosArrayDestroy(clusterInfo.mnodes); if (pRsp == NULL) {
taosArrayDestroy(vgroupInfo.vgroups); terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(diskInfo.datadirs); return -1;
}
monSendReport(pMonitor); tSerializeSMonQmInfo(pRsp, rspLen, &qmInfo);
monCleanupMonitorInfo(pMonitor); pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
tFreeSMonQmInfo(&qmInfo);
return 0;
} }
int32_t dmSetDiskInfo(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { int32_t dmProcessGetMonSmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq) {
SDnode *pDnode = pMgmt->pDnode; SMgmtWrapper *pWrapper = dndAcquireWrapper(pMgmt->pDnode, SNODE);
SMonDiskInfo info = {0}; if (pWrapper == NULL) return -1;
SMonSmInfo smInfo = {0};
smGetMonitorInfo(pWrapper, &smInfo);
dndReleaseWrapper(pWrapper);
if (tDeserializeSMonDiskInfo(pMsg->rpcMsg.pCont, pMsg->rpcMsg.contLen, &info) != 0) { int32_t rspLen = tSerializeSMonSmInfo(NULL, 0, &smInfo);
dError("failed to parse diskinfo since %s", terrstr()); if (rspLen < 0) {
return 0; terrno = TSDB_CODE_INVALID_MSG;
return -1;
} }
taosWLockLatch(&pMgmt->latch); void *pRsp = rpcMallocCont(rspLen);
memcpy(&pMgmt->diskInfo, &info, sizeof(SMonDiskInfo)); if (pRsp == NULL) {
taosWUnLockLatch(&pMgmt->latch); terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSMonSmInfo(pRsp, rspLen, &smInfo);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
tFreeSMonSmInfo(&smInfo);
return 0; return 0;
} }
int32_t dmSetVnodeStat(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { int32_t dmProcessGetMonBmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq) {
SDnode *pDnode = pMgmt->pDnode; SMgmtWrapper *pWrapper = dndAcquireWrapper(pMgmt->pDnode, BNODE);
SVnodesStat info = {0}; if (pWrapper == NULL) return -1;
SMonBmInfo bmInfo = {0};
bmGetMonitorInfo(pWrapper, &bmInfo);
dndReleaseWrapper(pWrapper);
if (tDeserializeSMonDiskInfo(pMsg->rpcMsg.pCont, pMsg->rpcMsg.contLen, &info) != 0) { int32_t rspLen = tSerializeSMonBmInfo(NULL, 0, &bmInfo);
dError("failed to parse diskinfo since %s", terrstr()); if (rspLen < 0) {
return 0; terrno = TSDB_CODE_INVALID_MSG;
return -1;
} }
taosWLockLatch(&pMgmt->latch); void *pRsp = rpcMallocCont(rspLen);
memcpy(&pMgmt->diskInfo, &info, sizeof(SMonDiskInfo)); if (pRsp == NULL) {
taosWUnLockLatch(&pMgmt->latch); terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSMonBmInfo(pRsp, rspLen, &bmInfo);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
tFreeSMonBmInfo(&bmInfo);
return 0; return 0;
} }
\ No newline at end of file
int32_t dmProcessGetVnodeLoadsReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pMgmt->pDnode, VNODES);
if (pWrapper == NULL) return -1;
SMonVloadInfo vloads = {0};
vmGetVnodeLoads(pWrapper, &vloads);
dndReleaseWrapper(pWrapper);
int32_t rspLen = tSerializeSMonVloadInfo(NULL, 0, &vloads);
if (rspLen < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSMonVloadInfo(pRsp, rspLen, &vloads);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
tFreeSMonVloadInfo(&vloads);
return 0;
}
void dmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) {
if (!tsMultiProcess) {
vmGetVnodeLoads(pWrapper, pInfo);
} else {
SRpcMsg req = {.msgType = TDMT_MON_VM_LOAD};
SRpcMsg rsp = {0};
SEpSet epset = {.inUse = 0, .numOfEps = 1};
tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
epset.eps[0].port = tsServerPort;
dndSendRecv(pWrapper->pDnode, &epset, &req, &rsp);
if (rsp.code == 0) {
tDeserializeSMonVloadInfo(rsp.pCont, rsp.contLen, pInfo);
}
rpcFreeCont(rsp.pCont);
}
}
void dmGetMonitorSysInfo(SMonSysInfo *pInfo) {
taosGetCpuUsage(&pInfo->cpu_engine, &pInfo->cpu_system);
taosGetCpuCores(&pInfo->cpu_cores);
taosGetProcMemory(&pInfo->mem_engine);
taosGetSysMemory(&pInfo->mem_system);
pInfo->mem_total = tsTotalMemoryKB;
pInfo->disk_engine = 0;
pInfo->disk_used = tsDataSpace.size.used;
pInfo->disk_total = tsDataSpace.size.total;
taosGetCardInfoDelta(&pInfo->net_in, &pInfo->net_out);
taosGetProcIODelta(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk);
}
...@@ -78,8 +78,23 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -78,8 +78,23 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
case TDMT_MND_GRANT_RSP: case TDMT_MND_GRANT_RSP:
code = dmProcessGrantRsp(pMgmt, pMsg); code = dmProcessGrantRsp(pMgmt, pMsg);
break; break;
case TDMT_MON_DISK_INFO_RSP: case TDMT_MON_MM_INFO:
code = dmSetDiskInfo(pMgmt, pMsg); code = dmProcessGetMonMmInfoReq(pMgmt, pMsg);
break;
case TDMT_MON_VM_INFO:
code = dmProcessGetMonVmInfoReq(pMgmt, pMsg);
break;
case TDMT_MON_QM_INFO:
code = dmProcessGetMonQmInfoReq(pMgmt, pMsg);
break;
case TDMT_MON_SM_INFO:
code = dmProcessGetMonSmInfoReq(pMgmt, pMsg);
break;
case TDMT_MON_BM_INFO:
code = dmProcessGetMonBmInfoReq(pMgmt, pMsg);
break;
case TDMT_MON_VM_LOAD:
code = dmProcessGetVnodeLoadsReq(pMgmt, pMsg);
break; break;
default: default:
code = dmProcessCDnodeReq(pMgmt->pDnode, pMsg); code = dmProcessCDnodeReq(pMgmt->pDnode, pMsg);
...@@ -134,7 +149,7 @@ int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -134,7 +149,7 @@ int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return 0; return 0;
} }
int32_t dmProcessStatusMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t dmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SDnodeMgmt *pMgmt = pWrapper->pMgmt; SDnodeMgmt *pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->statusWorker; SSingleWorker *pWorker = &pMgmt->statusWorker;
......
...@@ -37,11 +37,6 @@ typedef struct SDnodeMgmt { ...@@ -37,11 +37,6 @@ typedef struct SDnodeMgmt {
const char *path; const char *path;
SDnode *pDnode; SDnode *pDnode;
SMgmtWrapper *pWrapper; SMgmtWrapper *pWrapper;
// monitor infos
SMonDiskInfo diskInfo;
SMonVnodesStat vnodesStat;
SMonVnodesLoad vnodesLoad;
} SDnodeMgmt; } SDnodeMgmt;
// dmFile.c // dmFile.c
...@@ -59,9 +54,13 @@ int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); ...@@ -59,9 +54,13 @@ int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessCDnodeReq(SDnode *pDnode, SNodeMsg *pMsg); int32_t dmProcessCDnodeReq(SDnode *pDnode, SNodeMsg *pMsg);
// dmMonitor.c // dmMonitor.c
int32_t dmSetDiskInfo(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t dmProcessGetVnodeLoadsReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq);
int32_t dmSetVnodesStat(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t dmProcessGetMonMmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq);
int32_t dmSetVnodesLoad(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t dmProcessGetMonVmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq);
int32_t dmProcessGetMonQmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq);
int32_t dmProcessGetMonSmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq);
int32_t dmProcessGetMonBmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq);
void dmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo);
void dmSendMonitorReport(SDnode *pDnode); void dmSendMonitorReport(SDnode *pDnode);
// dmWorker.c // dmWorker.c
...@@ -69,7 +68,7 @@ int32_t dmStartThread(SDnodeMgmt *pMgmt); ...@@ -69,7 +68,7 @@ int32_t dmStartThread(SDnodeMgmt *pMgmt);
int32_t dmStartWorker(SDnodeMgmt *pMgmt); int32_t dmStartWorker(SDnodeMgmt *pMgmt);
void dmStopWorker(SDnodeMgmt *pMgmt); void dmStopWorker(SDnodeMgmt *pMgmt);
int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t dmProcessStatusMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t dmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -169,6 +169,7 @@ void dndCleanupTrans(SDnode *pDnode); ...@@ -169,6 +169,7 @@ void dndCleanupTrans(SDnode *pDnode);
SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper); SMsgCb dndCreateMsgcb(SMgmtWrapper *pWrapper);
SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper); SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper);
int32_t dndInitMsgHandle(SDnode *pDnode); int32_t dndInitMsgHandle(SDnode *pDnode);
void dndSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
// mgmt // mgmt
void dmSetMgmtFp(SMgmtWrapper *pWrapper); void dmSetMgmtFp(SMgmtWrapper *pWrapper);
...@@ -182,22 +183,13 @@ void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); ...@@ -182,22 +183,13 @@ void dmGetMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet); void dmUpdateMnodeEpSet(SDnodeMgmt *pMgmt, SEpSet *pEpSet);
void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pMsg); void dmSendRedirectRsp(SDnodeMgmt *pMgmt, const SRpcMsg *pMsg);
typedef struct { void dmGetMonitorSysInfo(SMonSysInfo *pInfo);
int32_t openVnodes; void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo);
int32_t totalVnodes; void mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonMmInfo *mmInfo);
int32_t masterNum; void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *vmInfo);
int64_t numOfSelectReqs; void qmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonQmInfo *qmInfo);
int64_t numOfInsertReqs; void smGetMonitorInfo(SMgmtWrapper *pWrapper, SMonSmInfo *smInfo);
int64_t numOfInsertSuccessReqs; void bmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonBmInfo *bmInfo);
int64_t numOfBatchInsertReqs;
int64_t numOfBatchInsertSuccessReqs;
} SVnodesStat;
void vmMonitorVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads);
int32_t vmMonitorTfsInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo);
void vmMonitorVnodeReqs(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo);
int32_t mmMonitorMnodeInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -28,6 +28,7 @@ typedef struct SVnodesMgmt { ...@@ -28,6 +28,7 @@ typedef struct SVnodesMgmt {
SHashObj *hash; SHashObj *hash;
SRWLatch latch; SRWLatch latch;
SVnodesStat state; SVnodesStat state;
SVnodesStat lastState;
STfs *pTfs; STfs *pTfs;
SQWorkerPool queryPool; SQWorkerPool queryPool;
SQWorkerPool fetchPool; SQWorkerPool fetchPool;
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dndInt.h" #include "dndInt.h"
...@@ -482,4 +482,8 @@ SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) { ...@@ -482,4 +482,8 @@ SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) {
.parent = pWrapper, .parent = pWrapper,
.name = pWrapper->name}; .name = pWrapper->name};
return cfg; return cfg;
}
void dndSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) {
rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp);
} }
\ No newline at end of file
...@@ -16,6 +16,15 @@ ...@@ -16,6 +16,15 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mmInt.h" #include "mmInt.h"
void mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonMmInfo *mmInfo) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
mndGetMonitorInfo(pMgmt->pMnode, &mmInfo->cluster, &mmInfo->vgroup, &mmInfo->grant);
if (pWrapper->procType == PROC_CHILD) {
dmGetMonitorSysInfo(&mmInfo->sys);
monGetLogs(&mmInfo->logs);
}
}
int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg; SRpcMsg *pReq = &pMsg->rpcMsg;
...@@ -161,5 +170,4 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) { ...@@ -161,5 +170,4 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, mmProcessQueryMsg, MNODE_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, mmProcessQueryMsg, MNODE_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, mmProcessQueryMsg, MNODE_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, mmProcessQueryMsg, MNODE_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, mmProcessQueryMsg, MNODE_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, mmProcessQueryMsg, MNODE_HANDLE);
} }
...@@ -241,8 +241,3 @@ void mmSetMgmtFp(SMgmtWrapper *pWrapper) { ...@@ -241,8 +241,3 @@ void mmSetMgmtFp(SMgmtWrapper *pWrapper) {
pWrapper->fp = mgmtFp; pWrapper->fp = mgmtFp;
} }
int32_t mmMonitorMnodeInfo(SMgmtWrapper *pWrapper, SMonClusterInfo *pClusterInfo, SMonVgroupInfo *pVgroupInfo,
SMonGrantInfo *pGrantInfo) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
return mndGetMonitorInfo(pMgmt->pMnode, pClusterInfo, pVgroupInfo, pGrantInfo);
}
...@@ -16,6 +16,13 @@ ...@@ -16,6 +16,13 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "qmInt.h" #include "qmInt.h"
void qmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonQmInfo *qmInfo) {
if (pWrapper->procType == PROC_CHILD) {
dmGetMonitorSysInfo(&qmInfo->sys);
monGetLogs(&qmInfo->logs);
}
}
int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg; SRpcMsg *pReq = &pMsg->rpcMsg;
......
...@@ -16,6 +16,13 @@ ...@@ -16,6 +16,13 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "smInt.h" #include "smInt.h"
void smGetMonitorInfo(SMgmtWrapper *pWrapper, SMonSmInfo *smInfo) {
if (pWrapper->procType == PROC_CHILD) {
dmGetMonitorSysInfo(&smInfo->sys);
monGetLogs(&smInfo->logs);
}
}
int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg; SRpcMsg *pReq = &pMsg->rpcMsg;
......
...@@ -16,6 +16,28 @@ ...@@ -16,6 +16,28 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "vmInt.h" #include "vmInt.h"
void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *vmInfo) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
tfsGetMonitorInfo(pMgmt->pTfs, &vmInfo->tfs);
taosWLockLatch(&pMgmt->latch);
vmInfo->vstat.totalVnodes = pMgmt->state.totalVnodes;
vmInfo->vstat.masterNum = pMgmt->state.masterNum;
vmInfo->vstat.numOfSelectReqs = pMgmt->state.numOfSelectReqs - pMgmt->lastState.numOfSelectReqs;
vmInfo->vstat.numOfInsertReqs = pMgmt->state.numOfInsertReqs - pMgmt->lastState.numOfInsertReqs;
vmInfo->vstat.numOfInsertSuccessReqs = pMgmt->state.numOfInsertSuccessReqs - pMgmt->lastState.numOfInsertSuccessReqs;
vmInfo->vstat.numOfBatchInsertReqs = pMgmt->state.numOfBatchInsertReqs - pMgmt->lastState.numOfBatchInsertReqs;
vmInfo->vstat.numOfBatchInsertSuccessReqs =
pMgmt->state.numOfBatchInsertSuccessReqs - pMgmt->lastState.numOfBatchInsertSuccessReqs;
pMgmt->lastState = pMgmt->state;
taosWUnLockLatch(&pMgmt->latch);
if (pWrapper->procType == PROC_CHILD) {
dmGetMonitorSysInfo(&vmInfo->sys);
monGetLogs(&vmInfo->logs);
}
}
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->vgId = pCreate->vgId; pCfg->vgId = pCreate->vgId;
pCfg->wsize = pCreate->cacheBlockSize; pCfg->wsize = pCreate->cacheBlockSize;
......
...@@ -344,38 +344,21 @@ void vmSetMgmtFp(SMgmtWrapper *pWrapper) { ...@@ -344,38 +344,21 @@ void vmSetMgmtFp(SMgmtWrapper *pWrapper) {
pWrapper->fp = mgmtFp; pWrapper->fp = mgmtFp;
} }
int32_t vmMonitorTfsInfo(SMgmtWrapper *pWrapper, SMonDiskInfo *pInfo) { void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt; SVnodesMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return -1;
return tfsGetMonitorInfo(pMgmt->pTfs, pInfo);
}
void vmMonitorVnodeReqs(SMgmtWrapper *pWrapper, SMonDnodeInfo *pInfo) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
SVnodesStat *pStat = &pMgmt->state; SVnodesStat *pStat = &pMgmt->state;
pInfo->req_select = pStat->numOfSelectReqs; SArray *pLoads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoad));
pInfo->req_insert = pStat->numOfInsertReqs;
pInfo->req_insert_success = pStat->numOfInsertSuccessReqs;
pInfo->req_insert_batch = pStat->numOfBatchInsertReqs;
pInfo->req_insert_batch_success = pStat->numOfBatchInsertSuccessReqs;
pInfo->errors = tsNumOfErrorLogs;
pInfo->vnodes_num = pStat->totalVnodes;
pInfo->masters = pStat->masterNum;
}
void vmMonitorVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads) { int32_t totalVnodes = 0;
SVnodesMgmt *pMgmt = pWrapper->pMgmt; int32_t masterNum = 0;
SVnodesStat *pStat = &pMgmt->state; int64_t numOfSelectReqs = 0;
int32_t totalVnodes = 0; int64_t numOfInsertReqs = 0;
int32_t masterNum = 0; int64_t numOfInsertSuccessReqs = 0;
int64_t numOfSelectReqs = 0; int64_t numOfBatchInsertReqs = 0;
int64_t numOfInsertReqs = 0; int64_t numOfBatchInsertSuccessReqs = 0;
int64_t numOfInsertSuccessReqs = 0;
int64_t numOfBatchInsertReqs = 0; pInfo->pVloads = pLoads;
int64_t numOfBatchInsertSuccessReqs = 0; if (pLoads == NULL) return;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
...@@ -402,6 +385,7 @@ void vmMonitorVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads) { ...@@ -402,6 +385,7 @@ void vmMonitorVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads) {
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
taosWLockLatch(&pMgmt->latch);
pStat->totalVnodes = totalVnodes; pStat->totalVnodes = totalVnodes;
pStat->masterNum = masterNum; pStat->masterNum = masterNum;
pStat->numOfSelectReqs = numOfSelectReqs; pStat->numOfSelectReqs = numOfSelectReqs;
...@@ -409,4 +393,5 @@ void vmMonitorVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads) { ...@@ -409,4 +393,5 @@ void vmMonitorVnodeLoads(SMgmtWrapper *pWrapper, SArray *pLoads) {
pStat->numOfInsertSuccessReqs = numOfInsertSuccessReqs; pStat->numOfInsertSuccessReqs = numOfInsertSuccessReqs;
pStat->numOfBatchInsertReqs = numOfBatchInsertReqs; pStat->numOfBatchInsertReqs = numOfBatchInsertReqs;
pStat->numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs; pStat->numOfBatchInsertSuccessReqs = numOfBatchInsertSuccessReqs;
taosWUnLockLatch(&pMgmt->latch);
} }
\ No newline at end of file
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册