From 951ee6136cb470f61033866bb3c148e2674d8b32 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 8 Apr 2022 20:30:36 +0800 Subject: [PATCH] feat[cluster]: send monitor information in multi-process mode --- source/dnode/mgmt/bm/bmHandle.c | 27 +++- source/dnode/mgmt/bm/bmWorker.c | 52 ++++++- source/dnode/mgmt/dm/dmHandle.c | 8 - source/dnode/mgmt/dm/dmMonitor.c | 246 +++++++----------------------- source/dnode/mgmt/dm/dmWorker.c | 18 --- source/dnode/mgmt/inc/bmInt.h | 4 + source/dnode/mgmt/inc/dmInt.h | 10 +- source/dnode/mgmt/inc/mmInt.h | 3 + source/dnode/mgmt/inc/qmInt.h | 3 + source/dnode/mgmt/inc/smInt.h | 3 + source/dnode/mgmt/inc/vmInt.h | 4 + source/dnode/mgmt/mm/mmHandle.c | 25 +++ source/dnode/mgmt/mm/mmWorker.c | 28 +++- source/dnode/mgmt/qm/qmHandle.c | 25 +++ source/dnode/mgmt/qm/qmWorker.c | 46 ++++++ source/dnode/mgmt/sm/smHandle.c | 25 +++ source/dnode/mgmt/sm/smWorker.c | 46 ++++++ source/dnode/mgmt/vm/vmHandle.c | 49 ++++++ source/dnode/mgmt/vm/vmWorker.c | 25 +++ source/libs/monitor/src/monMain.c | 3 + source/util/src/tconfig.c | 2 +- 21 files changed, 414 insertions(+), 238 deletions(-) diff --git a/source/dnode/mgmt/bm/bmHandle.c b/source/dnode/mgmt/bm/bmHandle.c index 4ab000cb4e..3021c2798f 100644 --- a/source/dnode/mgmt/bm/bmHandle.c +++ b/source/dnode/mgmt/bm/bmHandle.c @@ -23,6 +23,29 @@ void bmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonBmInfo *bmInfo) { } } +int32_t bmProcessGetMonBmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { + SMonBmInfo bmInfo = {0}; + bmGetMonitorInfo(pWrapper, &bmInfo); + + int32_t rspLen = tSerializeSMonBmInfo(NULL, 0, &bmInfo); + if (rspLen < 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + void *pRsp = rpcMallocCont(rspLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + tSerializeSMonBmInfo(pRsp, rspLen, &bmInfo); + pReq->pRsp = pRsp; + pReq->rspLen = rspLen; + tFreeSMonBmInfo(&bmInfo); + return 0; +} + int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SDnode *pDnode = pWrapper->pDnode; SRpcMsg *pReq = &pMsg->rpcMsg; @@ -61,4 +84,6 @@ int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } } -void bmInitMsgHandle(SMgmtWrapper *pWrapper) {} +void bmInitMsgHandle(SMgmtWrapper *pWrapper) { + dndSetMsgHandle(pWrapper, TDMT_MON_BM_INFO, bmProcessMonitorMsg, DEFAULT_HANDLE); +} diff --git a/source/dnode/mgmt/bm/bmWorker.c b/source/dnode/mgmt/bm/bmWorker.c index a5a97f6af0..a08d390d43 100644 --- a/source/dnode/mgmt/bm/bmWorker.c +++ b/source/dnode/mgmt/bm/bmWorker.c @@ -33,7 +33,34 @@ static void bmSendErrorRsps(SMgmtWrapper *pWrapper, STaosQall *qall, int32_t num } } -static void bmProcessQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { +static void bmProcessMonQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SBnodeMgmt *pMgmt = pInfo->ahandle; + + dTrace("msg:%p, get from bnode monitor queue", pMsg); + SRpcMsg *pRpc = &pMsg->rpcMsg; + int32_t code = -1; + + if (pMsg->rpcMsg.msgType == TDMT_MON_BM_INFO) { + code = bmProcessGetMonBmInfoReq(pMgmt->pWrapper, pMsg); + } + + if (pRpc->msgType & 1U) { + if (pRpc->handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0) { + code = terrno; + dError("msg:%p, failed to process since %s", pMsg, terrstr()); + } + SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp}; + tmsgSendRsp(&rsp); + } + } + + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); + rpcFreeCont(pRpc->pCont); + taosFreeQitem(pMsg); +} + +static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SBnodeMgmt *pMgmt = pInfo->ahandle; SMgmtWrapper *pWrapper = pMgmt->pWrapper; @@ -72,18 +99,37 @@ int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return 0; } +int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SBnodeMgmt *pMgmt = pWrapper->pMgmt; + SSingleWorker *pWorker = &pMgmt->monitorWorker; + + dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); + taosWriteQitem(pWorker->queue, pMsg); + return 0; +} + int32_t bmStartWorker(SBnodeMgmt *pMgmt) { - SMultiWorkerCfg cfg = {.max = 1, .name = "bnode-write", .fp = (FItems)bmProcessQueue, .param = pMgmt}; + SMultiWorkerCfg cfg = {.max = 1, .name = "bnode-write", .fp = (FItems)bmProcessWriteQueue, .param = pMgmt}; if (tMultiWorkerInit(&pMgmt->writeWorker, &cfg) != 0) { - dError("failed to start bnode write worker since %s", terrstr()); + dError("failed to start bnode-write worker since %s", terrstr()); return -1; } + if (tsMultiProcess) { + SSingleWorkerCfg sCfg = { + .min = 1, .max = 1, .name = "bnode-monitor", .fp = (FItem)bmProcessMonQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->monitorWorker, &sCfg) != 0) { + dError("failed to start bnode-monitor worker since %s", terrstr()); + return -1; + } + } + dDebug("bnode workers are initialized"); return 0; } void bmStopWorker(SBnodeMgmt *pMgmt) { + tSingleWorkerCleanup(&pMgmt->monitorWorker); tMultiWorkerCleanup(&pMgmt->writeWorker); dDebug("bnode workers are closed"); } diff --git a/source/dnode/mgmt/dm/dmHandle.c b/source/dnode/mgmt/dm/dmHandle.c index c0175fed10..685958aaf0 100644 --- a/source/dnode/mgmt/dm/dmHandle.c +++ b/source/dnode/mgmt/dm/dmHandle.c @@ -212,12 +212,4 @@ void dmInitMsgHandle(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dmProcessMonitorMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE); - - // Monitor info exchange between processes - dndSetMsgHandle(pWrapper, TDMT_MON_MM_INFO, dmProcessMonitorMsg, DEFAULT_HANDLE); - dndSetMsgHandle(pWrapper, TDMT_MON_VM_INFO, dmProcessMonitorMsg, DEFAULT_HANDLE); - dndSetMsgHandle(pWrapper, TDMT_MON_QM_INFO, dmProcessMonitorMsg, DEFAULT_HANDLE); - dndSetMsgHandle(pWrapper, TDMT_MON_SM_INFO, dmProcessMonitorMsg, DEFAULT_HANDLE); - dndSetMsgHandle(pWrapper, TDMT_MON_BM_INFO, dmProcessMonitorMsg, DEFAULT_HANDLE); - dndSetMsgHandle(pWrapper, TDMT_MON_VM_LOAD, dmProcessMonitorMsg, DEFAULT_HANDLE); } diff --git a/source/dnode/mgmt/dm/dmMonitor.c b/source/dnode/mgmt/dm/dmMonitor.c index b9199efac2..ff5225e580 100644 --- a/source/dnode/mgmt/dm/dmMonitor.c +++ b/source/dnode/mgmt/dm/dmMonitor.c @@ -59,74 +59,92 @@ void dmSendMonitorReport(SDnode *pDnode) { tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN); epset.eps[0].port = tsServerPort; + SMgmtWrapper *pWrapper = NULL; dmGetMonitorInfo(pDnode, &dmInfo); - SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, MNODE); - if (pWrapper != NULL) { - if (!tsMultiProcess) { + pWrapper = &pDnode->wrappers[MNODE]; + if (!tsMultiProcess) { + if (dndMarkWrapper(pWrapper) != 0) { mmGetMonitorInfo(pWrapper, &mmInfo); - } else { + dndReleaseWrapper(pWrapper); + } + } else { + if (pWrapper->required) { req.msgType = TDMT_MON_MM_INFO; dndSendRecv(pDnode, &epset, &req, &rsp); - tDeserializeSMonMmInfo(rsp.pCont, rsp.contLen, &mmInfo); + if (rsp.code == 0) { + tDeserializeSMonMmInfo(rsp.pCont, rsp.contLen, &mmInfo); + } rpcFreeCont(rsp.pCont); } - dndReleaseWrapper(pWrapper); } - pWrapper = dndAcquireWrapper(pDnode, VNODES); - if (pWrapper != NULL) { - if (!tsMultiProcess) { + pWrapper = &pDnode->wrappers[VNODES]; + if (!tsMultiProcess) { + if (dndMarkWrapper(pWrapper) != 0) { vmGetMonitorInfo(pWrapper, &vmInfo); - } else { + dndReleaseWrapper(pWrapper); + } + } else { + if (pWrapper->required) { req.msgType = TDMT_MON_VM_INFO; dndSendRecv(pDnode, &epset, &req, &rsp); - dndReleaseWrapper(pWrapper); - tDeserializeSMonVmInfo(rsp.pCont, rsp.contLen, &vmInfo); + if (rsp.code == 0) { + tDeserializeSMonVmInfo(rsp.pCont, rsp.contLen, &vmInfo); + } rpcFreeCont(rsp.pCont); } } - pWrapper = dndAcquireWrapper(pDnode, QNODE); - if (pWrapper != NULL) { - if (!tsMultiProcess) { + pWrapper = &pDnode->wrappers[QNODE]; + if (!tsMultiProcess) { + if (dndMarkWrapper(pWrapper) != 0) { qmGetMonitorInfo(pWrapper, &qmInfo); - } else { + dndReleaseWrapper(pWrapper); + } + } else { + if (pWrapper->required) { req.msgType = TDMT_MON_QM_INFO; dndSendRecv(pDnode, &epset, &req, &rsp); - dndReleaseWrapper(pWrapper); - tDeserializeSMonQmInfo(rsp.pCont, rsp.contLen, &qmInfo); + if (rsp.code == 0) { + tDeserializeSMonQmInfo(rsp.pCont, rsp.contLen, &qmInfo); + } rpcFreeCont(rsp.pCont); } - dndReleaseWrapper(pWrapper); } - pWrapper = dndAcquireWrapper(pDnode, SNODE); - if (pWrapper != NULL) { - if (!tsMultiProcess) { + pWrapper = &pDnode->wrappers[SNODE]; + if (!tsMultiProcess) { + if (dndMarkWrapper(pWrapper) != 0) { smGetMonitorInfo(pWrapper, &smInfo); - } else { + dndReleaseWrapper(pWrapper); + } + } else { + if (pWrapper->required) { req.msgType = TDMT_MON_SM_INFO; dndSendRecv(pDnode, &epset, &req, &rsp); - dndReleaseWrapper(pWrapper); - tDeserializeSMonSmInfo(rsp.pCont, rsp.contLen, &smInfo); + if (rsp.code == 0) { + tDeserializeSMonSmInfo(rsp.pCont, rsp.contLen, &smInfo); + } rpcFreeCont(rsp.pCont); } - dndReleaseWrapper(pWrapper); } - pWrapper = dndAcquireWrapper(pDnode, BNODE); - if (pWrapper != NULL) { - if (!tsMultiProcess) { + pWrapper = &pDnode->wrappers[BNODE]; + if (!tsMultiProcess) { + if (dndMarkWrapper(pWrapper) != 0) { bmGetMonitorInfo(pWrapper, &bmInfo); - } else { + dndReleaseWrapper(pWrapper); + } + } else { + if (pWrapper->required) { req.msgType = TDMT_MON_BM_INFO; dndSendRecv(pDnode, &epset, &req, &rsp); - dndReleaseWrapper(pWrapper); - tDeserializeSMonBmInfo(rsp.pCont, rsp.contLen, &bmInfo); + if (rsp.code == 0) { + tDeserializeSMonBmInfo(rsp.pCont, rsp.contLen, &bmInfo); + } rpcFreeCont(rsp.pCont); } - dndReleaseWrapper(pWrapper); } monSetDmInfo(&dmInfo); @@ -143,168 +161,6 @@ void dmSendMonitorReport(SDnode *pDnode) { monSendReport(); } -int32_t dmProcessGetMonMmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq) { - SMgmtWrapper *pWrapper = dndAcquireWrapper(pMgmt->pDnode, MNODE); - if (pWrapper == NULL) return -1; - - SMonMmInfo mmInfo = {0}; - mmGetMonitorInfo(pWrapper, &mmInfo); - dndReleaseWrapper(pWrapper); - - int32_t rspLen = tSerializeSMonMmInfo(NULL, 0, &mmInfo); - if (rspLen < 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - void *pRsp = rpcMallocCont(rspLen); - if (pRsp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - tSerializeSMonMmInfo(pRsp, rspLen, &mmInfo); - pReq->pRsp = pRsp; - pReq->rspLen = rspLen; - tFreeSMonMmInfo(&mmInfo); - return 0; -} - -int32_t dmProcessGetMonVmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq) { - SMgmtWrapper *pWrapper = dndAcquireWrapper(pMgmt->pDnode, VNODES); - if (pWrapper == NULL) return -1; - - SMonVmInfo vmInfo = {0}; - vmGetMonitorInfo(pWrapper, &vmInfo); - dndReleaseWrapper(pWrapper); - - int32_t rspLen = tSerializeSMonVmInfo(NULL, 0, &vmInfo); - if (rspLen < 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - void *pRsp = rpcMallocCont(rspLen); - if (pRsp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - tSerializeSMonVmInfo(pRsp, rspLen, &vmInfo); - pReq->pRsp = pRsp; - pReq->rspLen = rspLen; - tFreeSMonVmInfo(&vmInfo); - return 0; -} - -int32_t dmProcessGetMonQmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq) { - SMgmtWrapper *pWrapper = dndAcquireWrapper(pMgmt->pDnode, QNODE); - if (pWrapper == NULL) return -1; - - SMonQmInfo qmInfo = {0}; - qmGetMonitorInfo(pWrapper, &qmInfo); - dndReleaseWrapper(pWrapper); - - int32_t rspLen = tSerializeSMonQmInfo(NULL, 0, &qmInfo); - if (rspLen < 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - void *pRsp = rpcMallocCont(rspLen); - if (pRsp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - tSerializeSMonQmInfo(pRsp, rspLen, &qmInfo); - pReq->pRsp = pRsp; - pReq->rspLen = rspLen; - tFreeSMonQmInfo(&qmInfo); - return 0; -} - -int32_t dmProcessGetMonSmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq) { - SMgmtWrapper *pWrapper = dndAcquireWrapper(pMgmt->pDnode, SNODE); - if (pWrapper == NULL) return -1; - - SMonSmInfo smInfo = {0}; - smGetMonitorInfo(pWrapper, &smInfo); - dndReleaseWrapper(pWrapper); - - int32_t rspLen = tSerializeSMonSmInfo(NULL, 0, &smInfo); - if (rspLen < 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - void *pRsp = rpcMallocCont(rspLen); - if (pRsp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - tSerializeSMonSmInfo(pRsp, rspLen, &smInfo); - pReq->pRsp = pRsp; - pReq->rspLen = rspLen; - tFreeSMonSmInfo(&smInfo); - return 0; -} - -int32_t dmProcessGetMonBmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq) { - SMgmtWrapper *pWrapper = dndAcquireWrapper(pMgmt->pDnode, BNODE); - if (pWrapper == NULL) return -1; - - SMonBmInfo bmInfo = {0}; - bmGetMonitorInfo(pWrapper, &bmInfo); - dndReleaseWrapper(pWrapper); - - int32_t rspLen = tSerializeSMonBmInfo(NULL, 0, &bmInfo); - if (rspLen < 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - void *pRsp = rpcMallocCont(rspLen); - if (pRsp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - tSerializeSMonBmInfo(pRsp, rspLen, &bmInfo); - pReq->pRsp = pRsp; - pReq->rspLen = rspLen; - tFreeSMonBmInfo(&bmInfo); - return 0; -} - -int32_t dmProcessGetVnodeLoadsReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq) { - SMgmtWrapper *pWrapper = dndAcquireWrapper(pMgmt->pDnode, VNODES); - if (pWrapper == NULL) return -1; - - SMonVloadInfo vloads = {0}; - vmGetVnodeLoads(pWrapper, &vloads); - dndReleaseWrapper(pWrapper); - - int32_t rspLen = tSerializeSMonVloadInfo(NULL, 0, &vloads); - if (rspLen < 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - void *pRsp = rpcMallocCont(rspLen); - if (pRsp == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - tSerializeSMonVloadInfo(pRsp, rspLen, &vloads); - pReq->pRsp = pRsp; - pReq->rspLen = rspLen; - tFreeSMonVloadInfo(&vloads); - return 0; -} - void dmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) { if (!tsMultiProcess) { vmGetVnodeLoads(pWrapper, pInfo); diff --git a/source/dnode/mgmt/dm/dmWorker.c b/source/dnode/mgmt/dm/dmWorker.c index a6d0f4491a..41b38c8bb7 100644 --- a/source/dnode/mgmt/dm/dmWorker.c +++ b/source/dnode/mgmt/dm/dmWorker.c @@ -78,24 +78,6 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { case TDMT_MND_GRANT_RSP: code = dmProcessGrantRsp(pMgmt, pMsg); break; - case TDMT_MON_MM_INFO: - code = dmProcessGetMonMmInfoReq(pMgmt, pMsg); - break; - case TDMT_MON_VM_INFO: - code = dmProcessGetMonVmInfoReq(pMgmt, pMsg); - break; - case TDMT_MON_QM_INFO: - code = dmProcessGetMonQmInfoReq(pMgmt, pMsg); - break; - case TDMT_MON_SM_INFO: - code = dmProcessGetMonSmInfoReq(pMgmt, pMsg); - break; - case TDMT_MON_BM_INFO: - code = dmProcessGetMonBmInfoReq(pMgmt, pMsg); - break; - case TDMT_MON_VM_LOAD: - code = dmProcessGetVnodeLoadsReq(pMgmt, pMsg); - break; default: code = dmProcessCDnodeReq(pMgmt->pDnode, pMsg); break; diff --git a/source/dnode/mgmt/inc/bmInt.h b/source/dnode/mgmt/inc/bmInt.h index 919b1d2c7c..84a6a53e99 100644 --- a/source/dnode/mgmt/inc/bmInt.h +++ b/source/dnode/mgmt/inc/bmInt.h @@ -17,6 +17,7 @@ #define _TD_DND_BNODE_INT_H_ #include "dndInt.h" + #include "bnode.h" #ifdef __cplusplus @@ -29,6 +30,7 @@ typedef struct SBnodeMgmt { SMgmtWrapper *pWrapper; const char *path; SMultiWorker writeWorker; + SSingleWorker monitorWorker; } SBnodeMgmt; // bmInt.c @@ -39,11 +41,13 @@ int32_t bmDrop(SMgmtWrapper *pWrapper); void bmInitMsgHandle(SMgmtWrapper *pWrapper); int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t bmProcessGetMonBmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); // bmWorker.c int32_t bmStartWorker(SBnodeMgmt *pMgmt); void bmStopWorker(SBnodeMgmt *pMgmt); int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/inc/dmInt.h b/source/dnode/mgmt/inc/dmInt.h index 6d37f59595..a671368f06 100644 --- a/source/dnode/mgmt/inc/dmInt.h +++ b/source/dnode/mgmt/inc/dmInt.h @@ -54,14 +54,8 @@ int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t dmProcessCDnodeReq(SDnode *pDnode, SNodeMsg *pMsg); // dmMonitor.c -int32_t dmProcessGetVnodeLoadsReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq); -int32_t dmProcessGetMonMmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq); -int32_t dmProcessGetMonVmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq); -int32_t dmProcessGetMonQmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq); -int32_t dmProcessGetMonSmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq); -int32_t dmProcessGetMonBmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq); -void dmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo); -void dmSendMonitorReport(SDnode *pDnode); +void dmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo); +void dmSendMonitorReport(SDnode *pDnode); // dmWorker.c int32_t dmStartThread(SDnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/inc/mmInt.h b/source/dnode/mgmt/inc/mmInt.h index d09d15255d..df63e22059 100644 --- a/source/dnode/mgmt/inc/mmInt.h +++ b/source/dnode/mgmt/inc/mmInt.h @@ -32,6 +32,7 @@ typedef struct SMnodeMgmt { SSingleWorker readWorker; SSingleWorker writeWorker; SSingleWorker syncWorker; + SSingleWorker monitorWorker; SReplica replicas[TSDB_MAX_REPLICA]; int8_t replica; int8_t selfIndex; @@ -51,6 +52,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper); int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t mmProcessGetMonMmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); // mmWorker.c int32_t mmStartWorker(SMnodeMgmt *pMgmt); @@ -59,6 +61,7 @@ int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t mmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); diff --git a/source/dnode/mgmt/inc/qmInt.h b/source/dnode/mgmt/inc/qmInt.h index 02905413ab..012869d637 100644 --- a/source/dnode/mgmt/inc/qmInt.h +++ b/source/dnode/mgmt/inc/qmInt.h @@ -30,6 +30,7 @@ typedef struct SQnodeMgmt { const char *path; SSingleWorker queryWorker; SSingleWorker fetchWorker; + SSingleWorker monitorWorker; } SQnodeMgmt; // qmInt.c @@ -40,6 +41,7 @@ int32_t qmDrop(SMgmtWrapper *pWrapper); void qmInitMsgHandle(SMgmtWrapper *pWrapper); int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t qmProcessGetMonQmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); // qmWorker.c int32_t qmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); @@ -50,6 +52,7 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt); void qmStopWorker(SQnodeMgmt *pMgmt); int32_t qmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t qmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t qmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/inc/smInt.h b/source/dnode/mgmt/inc/smInt.h index 285ec84942..039dea2491 100644 --- a/source/dnode/mgmt/inc/smInt.h +++ b/source/dnode/mgmt/inc/smInt.h @@ -32,6 +32,7 @@ typedef struct SSnodeMgmt { int8_t uniqueWorkerInUse; SArray *uniqueWorkers; // SArray SSingleWorker sharedWorker; + SSingleWorker monitorWorker; } SSnodeMgmt; // smInt.c @@ -42,6 +43,7 @@ int32_t smDrop(SMgmtWrapper *pWrapper); void smInitMsgHandle(SMgmtWrapper *pWrapper); int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t smProcessGetMonSmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); // smWorker.c int32_t smStartWorker(SSnodeMgmt *pMgmt); @@ -50,6 +52,7 @@ int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t smProcessExecMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/inc/vmInt.h b/source/dnode/mgmt/inc/vmInt.h index ca210d62c6..f8466fe4f2 100644 --- a/source/dnode/mgmt/inc/vmInt.h +++ b/source/dnode/mgmt/inc/vmInt.h @@ -39,6 +39,7 @@ typedef struct SVnodesMgmt { SDnode *pDnode; SMgmtWrapper *pWrapper; SSingleWorker mgmtWorker; + SSingleWorker monitorWorker; } SVnodesMgmt; typedef struct { @@ -92,6 +93,8 @@ int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); +int32_t vmProcessGetMonVmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); +int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq); // vmFile.c int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); @@ -115,6 +118,7 @@ int32_t vmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrappert, SNodeMsg *pMsg); +int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mm/mmHandle.c b/source/dnode/mgmt/mm/mmHandle.c index 1ebcfdcda9..b5aff12992 100644 --- a/source/dnode/mgmt/mm/mmHandle.c +++ b/source/dnode/mgmt/mm/mmHandle.c @@ -25,6 +25,29 @@ void mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonMmInfo *mmInfo) { } } +int32_t mmProcessGetMonMmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { + SMonMmInfo mmInfo = {0}; + mmGetMonitorInfo(pWrapper, &mmInfo); + + int32_t rspLen = tSerializeSMonMmInfo(NULL, 0, &mmInfo); + if (rspLen < 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + void *pRsp = rpcMallocCont(rspLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + tSerializeSMonMmInfo(pRsp, rspLen, &mmInfo); + pReq->pRsp = pRsp; + pReq->rspLen = rspLen; + tFreeSMonMmInfo(&mmInfo); + return 0; +} + int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SDnode *pDnode = pWrapper->pDnode; SRpcMsg *pReq = &pMsg->rpcMsg; @@ -83,6 +106,8 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { } void mmInitMsgHandle(SMgmtWrapper *pWrapper) { + dndSetMsgHandle(pWrapper, TDMT_MON_MM_INFO, mmProcessMonitorMsg, DEFAULT_HANDLE); + // Requests handled by DNODE dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); diff --git a/source/dnode/mgmt/mm/mmWorker.c b/source/dnode/mgmt/mm/mmWorker.c index 735ef53b37..85aa265904 100644 --- a/source/dnode/mgmt/mm/mmWorker.c +++ b/source/dnode/mgmt/mm/mmWorker.c @@ -23,11 +23,13 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { SRpcMsg *pRpc = &pMsg->rpcMsg; int32_t code = -1; - if (pMsg->rpcMsg.msgType != TDMT_DND_ALTER_MNODE) { + if (pMsg->rpcMsg.msgType == TDMT_DND_ALTER_MNODE) { + code = mmProcessAlterReq(pMgmt, pMsg); + } else if (pMsg->rpcMsg.msgType == TDMT_MON_MM_INFO) { + code = mmProcessGetMonMmInfoReq(pMgmt->pWrapper, pMsg); + } else { pMsg->pNode = pMgmt->pMnode; code = mndProcessMsg(pMsg); - } else { - code = mmProcessAlterReq(pMgmt, pMsg); } if (pRpc->msgType & 1U) { @@ -98,6 +100,15 @@ int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return 0; } +int32_t mmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SMnodeMgmt *pMgmt = pWrapper->pMgmt; + SSingleWorker *pWorker = &pMgmt->monitorWorker; + + dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); + taosWriteQitem(pWorker->queue, pMsg); + return 0; +} + static int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) { SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); if (pMsg == NULL) return -1; @@ -157,15 +168,24 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { SSingleWorkerCfg sCfg = {.min = 1, .max = 1, .name = "mnode-sync", .fp = (FItem)mmProcessQueue, .param = pMgmt}; if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) { - dError("failed to start mnode sync-worker since %s", terrstr()); + dError("failed to start mnode mnode-sync worker since %s", terrstr()); return -1; } + if (tsMultiProcess) { + SSingleWorkerCfg sCfg = {.min = 1, .max = 1, .name = "mnode-monitor", .fp = (FItem)mmProcessQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->monitorWorker, &sCfg) != 0) { + dError("failed to start mnode mnode-monitor worker since %s", terrstr()); + return -1; + } + } + dDebug("mnode workers are initialized"); return 0; } void mmStopWorker(SMnodeMgmt *pMgmt) { + tSingleWorkerCleanup(&pMgmt->monitorWorker); tSingleWorkerCleanup(&pMgmt->queryWorker); tSingleWorkerCleanup(&pMgmt->readWorker); tSingleWorkerCleanup(&pMgmt->writeWorker); diff --git a/source/dnode/mgmt/qm/qmHandle.c b/source/dnode/mgmt/qm/qmHandle.c index 3c29bd717b..c96b396691 100644 --- a/source/dnode/mgmt/qm/qmHandle.c +++ b/source/dnode/mgmt/qm/qmHandle.c @@ -23,6 +23,29 @@ void qmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonQmInfo *qmInfo) { } } +int32_t qmProcessGetMonQmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { + SMonQmInfo qmInfo = {0}; + qmGetMonitorInfo(pWrapper, &qmInfo); + + int32_t rspLen = tSerializeSMonQmInfo(NULL, 0, &qmInfo); + if (rspLen < 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + void *pRsp = rpcMallocCont(rspLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + tSerializeSMonQmInfo(pRsp, rspLen, &qmInfo); + pReq->pRsp = pRsp; + pReq->rspLen = rspLen; + tFreeSMonQmInfo(&qmInfo); + return 0; +} + int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SDnode *pDnode = pWrapper->pDnode; SRpcMsg *pReq = &pMsg->rpcMsg; @@ -62,6 +85,8 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } void qmInitMsgHandle(SMgmtWrapper *pWrapper) { + dndSetMsgHandle(pWrapper, TDMT_MON_QM_INFO, qmProcessMonitorMsg, DEFAULT_HANDLE); + // Requests handled by VNODE dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, qmProcessQueryMsg, QNODE_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, qmProcessQueryMsg, QNODE_HANDLE); diff --git a/source/dnode/mgmt/qm/qmWorker.c b/source/dnode/mgmt/qm/qmWorker.c index db0752949d..e9d1173f20 100644 --- a/source/dnode/mgmt/qm/qmWorker.c +++ b/source/dnode/mgmt/qm/qmWorker.c @@ -21,6 +21,33 @@ static void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { tmsgSendRsp(&rsp); } +static void qmProcessMonQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SQnodeMgmt *pMgmt = pInfo->ahandle; + + dTrace("msg:%p, get from qnode monitor queue", pMsg); + SRpcMsg *pRpc = &pMsg->rpcMsg; + int32_t code = -1; + + if (pMsg->rpcMsg.msgType == TDMT_MON_SM_INFO) { + code = qmProcessGetMonQmInfoReq(pMgmt->pWrapper, pMsg); + } + + if (pRpc->msgType & 1U) { + if (pRpc->handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0) { + code = terrno; + dError("msg:%p, failed to process since %s", pMsg, terrstr()); + } + SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp}; + tmsgSendRsp(&rsp); + } + } + + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); + rpcFreeCont(pRpc->pCont); + taosFreeQitem(pMsg); +} + static void qmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { SQnodeMgmt *pMgmt = pInfo->ahandle; @@ -66,6 +93,15 @@ int32_t qmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return 0; } +int32_t qmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SQnodeMgmt *pMgmt = pWrapper->pMgmt; + SSingleWorker *pWorker = &pMgmt->monitorWorker; + + dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); + taosWriteQitem(pWorker->queue, pMsg); + return 0; +} + static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) { SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); if (pMsg == NULL) { @@ -128,11 +164,21 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { return -1; } + if (tsMultiProcess) { + SSingleWorkerCfg sCfg = { + .min = 1, .max = 1, .name = "qnode-monitor", .fp = (FItem)qmProcessMonQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->monitorWorker, &sCfg) != 0) { + dError("failed to start qnode-monitor worker since %s", terrstr()); + return -1; + } + } + dDebug("qnode workers are initialized"); return 0; } void qmStopWorker(SQnodeMgmt *pMgmt) { + tSingleWorkerCleanup(&pMgmt->monitorWorker); tSingleWorkerCleanup(&pMgmt->queryWorker); tSingleWorkerCleanup(&pMgmt->fetchWorker); dDebug("qnode workers are closed"); diff --git a/source/dnode/mgmt/sm/smHandle.c b/source/dnode/mgmt/sm/smHandle.c index 214dc581b8..5500db4513 100644 --- a/source/dnode/mgmt/sm/smHandle.c +++ b/source/dnode/mgmt/sm/smHandle.c @@ -23,6 +23,29 @@ void smGetMonitorInfo(SMgmtWrapper *pWrapper, SMonSmInfo *smInfo) { } } +int32_t smProcessGetMonSmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { + SMonSmInfo smInfo = {0}; + smGetMonitorInfo(pWrapper, &smInfo); + + int32_t rspLen = tSerializeSMonSmInfo(NULL, 0, &smInfo); + if (rspLen < 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + void *pRsp = rpcMallocCont(rspLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + tSerializeSMonSmInfo(pRsp, rspLen, &smInfo); + pReq->pRsp = pRsp; + pReq->rspLen = rspLen; + tFreeSMonSmInfo(&smInfo); + return 0; +} + int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SDnode *pDnode = pWrapper->pDnode; SRpcMsg *pReq = &pMsg->rpcMsg; @@ -62,6 +85,8 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } void smInitMsgHandle(SMgmtWrapper *pWrapper) { + dndSetMsgHandle(pWrapper, TDMT_MON_SM_INFO, smProcessMonitorMsg, DEFAULT_HANDLE); + // Requests handled by SNODE dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, smProcessMgmtMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, smProcessExecMsg, DEFAULT_HANDLE); diff --git a/source/dnode/mgmt/sm/smWorker.c b/source/dnode/mgmt/sm/smWorker.c index afa843953b..33ae289733 100644 --- a/source/dnode/mgmt/sm/smWorker.c +++ b/source/dnode/mgmt/sm/smWorker.c @@ -16,6 +16,33 @@ #define _DEFAULT_SOURCE #include "smInt.h" +static void smProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { + SSnodeMgmt *pMgmt = pInfo->ahandle; + + dTrace("msg:%p, get from snode monitor queue", pMsg); + SRpcMsg *pRpc = &pMsg->rpcMsg; + int32_t code = -1; + + if (pMsg->rpcMsg.msgType == TDMT_MON_SM_INFO) { + code = smProcessGetMonSmInfoReq(pMgmt->pWrapper, pMsg); + } + + if (pRpc->msgType & 1U) { + if (pRpc->handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { + if (code != 0) { + code = terrno; + dError("msg:%p, failed to process since %s", pMsg, terrstr()); + } + SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp}; + tmsgSendRsp(&rsp); + } + } + + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); + rpcFreeCont(pRpc->pCont); + taosFreeQitem(pMsg); +} + static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SSnodeMgmt *pMgmt = pInfo->ahandle; @@ -80,11 +107,21 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { return -1; } + if (tsMultiProcess) { + SSingleWorkerCfg sCfg = { + .min = 1, .max = 1, .name = "snode-monitor", .fp = (FItem)smProcessMonitorQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->monitorWorker, &sCfg) != 0) { + dError("failed to start snode-monitor worker since %s", terrstr()); + return -1; + } + } + dDebug("snode workers are initialized"); return 0; } void smStopWorker(SSnodeMgmt *pMgmt) { + tSingleWorkerCleanup(&pMgmt->monitorWorker); for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) { SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, i); tMultiWorkerCleanup(pWorker); @@ -120,6 +157,15 @@ int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return 0; } +int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SSnodeMgmt *pMgmt = pWrapper->pMgmt; + SSingleWorker *pWorker = &pMgmt->monitorWorker; + + dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); + taosWriteQitem(pWorker->queue, pMsg); + return 0; +} + int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SSnodeMgmt *pMgmt = pWrapper->pMgmt; int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); diff --git a/source/dnode/mgmt/vm/vmHandle.c b/source/dnode/mgmt/vm/vmHandle.c index 0433846ca1..f003d8c58a 100644 --- a/source/dnode/mgmt/vm/vmHandle.c +++ b/source/dnode/mgmt/vm/vmHandle.c @@ -38,6 +38,52 @@ void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *vmInfo) { } } +int32_t vmProcessGetMonVmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { + SMonVmInfo vmInfo = {0}; + vmGetMonitorInfo(pWrapper, &vmInfo); + + int32_t rspLen = tSerializeSMonVmInfo(NULL, 0, &vmInfo); + if (rspLen < 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + void *pRsp = rpcMallocCont(rspLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + tSerializeSMonVmInfo(pRsp, rspLen, &vmInfo); + pReq->pRsp = pRsp; + pReq->rspLen = rspLen; + tFreeSMonVmInfo(&vmInfo); + return 0; +} + +int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) { + SMonVloadInfo vloads = {0}; + vmGetVnodeLoads(pWrapper, &vloads); + + int32_t rspLen = tSerializeSMonVloadInfo(NULL, 0, &vloads); + if (rspLen < 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + void *pRsp = rpcMallocCont(rspLen); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + tSerializeSMonVloadInfo(pRsp, rspLen, &vloads); + pReq->pRsp = pRsp; + pReq->rspLen = rspLen; + tFreeSMonVloadInfo(&vloads); + return 0; +} + static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->vgId = pCreate->vgId; pCfg->wsize = pCreate->cacheBlockSize; @@ -261,6 +307,9 @@ int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { } void vmInitMsgHandle(SMgmtWrapper *pWrapper) { + dndSetMsgHandle(pWrapper, TDMT_MON_VM_INFO, vmProcessMonitorMsg, DEFAULT_HANDLE); + dndSetMsgHandle(pWrapper, TDMT_MON_VM_LOAD, vmProcessMonitorMsg, DEFAULT_HANDLE); + // Requests handled by VNODE dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE); diff --git a/source/dnode/mgmt/vm/vmWorker.c b/source/dnode/mgmt/vm/vmWorker.c index ed1a4ca2f4..82c45f172d 100644 --- a/source/dnode/mgmt/vm/vmWorker.c +++ b/source/dnode/mgmt/vm/vmWorker.c @@ -29,6 +29,12 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg); switch (msgType) { + case TDMT_MON_VM_INFO: + code = vmProcessGetMonVmInfoReq(pMgmt->pWrapper, pMsg); + break; + case TDMT_MON_VM_LOAD: + code = vmProcessGetVnodeLoadsReq(pMgmt->pWrapper, pMsg); + break; case TDMT_DND_CREATE_VNODE: code = vmProcessCreateVnodeReq(pMgmt, pMsg); break; @@ -255,6 +261,15 @@ int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { return 0; } +int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SVnodesMgmt *pMgmt = pWrapper->pMgmt; + SSingleWorker *pWorker = &pMgmt->monitorWorker; + + dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); + taosWriteQitem(pWorker->queue, pMsg); + return 0; +} + static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) { SVnodesMgmt *pMgmt = pWrapper->pMgmt; SMsgHead *pHead = pRpc->pCont; @@ -412,11 +427,21 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { return -1; } + if (tsMultiProcess) { + SSingleWorkerCfg sCfg = { + .min = 1, .max = 1, .name = "vnode-monitor", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt}; + if (tSingleWorkerInit(&pMgmt->monitorWorker, &sCfg) != 0) { + dError("failed to start mnode vnode-monitor worker since %s", terrstr()); + return -1; + } + } + dDebug("vnode workers are initialized"); return 0; } void vmStopWorker(SVnodesMgmt *pMgmt) { + tSingleWorkerCleanup(&pMgmt->monitorWorker); tSingleWorkerCleanup(&pMgmt->mgmtWorker); tQWorkerCleanup(&pMgmt->fetchPool); tQWorkerCleanup(&pMgmt->queryPool); diff --git a/source/libs/monitor/src/monMain.c b/source/libs/monitor/src/monMain.c index c012911e8b..af7799cd7c 100644 --- a/source/libs/monitor/src/monMain.c +++ b/source/libs/monitor/src/monMain.c @@ -187,6 +187,7 @@ static void monGenBasicJson(SMonInfo *pMonitor) { static void monGenClusterJson(SMonInfo *pMonitor) { SMonClusterInfo *pInfo = &pMonitor->mmInfo.cluster; + if (pMonitor->mmInfo.cluster.first_ep_dnode_id == 0) return; SJson *pJson = tjsonCreateObject(); if (pJson == NULL) return; @@ -239,6 +240,7 @@ static void monGenClusterJson(SMonInfo *pMonitor) { static void monGenVgroupJson(SMonInfo *pMonitor) { SMonVgroupInfo *pInfo = &pMonitor->mmInfo.vgroup; + if (pMonitor->mmInfo.cluster.first_ep_dnode_id == 0) return; SJson *pJson = tjsonAddArrayToObject(pMonitor->pJson, "vgroup_infos"); if (pJson == NULL) return; @@ -277,6 +279,7 @@ static void monGenVgroupJson(SMonInfo *pMonitor) { static void monGenGrantJson(SMonInfo *pMonitor) { SMonGrantInfo *pInfo = &pMonitor->mmInfo.grant; + if (pMonitor->mmInfo.cluster.first_ep_dnode_id == 0) return; SJson *pJson = tjsonCreateObject(); if (pJson == NULL) return; diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index e7e870e998..74d7c15e78 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -348,7 +348,7 @@ SConfigItem *cfgGetItem(SConfig *pCfg, const char *name) { } } - uError("name:%s, cfg not found", name); + // uError("name:%s, cfg not found", name); terrno = TSDB_CODE_CFG_NOT_FOUND; return NULL; } -- GitLab