提交 d812206e 编写于 作者: S Shengliang Guan

refact(cluster): node mgmt

上级 3b54d903
......@@ -25,7 +25,7 @@ static bool dmRequireNode(SMgmtWrapper *pWrapper) {
return required;
}
static int32_t dmInitNodeProc(SMgmtWrapper *pWrapper) {
static int32_t dmInitParentProc(SMgmtWrapper *pWrapper) {
int32_t shmsize = tsMnodeShmSize;
if (pWrapper->ntype == VNODE) {
shmsize = tsVnodeShmSize;
......@@ -82,20 +82,37 @@ static int32_t dmNewNodeProc(SMgmtWrapper *pWrapper, EDndNodeType n) {
return 0;
}
static int32_t dmRunNodeProc(SMgmtWrapper *pWrapper) {
static int32_t dmRunParentProc(SMgmtWrapper *pWrapper) {
if (pWrapper->pDnode->ntype == NODE_END) {
dInfo("node:%s, should be started manually", pWrapper->name);
dInfo("node:%s, should be started manually in child process", pWrapper->name);
} else {
if (dmNewNodeProc(pWrapper, pWrapper->ntype) != 0) {
return -1;
}
}
if (taosProcRun(pWrapper->procObj) != 0) {
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
return -1;
}
return 0;
}
static int32_t dmInitChildProc(SMgmtWrapper *pWrapper) {
SProcCfg cfg = dmGenProcCfg(pWrapper);
cfg.isChild = true;
pWrapper->procObj = taosProcInit(&cfg);
if (pWrapper->procObj == NULL) {
dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
return -1;
}
return 0;
}
static int32_t dmRunChildProc(SMgmtWrapper *pWrapper) {
if (taosProcRun(pWrapper->procObj) != 0) {
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
return -1;
}
return 0;
}
......@@ -111,23 +128,16 @@ int32_t dmOpenNode(SMgmtWrapper *pWrapper) {
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
return -1;
}
if (pWrapper->procType == DND_PROC_CHILD) {
if (dmInitChildProc(pWrapper) != 0) return -1;
if (dmRunChildProc(pWrapper) != 0) return -1;
}
dDebug("node:%s, has been opened", pWrapper->name);
pWrapper->deployed = true;
} else {
if (dmInitNodeProc(pWrapper) != 0) return -1;
if (dmInitParentProc(pWrapper) != 0) return -1;
if (dmWriteShmFile(pWrapper) != 0) return -1;
if (dmRunNodeProc(pWrapper) != 0) return -1;
}
if (pWrapper->procType == DND_PROC_CHILD) {
SProcCfg cfg = dmGenProcCfg(pWrapper);
cfg.isChild = true;
pWrapper->procObj = taosProcInit(&cfg);
if (pWrapper->procObj == NULL) {
dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
return -1;
}
if (dmRunParentProc(pWrapper) != 0) return -1;
}
return 0;
......@@ -138,10 +148,6 @@ int32_t dmStartNode(SMgmtWrapper *pWrapper) {
dInfo("node:%s, not start in parent process", pWrapper->name);
} else if (pWrapper->procType == DND_PROC_CHILD) {
dInfo("node:%s, start in child process", pWrapper->name);
if (taosProcRun(pWrapper->procObj) != 0) {
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
return -1;
}
} else {
if (pWrapper->fp.startFp != NULL && (*pWrapper->fp.startFp)(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
......
......@@ -78,10 +78,11 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
if (dmBuildMsg(pMsg, pRpc) != 0) goto _OVER;
if (pWrapper->procType == DND_PROC_SINGLE) {
dTrace("msg:%p, is created, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
dTrace("msg:%p, is created, type:%s handle:%p user:%s", pMsg, TMSG_INFO(msgType), pRpc->handle, pMsg->user);
code = (*msgFp)(pWrapper, pMsg);
} else if (pWrapper->procType == DND_PROC_PARENT) {
dTrace("msg:%p, is created and put into child queue, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user);
dTrace("msg:%p, is created and put into child queue, type:%s handle:%p user:%s", pMsg, TMSG_INFO(msgType),
pRpc->handle, pMsg->user);
code = taosProcPutToChildQ(pWrapper->procObj, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, pRpc->handle,
PROC_FUNC_REQ);
} else {
......@@ -97,7 +98,7 @@ _OVER:
rpcFreeCont(pRpc->pCont);
}
} else {
dError("msg:%p, failed to process since 0x%04x:%s", pMsg, code & 0XFFFF, terrstr());
dError("msg:%p, type:%s failed to process since 0x%04x:%s", pMsg, TMSG_INFO(msgType), code & 0XFFFF, terrstr());
if (msgType & 1U) {
if (terrno != 0) code = terrno;
if (code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_NODE_OFFLINE) {
......
......@@ -44,8 +44,6 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed);
int32_t mmWriteFile(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq, bool deployed);
// mmInt.c
int32_t mmOpenFromMsg(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq);
int32_t mmDrop(SMgmtWrapper *pWrapper);
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq);
// mmHandle.c
......
......@@ -98,53 +98,6 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre
return 0;
}
static int32_t mmOpenImp(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq) {
SMnodeOpt option = {0};
if (pReq != NULL) {
if (mmBuildOptionFromReq(pMgmt, &option, pReq) != 0) {
return -1;
}
} else {
bool deployed = false;
if (mmReadFile(pMgmt, &deployed) != 0) {
dError("failed to read file since %s", terrstr());
return -1;
}
if (!deployed) {
dInfo("mnode start to deploy");
if (pMgmt->pWrapper->procType == DND_PROC_CHILD) {
pMgmt->pDnode->data.dnodeId = 1;
}
mmBuildOptionForDeploy(pMgmt, &option);
} else {
dInfo("mnode start to open");
mmBuildOptionForOpen(pMgmt, &option);
}
}
pMgmt->pMnode = mndOpen(pMgmt->path, &option);
if (pMgmt->pMnode == NULL) {
dError("failed to open mnode since %s", terrstr());
return -1;
}
if (mmStartWorker(pMgmt) != 0) {
dError("failed to start mnode worker since %s", terrstr());
return -1;
}
return 0;
}
static void mmCloseImp(SMnodeMgmt *pMgmt) {
if (pMgmt->pMnode != NULL) {
mmStopWorker(pMgmt);
mndClose(pMgmt->pMnode);
pMgmt->pMnode = NULL;
}
}
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) {
SMnodeOpt option = {0};
if (mmBuildOptionFromReq(pMgmt, &option, pReq) != 0) {
......@@ -153,37 +106,23 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) {
return mndAlter(pMgmt->pMnode, &option);
}
int32_t mmDrop(SMgmtWrapper *pWrapper) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return 0;
dInfo("mnode-mgmt start to drop");
// bool deployed = false;
// if (mmWriteFile(pMgmt, deployed) != 0) {
// dError("failed to drop mnode since %s", terrstr());
// return -1;
// }
mmCloseImp(pMgmt);
taosRemoveDir(pMgmt->path);
pWrapper->pMgmt = NULL;
taosMemoryFree(pMgmt);
dInfo("mnode-mgmt is dropped");
return 0;
}
static void mmClose(SMgmtWrapper *pWrapper) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return;
dInfo("mnode-mgmt start to cleanup");
mmCloseImp(pMgmt);
if (pMgmt->pMnode != NULL) {
mmStopWorker(pMgmt);
mndClose(pMgmt->pMnode);
pMgmt->pMnode = NULL;
}
pWrapper->pMgmt = NULL;
taosMemoryFree(pMgmt);
dInfo("mnode-mgmt is cleaned up");
}
int32_t mmOpenFromMsg(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq) {
static int32_t mmOpen(SMgmtWrapper *pWrapper) {
dInfo("mnode-mgmt start to init");
if (walInit() != 0) {
dError("failed to init wal since %s", terrstr());
......@@ -201,18 +140,41 @@ int32_t mmOpenFromMsg(SMgmtWrapper *pWrapper, SDCreateMnodeReq *pReq) {
pMgmt->pWrapper = pWrapper;
pWrapper->pMgmt = pMgmt;
int32_t code = mmOpenImp(pMgmt, pReq);
if (code != 0) {
dError("failed to init mnode-mgmt since %s", terrstr());
bool deployed = false;
if (mmReadFile(pMgmt, &deployed) != 0) {
dError("failed to read file since %s", terrstr());
mmClose(pWrapper);
return -1;
}
SMnodeOpt option = {0};
if (!deployed) {
dInfo("mnode start to deploy");
if (pWrapper->procType == DND_PROC_CHILD) {
pWrapper->pDnode->data.dnodeId = 1;
}
mmBuildOptionForDeploy(pMgmt, &option);
} else {
dInfo("mnode-mgmt is initialized");
dInfo("mnode start to open");
mmBuildOptionForOpen(pMgmt, &option);
}
return code;
}
pMgmt->pMnode = mndOpen(pMgmt->path, &option);
if (pMgmt->pMnode == NULL) {
dError("failed to open mnode since %s", terrstr());
mmClose(pWrapper);
return -1;
}
static int32_t mmOpen(SMgmtWrapper *pWrapper) { return mmOpenFromMsg(pWrapper, NULL); }
if (mmStartWorker(pMgmt) != 0) {
dError("failed to start mnode worker since %s", terrstr());
mmClose(pWrapper);
return -1;
}
dInfo("mnode-mgmt is initialized");
return 0;
}
static int32_t mmStart(SMgmtWrapper *pWrapper) {
dDebug("mnode-mgmt start to run");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册