提交 882575fd 编写于 作者: S Shengliang Guan

shm

上级 eb561f31
...@@ -56,7 +56,7 @@ extern "C" { ...@@ -56,7 +56,7 @@ extern "C" {
#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} #define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }}
#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }}
typedef enum { VNODES, QNODE, SNODE, MNODE, BNODE, DNODE, NODE_MAX } ENodeType; typedef enum { DNODE, VNODES, QNODE, SNODE, MNODE, BNODE, NODE_MAX } ENodeType;
typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType; typedef enum { PROC_SINGLE, PROC_CHILD, PROC_PARENT } EProcType;
typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus; typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus;
typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType; typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType;
......
...@@ -121,14 +121,6 @@ SDnode *dndCreate(SDndCfg *pCfg) { ...@@ -121,14 +121,6 @@ SDnode *dndCreate(SDndCfg *pCfg) {
} }
pWrapper->procType = PROC_SINGLE; pWrapper->procType = PROC_SINGLE;
pWrapper->required = dndRequireNode(pWrapper);
if (pWrapper->required) {
if (taosMkDir(pWrapper->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
goto _OVER;
}
}
} }
code = 0; code = 0;
...@@ -172,8 +164,15 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) { ...@@ -172,8 +164,15 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
for (ENodeType n = 0; n < NODE_MAX; ++n) { for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue; if (!pWrapper->required) continue;
if (taosMkDir(pWrapper->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
return -1;
}
dInfo("node:%s, will start in single process", pWrapper->name); dInfo("node:%s, will start in single process", pWrapper->name);
pWrapper->procType = PROC_SINGLE; pWrapper->procType = PROC_SINGLE;
if (dndOpenNode(pWrapper) != 0) { if (dndOpenNode(pWrapper) != 0) {
...@@ -182,6 +181,12 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) { ...@@ -182,6 +181,12 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
} }
} }
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE);
if (dmStartWorker(pWrapper->pMgmt) != 0) {
dError("failed to start dnode worker since %s", terrstr());
return -1;
}
return 0; return 0;
} }
...@@ -250,8 +255,15 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { ...@@ -250,8 +255,15 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
for (ENodeType n = 0; n < NODE_MAX; ++n) { for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue; if (!pWrapper->required) continue;
if (taosMkDir(pWrapper->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
return -1;
}
if (n == DNODE) { if (n == DNODE) {
dInfo("node:%s, will start in parent process", pWrapper->name); dInfo("node:%s, will start in parent process", pWrapper->name);
pWrapper->procType = PROC_SINGLE; pWrapper->procType = PROC_SINGLE;
...@@ -306,6 +318,12 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { ...@@ -306,6 +318,12 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
} }
} }
SMgmtWrapper *pWrapper = dndGetWrapper(pDnode, DNODE);
if (pWrapper->procType == PROC_PARENT && dmStartWorker(pWrapper->pMgmt) != 0) {
dError("failed to start dnode worker since %s", terrstr());
return -1;
}
return 0; return 0;
} }
...@@ -322,6 +340,9 @@ int32_t dndRun(SDnode *pDnode) { ...@@ -322,6 +340,9 @@ int32_t dndRun(SDnode *pDnode) {
} }
} }
dndSetStatus(pDnode, DND_STAT_RUNNING);
dndReportStartup(pDnode, "TDengine", "initialized successfully");
while (1) { while (1) {
if (pDnode->event == DND_EVENT_STOP) { if (pDnode->event == DND_EVENT_STOP) {
dInfo("dnode is about to stop"); dInfo("dnode is about to stop");
......
...@@ -65,6 +65,8 @@ int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, cons ...@@ -65,6 +65,8 @@ int32_t dndInitWorker(void *param, SDnodeWorker *pWorker, EWorkerType type, cons
} }
void dndCleanupWorker(SDnodeWorker *pWorker) { void dndCleanupWorker(SDnodeWorker *pWorker) {
if (pWorker->queue == NULL) return;
while (!taosQueueEmpty(pWorker->queue)) { while (!taosQueueEmpty(pWorker->queue)) {
taosMsleep(10); taosMsleep(10);
} }
......
...@@ -51,6 +51,9 @@ void dmUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet); ...@@ -51,6 +51,9 @@ void dmUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet);
void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort); void dmGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort);
void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg); void dmSendRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg);
// dmWorker.h
int32_t dmStartWorker(SDnodeMgmt *pMgmt);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -136,17 +136,8 @@ int32_t dmInit(SMgmtWrapper *pWrapper) { ...@@ -136,17 +136,8 @@ int32_t dmInit(SMgmtWrapper *pWrapper) {
return -1; return -1;
} }
if (dmStartWorker(pMgmt) != 0) {
dError("failed to start dnode worker since %s", terrstr());
return -1;
}
pWrapper->pMgmt = pMgmt; pWrapper->pMgmt = pMgmt;
dInfo("dnode-mgmt is initialized"); dInfo("dnode-mgmt is initialized");
dndSetStatus(pDnode, DND_STAT_RUNNING);
dmSendStatusReq(pMgmt);
dndReportStartup(pDnode, "TDengine", "initialized successfully");
return 0; return 0;
} }
......
...@@ -200,7 +200,7 @@ static void mmCleanup(SMgmtWrapper *pWrapper) { ...@@ -200,7 +200,7 @@ static void mmCleanup(SMgmtWrapper *pWrapper) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
if (pMgmt == NULL) return; if (pMgmt == NULL) return;
dInfo("mnode-mgmt start to clean up"); dInfo("mnode-mgmt start to cleanup");
if (pMgmt->pMnode) { if (pMgmt->pMnode) {
mmStopWorker(pMgmt); mmStopWorker(pMgmt);
mndClose(pMgmt->pMnode); mndClose(pMgmt->pMnode);
...@@ -217,7 +217,7 @@ static int32_t mmInit(SMgmtWrapper *pWrapper) { ...@@ -217,7 +217,7 @@ static int32_t mmInit(SMgmtWrapper *pWrapper) {
int32_t code = -1; int32_t code = -1;
SMnodeOpt option = {0}; SMnodeOpt option = {0};
dInfo("mnode-mgmt is initialized"); dInfo("mnode-mgmt start to init");
pMgmt->path = pWrapper->path; pMgmt->path = pWrapper->path;
pMgmt->pDnode = pWrapper->pDnode; pMgmt->pDnode = pWrapper->pDnode;
pMgmt->pWrapper = pWrapper; pMgmt->pWrapper = pWrapper;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册