提交 951ee613 编写于 作者: S Shengliang Guan

feat[cluster]: send monitor information in multi-process mode

上级 b34ea72a
...@@ -23,6 +23,29 @@ void bmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonBmInfo *bmInfo) { ...@@ -23,6 +23,29 @@ void bmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonBmInfo *bmInfo) {
} }
} }
int32_t bmProcessGetMonBmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
SMonBmInfo bmInfo = {0};
bmGetMonitorInfo(pWrapper, &bmInfo);
int32_t rspLen = tSerializeSMonBmInfo(NULL, 0, &bmInfo);
if (rspLen < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSMonBmInfo(pRsp, rspLen, &bmInfo);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
tFreeSMonBmInfo(&bmInfo);
return 0;
}
int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg; SRpcMsg *pReq = &pMsg->rpcMsg;
...@@ -61,4 +84,6 @@ int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -61,4 +84,6 @@ int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
} }
void bmInitMsgHandle(SMgmtWrapper *pWrapper) {} void bmInitMsgHandle(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_MON_BM_INFO, bmProcessMonitorMsg, DEFAULT_HANDLE);
}
...@@ -33,7 +33,34 @@ static void bmSendErrorRsps(SMgmtWrapper *pWrapper, STaosQall *qall, int32_t num ...@@ -33,7 +33,34 @@ static void bmSendErrorRsps(SMgmtWrapper *pWrapper, STaosQall *qall, int32_t num
} }
} }
static void bmProcessQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void bmProcessMonQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SBnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, get from bnode monitor queue", pMsg);
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1;
if (pMsg->rpcMsg.msgType == TDMT_MON_BM_INFO) {
code = bmProcessGetMonBmInfoReq(pMgmt->pWrapper, pMsg);
}
if (pRpc->msgType & 1U) {
if (pRpc->handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
if (code != 0) {
code = terrno;
dError("msg:%p, failed to process since %s", pMsg, terrstr());
}
SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp};
tmsgSendRsp(&rsp);
}
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pRpc->pCont);
taosFreeQitem(pMsg);
}
static void bmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SBnodeMgmt *pMgmt = pInfo->ahandle; SBnodeMgmt *pMgmt = pInfo->ahandle;
SMgmtWrapper *pWrapper = pMgmt->pWrapper; SMgmtWrapper *pWrapper = pMgmt->pWrapper;
...@@ -72,18 +99,37 @@ int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -72,18 +99,37 @@ int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return 0; return 0;
} }
int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SBnodeMgmt *pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg);
return 0;
}
int32_t bmStartWorker(SBnodeMgmt *pMgmt) { int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
SMultiWorkerCfg cfg = {.max = 1, .name = "bnode-write", .fp = (FItems)bmProcessQueue, .param = pMgmt}; SMultiWorkerCfg cfg = {.max = 1, .name = "bnode-write", .fp = (FItems)bmProcessWriteQueue, .param = pMgmt};
if (tMultiWorkerInit(&pMgmt->writeWorker, &cfg) != 0) { if (tMultiWorkerInit(&pMgmt->writeWorker, &cfg) != 0) {
dError("failed to start bnode write worker since %s", terrstr()); dError("failed to start bnode-write worker since %s", terrstr());
return -1; return -1;
} }
if (tsMultiProcess) {
SSingleWorkerCfg sCfg = {
.min = 1, .max = 1, .name = "bnode-monitor", .fp = (FItem)bmProcessMonQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &sCfg) != 0) {
dError("failed to start bnode-monitor worker since %s", terrstr());
return -1;
}
}
dDebug("bnode workers are initialized"); dDebug("bnode workers are initialized");
return 0; return 0;
} }
void bmStopWorker(SBnodeMgmt *pMgmt) { void bmStopWorker(SBnodeMgmt *pMgmt) {
tSingleWorkerCleanup(&pMgmt->monitorWorker);
tMultiWorkerCleanup(&pMgmt->writeWorker); tMultiWorkerCleanup(&pMgmt->writeWorker);
dDebug("bnode workers are closed"); dDebug("bnode workers are closed");
} }
...@@ -212,12 +212,4 @@ void dmInitMsgHandle(SMgmtWrapper *pWrapper) { ...@@ -212,12 +212,4 @@ void dmInitMsgHandle(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dmProcessMonitorMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, dmProcessMonitorMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, dmProcessMgmtMsg, DEFAULT_HANDLE);
// Monitor info exchange between processes
dndSetMsgHandle(pWrapper, TDMT_MON_MM_INFO, dmProcessMonitorMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_MON_VM_INFO, dmProcessMonitorMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_MON_QM_INFO, dmProcessMonitorMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_MON_SM_INFO, dmProcessMonitorMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_MON_BM_INFO, dmProcessMonitorMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_MON_VM_LOAD, dmProcessMonitorMsg, DEFAULT_HANDLE);
} }
...@@ -59,74 +59,92 @@ void dmSendMonitorReport(SDnode *pDnode) { ...@@ -59,74 +59,92 @@ void dmSendMonitorReport(SDnode *pDnode) {
tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN); tstrncpy(epset.eps[0].fqdn, tsLocalFqdn, TSDB_FQDN_LEN);
epset.eps[0].port = tsServerPort; epset.eps[0].port = tsServerPort;
SMgmtWrapper *pWrapper = NULL;
dmGetMonitorInfo(pDnode, &dmInfo); dmGetMonitorInfo(pDnode, &dmInfo);
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, MNODE); pWrapper = &pDnode->wrappers[MNODE];
if (pWrapper != NULL) { if (!tsMultiProcess) {
if (!tsMultiProcess) { if (dndMarkWrapper(pWrapper) != 0) {
mmGetMonitorInfo(pWrapper, &mmInfo); mmGetMonitorInfo(pWrapper, &mmInfo);
} else { dndReleaseWrapper(pWrapper);
}
} else {
if (pWrapper->required) {
req.msgType = TDMT_MON_MM_INFO; req.msgType = TDMT_MON_MM_INFO;
dndSendRecv(pDnode, &epset, &req, &rsp); dndSendRecv(pDnode, &epset, &req, &rsp);
tDeserializeSMonMmInfo(rsp.pCont, rsp.contLen, &mmInfo); if (rsp.code == 0) {
tDeserializeSMonMmInfo(rsp.pCont, rsp.contLen, &mmInfo);
}
rpcFreeCont(rsp.pCont); rpcFreeCont(rsp.pCont);
} }
dndReleaseWrapper(pWrapper);
} }
pWrapper = dndAcquireWrapper(pDnode, VNODES); pWrapper = &pDnode->wrappers[VNODES];
if (pWrapper != NULL) { if (!tsMultiProcess) {
if (!tsMultiProcess) { if (dndMarkWrapper(pWrapper) != 0) {
vmGetMonitorInfo(pWrapper, &vmInfo); vmGetMonitorInfo(pWrapper, &vmInfo);
} else { dndReleaseWrapper(pWrapper);
}
} else {
if (pWrapper->required) {
req.msgType = TDMT_MON_VM_INFO; req.msgType = TDMT_MON_VM_INFO;
dndSendRecv(pDnode, &epset, &req, &rsp); dndSendRecv(pDnode, &epset, &req, &rsp);
dndReleaseWrapper(pWrapper); if (rsp.code == 0) {
tDeserializeSMonVmInfo(rsp.pCont, rsp.contLen, &vmInfo); tDeserializeSMonVmInfo(rsp.pCont, rsp.contLen, &vmInfo);
}
rpcFreeCont(rsp.pCont); rpcFreeCont(rsp.pCont);
} }
} }
pWrapper = dndAcquireWrapper(pDnode, QNODE); pWrapper = &pDnode->wrappers[QNODE];
if (pWrapper != NULL) { if (!tsMultiProcess) {
if (!tsMultiProcess) { if (dndMarkWrapper(pWrapper) != 0) {
qmGetMonitorInfo(pWrapper, &qmInfo); qmGetMonitorInfo(pWrapper, &qmInfo);
} else { dndReleaseWrapper(pWrapper);
}
} else {
if (pWrapper->required) {
req.msgType = TDMT_MON_QM_INFO; req.msgType = TDMT_MON_QM_INFO;
dndSendRecv(pDnode, &epset, &req, &rsp); dndSendRecv(pDnode, &epset, &req, &rsp);
dndReleaseWrapper(pWrapper); if (rsp.code == 0) {
tDeserializeSMonQmInfo(rsp.pCont, rsp.contLen, &qmInfo); tDeserializeSMonQmInfo(rsp.pCont, rsp.contLen, &qmInfo);
}
rpcFreeCont(rsp.pCont); rpcFreeCont(rsp.pCont);
} }
dndReleaseWrapper(pWrapper);
} }
pWrapper = dndAcquireWrapper(pDnode, SNODE); pWrapper = &pDnode->wrappers[SNODE];
if (pWrapper != NULL) { if (!tsMultiProcess) {
if (!tsMultiProcess) { if (dndMarkWrapper(pWrapper) != 0) {
smGetMonitorInfo(pWrapper, &smInfo); smGetMonitorInfo(pWrapper, &smInfo);
} else { dndReleaseWrapper(pWrapper);
}
} else {
if (pWrapper->required) {
req.msgType = TDMT_MON_SM_INFO; req.msgType = TDMT_MON_SM_INFO;
dndSendRecv(pDnode, &epset, &req, &rsp); dndSendRecv(pDnode, &epset, &req, &rsp);
dndReleaseWrapper(pWrapper); if (rsp.code == 0) {
tDeserializeSMonSmInfo(rsp.pCont, rsp.contLen, &smInfo); tDeserializeSMonSmInfo(rsp.pCont, rsp.contLen, &smInfo);
}
rpcFreeCont(rsp.pCont); rpcFreeCont(rsp.pCont);
} }
dndReleaseWrapper(pWrapper);
} }
pWrapper = dndAcquireWrapper(pDnode, BNODE); pWrapper = &pDnode->wrappers[BNODE];
if (pWrapper != NULL) { if (!tsMultiProcess) {
if (!tsMultiProcess) { if (dndMarkWrapper(pWrapper) != 0) {
bmGetMonitorInfo(pWrapper, &bmInfo); bmGetMonitorInfo(pWrapper, &bmInfo);
} else { dndReleaseWrapper(pWrapper);
}
} else {
if (pWrapper->required) {
req.msgType = TDMT_MON_BM_INFO; req.msgType = TDMT_MON_BM_INFO;
dndSendRecv(pDnode, &epset, &req, &rsp); dndSendRecv(pDnode, &epset, &req, &rsp);
dndReleaseWrapper(pWrapper); if (rsp.code == 0) {
tDeserializeSMonBmInfo(rsp.pCont, rsp.contLen, &bmInfo); tDeserializeSMonBmInfo(rsp.pCont, rsp.contLen, &bmInfo);
}
rpcFreeCont(rsp.pCont); rpcFreeCont(rsp.pCont);
} }
dndReleaseWrapper(pWrapper);
} }
monSetDmInfo(&dmInfo); monSetDmInfo(&dmInfo);
...@@ -143,168 +161,6 @@ void dmSendMonitorReport(SDnode *pDnode) { ...@@ -143,168 +161,6 @@ void dmSendMonitorReport(SDnode *pDnode) {
monSendReport(); monSendReport();
} }
int32_t dmProcessGetMonMmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pMgmt->pDnode, MNODE);
if (pWrapper == NULL) return -1;
SMonMmInfo mmInfo = {0};
mmGetMonitorInfo(pWrapper, &mmInfo);
dndReleaseWrapper(pWrapper);
int32_t rspLen = tSerializeSMonMmInfo(NULL, 0, &mmInfo);
if (rspLen < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSMonMmInfo(pRsp, rspLen, &mmInfo);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
tFreeSMonMmInfo(&mmInfo);
return 0;
}
int32_t dmProcessGetMonVmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pMgmt->pDnode, VNODES);
if (pWrapper == NULL) return -1;
SMonVmInfo vmInfo = {0};
vmGetMonitorInfo(pWrapper, &vmInfo);
dndReleaseWrapper(pWrapper);
int32_t rspLen = tSerializeSMonVmInfo(NULL, 0, &vmInfo);
if (rspLen < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSMonVmInfo(pRsp, rspLen, &vmInfo);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
tFreeSMonVmInfo(&vmInfo);
return 0;
}
int32_t dmProcessGetMonQmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pMgmt->pDnode, QNODE);
if (pWrapper == NULL) return -1;
SMonQmInfo qmInfo = {0};
qmGetMonitorInfo(pWrapper, &qmInfo);
dndReleaseWrapper(pWrapper);
int32_t rspLen = tSerializeSMonQmInfo(NULL, 0, &qmInfo);
if (rspLen < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSMonQmInfo(pRsp, rspLen, &qmInfo);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
tFreeSMonQmInfo(&qmInfo);
return 0;
}
int32_t dmProcessGetMonSmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pMgmt->pDnode, SNODE);
if (pWrapper == NULL) return -1;
SMonSmInfo smInfo = {0};
smGetMonitorInfo(pWrapper, &smInfo);
dndReleaseWrapper(pWrapper);
int32_t rspLen = tSerializeSMonSmInfo(NULL, 0, &smInfo);
if (rspLen < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSMonSmInfo(pRsp, rspLen, &smInfo);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
tFreeSMonSmInfo(&smInfo);
return 0;
}
int32_t dmProcessGetMonBmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pMgmt->pDnode, BNODE);
if (pWrapper == NULL) return -1;
SMonBmInfo bmInfo = {0};
bmGetMonitorInfo(pWrapper, &bmInfo);
dndReleaseWrapper(pWrapper);
int32_t rspLen = tSerializeSMonBmInfo(NULL, 0, &bmInfo);
if (rspLen < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSMonBmInfo(pRsp, rspLen, &bmInfo);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
tFreeSMonBmInfo(&bmInfo);
return 0;
}
int32_t dmProcessGetVnodeLoadsReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq) {
SMgmtWrapper *pWrapper = dndAcquireWrapper(pMgmt->pDnode, VNODES);
if (pWrapper == NULL) return -1;
SMonVloadInfo vloads = {0};
vmGetVnodeLoads(pWrapper, &vloads);
dndReleaseWrapper(pWrapper);
int32_t rspLen = tSerializeSMonVloadInfo(NULL, 0, &vloads);
if (rspLen < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSMonVloadInfo(pRsp, rspLen, &vloads);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
tFreeSMonVloadInfo(&vloads);
return 0;
}
void dmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) { void dmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) {
if (!tsMultiProcess) { if (!tsMultiProcess) {
vmGetVnodeLoads(pWrapper, pInfo); vmGetVnodeLoads(pWrapper, pInfo);
......
...@@ -78,24 +78,6 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -78,24 +78,6 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
case TDMT_MND_GRANT_RSP: case TDMT_MND_GRANT_RSP:
code = dmProcessGrantRsp(pMgmt, pMsg); code = dmProcessGrantRsp(pMgmt, pMsg);
break; break;
case TDMT_MON_MM_INFO:
code = dmProcessGetMonMmInfoReq(pMgmt, pMsg);
break;
case TDMT_MON_VM_INFO:
code = dmProcessGetMonVmInfoReq(pMgmt, pMsg);
break;
case TDMT_MON_QM_INFO:
code = dmProcessGetMonQmInfoReq(pMgmt, pMsg);
break;
case TDMT_MON_SM_INFO:
code = dmProcessGetMonSmInfoReq(pMgmt, pMsg);
break;
case TDMT_MON_BM_INFO:
code = dmProcessGetMonBmInfoReq(pMgmt, pMsg);
break;
case TDMT_MON_VM_LOAD:
code = dmProcessGetVnodeLoadsReq(pMgmt, pMsg);
break;
default: default:
code = dmProcessCDnodeReq(pMgmt->pDnode, pMsg); code = dmProcessCDnodeReq(pMgmt->pDnode, pMsg);
break; break;
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#define _TD_DND_BNODE_INT_H_ #define _TD_DND_BNODE_INT_H_
#include "dndInt.h" #include "dndInt.h"
#include "bnode.h" #include "bnode.h"
#ifdef __cplusplus #ifdef __cplusplus
...@@ -29,6 +30,7 @@ typedef struct SBnodeMgmt { ...@@ -29,6 +30,7 @@ typedef struct SBnodeMgmt {
SMgmtWrapper *pWrapper; SMgmtWrapper *pWrapper;
const char *path; const char *path;
SMultiWorker writeWorker; SMultiWorker writeWorker;
SSingleWorker monitorWorker;
} SBnodeMgmt; } SBnodeMgmt;
// bmInt.c // bmInt.c
...@@ -39,11 +41,13 @@ int32_t bmDrop(SMgmtWrapper *pWrapper); ...@@ -39,11 +41,13 @@ int32_t bmDrop(SMgmtWrapper *pWrapper);
void bmInitMsgHandle(SMgmtWrapper *pWrapper); void bmInitMsgHandle(SMgmtWrapper *pWrapper);
int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t bmProcessGetMonBmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq);
// bmWorker.c // bmWorker.c
int32_t bmStartWorker(SBnodeMgmt *pMgmt); int32_t bmStartWorker(SBnodeMgmt *pMgmt);
void bmStopWorker(SBnodeMgmt *pMgmt); void bmStopWorker(SBnodeMgmt *pMgmt);
int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t bmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t bmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -54,14 +54,8 @@ int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg); ...@@ -54,14 +54,8 @@ int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t dmProcessCDnodeReq(SDnode *pDnode, SNodeMsg *pMsg); int32_t dmProcessCDnodeReq(SDnode *pDnode, SNodeMsg *pMsg);
// dmMonitor.c // dmMonitor.c
int32_t dmProcessGetVnodeLoadsReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq); void dmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo);
int32_t dmProcessGetMonMmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq); void dmSendMonitorReport(SDnode *pDnode);
int32_t dmProcessGetMonVmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq);
int32_t dmProcessGetMonQmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq);
int32_t dmProcessGetMonSmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq);
int32_t dmProcessGetMonBmInfoReq(SDnodeMgmt *pMgmt, SNodeMsg *pReq);
void dmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo);
void dmSendMonitorReport(SDnode *pDnode);
// dmWorker.c // dmWorker.c
int32_t dmStartThread(SDnodeMgmt *pMgmt); int32_t dmStartThread(SDnodeMgmt *pMgmt);
......
...@@ -32,6 +32,7 @@ typedef struct SMnodeMgmt { ...@@ -32,6 +32,7 @@ typedef struct SMnodeMgmt {
SSingleWorker readWorker; SSingleWorker readWorker;
SSingleWorker writeWorker; SSingleWorker writeWorker;
SSingleWorker syncWorker; SSingleWorker syncWorker;
SSingleWorker monitorWorker;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
int8_t replica; int8_t replica;
int8_t selfIndex; int8_t selfIndex;
...@@ -51,6 +52,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper); ...@@ -51,6 +52,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper);
int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg); int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg);
int32_t mmProcessGetMonMmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq);
// mmWorker.c // mmWorker.c
int32_t mmStartWorker(SMnodeMgmt *pMgmt); int32_t mmStartWorker(SMnodeMgmt *pMgmt);
...@@ -59,6 +61,7 @@ int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); ...@@ -59,6 +61,7 @@ int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t mmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t mmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); int32_t mmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc);
int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc);
int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc);
......
...@@ -30,6 +30,7 @@ typedef struct SQnodeMgmt { ...@@ -30,6 +30,7 @@ typedef struct SQnodeMgmt {
const char *path; const char *path;
SSingleWorker queryWorker; SSingleWorker queryWorker;
SSingleWorker fetchWorker; SSingleWorker fetchWorker;
SSingleWorker monitorWorker;
} SQnodeMgmt; } SQnodeMgmt;
// qmInt.c // qmInt.c
...@@ -40,6 +41,7 @@ int32_t qmDrop(SMgmtWrapper *pWrapper); ...@@ -40,6 +41,7 @@ int32_t qmDrop(SMgmtWrapper *pWrapper);
void qmInitMsgHandle(SMgmtWrapper *pWrapper); void qmInitMsgHandle(SMgmtWrapper *pWrapper);
int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t qmProcessGetMonQmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq);
// qmWorker.c // qmWorker.c
int32_t qmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t qmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
...@@ -50,6 +52,7 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt); ...@@ -50,6 +52,7 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt);
void qmStopWorker(SQnodeMgmt *pMgmt); void qmStopWorker(SQnodeMgmt *pMgmt);
int32_t qmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t qmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t qmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t qmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t qmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -32,6 +32,7 @@ typedef struct SSnodeMgmt { ...@@ -32,6 +32,7 @@ typedef struct SSnodeMgmt {
int8_t uniqueWorkerInUse; int8_t uniqueWorkerInUse;
SArray *uniqueWorkers; // SArray<SMultiWorker*> SArray *uniqueWorkers; // SArray<SMultiWorker*>
SSingleWorker sharedWorker; SSingleWorker sharedWorker;
SSingleWorker monitorWorker;
} SSnodeMgmt; } SSnodeMgmt;
// smInt.c // smInt.c
...@@ -42,6 +43,7 @@ int32_t smDrop(SMgmtWrapper *pWrapper); ...@@ -42,6 +43,7 @@ int32_t smDrop(SMgmtWrapper *pWrapper);
void smInitMsgHandle(SMgmtWrapper *pWrapper); void smInitMsgHandle(SMgmtWrapper *pWrapper);
int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t smProcessGetMonSmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq);
// smWorker.c // smWorker.c
int32_t smStartWorker(SSnodeMgmt *pMgmt); int32_t smStartWorker(SSnodeMgmt *pMgmt);
...@@ -50,6 +52,7 @@ int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); ...@@ -50,6 +52,7 @@ int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t smProcessSharedMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t smProcessExecMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t smProcessExecMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -39,6 +39,7 @@ typedef struct SVnodesMgmt { ...@@ -39,6 +39,7 @@ typedef struct SVnodesMgmt {
SDnode *pDnode; SDnode *pDnode;
SMgmtWrapper *pWrapper; SMgmtWrapper *pWrapper;
SSingleWorker mgmtWorker; SSingleWorker mgmtWorker;
SSingleWorker monitorWorker;
} SVnodesMgmt; } SVnodesMgmt;
typedef struct { typedef struct {
...@@ -92,6 +93,8 @@ int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); ...@@ -92,6 +93,8 @@ int32_t vmProcessAlterVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); int32_t vmProcessDropVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); int32_t vmProcessSyncVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq); int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pReq);
int32_t vmProcessGetMonVmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq);
int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq);
// vmFile.c // vmFile.c
int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); int32_t vmGetVnodesFromFile(SVnodesMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes);
...@@ -115,6 +118,7 @@ int32_t vmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); ...@@ -115,6 +118,7 @@ int32_t vmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t vmProcessMergeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrappert, SNodeMsg *pMsg); int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrappert, SNodeMsg *pMsg);
int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -25,6 +25,29 @@ void mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonMmInfo *mmInfo) { ...@@ -25,6 +25,29 @@ void mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonMmInfo *mmInfo) {
} }
} }
int32_t mmProcessGetMonMmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
SMonMmInfo mmInfo = {0};
mmGetMonitorInfo(pWrapper, &mmInfo);
int32_t rspLen = tSerializeSMonMmInfo(NULL, 0, &mmInfo);
if (rspLen < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSMonMmInfo(pRsp, rspLen, &mmInfo);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
tFreeSMonMmInfo(&mmInfo);
return 0;
}
int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t mmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg; SRpcMsg *pReq = &pMsg->rpcMsg;
...@@ -83,6 +106,8 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -83,6 +106,8 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
} }
void mmInitMsgHandle(SMgmtWrapper *pWrapper) { void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_MON_MM_INFO, mmProcessMonitorMsg, DEFAULT_HANDLE);
// Requests handled by DNODE // Requests handled by DNODE
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, mmProcessWriteMsg, DEFAULT_HANDLE);
......
...@@ -23,11 +23,13 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -23,11 +23,13 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1; int32_t code = -1;
if (pMsg->rpcMsg.msgType != TDMT_DND_ALTER_MNODE) { if (pMsg->rpcMsg.msgType == TDMT_DND_ALTER_MNODE) {
code = mmProcessAlterReq(pMgmt, pMsg);
} else if (pMsg->rpcMsg.msgType == TDMT_MON_MM_INFO) {
code = mmProcessGetMonMmInfoReq(pMgmt->pWrapper, pMsg);
} else {
pMsg->pNode = pMgmt->pMnode; pMsg->pNode = pMgmt->pMnode;
code = mndProcessMsg(pMsg); code = mndProcessMsg(pMsg);
} else {
code = mmProcessAlterReq(pMgmt, pMsg);
} }
if (pRpc->msgType & 1U) { if (pRpc->msgType & 1U) {
...@@ -98,6 +100,15 @@ int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -98,6 +100,15 @@ int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return 0; return 0;
} }
int32_t mmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg);
return 0;
}
static int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) { static int32_t mmPutRpcMsgToWorker(SSingleWorker *pWorker, SRpcMsg *pRpc) {
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg == NULL) return -1; if (pMsg == NULL) return -1;
...@@ -157,15 +168,24 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { ...@@ -157,15 +168,24 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
SSingleWorkerCfg sCfg = {.min = 1, .max = 1, .name = "mnode-sync", .fp = (FItem)mmProcessQueue, .param = pMgmt}; SSingleWorkerCfg sCfg = {.min = 1, .max = 1, .name = "mnode-sync", .fp = (FItem)mmProcessQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) { if (tSingleWorkerInit(&pMgmt->syncWorker, &sCfg) != 0) {
dError("failed to start mnode sync-worker since %s", terrstr()); dError("failed to start mnode mnode-sync worker since %s", terrstr());
return -1; return -1;
} }
if (tsMultiProcess) {
SSingleWorkerCfg sCfg = {.min = 1, .max = 1, .name = "mnode-monitor", .fp = (FItem)mmProcessQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &sCfg) != 0) {
dError("failed to start mnode mnode-monitor worker since %s", terrstr());
return -1;
}
}
dDebug("mnode workers are initialized"); dDebug("mnode workers are initialized");
return 0; return 0;
} }
void mmStopWorker(SMnodeMgmt *pMgmt) { void mmStopWorker(SMnodeMgmt *pMgmt) {
tSingleWorkerCleanup(&pMgmt->monitorWorker);
tSingleWorkerCleanup(&pMgmt->queryWorker); tSingleWorkerCleanup(&pMgmt->queryWorker);
tSingleWorkerCleanup(&pMgmt->readWorker); tSingleWorkerCleanup(&pMgmt->readWorker);
tSingleWorkerCleanup(&pMgmt->writeWorker); tSingleWorkerCleanup(&pMgmt->writeWorker);
......
...@@ -23,6 +23,29 @@ void qmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonQmInfo *qmInfo) { ...@@ -23,6 +23,29 @@ void qmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonQmInfo *qmInfo) {
} }
} }
int32_t qmProcessGetMonQmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
SMonQmInfo qmInfo = {0};
qmGetMonitorInfo(pWrapper, &qmInfo);
int32_t rspLen = tSerializeSMonQmInfo(NULL, 0, &qmInfo);
if (rspLen < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSMonQmInfo(pRsp, rspLen, &qmInfo);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
tFreeSMonQmInfo(&qmInfo);
return 0;
}
int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg; SRpcMsg *pReq = &pMsg->rpcMsg;
...@@ -62,6 +85,8 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -62,6 +85,8 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
void qmInitMsgHandle(SMgmtWrapper *pWrapper) { void qmInitMsgHandle(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_MON_QM_INFO, qmProcessMonitorMsg, DEFAULT_HANDLE);
// Requests handled by VNODE // Requests handled by VNODE
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, qmProcessQueryMsg, QNODE_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, qmProcessQueryMsg, QNODE_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, qmProcessQueryMsg, QNODE_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, qmProcessQueryMsg, QNODE_HANDLE);
......
...@@ -21,6 +21,33 @@ static void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { ...@@ -21,6 +21,33 @@ static void qmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }
static void qmProcessMonQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SQnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, get from qnode monitor queue", pMsg);
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1;
if (pMsg->rpcMsg.msgType == TDMT_MON_SM_INFO) {
code = qmProcessGetMonQmInfoReq(pMgmt->pWrapper, pMsg);
}
if (pRpc->msgType & 1U) {
if (pRpc->handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
if (code != 0) {
code = terrno;
dError("msg:%p, failed to process since %s", pMsg, terrstr());
}
SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp};
tmsgSendRsp(&rsp);
}
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pRpc->pCont);
taosFreeQitem(pMsg);
}
static void qmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { static void qmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SQnodeMgmt *pMgmt = pInfo->ahandle; SQnodeMgmt *pMgmt = pInfo->ahandle;
...@@ -66,6 +93,15 @@ int32_t qmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -66,6 +93,15 @@ int32_t qmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return 0; return 0;
} }
int32_t qmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SQnodeMgmt *pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg);
return 0;
}
static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) { static int32_t qmPutRpcMsgToWorker(SQnodeMgmt *pMgmt, SSingleWorker *pWorker, SRpcMsg *pRpc) {
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg == NULL) { if (pMsg == NULL) {
...@@ -128,11 +164,21 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { ...@@ -128,11 +164,21 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
return -1; return -1;
} }
if (tsMultiProcess) {
SSingleWorkerCfg sCfg = {
.min = 1, .max = 1, .name = "qnode-monitor", .fp = (FItem)qmProcessMonQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &sCfg) != 0) {
dError("failed to start qnode-monitor worker since %s", terrstr());
return -1;
}
}
dDebug("qnode workers are initialized"); dDebug("qnode workers are initialized");
return 0; return 0;
} }
void qmStopWorker(SQnodeMgmt *pMgmt) { void qmStopWorker(SQnodeMgmt *pMgmt) {
tSingleWorkerCleanup(&pMgmt->monitorWorker);
tSingleWorkerCleanup(&pMgmt->queryWorker); tSingleWorkerCleanup(&pMgmt->queryWorker);
tSingleWorkerCleanup(&pMgmt->fetchWorker); tSingleWorkerCleanup(&pMgmt->fetchWorker);
dDebug("qnode workers are closed"); dDebug("qnode workers are closed");
......
...@@ -23,6 +23,29 @@ void smGetMonitorInfo(SMgmtWrapper *pWrapper, SMonSmInfo *smInfo) { ...@@ -23,6 +23,29 @@ void smGetMonitorInfo(SMgmtWrapper *pWrapper, SMonSmInfo *smInfo) {
} }
} }
int32_t smProcessGetMonSmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
SMonSmInfo smInfo = {0};
smGetMonitorInfo(pWrapper, &smInfo);
int32_t rspLen = tSerializeSMonSmInfo(NULL, 0, &smInfo);
if (rspLen < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSMonSmInfo(pRsp, rspLen, &smInfo);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
tFreeSMonSmInfo(&smInfo);
return 0;
}
int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
SRpcMsg *pReq = &pMsg->rpcMsg; SRpcMsg *pReq = &pMsg->rpcMsg;
...@@ -62,6 +85,8 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -62,6 +85,8 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
void smInitMsgHandle(SMgmtWrapper *pWrapper) { void smInitMsgHandle(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_MON_SM_INFO, smProcessMonitorMsg, DEFAULT_HANDLE);
// Requests handled by SNODE // Requests handled by SNODE
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, smProcessMgmtMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, smProcessMgmtMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, smProcessExecMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, smProcessExecMsg, DEFAULT_HANDLE);
......
...@@ -16,6 +16,33 @@ ...@@ -16,6 +16,33 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "smInt.h" #include "smInt.h"
static void smProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SSnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, get from snode monitor queue", pMsg);
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1;
if (pMsg->rpcMsg.msgType == TDMT_MON_SM_INFO) {
code = smProcessGetMonSmInfoReq(pMgmt->pWrapper, pMsg);
}
if (pRpc->msgType & 1U) {
if (pRpc->handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
if (code != 0) {
code = terrno;
dError("msg:%p, failed to process since %s", pMsg, terrstr());
}
SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp};
tmsgSendRsp(&rsp);
}
}
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pRpc->pCont);
taosFreeQitem(pMsg);
}
static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { static void smProcessUniqueQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SSnodeMgmt *pMgmt = pInfo->ahandle; SSnodeMgmt *pMgmt = pInfo->ahandle;
...@@ -80,11 +107,21 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { ...@@ -80,11 +107,21 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
return -1; return -1;
} }
if (tsMultiProcess) {
SSingleWorkerCfg sCfg = {
.min = 1, .max = 1, .name = "snode-monitor", .fp = (FItem)smProcessMonitorQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &sCfg) != 0) {
dError("failed to start snode-monitor worker since %s", terrstr());
return -1;
}
}
dDebug("snode workers are initialized"); dDebug("snode workers are initialized");
return 0; return 0;
} }
void smStopWorker(SSnodeMgmt *pMgmt) { void smStopWorker(SSnodeMgmt *pMgmt) {
tSingleWorkerCleanup(&pMgmt->monitorWorker);
for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) { for (int32_t i = 0; i < taosArrayGetSize(pMgmt->uniqueWorkers); i++) {
SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, i); SMultiWorker *pWorker = taosArrayGetP(pMgmt->uniqueWorkers, i);
tMultiWorkerCleanup(pWorker); tMultiWorkerCleanup(pWorker);
...@@ -120,6 +157,15 @@ int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -120,6 +157,15 @@ int32_t smProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return 0; return 0;
} }
int32_t smProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SSnodeMgmt *pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg);
return 0;
}
int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t smProcessUniqueMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SSnodeMgmt *pMgmt = pWrapper->pMgmt; SSnodeMgmt *pMgmt = pWrapper->pMgmt;
int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg); int32_t index = smGetSWIdFromMsg(&pMsg->rpcMsg);
......
...@@ -38,6 +38,52 @@ void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *vmInfo) { ...@@ -38,6 +38,52 @@ void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *vmInfo) {
} }
} }
int32_t vmProcessGetMonVmInfoReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
SMonVmInfo vmInfo = {0};
vmGetMonitorInfo(pWrapper, &vmInfo);
int32_t rspLen = tSerializeSMonVmInfo(NULL, 0, &vmInfo);
if (rspLen < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSMonVmInfo(pRsp, rspLen, &vmInfo);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
tFreeSMonVmInfo(&vmInfo);
return 0;
}
int32_t vmProcessGetVnodeLoadsReq(SMgmtWrapper *pWrapper, SNodeMsg *pReq) {
SMonVloadInfo vloads = {0};
vmGetVnodeLoads(pWrapper, &vloads);
int32_t rspLen = tSerializeSMonVloadInfo(NULL, 0, &vloads);
if (rspLen < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSMonVloadInfo(pRsp, rspLen, &vloads);
pReq->pRsp = pRsp;
pReq->rspLen = rspLen;
tFreeSMonVloadInfo(&vloads);
return 0;
}
static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->vgId = pCreate->vgId; pCfg->vgId = pCreate->vgId;
pCfg->wsize = pCreate->cacheBlockSize; pCfg->wsize = pCreate->cacheBlockSize;
...@@ -261,6 +307,9 @@ int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { ...@@ -261,6 +307,9 @@ int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
} }
void vmInitMsgHandle(SMgmtWrapper *pWrapper) { void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_MON_VM_INFO, vmProcessMonitorMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_MON_VM_LOAD, vmProcessMonitorMsg, DEFAULT_HANDLE);
// Requests handled by VNODE // Requests handled by VNODE
dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg, DEFAULT_HANDLE);
......
...@@ -29,6 +29,12 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { ...@@ -29,6 +29,12 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg); dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg);
switch (msgType) { switch (msgType) {
case TDMT_MON_VM_INFO:
code = vmProcessGetMonVmInfoReq(pMgmt->pWrapper, pMsg);
break;
case TDMT_MON_VM_LOAD:
code = vmProcessGetVnodeLoadsReq(pMgmt->pWrapper, pMsg);
break;
case TDMT_DND_CREATE_VNODE: case TDMT_DND_CREATE_VNODE:
code = vmProcessCreateVnodeReq(pMgmt, pMsg); code = vmProcessCreateVnodeReq(pMgmt, pMsg);
break; break;
...@@ -255,6 +261,15 @@ int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { ...@@ -255,6 +261,15 @@ int32_t vmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return 0; return 0;
} }
int32_t vmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->monitorWorker;
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg);
return 0;
}
static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) { static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EQueueType qtype) {
SVnodesMgmt *pMgmt = pWrapper->pMgmt; SVnodesMgmt *pMgmt = pWrapper->pMgmt;
SMsgHead *pHead = pRpc->pCont; SMsgHead *pHead = pRpc->pCont;
...@@ -412,11 +427,21 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { ...@@ -412,11 +427,21 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
return -1; return -1;
} }
if (tsMultiProcess) {
SSingleWorkerCfg sCfg = {
.min = 1, .max = 1, .name = "vnode-monitor", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt};
if (tSingleWorkerInit(&pMgmt->monitorWorker, &sCfg) != 0) {
dError("failed to start mnode vnode-monitor worker since %s", terrstr());
return -1;
}
}
dDebug("vnode workers are initialized"); dDebug("vnode workers are initialized");
return 0; return 0;
} }
void vmStopWorker(SVnodesMgmt *pMgmt) { void vmStopWorker(SVnodesMgmt *pMgmt) {
tSingleWorkerCleanup(&pMgmt->monitorWorker);
tSingleWorkerCleanup(&pMgmt->mgmtWorker); tSingleWorkerCleanup(&pMgmt->mgmtWorker);
tQWorkerCleanup(&pMgmt->fetchPool); tQWorkerCleanup(&pMgmt->fetchPool);
tQWorkerCleanup(&pMgmt->queryPool); tQWorkerCleanup(&pMgmt->queryPool);
......
...@@ -187,6 +187,7 @@ static void monGenBasicJson(SMonInfo *pMonitor) { ...@@ -187,6 +187,7 @@ static void monGenBasicJson(SMonInfo *pMonitor) {
static void monGenClusterJson(SMonInfo *pMonitor) { static void monGenClusterJson(SMonInfo *pMonitor) {
SMonClusterInfo *pInfo = &pMonitor->mmInfo.cluster; SMonClusterInfo *pInfo = &pMonitor->mmInfo.cluster;
if (pMonitor->mmInfo.cluster.first_ep_dnode_id == 0) return;
SJson *pJson = tjsonCreateObject(); SJson *pJson = tjsonCreateObject();
if (pJson == NULL) return; if (pJson == NULL) return;
...@@ -239,6 +240,7 @@ static void monGenClusterJson(SMonInfo *pMonitor) { ...@@ -239,6 +240,7 @@ static void monGenClusterJson(SMonInfo *pMonitor) {
static void monGenVgroupJson(SMonInfo *pMonitor) { static void monGenVgroupJson(SMonInfo *pMonitor) {
SMonVgroupInfo *pInfo = &pMonitor->mmInfo.vgroup; SMonVgroupInfo *pInfo = &pMonitor->mmInfo.vgroup;
if (pMonitor->mmInfo.cluster.first_ep_dnode_id == 0) return;
SJson *pJson = tjsonAddArrayToObject(pMonitor->pJson, "vgroup_infos"); SJson *pJson = tjsonAddArrayToObject(pMonitor->pJson, "vgroup_infos");
if (pJson == NULL) return; if (pJson == NULL) return;
...@@ -277,6 +279,7 @@ static void monGenVgroupJson(SMonInfo *pMonitor) { ...@@ -277,6 +279,7 @@ static void monGenVgroupJson(SMonInfo *pMonitor) {
static void monGenGrantJson(SMonInfo *pMonitor) { static void monGenGrantJson(SMonInfo *pMonitor) {
SMonGrantInfo *pInfo = &pMonitor->mmInfo.grant; SMonGrantInfo *pInfo = &pMonitor->mmInfo.grant;
if (pMonitor->mmInfo.cluster.first_ep_dnode_id == 0) return;
SJson *pJson = tjsonCreateObject(); SJson *pJson = tjsonCreateObject();
if (pJson == NULL) return; if (pJson == NULL) return;
......
...@@ -348,7 +348,7 @@ SConfigItem *cfgGetItem(SConfig *pCfg, const char *name) { ...@@ -348,7 +348,7 @@ SConfigItem *cfgGetItem(SConfig *pCfg, const char *name) {
} }
} }
uError("name:%s, cfg not found", name); // uError("name:%s, cfg not found", name);
terrno = TSDB_CODE_CFG_NOT_FOUND; terrno = TSDB_CODE_CFG_NOT_FOUND;
return NULL; return NULL;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册