提交 ffaef98b 编写于 作者: S Shengliang Guan

feat[cluster]: send monitor information in multi-process mode

上级 a722d3df
......@@ -17,15 +17,13 @@
#include "bmInt.h"
void bmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonBmInfo *bmInfo) {
if (pWrapper->procType == PROC_CHILD) {
dmGetMonitorSysInfo(&bmInfo->sys);
monGetLogs(&bmInfo->log);
}
}
int32_t bmProcessGetMonBmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
SMonBmInfo bmInfo = {0};
bmGetMonitorInfo(pWrapper, &bmInfo);
dmGetMonitorSysInfo(&bmInfo.sys);
monGetLogs(&bmInfo.log);
int32_t rspLen = tSerializeSMonBmInfo(NULL, 0, &bmInfo);
if (rspLen < 0) {
......
......@@ -33,6 +33,15 @@ static void bmSendErrorRsps(SMgmtWrapper *pWrapper, STaosQall *qall, int32_t num
}
}
static inline void bmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.code = code,
.pCont = pMsg->pRsp,
.contLen = pMsg->rspLen};
tmsgSendRsp(&rsp);
}
static void bmProcessMonQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SBnodeMgmt *pMgmt = pInfo->ahandle;
......@@ -45,14 +54,8 @@ static void bmProcessMonQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
}
if (pRpc->msgType & 1U) {
if (pRpc->handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
if (code != 0) {
code = terrno;
dError("msg:%p, failed to process since %s", pMsg, terrstr());
}
SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp};
tmsgSendRsp(&rsp);
}
if (code != 0 && terrno != 0) code = terrno;
bmSendRsp(pMgmt->pWrapper, pMsg, code);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
......@@ -116,13 +119,13 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
}
if (tsMultiProcess) {
SSingleWorkerCfg sCfg = {
SSingleWorkerCfg mCfg = {
.min = 1, .max = 1, .name = "bnode-monitor", .fp = (FItem)bmProcessMonQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &sCfg) != 0) {
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
dError("failed to start bnode-monitor worker since %s", terrstr());
return -1;
}
}
}
dDebug("bnode workers are initialized");
return 0;
......
......@@ -62,8 +62,9 @@ void dmSendMonitorReport(SDnode *pDnode) {
SMgmtWrapper *pWrapper = NULL;
dmGetMonitorInfo(pDnode, &dmInfo);
bool getFromAPI = !tsMultiProcess;
pWrapper = &pDnode->wrappers[MNODE];
if (!tsMultiProcess) {
if (getFromAPI) {
if (dndMarkWrapper(pWrapper) != 0) {
mmGetMonitorInfo(pWrapper, &mmInfo);
dndReleaseWrapper(pWrapper);
......@@ -72,7 +73,7 @@ void dmSendMonitorReport(SDnode *pDnode) {
if (pWrapper->required) {
req.msgType = TDMT_MON_MM_INFO;
dndSendRecv(pDnode, &epset, &req, &rsp);
if (rsp.code == 0) {
if (rsp.code == 0 && rsp.contLen > 0) {
tDeserializeSMonMmInfo(rsp.pCont, rsp.contLen, &mmInfo);
}
rpcFreeCont(rsp.pCont);
......@@ -80,7 +81,7 @@ void dmSendMonitorReport(SDnode *pDnode) {
}
pWrapper = &pDnode->wrappers[VNODES];
if (!tsMultiProcess) {
if (getFromAPI) {
if (dndMarkWrapper(pWrapper) != 0) {
vmGetMonitorInfo(pWrapper, &vmInfo);
dndReleaseWrapper(pWrapper);
......@@ -89,7 +90,7 @@ void dmSendMonitorReport(SDnode *pDnode) {
if (pWrapper->required) {
req.msgType = TDMT_MON_VM_INFO;
dndSendRecv(pDnode, &epset, &req, &rsp);
if (rsp.code == 0) {
if (rsp.code == 0 && rsp.contLen > 0) {
tDeserializeSMonVmInfo(rsp.pCont, rsp.contLen, &vmInfo);
}
rpcFreeCont(rsp.pCont);
......@@ -97,7 +98,7 @@ void dmSendMonitorReport(SDnode *pDnode) {
}
pWrapper = &pDnode->wrappers[QNODE];
if (!tsMultiProcess) {
if (getFromAPI) {
if (dndMarkWrapper(pWrapper) != 0) {
qmGetMonitorInfo(pWrapper, &qmInfo);
dndReleaseWrapper(pWrapper);
......@@ -106,7 +107,7 @@ void dmSendMonitorReport(SDnode *pDnode) {
if (pWrapper->required) {
req.msgType = TDMT_MON_QM_INFO;
dndSendRecv(pDnode, &epset, &req, &rsp);
if (rsp.code == 0) {
if (rsp.code == 0 && rsp.contLen > 0) {
tDeserializeSMonQmInfo(rsp.pCont, rsp.contLen, &qmInfo);
}
rpcFreeCont(rsp.pCont);
......@@ -114,7 +115,7 @@ void dmSendMonitorReport(SDnode *pDnode) {
}
pWrapper = &pDnode->wrappers[SNODE];
if (!tsMultiProcess) {
if (getFromAPI) {
if (dndMarkWrapper(pWrapper) != 0) {
smGetMonitorInfo(pWrapper, &smInfo);
dndReleaseWrapper(pWrapper);
......@@ -123,7 +124,7 @@ void dmSendMonitorReport(SDnode *pDnode) {
if (pWrapper->required) {
req.msgType = TDMT_MON_SM_INFO;
dndSendRecv(pDnode, &epset, &req, &rsp);
if (rsp.code == 0) {
if (rsp.code == 0 && rsp.contLen > 0) {
tDeserializeSMonSmInfo(rsp.pCont, rsp.contLen, &smInfo);
}
rpcFreeCont(rsp.pCont);
......@@ -131,7 +132,7 @@ void dmSendMonitorReport(SDnode *pDnode) {
}
pWrapper = &pDnode->wrappers[BNODE];
if (!tsMultiProcess) {
if (getFromAPI) {
if (dndMarkWrapper(pWrapper) != 0) {
bmGetMonitorInfo(pWrapper, &bmInfo);
dndReleaseWrapper(pWrapper);
......@@ -140,7 +141,7 @@ void dmSendMonitorReport(SDnode *pDnode) {
if (pWrapper->required) {
req.msgType = TDMT_MON_BM_INFO;
dndSendRecv(pDnode, &epset, &req, &rsp);
if (rsp.code == 0) {
if (rsp.code == 0 && rsp.contLen > 0) {
tDeserializeSMonBmInfo(rsp.pCont, rsp.contLen, &bmInfo);
}
rpcFreeCont(rsp.pCont);
......@@ -162,7 +163,8 @@ void dmSendMonitorReport(SDnode *pDnode) {
}
void dmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) {
if (!tsMultiProcess) {
bool getFromAPI = !tsMultiProcess;
if (getFromAPI) {
vmGetVnodeLoads(pWrapper, pInfo);
} else {
SRpcMsg req = {.msgType = TDMT_MON_VM_LOAD};
......@@ -172,7 +174,7 @@ void dmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) {
epset.eps[0].port = tsServerPort;
dndSendRecv(pWrapper->pDnode, &epset, &req, &rsp);
if (rsp.code == 0) {
if (rsp.code == 0 && rsp.contLen > 0) {
tDeserializeSMonVloadInfo(rsp.pCont, rsp.contLen, pInfo);
}
rpcFreeCont(rsp.pCont);
......
......@@ -19,15 +19,13 @@
void mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonMmInfo *mmInfo) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
mndGetMonitorInfo(pMgmt->pMnode, &mmInfo->cluster, &mmInfo->vgroup, &mmInfo->grant);
if (pWrapper->procType == PROC_CHILD) {
dmGetMonitorSysInfo(&mmInfo->sys);
monGetLogs(&mmInfo->log);
}
}
int32_t mmProcessGetMonMmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
SMonMmInfo mmInfo = {0};
mmGetMonitorInfo(pWrapper, &mmInfo);
dmGetMonitorSysInfo(&mmInfo.sys);
monGetLogs(&mmInfo.log);
int32_t rspLen = tSerializeSMonMmInfo(NULL, 0, &mmInfo);
if (rspLen < 0) {
......
......@@ -34,10 +34,7 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
if (pRpc->msgType & 1U) {
if (pRpc->handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
if (code != 0) {
code = terrno;
dError("msg:%p, failed to process since %s", pMsg, terrstr());
}
if (code != 0 && terrno != 0) code = terrno;
SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp};
tmsgSendRsp(&rsp);
}
......@@ -173,8 +170,8 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
}
if (tsMultiProcess) {
SSingleWorkerCfg sCfg = {.min = 1, .max = 1, .name = "mnode-monitor", .fp = (FItem)mmProcessQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &sCfg) != 0) {
SSingleWorkerCfg mCfg = {.min = 1, .max = 1, .name = "mnode-monitor", .fp = (FItem)mmProcessQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
dError("failed to start mnode mnode-monitor worker since %s", terrstr());
return -1;
}
......
......@@ -17,15 +17,13 @@
#include "qmInt.h"
void qmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonQmInfo *qmInfo) {
if (pWrapper->procType == PROC_CHILD) {
dmGetMonitorSysInfo(&qmInfo->sys);
monGetLogs(&qmInfo->log);
}
}
int32_t qmProcessGetMonQmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
SMonQmInfo qmInfo = {0};
qmGetMonitorInfo(pWrapper, &qmInfo);
dmGetMonitorSysInfo(&qmInfo.sys);
monGetLogs(&qmInfo.log);
int32_t rspLen = tSerializeSMonQmInfo(NULL, 0, &qmInfo);
if (rspLen < 0) {
......
......@@ -16,8 +16,12 @@
#define _DEFAULT_SOURCE
#include "qmInt.h"
static void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
static inline void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.code = code,
.pCont = pMsg->pRsp,
.contLen = pMsg->rspLen};
tmsgSendRsp(&rsp);
}
......@@ -33,14 +37,8 @@ static void qmProcessMonQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
}
if (pRpc->msgType & 1U) {
if (pRpc->handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
if (code != 0) {
code = terrno;
dError("msg:%p, failed to process since %s", pMsg, terrstr());
}
SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp};
tmsgSendRsp(&rsp);
}
if (code != 0 && terrno != 0) code = terrno;
qmSendRsp(pMgmt->pWrapper, pMsg, code);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
......@@ -165,9 +163,9 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
}
if (tsMultiProcess) {
SSingleWorkerCfg sCfg = {
SSingleWorkerCfg mCfg = {
.min = 1, .max = 1, .name = "qnode-monitor", .fp = (FItem)qmProcessMonQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &sCfg) != 0) {
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
dError("failed to start qnode-monitor worker since %s", terrstr());
return -1;
}
......
......@@ -16,17 +16,14 @@
#define _DEFAULT_SOURCE
#include "smInt.h"
void smGetMonitorInfo(SMgmtWrapper *pWrapper, SMonSmInfo *smInfo) {
if (pWrapper->procType == PROC_CHILD) {
dmGetMonitorSysInfo(&smInfo->sys);
monGetLogs(&smInfo->log);
}
}
void smGetMonitorInfo(SMgmtWrapper *pWrapper, SMonSmInfo *smInfo) {}
int32_t smProcessGetMonSmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
SMonSmInfo smInfo = {0};
smGetMonitorInfo(pWrapper, &smInfo);
dmGetMonitorSysInfo(&smInfo.sys);
monGetLogs(&smInfo.log);
int32_t rspLen = tSerializeSMonSmInfo(NULL, 0, &smInfo);
if (rspLen < 0) {
terrno = TSDB_CODE_INVALID_MSG;
......
......@@ -16,6 +16,15 @@
#define _DEFAULT_SOURCE
#include "smInt.h"
static inline void smSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.code = code,
.pCont = pMsg->pRsp,
.contLen = pMsg->rspLen};
tmsgSendRsp(&rsp);
}
static void smProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SSnodeMgmt *pMgmt = pInfo->ahandle;
......@@ -28,14 +37,8 @@ static void smProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
}
if (pRpc->msgType & 1U) {
if (pRpc->handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
if (code != 0) {
code = terrno;
dError("msg:%p, failed to process since %s", pMsg, terrstr());
}
SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp};
tmsgSendRsp(&rsp);
}
if (code != 0 && terrno != 0) code = terrno;
smSendRsp(pMgmt->pWrapper, pMsg, code);
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
......@@ -108,9 +111,9 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
}
if (tsMultiProcess) {
SSingleWorkerCfg sCfg = {
SSingleWorkerCfg mCfg = {
.min = 1, .max = 1, .name = "snode-monitor", .fp = (FItem)smProcessMonitorQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &sCfg) != 0) {
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
dError("failed to start snode-monitor worker since %s", terrstr());
return -1;
}
......
......@@ -31,16 +31,13 @@ void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *vmInfo) {
pMgmt->state.numOfBatchInsertSuccessReqs - pMgmt->lastState.numOfBatchInsertSuccessReqs;
pMgmt->lastState = pMgmt->state;
taosWUnLockLatch(&pMgmt->latch);
if (pWrapper->procType == PROC_CHILD) {
dmGetMonitorSysInfo(&vmInfo->sys);
monGetLogs(&vmInfo->log);
}
}
int32_t vmProcessGetMonVmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
SMonVmInfo vmInfo = {0};
vmGetMonitorInfo(pWrapper, &vmInfo);
dmGetMonitorSysInfo(&vmInfo.sys);
monGetLogs(&vmInfo.log);
int32_t rspLen = tSerializeSMonVmInfo(NULL, 0, &vmInfo);
if (rspLen < 0) {
......
......@@ -16,8 +16,12 @@
#define _DEFAULT_SOURCE
#include "vmInt.h"
static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
static inline void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.code = code,
.pCont = pMsg->pRsp,
.contLen = pMsg->rspLen};
tmsgSendRsp(&rsp);
}
......@@ -26,7 +30,7 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
int32_t code = -1;
tmsg_t msgType = pMsg->rpcMsg.msgType;
dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg);
dTrace("msg:%p, will be processed in vnode-m queue", pMsg);
switch (msgType) {
case TDMT_MON_VM_INFO:
......@@ -428,9 +432,9 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
}
if (tsMultiProcess) {
SSingleWorkerCfg sCfg = {
SSingleWorkerCfg mCfg = {
.min = 1, .max = 1, .name = "vnode-monitor", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &sCfg) != 0) {
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
dError("failed to start mnode vnode-monitor worker since %s", terrstr());
return -1;
}
......
......@@ -237,7 +237,7 @@ int32_t mndGetDnodeSize(SMnode *pMnode) {
bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs) {
int64_t interval = TABS(pDnode->lastAccessTime - curMs);
if (interval > 3500 * tsStatusInterval) {
if (interval > 30000 * tsStatusInterval) {
if (pDnode->rebootTime > 0) {
pDnode->offlineReason = DND_REASON_STATUS_MSG_TIMEOUT;
}
......
......@@ -502,7 +502,11 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
SMonVgroupDesc desc = {0};
desc.vgroup_id = pVgroup->vgId;
strncpy(desc.database_name, pVgroup->dbName, sizeof(desc.database_name));
SName name = {0};
tNameFromString(&name, pVgroup->dbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
tNameGetDbName(&name, desc.database_name);
desc.tables_num = pVgroup->numOfTables;
pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
tstrncpy(desc.status, "unsynced", sizeof(desc.status));
......
......@@ -529,7 +529,9 @@ void monSendReport() {
char *pCont = tjsonToString(pMonitor->pJson);
if (pCont != NULL) {
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
taosSendHttpReport(tsMonitor.cfg.server, tsMonitor.cfg.port, pCont, strlen(pCont), flag);
if (taosSendHttpReport(tsMonitor.cfg.server, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) {
uError("failed to send monitor msg since %s", terrstr());
}
taosMemoryFree(pCont);
}
......
......@@ -117,7 +117,7 @@ _OVER:
static void clientConnCb(uv_connect_t* req, int32_t status) {
if (status < 0) {
terrno = TAOS_SYSTEM_ERROR(status);
uError("Connection error %s\n", uv_strerror(status));
uError("connection error %s", uv_strerror(status));
uv_close((uv_handle_t*)req->handle, NULL);
return;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册