提交 447c9504 编写于 作者: S Shengliang Guan

refactor: node mgmt

上级 9f4d41fe
...@@ -134,18 +134,16 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) { ...@@ -134,18 +134,16 @@ int32_t bmStartWorker(SBnodeMgmt *pMgmt) {
return -1; return -1;
} }
if (tsMultiProcess) { SSingleWorkerCfg mCfg = {
SSingleWorkerCfg mCfg = { .min = 1,
.min = 1, .max = 1,
.max = 1, .name = "bnode-monitor",
.name = "bnode-monitor", .fp = (FItem)bmProcessMonitorQueue,
.fp = (FItem)bmProcessMonitorQueue, .param = pMgmt,
.param = pMgmt, };
}; if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { dError("failed to start bnode-monitor worker since %s", terrstr());
dError("failed to start bnode-monitor worker since %s", terrstr()); return -1;
return -1;
}
} }
dDebug("bnode workers are initialized"); dDebug("bnode workers are initialized");
......
...@@ -52,6 +52,9 @@ static int32_t dmOpenMgmt(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) ...@@ -52,6 +52,9 @@ static int32_t dmOpenMgmt(const SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput)
pMgmt->data.supportVnodes = pInput->supportVnodes; pMgmt->data.supportVnodes = pInput->supportVnodes;
pMgmt->data.serverPort = pInput->serverPort; pMgmt->data.serverPort = pInput->serverPort;
pMgmt->pDnode = pInput->pDnode; pMgmt->pDnode = pInput->pDnode;
pMgmt->msgCb = pInput->msgCb;
pMgmt->path = pInput->path;
pMgmt->name = pInput->name;
pMgmt->processCreateNodeFp = pInput->processCreateNodeFp; pMgmt->processCreateNodeFp = pInput->processCreateNodeFp;
pMgmt->processDropNodeFp = pInput->processDropNodeFp; pMgmt->processDropNodeFp = pInput->processDropNodeFp;
pMgmt->isNodeDeployedFp = pInput->isNodeDeployedFp; pMgmt->isNodeDeployedFp = pInput->isNodeDeployedFp;
......
...@@ -176,18 +176,16 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { ...@@ -176,18 +176,16 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
return -1; return -1;
} }
if (tsMultiProcess) { SSingleWorkerCfg mCfg = {
SSingleWorkerCfg mCfg = { .min = 1,
.min = 1, .max = 1,
.max = 1, .name = "mnode-monitor",
.name = "mnode-monitor", .fp = (FItem)mmProcessQueue,
.fp = (FItem)mmProcessQueue, .param = pMgmt,
.param = pMgmt, };
}; if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { dError("failed to start mnode mnode-monitor worker since %s", terrstr());
dError("failed to start mnode mnode-monitor worker since %s", terrstr()); return -1;
return -1;
}
} }
dDebug("mnode workers are initialized"); dDebug("mnode workers are initialized");
......
...@@ -165,18 +165,16 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) { ...@@ -165,18 +165,16 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
return -1; return -1;
} }
if (tsMultiProcess) { SSingleWorkerCfg mCfg = {
SSingleWorkerCfg mCfg = { .min = 1,
.min = 1, .max = 1,
.max = 1, .name = "qnode-monitor",
.name = "qnode-monitor", .fp = (FItem)qmProcessMonitorQueue,
.fp = (FItem)qmProcessMonitorQueue, .param = pMgmt,
.param = pMgmt, };
}; if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { dError("failed to start qnode-monitor worker since %s", terrstr());
dError("failed to start qnode-monitor worker since %s", terrstr()); return -1;
return -1;
}
} }
dDebug("qnode workers are initialized"); dDebug("qnode workers are initialized");
......
...@@ -121,18 +121,16 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) { ...@@ -121,18 +121,16 @@ int32_t smStartWorker(SSnodeMgmt *pMgmt) {
return -1; return -1;
} }
if (tsMultiProcess) { SSingleWorkerCfg mCfg = {
SSingleWorkerCfg mCfg = { .min = 1,
.min = 1, .max = 1,
.max = 1, .name = "snode-monitor",
.name = "snode-monitor", .fp = (FItem)smProcessMonitorQueue,
.fp = (FItem)smProcessMonitorQueue, .param = pMgmt,
.param = pMgmt, };
}; if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { dError("failed to start snode-monitor worker since %s", terrstr());
dError("failed to start snode-monitor worker since %s", terrstr()); return -1;
return -1;
}
} }
dDebug("snode workers are initialized"); dDebug("snode workers are initialized");
......
...@@ -496,18 +496,16 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { ...@@ -496,18 +496,16 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
return -1; return -1;
} }
if (tsMultiProcess) { SSingleWorkerCfg mCfg = {
SSingleWorkerCfg mCfg = { .min = 1,
.min = 1, .max = 1,
.max = 1, .name = "vnode-monitor",
.name = "vnode-monitor", .fp = (FItem)vmProcessMgmtMonitorQueue,
.fp = (FItem)vmProcessMgmtMonitorQueue, .param = pMgmt,
.param = pMgmt, };
}; if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) {
if (tSingleWorkerInit(&pMgmt->monitorWorker, &mCfg) != 0) { dError("failed to start mnode vnode-monitor worker since %s", terrstr());
dError("failed to start mnode vnode-monitor worker since %s", terrstr()); return -1;
return -1;
}
} }
dDebug("vnode workers are initialized"); dDebug("vnode workers are initialized");
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dmImp.h" #include "dmImp.h"
static bool dmIsNodeDeployedFp(SDnode *pDnode, EDndNodeType ntype) { return pDnode->wrappers[ntype].required; }
static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
pDnode->input.dnodeId = 0; pDnode->input.dnodeId = 0;
pDnode->input.clusterId = 0; pDnode->input.clusterId = 0;
...@@ -29,6 +31,9 @@ static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { ...@@ -29,6 +31,9 @@ static int32_t dmInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
pDnode->input.disks = pOption->disks; pDnode->input.disks = pOption->disks;
pDnode->input.dataDir = strdup(pOption->dataDir); pDnode->input.dataDir = strdup(pOption->dataDir);
pDnode->input.pDnode = pDnode; pDnode->input.pDnode = pDnode;
pDnode->input.processCreateNodeFp = dmProcessCreateNodeReq;
pDnode->input.processDropNodeFp = dmProcessDropNodeReq;
pDnode->input.isNodeDeployedFp = dmIsNodeDeployedFp;
if (pDnode->input.dataDir == NULL || pDnode->input.localEp == NULL || pDnode->input.localFqdn == NULL || if (pDnode->input.dataDir == NULL || pDnode->input.localEp == NULL || pDnode->input.localFqdn == NULL ||
pDnode->input.firstEp == NULL || pDnode->input.secondEp == NULL) { pDnode->input.firstEp == NULL || pDnode->input.secondEp == NULL) {
......
...@@ -84,7 +84,7 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe ...@@ -84,7 +84,7 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
if (pWrapper->procType != DND_PROC_PARENT) { if (pWrapper->procType != DND_PROC_PARENT) {
dTrace("msg:%p, created, type:%s handle:%p user:%s", pMsg, TMSG_INFO(msgType), pRpc->handle, pMsg->user); dTrace("msg:%p, created, type:%s handle:%p user:%s", pMsg, TMSG_INFO(msgType), pRpc->handle, pMsg->user);
code = (*msgFp)(pWrapper, pMsg); code = (*msgFp)(pWrapper->pMgmt, pMsg);
} else { } else {
dTrace("msg:%p, created and put into child queue, type:%s handle:%p code:0x%04x user:%s contLen:%d", pMsg, dTrace("msg:%p, created and put into child queue, type:%s handle:%p code:0x%04x user:%s contLen:%d", pMsg,
TMSG_INFO(msgType), pRpc->handle, pMsg->rpcMsg.code & 0XFFFF, pMsg->user, pRpc->contLen); TMSG_INFO(msgType), pRpc->handle, pMsg->rpcMsg.code & 0XFFFF, pMsg->user, pRpc->contLen);
...@@ -335,7 +335,7 @@ static void dmConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t ...@@ -335,7 +335,7 @@ static void dmConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t
dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle); dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle);
NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)];
int32_t code = (*msgFp)(pWrapper, pMsg); int32_t code = (*msgFp)(pWrapper->pMgmt, pMsg);
if (code != 0) { if (code != 0) {
dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
......
...@@ -83,7 +83,7 @@ typedef enum { ...@@ -83,7 +83,7 @@ typedef enum {
typedef int32_t (*ProcessCreateNodeFp)(struct SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg); typedef int32_t (*ProcessCreateNodeFp)(struct SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg);
typedef int32_t (*ProcessDropNodeFp)(struct SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg); typedef int32_t (*ProcessDropNodeFp)(struct SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg);
typedef int8_t (*IsNodeDeployedFp)(struct SDnode *pDnode, EDndNodeType ntype); typedef bool (*IsNodeDeployedFp)(struct SDnode *pDnode, EDndNodeType ntype);
typedef struct { typedef struct {
const char *path; const char *path;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册