From 6ece0d9ac1cd7b0060d95a1609e63398bd00da2f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 30 Mar 2022 16:03:21 +0800 Subject: [PATCH] shm --- include/util/tprocess.h | 4 +- source/dnode/mgmt/main/src/dndExec.c | 192 +++++++++++++-------------- source/dnode/mgmt/main/src/dndObj.c | 3 + source/util/src/tprocess.c | 4 +- 4 files changed, 101 insertions(+), 102 deletions(-) diff --git a/include/util/tprocess.h b/include/util/tprocess.h index 51ce0243b7..090762b340 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -32,19 +32,17 @@ typedef void *(*ProcConsumeFp)(void *pParent, void *pHead, int16_t headLen, void ProcFuncType ftype); typedef struct { - int32_t childQueueSize; ProcConsumeFp childConsumeFp; ProcMallocFp childMallocHeadFp; ProcFreeFp childFreeHeadFp; ProcMallocFp childMallocBodyFp; ProcFreeFp childFreeBodyFp; - int32_t parentQueueSize; ProcConsumeFp parentConsumeFp; ProcMallocFp parentdMallocHeadFp; ProcFreeFp parentFreeHeadFp; ProcMallocFp parentMallocBodyFp; ProcFreeFp parentFreeBodyFp; - bool testFlag; + SShm shm; void *pParent; const char *name; } SProcCfg; diff --git a/source/dnode/mgmt/main/src/dndExec.c b/source/dnode/mgmt/main/src/dndExec.c index b289fe91c6..bdce489f76 100644 --- a/source/dnode/mgmt/main/src/dndExec.c +++ b/source/dnode/mgmt/main/src/dndExec.c @@ -28,14 +28,18 @@ static bool dndRequireNode(SMgmtWrapper *pWrapper) { } int32_t dndOpenNode(SMgmtWrapper *pWrapper) { - int32_t code = (*pWrapper->fp.openFp)(pWrapper); - if (code != 0) { + if (taosMkDir(pWrapper->path) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr()); + return -1; + } + + if ((*pWrapper->fp.openFp)(pWrapper) != 0) { dError("node:%s, failed to open since %s", pWrapper->name, terrstr()); return -1; - } else { - dDebug("node:%s, has been opened", pWrapper->name); } + dDebug("node:%s, has been opened", pWrapper->name); pWrapper->deployed = true; return 0; } @@ -62,22 +66,13 @@ void dndCloseNode(SMgmtWrapper *pWrapper) { } static int32_t dndRunInSingleProcess(SDnode *pDnode) { - dDebug("dnode run in single process mode"); - SMsgCb msgCb = dndCreateMsgcb(&pDnode->wrappers[0]); - tmsgSetDefaultMsgCb(&msgCb); + dInfo("dnode start to run in single process"); for (ENodeType n = DNODE; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; pWrapper->required = dndRequireNode(pWrapper); if (!pWrapper->required) continue; - if (taosMkDir(pWrapper->path) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr()); - return -1; - } - - pWrapper->procType = PROC_SINGLE; if (dndOpenNode(pWrapper) != 0) { dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); return -1; @@ -96,18 +91,10 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) { } } + dInfo("dnode running in single process"); return 0; } -static void dndClearNodesExecpt(SDnode *pDnode, ENodeType except) { - // dndCleanupServer(pDnode); - for (ENodeType n = 0; n < NODE_MAX; ++n) { - if (except == n) continue; - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - pWrapper->required = false; - } -} - static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, ProcFuncType ftype) { SRpcMsg *pRpc = &pMsg->rpcMsg; @@ -153,115 +140,126 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t taosMemoryFree(pMsg); } -static int32_t dndRunInMultiProcess(SDnode *pDnode) { - dInfo("dnode run in multi process mode"); +static int32_t dndRunInParentProcess(SDnode *pDnode) { + dInfo("dnode start to run in parent process"); + SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE]; + if (dndOpenNode(pDWrapper) != 0) { + dError("node:%s, failed to start since %s", pDWrapper->name, terrstr()); + return -1; + } - for (ENodeType n = 0; n < NODE_MAX; ++n) { + for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) { SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; pWrapper->required = dndRequireNode(pWrapper); if (!pWrapper->required) continue; - SMsgCb msgCb = dndCreateMsgcb(pWrapper); - tmsgSetDefaultMsgCb(&msgCb); - - if (taosMkDir(pWrapper->path) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to create dir:%s since %s", pWrapper->path, terrstr()); + int64_t shmsize = 1024 * 1024 * 2; // size will be a configuration item + if (taosCreateShm(&pWrapper->shm, shmsize) != 0) { + terrno = TAOS_SYSTEM_ERROR(terrno); + dError("node:%s, failed to create shm size:%" PRId64 " since %s", pWrapper->name, shmsize, terrstr()); return -1; } - if (n == DNODE) { - dInfo("node:%s, will start in parent process", pWrapper->name); - pWrapper->procType = PROC_SINGLE; - if (dndOpenNode(pWrapper) != 0) { - dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); - return -1; - } - continue; + SProcCfg cfg = {.parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue, + .parentdMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, + .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, + .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .shm = pWrapper->shm, + .pParent = pWrapper, + .name = pWrapper->name}; + + pWrapper->pProc = taosProcInit(&cfg); + if (pWrapper->pProc == NULL) { + dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr()); + return -1; } + } + + if (dndWriteRuntimeFile(pDnode) != 0) { + dError("failed to write runtime file since %s", terrstr()); + return -1; + } + + for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + if (!pWrapper->required) continue; - SProcCfg cfg = {.childQueueSize = 1024 * 1024 * 2, // size will be a configuration item - .childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue, - .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, - .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, - .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, - .parentQueueSize = 1024 * 1024 * 2, // size will be a configuration item - .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue, - .parentdMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, - .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, - .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, - .pParent = pWrapper, - .name = pWrapper->name}; - SProcObj *pProc = taosProcInit(&cfg); - if (pProc == NULL) { - dError("node:%s, failed to fork since %s", pWrapper->name, terrstr()); + dInfo("node:%s, will not start in parent process", pWrapper->name); + // exec new node + + pWrapper->procType = PROC_PARENT; + if (taosProcRun(pWrapper->pProc) != 0) { + dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); return -1; } + } - pWrapper->pProc = pProc; + dndSetStatus(pDnode, DND_STAT_RUNNING); - if (taosProcIsChild(pProc)) { - dInfo("node:%s, will start in child process", pWrapper->name); - pWrapper->procType = PROC_CHILD; - // dndResetLog(pWrapper); + if ((*pDWrapper->fp.startFp)(pDWrapper) != 0) { + dError("node:%s, failed to start since %s", pDWrapper->name, terrstr()); + return -1; + } - dInfo("node:%s, clean up resources inherited from parent", pWrapper->name); - dndClearNodesExecpt(pDnode, n); + dInfo("dnode running in parent process"); + return 0; +} - dInfo("node:%s, will be initialized in child process", pWrapper->name); - if (dndOpenNode(pWrapper) != 0) { - dInfo("node:%s, failed to init in child process since %s", pWrapper->name, terrstr()); - return -1; - } +static int32_t dndRunInChildProcess(SDnode *pDnode) { + dInfo("dnode start to run in child process"); - if (taosProcRun(pProc) != 0) { - dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); - return -1; - } - break; - } else { - dInfo("node:%s, will not start in parent process, child pid:%d", pWrapper->name, taosProcChildId(pProc)); - pWrapper->procType = PROC_PARENT; - if (taosProcRun(pProc) != 0) { - dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); - return -1; - } - } + SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; + if (dndOpenNode(pWrapper) != 0) { + dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); + return -1; } - dndSetStatus(pDnode, DND_STAT_RUNNING); + SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue, + .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, + .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, + .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .shm = pWrapper->shm, + .pParent = pWrapper, + .name = pWrapper->name}; + + pWrapper->pProc = taosProcInit(&cfg); + if (pWrapper->pProc == NULL) { + dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr()); + return -1; + } - for (ENodeType n = 0; n < NODE_MAX; ++n) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - if (!pWrapper->required) continue; - if (pWrapper->fp.startFp == NULL) continue; - if (pWrapper->procType == PROC_PARENT && n != DNODE) continue; - if (pWrapper->procType == PROC_CHILD && n == DNODE) continue; - if ((*pWrapper->fp.startFp)(pWrapper) != 0) { - dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); - return -1; - } + pWrapper->procType = PROC_CHILD; + if (taosProcRun(pWrapper->pProc) != 0) { + dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); + return -1; } + dInfo("dnode running in child process"); return 0; } -int32_t dndRun(SDnode *pDnode) { +int32_t dndRun(SDnode * pDnode) { if (!tsMultiProcess) { if (dndRunInSingleProcess(pDnode) != 0) { - dError("failed to run dnode in single process mode since %s", terrstr()); + dError("failed to run dnode since %s", terrstr()); + return -1; + } + } else if (pDnode->ntype == DNODE) { + if (dndRunInParentProcess(pDnode) != 0) { + dError("failed to run dnode in parent process since %s", terrstr()); return -1; } } else { - if (dndRunInMultiProcess(pDnode) != 0) { - dError("failed to run dnode in multi process mode since %s", terrstr()); + if (dndRunInChildProcess(pDnode) != 0) { + dError("failed to run dnode in child process since %s", terrstr()); return -1; } } dndReportStartup(pDnode, "TDengine", "initialized successfully"); + dInfo("TDengine initialized successfully"); while (1) { if (pDnode->event == DND_EVENT_STOP) { diff --git a/source/dnode/mgmt/main/src/dndObj.c b/source/dnode/mgmt/main/src/dndObj.c index 6afe0c5b5f..b7e91f3039 100644 --- a/source/dnode/mgmt/main/src/dndObj.c +++ b/source/dnode/mgmt/main/src/dndObj.c @@ -101,6 +101,9 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { goto _OVER; } + SMsgCb msgCb = dndCreateMsgcb(&pDnode->wrappers[0]); + tmsgSetDefaultMsgCb(&msgCb); + dInfo("dnode object is created, data:%p", pDnode); code = 0; diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 86cec2d271..7afbe56587 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -359,8 +359,8 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { } pProc->name = pCfg->name; - pProc->pChildQueue = taosProcInitQueue(pCfg->childQueueSize); - pProc->pParentQueue = taosProcInitQueue(pCfg->parentQueueSize); + pProc->pChildQueue = taosProcInitQueue(pCfg->shm.size / 2); + pProc->pParentQueue = taosProcInitQueue(pCfg->shm.size / 2); if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) { taosProcCleanupQueue(pProc->pChildQueue); taosMemoryFree(pProc); -- GitLab