diff --git a/src/dnode/inc/dnodeModule.h b/src/dnode/inc/dnodeModule.h index b6b57be3d1760d79007658255e94870660d73446..8618de324446ca3f2504d15a6422bcca3a4b51b0 100644 --- a/src/dnode/inc/dnodeModule.h +++ b/src/dnode/inc/dnodeModule.h @@ -22,7 +22,7 @@ extern "C" { int32_t dnodeInitModules(); void dnodeStartModules(); -void dnodeCleanUpModules(); +void dnodeCleanupModules(); void dnodeProcessModuleStatus(uint32_t moduleStatus); #ifdef __cplusplus diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 76f9446e0e9deeb7a3c2d2085b1020241e38cf05..69400816295a155de26f6f0175180931096471aa 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -36,6 +36,46 @@ static void dnodeCleanupStorage(); static void dnodeSetRunStatus(SDnodeRunStatus status); static void dnodeCheckDataDirOpenned(char *dir); static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED; +static int32_t dnodeInitComponents(); +static void dnodeCleanupComponents(int32_t stepId); + +typedef struct { + const char *const name; + int (*init)(); + void (*cleanup)(); +} SDnodeComponent; + +static const SDnodeComponent SDnodeComponents[] = { + {"storage", dnodeInitStorage, dnodeCleanupStorage}, + {"vread", dnodeInitVnodeRead, dnodeCleanupVnodeRead}, + {"vwrite", dnodeInitVnodeWrite, dnodeCleanupVnodeWrite}, + {"mread", dnodeInitMnodeRead, dnodeCleanupMnodeRead}, + {"mwrite", dnodeInitMnodeWrite, dnodeCleanupMnodeWrite}, + {"mpeer", dnodeInitMnodePeer, dnodeCleanupMnodePeer}, + {"client", dnodeInitClient, dnodeCleanupClient}, + {"server", dnodeInitServer, dnodeCleanupServer}, + {"mgmt", dnodeInitMgmt, dnodeCleanupMgmt}, + {"modules", dnodeInitModules, dnodeCleanupModules}, + {"shell", dnodeInitShell, dnodeCleanupShell} +}; + +static void dnodeCleanupComponents(int32_t stepId) { + for (int32_t i = stepId; i >= 0; i--) { + SDnodeComponents[i].cleanup(); + } +} + +static int32_t dnodeInitComponents() { + int32_t code = 0; + for (int32_t i = 0; i < sizeof(SDnodeComponents) / sizeof(SDnodeComponents[0]); i++) { + if (SDnodeComponents[i].init() != 0) { + dnodeCleanupComponents(i); + code = -1; + break; + } + } + return code; +} int32_t dnodeInitSystem() { dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_INITIALIZE); @@ -67,17 +107,9 @@ int32_t dnodeInitSystem() { dPrint("start to initialize TDengine on %s", tsLocalEp); - if (dnodeInitStorage() != 0) return -1; - if (dnodeInitVnodeRead() != 0) return -1; - if (dnodeInitVnodeWrite() != 0) return -1; - if (dnodeInitMnodeRead() != 0) return -1; - if (dnodeInitMnodeWrite() != 0) return -1; - if (dnodeInitMnodePeer() != 0) return -1; - if (dnodeInitClient() != 0) return -1; - if (dnodeInitServer() != 0) return -1; - if (dnodeInitMgmt() != 0) return -1; - if (dnodeInitModules() != 0) return -1; - if (dnodeInitShell() != 0) return -1; + if (dnodeInitComponents() != 0) { + return -1; + } dnodeStartModules(); dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING); @@ -90,17 +122,7 @@ int32_t dnodeInitSystem() { void dnodeCleanUpSystem() { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) { dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); - dnodeCleanupShell(); - dnodeCleanUpModules(); - dnodeCleanupMgmt(); - dnodeCleanupServer(); - dnodeCleanupClient(); - dnodeCleanupMnodePeer(); - dnodeCleanupMnodeWrite(); - dnodeCleanupMnodeRead(); - dnodeCleanupVnodeWrite(); - dnodeCleanupVnodeRead(); - dnodeCleanupStorage(); + dnodeCleanupComponents(sizeof(SDnodeComponents) / sizeof(SDnodeComponents[0]) - 1); taos_cleanup(); taosCloseLog(); } diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index 2afc55eada6d62de53641dceb3ce1aa368fc5469..dc1515282d86abec8d1e177d963b4ba6b473464b 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -83,7 +83,7 @@ static void dnodeAllocModules() { } } -void dnodeCleanUpModules() { +void dnodeCleanupModules() { for (int32_t module = 1; module < TSDB_MOD_MAX; ++module) { if (tsModule[module].enable && tsModule[module].stopFp) { (*tsModule[module].stopFp)(); diff --git a/src/dnode/src/dnodeSystem.c b/src/dnode/src/dnodeSystem.c index 0f8dabd75a7b0e10263a721b77e765971d5272c0..97cf6f7eeb4170d4ff7771db8498e35ec5652458 100644 --- a/src/dnode/src/dnodeSystem.c +++ b/src/dnode/src/dnodeSystem.c @@ -39,11 +39,11 @@ int32_t main(int32_t argc, char *argv[]) { exit(EXIT_FAILURE); } } else if (strcmp(argv[i], "-V") == 0) { -#ifdef _SYNC +#ifdef _SYNC char *versionStr = "enterprise"; -#else +#else char *versionStr = "community"; -#endif +#endif printf("%s version: %s compatible_version: %s\n", versionStr, version, compatible_version); printf("gitinfo: %s\n", gitinfo); printf("gitinfoI: %s\n", gitinfoOfInternal); @@ -93,8 +93,6 @@ int32_t main(int32_t argc, char *argv[]) { if (dnodeInitSystem() < 0) { syslog(LOG_ERR, "Error initialize TDengine system"); closelog(); - - dnodeCleanUpSystem(); exit(EXIT_FAILURE); } diff --git a/src/dnode/src/dnodeVRead.c b/src/dnode/src/dnodeVRead.c index 1ff868d0ff5e7ae341579265d52f1deec020a5a7..72882b679bbbf06aab7a6d26b6f86b270dbde38e 100644 --- a/src/dnode/src/dnodeVRead.c +++ b/src/dnode/src/dnodeVRead.c @@ -34,7 +34,7 @@ typedef struct { } SReadMsg; typedef struct { - pthread_t thread; // thread + pthread_t thread; // thread int32_t workerId; // worker ID } SReadWorker; @@ -85,8 +85,8 @@ void dnodeCleanupVnodeRead() { } } - taosCloseQset(readQset); free(readPool.readWorker); + taosCloseQset(readQset); dPrint("dnode read is closed"); } @@ -95,7 +95,7 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { int32_t queuedMsgNum = 0; int32_t leftLen = pMsg->contLen; char *pCont = (char *) pMsg->pCont; - void *pVnode; + void *pVnode; while (leftLen > 0) { SMsgHead *pHead = (SMsgHead *) pCont; @@ -166,7 +166,7 @@ void *dnodeAllocateVnodeRqueue(void *pVnode) { } while (readPool.num < readPool.min); } - dTrace("pVnode:%p, read queue:%p is allocated", pVnode, queue); + dTrace("pVnode:%p, read queue:%p is allocated", pVnode, queue); return queue; } @@ -177,13 +177,13 @@ void dnodeFreeVnodeRqueue(void *rqueue) { // dynamically adjust the number of threads } -static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) { +static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) { SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); pRead->rpcMsg = pMsg->rpcMsg; pRead->pCont = qhandle; pRead->contLen = 0; pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; - + taos_queue queue = vnodeGetRqueue(pVnode); taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); }