提交 6ece0d9a 编写于 作者: S Shengliang Guan

shm

上级 5bd37b75
......@@ -32,19 +32,17 @@ typedef void *(*ProcConsumeFp)(void *pParent, void *pHead, int16_t headLen, void
ProcFuncType ftype);
typedef struct {
int32_t childQueueSize;
ProcConsumeFp childConsumeFp;
ProcMallocFp childMallocHeadFp;
ProcFreeFp childFreeHeadFp;
ProcMallocFp childMallocBodyFp;
ProcFreeFp childFreeBodyFp;
int32_t parentQueueSize;
ProcConsumeFp parentConsumeFp;
ProcMallocFp parentdMallocHeadFp;
ProcFreeFp parentFreeHeadFp;
ProcMallocFp parentMallocBodyFp;
ProcFreeFp parentFreeBodyFp;
bool testFlag;
SShm shm;
void *pParent;
const char *name;
} SProcCfg;
......
......@@ -28,14 +28,18 @@ static bool dndRequireNode(SMgmtWrapper *pWrapper) {
}
int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
int32_t code = (*pWrapper->fp.openFp)(pWrapper);
if (code != 0) {
if (taosMkDir(pWrapper->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
return -1;
}
if ((*pWrapper->fp.openFp)(pWrapper) != 0) {
dError("node:%s, failed to open since %s", pWrapper->name, terrstr());
return -1;
} else {
dDebug("node:%s, has been opened", pWrapper->name);
}
dDebug("node:%s, has been opened", pWrapper->name);
pWrapper->deployed = true;
return 0;
}
......@@ -62,22 +66,13 @@ void dndCloseNode(SMgmtWrapper *pWrapper) {
}
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
dDebug("dnode run in single process mode");
SMsgCb msgCb = dndCreateMsgcb(&pDnode->wrappers[0]);
tmsgSetDefaultMsgCb(&msgCb);
dInfo("dnode start to run in single process");
for (ENodeType n = DNODE; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue;
if (taosMkDir(pWrapper->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
return -1;
}
pWrapper->procType = PROC_SINGLE;
if (dndOpenNode(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
......@@ -96,18 +91,10 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
}
}
dInfo("dnode running in single process");
return 0;
}
static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) {
// dndCleanupServer(pDnode);
for (ENodeType n = 0; n < NODE_MAX; ++n) {
if (except == n) continue;
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = false;
}
}
static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
ProcFuncType ftype) {
SRpcMsg *pRpc = &pMsg->rpcMsg;
......@@ -153,115 +140,126 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
taosMemoryFree(pMsg);
}
static int32_t dndRunInMultiProcess(SDnode *pDnode) {
dInfo("dnode run in multi process mode");
static int32_t dndRunInParentProcess(SDnode *pDnode) {
dInfo("dnode start to run in parent process");
SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE];
if (dndOpenNode(pDWrapper) != 0) {
dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
return -1;
}
for (ENodeType n = 0; n < NODE_MAX; ++n) {
for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue;
SMsgCb msgCb = dndCreateMsgcb(pWrapper);
tmsgSetDefaultMsgCb(&msgCb);
if (taosMkDir(pWrapper->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to create dir:%s since %s", pWrapper->path, terrstr());
int64_t shmsize = 1024 * 1024 * 2; // size will be a configuration item
if (taosCreateShm(&pWrapper->shm, shmsize) != 0) {
terrno = TAOS_SYSTEM_ERROR(terrno);
dError("node:%s, failed to create shm size:%" PRId64 " since %s", pWrapper->name, shmsize, terrstr());
return -1;
}
if (n == DNODE) {
dInfo("node:%s, will start in parent process", pWrapper->name);
pWrapper->procType = PROC_SINGLE;
if (dndOpenNode(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
continue;
}
SProcCfg cfg = {.childQueueSize = 1024 * 1024 * 2, // size will be a configuration item
.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue,
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.parentQueueSize = 1024 * 1024 * 2, // size will be a configuration item
.parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue,
SProcCfg cfg = {.parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue,
.parentdMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.shm = pWrapper->shm,
.pParent = pWrapper,
.name = pWrapper->name};
SProcObj *pProc = taosProcInit(&cfg);
if (pProc == NULL) {
dError("node:%s, failed to fork since %s", pWrapper->name, terrstr());
return -1;
}
pWrapper->pProc = pProc;
if (taosProcIsChild(pProc)) {
dInfo("node:%s, will start in child process", pWrapper->name);
pWrapper->procType = PROC_CHILD;
// dndResetLog(pWrapper);
dInfo("node:%s, clean up resources inherited from parent", pWrapper->name);
dndClearNodesExecpt(pDnode, n);
dInfo("node:%s, will be initialized in child process", pWrapper->name);
if (dndOpenNode(pWrapper) != 0) {
dInfo("node:%s, failed to init in child process since %s", pWrapper->name, terrstr());
pWrapper->pProc = taosProcInit(&cfg);
if (pWrapper->pProc == NULL) {
dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
return -1;
}
}
if (taosProcRun(pProc) != 0) {
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
if (dndWriteRuntimeFile(pDnode) != 0) {
dError("failed to write runtime file since %s", terrstr());
return -1;
}
break;
} else {
dInfo("node:%s, will not start in parent process, child pid:%d", pWrapper->name, taosProcChildId(pProc));
for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
dInfo("node:%s, will not start in parent process", pWrapper->name);
// exec new node
pWrapper->procType = PROC_PARENT;
if (taosProcRun(pProc) != 0) {
if (taosProcRun(pWrapper->pProc) != 0) {
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
return -1;
}
}
}
dndSetStatus(pDnode, DND_STAT_RUNNING);
for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (pWrapper->fp.startFp == NULL) continue;
if (pWrapper->procType == PROC_PARENT && n != DNODE) continue;
if (pWrapper->procType == PROC_CHILD && n == DNODE) continue;
if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
if ((*pDWrapper->fp.startFp)(pDWrapper) != 0) {
dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
return -1;
}
dInfo("dnode running in parent process");
return 0;
}
static int32_t dndRunInChildProcess(SDnode *pDnode) {
dInfo("dnode start to run in child process");
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
if (dndOpenNode(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue,
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.shm = pWrapper->shm,
.pParent = pWrapper,
.name = pWrapper->name};
pWrapper->pProc = taosProcInit(&cfg);
if (pWrapper->pProc == NULL) {
dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
return -1;
}
pWrapper->procType = PROC_CHILD;
if (taosProcRun(pWrapper->pProc) != 0) {
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
return -1;
}
dInfo("dnode running in child process");
return 0;
}
int32_t dndRun(SDnode *pDnode) {
int32_t dndRun(SDnode * pDnode) {
if (!tsMultiProcess) {
if (dndRunInSingleProcess(pDnode) != 0) {
dError("failed to run dnode in single process mode since %s", terrstr());
dError("failed to run dnode since %s", terrstr());
return -1;
}
} else if (pDnode->ntype == DNODE) {
if (dndRunInParentProcess(pDnode) != 0) {
dError("failed to run dnode in parent process since %s", terrstr());
return -1;
}
} else {
if (dndRunInMultiProcess(pDnode) != 0) {
dError("failed to run dnode in multi process mode since %s", terrstr());
if (dndRunInChildProcess(pDnode) != 0) {
dError("failed to run dnode in child process since %s", terrstr());
return -1;
}
}
dndReportStartup(pDnode, "TDengine", "initialized successfully");
dInfo("TDengine initialized successfully");
while (1) {
if (pDnode->event == DND_EVENT_STOP) {
......
......@@ -101,6 +101,9 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
goto _OVER;
}
SMsgCb msgCb = dndCreateMsgcb(&pDnode->wrappers[0]);
tmsgSetDefaultMsgCb(&msgCb);
dInfo("dnode object is created, data:%p", pDnode);
code = 0;
......
......@@ -359,8 +359,8 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
}
pProc->name = pCfg->name;
pProc->pChildQueue = taosProcInitQueue(pCfg->childQueueSize);
pProc->pParentQueue = taosProcInitQueue(pCfg->parentQueueSize);
pProc->pChildQueue = taosProcInitQueue(pCfg->shm.size / 2);
pProc->pParentQueue = taosProcInitQueue(pCfg->shm.size / 2);
if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) {
taosProcCleanupQueue(pProc->pChildQueue);
taosMemoryFree(pProc);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册