diff --git a/src/dnode/inc/dnodeModule.h b/src/dnode/inc/dnodeModule.h index b6b57be3d1760d79007658255e94870660d73446..8618de324446ca3f2504d15a6422bcca3a4b51b0 100644 --- a/src/dnode/inc/dnodeModule.h +++ b/src/dnode/inc/dnodeModule.h @@ -22,7 +22,7 @@ extern "C" { int32_t dnodeInitModules(); void dnodeStartModules(); -void dnodeCleanUpModules(); +void dnodeCleanupModules(); void dnodeProcessModuleStatus(uint32_t moduleStatus); #ifdef __cplusplus diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 940b884927719986009d6934802e2972cab49399..6a9299902c2307da859cbe69acc9721a8b319bc5 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -41,6 +41,25 @@ static void dnodeSetRunStatus(SDnodeRunStatus status); static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context); static void dnodeCheckDataDirOpenned(char *dir); static SDnodeRunStatus tsDnodeRunStatus = TSDB_DNODE_RUN_STATUS_STOPPED; +static int32_t dnodeInitSteps(); +static void dnodeCleanupSteps(int32_t stepId); + +typedef struct { + const char *const name; + int (*init)(); + void (*cleanup)(); +} DnodeStep; + +static const DnodeStep DnodeSteps[] = { + {"storage", dnodeInitStorage, dnodeCleanupStorage}, + {"read", dnodeInitRead, dnodeCleanupRead}, + {"write", dnodeInitWrite, dnodeCleanupWrite}, + {"mclient", dnodeInitMClient, dnodeCleanupMClient}, + {"modules", dnodeInitModules, dnodeCleanupModules}, + {"mnode", dnodeInitMnode, dnodeCleanupMnode}, + {"mgmt", dnodeInitMgmt, dnodeCleanupMgmt}, + {"shell", dnodeInitShell, dnodeCleanupShell}, +}; int32_t main(int32_t argc, char *argv[]) { // Set global configuration file @@ -53,11 +72,11 @@ int32_t main(int32_t argc, char *argv[]) { exit(EXIT_FAILURE); } } else if (strcmp(argv[i], "-V") == 0) { -#ifdef _SYNC +#ifdef _SYNC char *versionStr = "enterprise"; -#else +#else char *versionStr = "community"; -#endif +#endif printf("%s version: %s compatible_version: %s\n", versionStr, version, compatible_version); printf("gitinfo: %s\n", gitinfo); printf("gitinfoI: %s\n", gitinfoOfInternal); @@ -102,8 +121,6 @@ int32_t main(int32_t argc, char *argv[]) { if (dnodeInitSystem() < 0) { syslog(LOG_ERR, "Error initialize TDengine system"); closelog(); - - dnodeCleanUpSystem(); exit(EXIT_FAILURE); } @@ -135,6 +152,24 @@ static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) { exit(EXIT_SUCCESS); } +static void dnodeCleanupSteps(int32_t stepId) { + for (int32_t i = stepId; i >= 0; i--) { + DnodeSteps[i].cleanup(); + } +} + +static int32_t dnodeInitSteps() { + int32_t code = 0; + for (int32_t i = 0; i < sizeof(DnodeSteps) / sizeof(DnodeSteps[0]); i++) { + if (DnodeSteps[i].init() != 0) { + dnodeCleanupSteps(i); + code = -1; + break; + } + } + return code; +} + static int32_t dnodeInitSystem() { dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_INITIALIZE); tscEmbedded = 1; @@ -164,14 +199,9 @@ static int32_t dnodeInitSystem() { dPrint("start to initialize TDengine on %s", tsLocalEp); - if (dnodeInitStorage() != 0) return -1; - if (dnodeInitRead() != 0) return -1; - if (dnodeInitWrite() != 0) return -1; - if (dnodeInitMClient() != 0) return -1; - if (dnodeInitModules() != 0) return -1; - if (dnodeInitMnode() != 0) return -1; - if (dnodeInitMgmt() != 0) return -1; - if (dnodeInitShell() != 0) return -1; + if (dnodeInitSteps() != 0) { + return -1; + } dnodeStartModules(); dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_RUNING); @@ -184,16 +214,8 @@ static int32_t dnodeInitSystem() { static void dnodeCleanUpSystem() { if (dnodeGetRunStatus() != TSDB_DNODE_RUN_STATUS_STOPPED) { dnodeSetRunStatus(TSDB_DNODE_RUN_STATUS_STOPPED); - dnodeCleanupShell(); - dnodeCleanupMnode(); - dnodeCleanupMgmt(); - dnodeCleanupMClient(); - dnodeCleanupWrite(); - dnodeCleanupRead(); - dnodeCleanUpModules(); + dnodeCleanupSteps(sizeof(DnodeSteps) / sizeof(DnodeSteps[0]) - 1); taos_cleanup(); - dnodeCleanupStorage(); - taosCloseLog(); } } diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index e1aa48d47786898cb511b55a7913e8ee48d9158c..e63781283c2ec4d0c8e95d50609ddd7583a51868 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -72,7 +72,7 @@ static void dnodeAllocModules() { } } -void dnodeCleanUpModules() { +void dnodeCleanupModules() { for (int32_t module = 1; module < TSDB_MOD_MAX; ++module) { if (tsModule[module].enable && tsModule[module].stopFp) { (*tsModule[module].stopFp)(); diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index e52b59d20a38ad9242eb7103885b4e6a5c7691b5..c2beb28e6411771e778bcd20d031925f1673cbec 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -34,7 +34,7 @@ typedef struct { } SReadMsg; typedef struct { - pthread_t thread; // thread + pthread_t thread; // thread int32_t workerId; // worker ID } SReadWorker; @@ -74,10 +74,10 @@ void dnodeCleanupRead() { for (int i=0; i < readPool.max; ++i) { SReadWorker *pWorker = readPool.readWorker + i; - if (pWorker->thread) + if (pWorker->thread) pthread_join(pWorker->thread, NULL); } - + free(readPool.readWorker); taosCloseQset(readQset); dPrint("dnode read is closed"); } @@ -86,7 +86,7 @@ void dnodeRead(SRpcMsg *pMsg) { int32_t queuedMsgNum = 0; int32_t leftLen = pMsg->contLen; char *pCont = (char *) pMsg->pCont; - void *pVnode; + void *pVnode; dTrace("dnode %s msg incoming, thandle:%p", taosMsg[pMsg->msgType], pMsg->handle); @@ -159,7 +159,7 @@ void *dnodeAllocateRqueue(void *pVnode) { } while (readPool.num < readPool.min); } - dTrace("pVnode:%p, read queue:%p is allocated", pVnode, queue); + dTrace("pVnode:%p, read queue:%p is allocated", pVnode, queue); return queue; } @@ -170,13 +170,13 @@ void dnodeFreeRqueue(void *rqueue) { // dynamically adjust the number of threads } -static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) { +static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) { SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); pRead->rpcMsg = pMsg->rpcMsg; pRead->pCont = qhandle; pRead->contLen = 0; pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; - + taos_queue queue = vnodeGetRqueue(pVnode); taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); }