diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 342d78382ab4ac680eca259739c3061b15b94a22..089cb5bb94935a4a5b40cb19db4c86ec47c5c29b 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -51,6 +51,7 @@ extern int32_t tsCompatibleModel; extern bool tsEnableSlaveQuery; extern bool tsPrintAuth; extern int64_t tsTickPerDay[3]; +extern bool tsMultiProcess; // monitor extern bool tsEnableMonitor; diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h index 068e9a69170bfd1659b2961bef220b765d74c35e..4f6f6ab878e5fa334a78f5f93520cdbdbecb209e 100644 --- a/include/dnode/mgmt/dnode.h +++ b/include/dnode/mgmt/dnode.h @@ -33,8 +33,7 @@ typedef struct SDnode SDnode; int32_t dndInit(); /** - * @brief clear the environment - * + * @brief Clear the environment */ void dndCleanup(); @@ -51,6 +50,8 @@ typedef struct { int32_t numOfDisks; } SDnodeObjCfg; +typedef enum { DND_EVENT_STOP = 1, DND_EVENT_RELOAD } EDndEvent; + /** * @brief Initialize and start the dnode. * @@ -66,6 +67,21 @@ SDnode *dndCreate(SDnodeObjCfg *pCfg); */ void dndClose(SDnode *pDnode); +/** + * @brief Run dnode until specific event is receive. + * + * @param pDnode The dnode object to run. + */ +void dndRun(SDnode *pDnode); + +/** + * @brief Handle event in the dnode. + * + * @param pDnode The dnode object to close. + * @param event The event to handle. + */ +void dndeHandleEvent(SDnode *pDnode, EDndEvent event); + #ifdef __cplusplus } #endif diff --git a/include/util/tprocess.h b/include/util/tprocess.h index 1ac81558271248f545743d5215d5dd69cd2fb545..4ce536fd968198ef5be19e888607e647e057853c 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -48,8 +48,9 @@ typedef struct { SProcObj *taosProcInit(const SProcCfg *pCfg); void taosProcCleanup(SProcObj *pProc); -int32_t taosProcStart(SProcObj *pProc); +int32_t taosProcRun(SProcObj *pProc); void taosProcStop(SProcObj *pProc); +bool taosProcIsChild(SProcObj *pProc); int32_t taosProcPutToChildQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen); int32_t taosProcPutToParentQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index e7180f2560288c7c33ebbf0996be53c9b2d3a99b..f9a5538a0152be08e9c348b900496f72cbe460ff 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -45,6 +45,7 @@ float tsRatioOfQueryCores = 1.0f; int32_t tsMaxBinaryDisplayWidth = 30; bool tsEnableSlaveQuery = 1; bool tsPrintAuth = 0; +bool tsMultiProcess = 0; // monitor bool tsEnableMonitor = 1; @@ -339,6 +340,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1; if (cfgAddBool(pCfg, "slaveQuery", tsEnableSlaveQuery, 0) != 0) return -1; if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1; + if (cfgAddBool(pCfg, "multiProcess", tsMultiProcess, 0) != 0) return -1; if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1; @@ -456,6 +458,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval; tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval; tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->bval; + tsMultiProcess = cfgGetItem(pCfg, "multiProcess")->bval; tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32; diff --git a/source/dnode/mgmt/daemon/inc/dmnInt.h b/source/dnode/mgmt/daemon/inc/dmnInt.h index 8a571352f021ab53f3405e11114257b7ecbd4fd8..6bb4b84b2a6f79f17ec9f119a5b10a46a4274a19 100644 --- a/source/dnode/mgmt/daemon/inc/dmnInt.h +++ b/source/dnode/mgmt/daemon/inc/dmnInt.h @@ -28,12 +28,19 @@ extern "C" { #endif -SDnodeObjCfg dmnGetObjCfg(); +#define dFatal(...) { if (dDebugFlag & DEBUG_FATAL) { taosPrintLog("DND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} +#define dError(...) { if (dDebugFlag & DEBUG_ERROR) { taosPrintLog("DND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} +#define dWarn(...) { if (dDebugFlag & DEBUG_WARN) { taosPrintLog("DND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} +#define dInfo(...) { if (dDebugFlag & DEBUG_INFO) { taosPrintLog("DND ", DEBUG_INFO, 255, __VA_ARGS__); }} +#define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} +#define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} void dmnDumpCfg(); void dmnPrintVersion(); void dmnGenerateGrant(); +SDnodeObjCfg dmnGetObjCfg(); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/daemon/src/dmnMain.c b/source/dnode/mgmt/daemon/src/dmnMain.c index 7ba272453ca247600024411c106af17a1dc6cd25..3c2460e607699298da31ace6fed73f36263abea9 100644 --- a/source/dnode/mgmt/daemon/src/dmnMain.c +++ b/source/dnode/mgmt/daemon/src/dmnMain.c @@ -17,18 +17,18 @@ #include "dmnInt.h" static struct { - bool stop; - bool dumpConfig; - bool generateGrant; - bool printAuth; - bool printVersion; - char envFile[PATH_MAX]; - char apolloUrl[PATH_MAX]; + bool dumpConfig; + bool generateGrant; + bool printAuth; + bool printVersion; + char envFile[PATH_MAX]; + char apolloUrl[PATH_MAX]; + SDnode *pDnode; } dmn = {0}; static void dmnSigintHandle(int signum, void *info, void *ctx) { - uInfo("singal:%d is received", signum); - dmn.stop = true; + dInfo("singal:%d is received", signum); + dndeHandleEvent(dmn.pDnode, DND_EVENT_STOP); } static void dmnSetSignalHandle() { @@ -39,13 +39,6 @@ static void dmnSetSignalHandle() { taosSetSignal(SIGBREAK, dmnSigintHandle); } -static void dmnWaitSignal() { - dmnSetSignalHandle(); - while (!dmn.stop) { - taosMsleep(100); - } -} - static int32_t dmnParseOption(int32_t argc, char const *argv[]) { for (int32_t i = 1; i < argc; ++i) { if (strcmp(argv[i], "-c") == 0) { @@ -74,20 +67,22 @@ static int32_t dmnParseOption(int32_t argc, char const *argv[]) { int32_t dmnRunDnode() { if (dndInit() != 0) { - uInfo("Failed to start TDengine, please check the log"); + dInfo("failed to initialize dnode environment since %s", terrstr()); return -1; } SDnodeObjCfg objCfg = dmnGetObjCfg(); SDnode *pDnode = dndCreate(&objCfg); if (pDnode == NULL) { - uInfo("Failed to start TDengine, please check the log"); + dError("failed to to create dnode object since %s", terrstr()); return -1; + } else { + dmn.pDnode = pDnode; } - uInfo("Started TDengine service successfully."); - dmnWaitSignal(); - uInfo("TDengine is shut down!"); + dInfo("start the TDengine service"); + dndRun(pDnode); + dInfo("start shutting down the TDengine service"); dndClose(pDnode); dndCleanup(); @@ -98,7 +93,7 @@ int32_t dmnRunDnode() { int main(int argc, char const *argv[]) { if (!taosCheckSystemIsSmallEnd()) { - uError("TDengine does not run on non-small-end machines."); + dError("failed to start TDengine since on non-small-end machines"); return -1; } @@ -117,18 +112,19 @@ int main(int argc, char const *argv[]) { } if (taosCreateLog("taosdlog", 1, configDir, dmn.envFile, dmn.apolloUrl, NULL, 0) != 0) { - uInfo("Failed to start TDengine since read config error"); + dError("failed to start TDengine since read log config error"); return -1; } if (taosInitCfg(configDir, dmn.envFile, dmn.apolloUrl, NULL, 0) != 0) { - uInfo("Failed to start TDengine since read config error"); + dError("failed to start TDengine since read config error"); return -1; } if (dmn.dumpConfig) { dmnDumpCfg(); taosCleanupCfg(); + taosCloseLog(); return 0; } diff --git a/source/dnode/mgmt/impl/inc/dndEnv.h b/source/dnode/mgmt/impl/inc/dndEnv.h index 091c1f28e8b14842cf2206cc1850030d5e924cd9..a583c61699f20bcf01a5ac4d5d24a523241577b9 100644 --- a/source/dnode/mgmt/impl/inc/dndEnv.h +++ b/source/dnode/mgmt/impl/inc/dndEnv.h @@ -139,7 +139,7 @@ typedef struct { } STransMgmt; typedef struct SDnode { - EStat stat; + EDndStatus status; SDnodeObjCfg cfg; SDnodeDir dir; TdFilePtr pLockFile; @@ -152,6 +152,7 @@ typedef struct SDnode { STransMgmt tmgmt; STfs *pTfs; SStartupReq startup; + EDndEvent event; } SDnode; int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo); diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 985ce4942844e06e67993020bc57fa6bd5e36dc3..19d17c426f7d86a3b49c1f05db96a97a95199657 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -56,16 +56,16 @@ extern "C" { #define dDebug(...) { if (dDebugFlag & DEBUG_DEBUG) { taosPrintLog("DND ", DEBUG_DEBUG, dDebugFlag, __VA_ARGS__); }} #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", DEBUG_TRACE, dDebugFlag, __VA_ARGS__); }} -typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EStat; +typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus; typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType; typedef enum { DND_ENV_INIT = 0, DND_ENV_READY = 1, DND_ENV_CLEANUP = 2 } EEnvStat; typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps); typedef int32_t (*MndMsgFp)(SDnode *pDnode, SMndMsg *pMnodeMsg); -EStat dndGetStat(SDnode *pDnode); -void dndSetStat(SDnode *pDnode, EStat stat); -const char *dndStatStr(EStat stat); +EDndStatus dndGetStatus(SDnode *pDnode); +void dndSetStatus(SDnode *pDnode, EDndStatus stat); +const char *dndStatStr(EDndStatus stat); void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc); void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup); diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c index 6bd504014cff17af68644d25972b23da9e63254e..7ba40bf42bd730d048fa3cc586e827167be551ca 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c @@ -143,7 +143,7 @@ int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) { return -1; } - return taosProcStart(pMgmt->pProcess); + return taosProcRun(pMgmt->pProcess); } return code; diff --git a/source/dnode/mgmt/impl/src/dndEnv.c b/source/dnode/mgmt/impl/src/dndEnv.c index bb33321e97c1e385db073737bb5b40246919e199..d0b38120c184d60fd903c74251cbaa2d3c1939ac 100644 --- a/source/dnode/mgmt/impl/src/dndEnv.c +++ b/source/dnode/mgmt/impl/src/dndEnv.c @@ -28,15 +28,17 @@ static int8_t once = DND_ENV_INIT; -EStat dndGetStat(SDnode *pDnode) { return pDnode->stat; } +EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; } -void dndSetStat(SDnode *pDnode, EStat stat) { - dDebug("dnode status set from %s to %s", dndStatStr(pDnode->stat), dndStatStr(stat)); - pDnode->stat = stat; +void dndSetStatus(SDnode *pDnode, EDndStatus status) { + if (pDnode->status != status) { + dDebug("dnode status set from %s to %s", dndStatStr(pDnode->status), dndStatStr(status)); + pDnode->status = status; + } } -const char *dndStatStr(EStat stat) { - switch (stat) { +const char *dndStatStr(EDndStatus status) { + switch (status) { case DND_STAT_INIT: return "init"; case DND_STAT_RUNNING: @@ -57,7 +59,7 @@ void dndReportStartup(SDnode *pDnode, char *pName, char *pDesc) { void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) { memcpy(pStartup, &pDnode->startup, sizeof(SStartupReq)); - pStartup->finished = (dndGetStat(pDnode) == DND_STAT_RUNNING); + pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING); } static TdFilePtr dndCheckRunning(char *dataDir) { @@ -165,7 +167,7 @@ SDnode *dndCreate(SDnodeObjCfg *pCfg) { return NULL; } - dndSetStat(pDnode, DND_STAT_INIT); + dndSetStatus(pDnode, DND_STAT_INIT); if (dndInitDir(pDnode, pCfg) != 0) { dError("failed to init dnode dir since %s", terrstr()); @@ -233,7 +235,7 @@ SDnode *dndCreate(SDnodeObjCfg *pCfg) { return NULL; } - dndSetStat(pDnode, DND_STAT_RUNNING); + dndSetStatus(pDnode, DND_STAT_RUNNING); dndSendStatusReq(pDnode); dndReportStartup(pDnode, "TDengine", "initialized successfully"); dInfo("dnode object is created, data:%p", pDnode); @@ -244,13 +246,13 @@ SDnode *dndCreate(SDnodeObjCfg *pCfg) { void dndClose(SDnode *pDnode) { if (pDnode == NULL) return; - if (dndGetStat(pDnode) == DND_STAT_STOPPED) { + if (dndGetStatus(pDnode) == DND_STAT_STOPPED) { dError("dnode is shutting down, data:%p", pDnode); return; } dInfo("start to close dnode, data:%p", pDnode); - dndSetStat(pDnode, DND_STAT_STOPPED); + dndSetStatus(pDnode, DND_STAT_STOPPED); dndCleanupTrans(pDnode); dndStopMgmt(pDnode); mmCleanup(pDnode); @@ -331,4 +333,12 @@ int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) { pInfo->tempdir.size = tsTempSpace.size; return tfsGetMonitorInfo(pDnode->pTfs, pInfo); -} \ No newline at end of file +} + +void dndRun(SDnode *pDnode) { + while (pDnode->event != DND_EVENT_STOP) { + taosMsleep(100); + } +} + +void dndeHandleEvent(SDnode *pDnode, EDndEvent event) { pDnode->event = event; } \ No newline at end of file diff --git a/source/dnode/mgmt/impl/src/dndMgmt.c b/source/dnode/mgmt/impl/src/dndMgmt.c index 19e269063b0a9bee6a7ca661d756fb5670e30d30..64fc1450f0fa16134be7207cdb2bd68427576276 100644 --- a/source/dnode/mgmt/impl/src/dndMgmt.c +++ b/source/dnode/mgmt/impl/src/dndMgmt.c @@ -555,7 +555,7 @@ static void *dnodeThreadRoutine(void *param) { while (true) { pthread_testcancel(); taosMsleep(200); - if (dndGetStat(pDnode) != DND_STAT_RUNNING || pMgmt->dropped) { + if (dndGetStatus(pDnode) != DND_STAT_RUNNING || pMgmt->dropped) { continue; } diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index 20f6928afdec04f9ba8c70f646a9288e2cba128c..831fecb855b7014378d6142b27c1d3b7866c84f0 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -161,7 +161,7 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { tmsg_t msgType = pRsp->msgType; - if (dndGetStat(pDnode) == DND_STAT_STOPPED) { + if (dndGetStatus(pDnode) == DND_STAT_STOPPED) { if (pRsp == NULL || pRsp->pCont == NULL) return; dTrace("RPC %p, rsp:%s ignored since dnode exiting, app:%p", pRsp->handle, TMSG_INFO(msgType), pRsp->ahandle); rpcFreeCont(pRsp->pCont); @@ -229,13 +229,13 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { return; } - if (dndGetStat(pDnode) == DND_STAT_STOPPED) { + if (dndGetStatus(pDnode) == DND_STAT_STOPPED) { dError("RPC %p, req:%s ignored since dnode exiting, app:%p", pReq->handle, TMSG_INFO(msgType), pReq->ahandle); SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_DND_OFFLINE, .ahandle = pReq->ahandle}; rpcSendResponse(&rspMsg); rpcFreeCont(pReq->pCont); return; - } else if (dndGetStat(pDnode) != DND_STAT_RUNNING) { + } else if (dndGetStatus(pDnode) != DND_STAT_RUNNING) { dError("RPC %p, req:%s ignored since dnode not running, app:%p", pReq->handle, TMSG_INFO(msgType), pReq->ahandle); SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_APP_NOT_READY, .ahandle = pReq->ahandle}; rpcSendResponse(&rspMsg); diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 6dedc3f7406d3653c1513cb345b68cd17c966e1e..47aaa23825ce13b881c1f5bd133fc2fd45c56905 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -141,6 +141,7 @@ void taosCloseLog() { if (taosCheckPthreadValid(tsLogObj.logHandle->asyncThread)) { pthread_join(tsLogObj.logHandle->asyncThread, NULL); } + tsLogInited = 0; // In case that other threads still use log resources causing invalid write in valgrind // we comment two lines below. // taosLogBuffDestroy(tsLogObj.logHandle); diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 5842604d2287eb1c9240b98a39814a1aded86bdb..54cd894692358d01dca6508f79ce16519c038315 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -374,8 +374,6 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { if (!pProc->testFlag) { pProc->pid = fork(); if (pProc->pid == 0) { - // tsLogInited = 0; - taosInitLog("mnodelog", 1); pProc->isChild = 1; uInfo("this is child process, pid:%d", pProc->pid); } else { @@ -410,7 +408,7 @@ static void taosProcThreadLoop(SProcQueue *pQueue) { } } -int32_t taosProcStart(SProcObj *pProc) { +int32_t taosProcRun(SProcObj *pProc) { pthread_attr_t thAttr = {0}; pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); @@ -442,6 +440,8 @@ void taosProcStop(SProcObj *pProc) { // join } +bool taosProcIsChild(SProcObj *pProc) { return pProc->isChild; } + void taosProcCleanup(SProcObj *pProc) { if (pProc != NULL) { uDebug("proc:%s, clean up", pProc->name);