diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 90aac6edcdec34c4efee9abacb995c7d3827f9f3..089cb5bb94935a4a5b40cb19db4c86ec47c5c29b 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -51,7 +51,7 @@ extern int32_t tsCompatibleModel; extern bool tsEnableSlaveQuery; extern bool tsPrintAuth; extern int64_t tsTickPerDay[3]; -extern int32_t tsMultiProcess; +extern bool tsMultiProcess; // monitor extern bool tsEnableMonitor; diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h index 8d19ce23dfdfce22a3e0e9b0961f58aa9319bc74..e4f4bdf8f994c31df6f31ce6df1347e384df5de6 100644 --- a/include/dnode/mgmt/dnode.h +++ b/include/dnode/mgmt/dnode.h @@ -46,11 +46,12 @@ typedef struct { char localFqdn[TSDB_FQDN_LEN]; char firstEp[TSDB_EP_LEN]; char secondEp[TSDB_EP_LEN]; - SDiskCfg *pDisks; + SDiskCfg *disks; int32_t numOfDisks; + int8_t ntype; } SDnodeOpt; -typedef enum { DND_EVENT_START, DND_EVENT_STOP = 1, DND_EVENT_RELOAD } EDndEvent; +typedef enum { DND_EVENT_START, DND_EVENT_STOP = 1, DND_EVENT_CHILD } EDndEvent; /** * @brief Initialize and start the dnode. diff --git a/include/os/os.h b/include/os/os.h index fa9d61b99705bc81dda3aa9dc68c9738d88239d8..0d0c30813457ade8bf965d2840477024e0bed460 100644 --- a/include/os/os.h +++ b/include/os/os.h @@ -73,7 +73,6 @@ extern "C" { #include #include - #include "osAtomic.h" #include "osDef.h" #include "osDir.h" @@ -87,6 +86,7 @@ extern "C" { #include "osThread.h" #include "osSemaphore.h" #include "osSignal.h" +#include "osShm.h" #include "osSleep.h" #include "osSocket.h" #include "osString.h" diff --git a/include/os/osShm.h b/include/os/osShm.h new file mode 100644 index 0000000000000000000000000000000000000000..82ee2339f23bd534e31744bc9197002b0bb77bc3 --- /dev/null +++ b/include/os/osShm.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_OS_SHM_H_ +#define _TD_OS_SHM_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct { + int32_t id; + int32_t size; + void* ptr; +} SShm; + +int32_t taosCreateShm(SShm *pShm, int32_t shmsize) ; +void taosDropShm(SShm *pShm); +int32_t taosAttachShm(SShm *pShm); +void taosDetachShm(SShm *pShm); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_OS_SHM_H_*/ diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index efd790ade8c143dd40b5cb9b0bfe749e0f57b06f..c79e1531229330ab4611319b77f5305d81d80b95 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -45,7 +45,7 @@ float tsRatioOfQueryCores = 1.0f; int32_t tsMaxBinaryDisplayWidth = 30; bool tsEnableSlaveQuery = 1; bool tsPrintAuth = 0; -int32_t tsMultiProcess = 0; +bool tsMultiProcess = 0; // monitor bool tsEnableMonitor = 1; @@ -347,7 +347,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1; if (cfgAddBool(pCfg, "slaveQuery", tsEnableSlaveQuery, 0) != 0) return -1; if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1; - if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1; + if (cfgAddBool(pCfg, "multiProcess", tsMultiProcess, 0) != 0) return -1; if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1; @@ -466,7 +466,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval; tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval; tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->bval; - tsMultiProcess = cfgGetItem(pCfg, "multiProcess")->i32; + tsMultiProcess = cfgGetItem(pCfg, "multiProcess")->bval; tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32; diff --git a/source/dnode/mgmt/main/exe/dndMain.c b/source/dnode/mgmt/main/exe/dndMain.c index 634e2caa28462049fc092492bf6ed778469ef846..525b26d967727e531ba2a92f93be90c0db986bff 100644 --- a/source/dnode/mgmt/main/exe/dndMain.c +++ b/source/dnode/mgmt/main/exe/dndMain.c @@ -29,7 +29,7 @@ static struct { ENodeType ntype; } global = {0}; -static void dndSigintHandle(int signum, void *info, void *ctx) { +static void dndStopDnode(int signum, void *info, void *ctx) { dInfo("signal:%d is received", signum); SDnode *pDnode = atomic_val_compare_exchange_ptr(&global.pDnode, 0, global.pDnode); if (pDnode != NULL) { @@ -37,12 +37,29 @@ static void dndSigintHandle(int signum, void *info, void *ctx) { } } +static void dndHandleChild(int signum, void *info, void *ctx) { + dInfo("signal:%d is received", signum); + dndHandleEvent(global.pDnode, DND_EVENT_CHILD); +} + static void dndSetSignalHandle() { - taosSetSignal(SIGTERM, dndSigintHandle); - taosSetSignal(SIGHUP, dndSigintHandle); - taosSetSignal(SIGINT, dndSigintHandle); - taosSetSignal(SIGABRT, dndSigintHandle); - taosSetSignal(SIGBREAK, dndSigintHandle); + taosSetSignal(SIGTERM, dndStopDnode); + taosSetSignal(SIGHUP, dndStopDnode); + taosSetSignal(SIGINT, dndStopDnode); + taosSetSignal(SIGABRT, dndStopDnode); + taosSetSignal(SIGBREAK, dndStopDnode); + + if (!tsMultiProcess) { + // Set the single process signal + } else if (global.ntype == DNODE) { + // Set the parent process signal + // When the child process exits, the parent process receives a signal + taosSetSignal(SIGCHLD, dndHandleChild); + } else { + // Set child process signal + // When the parent process exits, the child process will receive the SIGKILL signal + prctl(PR_SET_PDEATHSIG, SIGKILL); + } } static int32_t dndParseArgs(int32_t argc, char const *argv[]) { @@ -109,8 +126,9 @@ static SDnodeOpt dndGetOpt() { option.serverPort = tsServerPort; tstrncpy(option.localFqdn, tsLocalFqdn, sizeof(option.localFqdn)); snprintf(option.localEp, sizeof(option.localEp), "%s:%u", option.localFqdn, option.serverPort); - option.pDisks = tsDiskCfg; + option.disks = tsDiskCfg; option.numOfDisks = tsDiskCfgNum; + option.ntype = global.ntype; return option; } @@ -121,7 +139,7 @@ static int32_t dndInitLog() { } static void dndSetProcName(char **argv) { - if (global.ntype != 0) { + if (global.ntype != DNODE) { const char *name = dndNodeProcStr(global.ntype); prctl(PR_SET_NAME, name); strcpy(argv[0], name); diff --git a/source/dnode/mgmt/main/inc/dnd.h b/source/dnode/mgmt/main/inc/dnd.h index 046200510dd408d5abaacf1634b051fddae2cd83..b416ee4f7aa84dc48fa1734a1ab0078345f99636 100644 --- a/source/dnode/mgmt/main/inc/dnd.h +++ b/source/dnode/mgmt/main/inc/dnd.h @@ -96,10 +96,11 @@ typedef struct SMgmtWrapper { bool required; EProcType procType; SProcObj *pProc; + SShm shm; void *pMgmt; SDnode *pDnode; NodeMsgFp msgFps[TDMT_MAX]; - int32_t msgVgIds[TDMT_MAX]; // Handle the case where the same message type is distributed to qnode or vnode + int8_t msgVgIds[TDMT_MAX]; // Handle the case where the same message type is distributed to qnode or vnode SMgmtFp fp; } SMgmtWrapper; @@ -119,14 +120,15 @@ typedef struct SDnode { char *firstEp; char *secondEp; char *dataDir; - SDiskCfg *pDisks; + SDiskCfg *disks; int32_t numOfDisks; uint16_t serverPort; bool dropped; + ENodeType ntype; EDndStatus status; EDndEvent event; SStartupReq startup; - TdFilePtr pLockFile; + TdFilePtr runtimeFile; STransMgmt trans; SMgmtWrapper wrappers[NODE_MAX]; } SDnode; @@ -135,7 +137,7 @@ const char *dndNodeLogStr(ENodeType ntype); const char *dndNodeProcStr(ENodeType ntype); EDndStatus dndGetStatus(SDnode *pDnode); void dndSetStatus(SDnode *pDnode, EDndStatus stat); -void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp, int32_t vgId); +void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId); void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); void dndSendMonitorReport(SDnode *pDnode); diff --git a/source/dnode/mgmt/main/inc/dndInt.h b/source/dnode/mgmt/main/inc/dndInt.h index 0f0cc78a1d088474c5ae04043f38df64f31a5b08..56782f872ba0efe9dae17f76a30c5769ebc20a72 100644 --- a/source/dnode/mgmt/main/inc/dndInt.h +++ b/source/dnode/mgmt/main/inc/dndInt.h @@ -34,7 +34,6 @@ int32_t dndInit(); void dndCleanup(); const char *dndStatStr(EDndStatus stat); void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup); -TdFilePtr dndCheckRunning(const char *dataDir); void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); // dndMsg.c @@ -58,6 +57,11 @@ void dndCleanupClient(SDnode *pDnode); int32_t dndInitMsgHandle(SDnode *pDnode); void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); +// dndFile.c +int32_t dndOpenRuntimeFile(SDnode *pDnode); +int32_t dndWriteRuntimeFile(SDnode *pDnode); +void dndCloseRuntimeFile(SDnode *pDnode); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/main/src/dndExec.c b/source/dnode/mgmt/main/src/dndExec.c index 5a9077a93794c36fe356b75c96469b4cc17bd4d4..c41d4f28e40dce726ef43015254c54c2720f9bf6 100644 --- a/source/dnode/mgmt/main/src/dndExec.c +++ b/source/dnode/mgmt/main/src/dndExec.c @@ -259,7 +259,7 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { } int32_t dndRun(SDnode *pDnode) { - if (tsMultiProcess == 0) { + if (!tsMultiProcess) { if (dndRunInSingleProcess(pDnode) != 0) { dError("failed to run dnode in single process mode since %s", terrstr()); return -1; diff --git a/source/dnode/mgmt/main/src/dndFile.c b/source/dnode/mgmt/main/src/dndFile.c index ab04040b57b710c19b33410dbeab87c221b5b243..51d4ff3902e207336d90592d973efa566ae78df8 100644 --- a/source/dnode/mgmt/main/src/dndFile.c +++ b/source/dnode/mgmt/main/src/dndFile.c @@ -19,13 +19,12 @@ #define MAXLEN 1024 int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed) { - int32_t code = TSDB_CODE_NODE_PARSE_FILE_ERROR; - int32_t len = 0; - const int32_t maxLen = MAXLEN; - char content[MAXLEN + 1] = {0}; - cJSON *root = NULL; - char file[PATH_MAX]; - TdFilePtr pFile = NULL; + int32_t code = TSDB_CODE_NODE_PARSE_FILE_ERROR; + int64_t len = 0; + char content[MAXLEN + 1] = {0}; + cJSON *root = NULL; + char file[PATH_MAX]; + TdFilePtr pFile = NULL; snprintf(file, sizeof(file), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name); pFile = taosOpenFile(file, TD_FILE_READ); @@ -35,13 +34,12 @@ int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed) { goto _OVER; } - len = (int32_t)taosReadFile(pFile, content, maxLen); + len = taosReadFile(pFile, content, MAXLEN); if (len <= 0) { dError("failed to read %s since content is null", file); goto _OVER; } - content[len] = 0; root = cJSON_Parse(content); if (root == NULL) { dError("failed to read %s since invalid json format", file); @@ -55,8 +53,8 @@ int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed) { } *pDeployed = deployed->valueint != 0; - code = 0; dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed); + code = 0; _OVER: if (root != NULL) cJSON_Delete(root); @@ -67,30 +65,40 @@ _OVER: } int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed) { - char file[PATH_MAX] = {0}; + int32_t code = -1; + int32_t len = 0; + char content[MAXLEN + 1] = {0}; + char file[PATH_MAX] = {0}; + char realfile[PATH_MAX] = {0}; + TdFilePtr pFile = NULL; + snprintf(file, sizeof(file), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name); + snprintf(realfile, sizeof(realfile), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name); - TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); dError("failed to write %s since %s", file, terrstr()); - return -1; + goto _OVER; } - int32_t len = 0; - const int32_t maxLen = MAXLEN; - char content[MAXLEN + 1] = {0}; + len += snprintf(content + len, MAXLEN - len, "{\n"); + len += snprintf(content + len, MAXLEN - len, " \"deployed\": %d\n", deployed); + len += snprintf(content + len, MAXLEN - len, "}\n"); - len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"deployed\": %d\n", deployed); - len += snprintf(content + len, maxLen - len, "}\n"); + if (taosWriteFile(pFile, content, len) != len) { + terrno = TAOS_SYSTEM_ERROR(errno); + dError("failed to write file:%s since %s", file, terrstr()); + goto _OVER; + } - taosWriteFile(pFile, content, len); - taosFsyncFile(pFile); - taosCloseFile(&pFile); + if (taosFsyncFile(pFile) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + dError("failed to fsync file:%s since %s", file, terrstr()); + goto _OVER; + } - char realfile[PATH_MAX] = {0}; - snprintf(realfile, sizeof(realfile), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name); + taosCloseFile(&pFile); if (taosRenameFile(file, realfile) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); @@ -99,5 +107,158 @@ int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed) { } dInfo("successed to write %s, deployed:%d", realfile, deployed); - return 0; + code = 0; + +_OVER: + if (pFile != NULL) { + taosCloseFile(&pFile); + } + + return code; +} + +int32_t dndOpenRuntimeFile(SDnode *pDnode) { + int32_t code = -1; + char itemName[24] = {0}; + char content[MAXLEN + 1] = {0}; + char file[PATH_MAX] = {0}; + cJSON *root = NULL; + TdFilePtr pFile = NULL; + + snprintf(file, sizeof(file), "%s%s.running", pDnode->dataDir, TD_DIRSEP); + pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + dError("failed to open file:%s since %s", file, terrstr()); + goto _OVER; + } + + if (taosLockFile(pFile) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + dError("failed to lock file:%s since %s", file, terrstr()); + goto _OVER; + } + + if (taosReadFile(pFile, content, MAXLEN) > 0) { + root = cJSON_Parse(content); + if (root == NULL) { + terrno = TSDB_CODE_NODE_PARSE_FILE_ERROR; + dError("failed to read %s since invalid json format", file); + goto _OVER; + } + + for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) { + snprintf(itemName, sizeof(itemName), "%s_shmid", dndNodeProcStr(ntype)); + cJSON *shmid = cJSON_GetObjectItem(root, itemName); + if (shmid && shmid->type == cJSON_Number) { + pDnode->wrappers[ntype].shm.id = shmid->valueint; + } + + snprintf(itemName, sizeof(itemName), "%s_shmsize", dndNodeProcStr(ntype)); + cJSON *shmsize = cJSON_GetObjectItem(root, itemName); + if (shmsize && shmsize->type == cJSON_Number) { + pDnode->wrappers[ntype].shm.size = shmsize->valueint; + } + } + } + + if (tsMultiProcess || pDnode->ntype == DNODE) { + for (ENodeType ntype = DNODE; ntype < NODE_MAX; ++ntype) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; + if (pWrapper->shm.id > 0) { + dDebug("shmid:%d, is closed, size:%d", pWrapper->shm.id, pWrapper->shm.size); + taosDropShm(&pWrapper->shm); + } + } + } else { + SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; + if (taosAttachShm(&pWrapper->shm) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + dError("shmid:%d, failed to attach since %s", pWrapper->shm.id, terrstr()); + goto _OVER; + } + dDebug("shmid:%d, is attached, size:%d", pWrapper->shm.id, pWrapper->shm.size); + } + + dDebug("successed to open %s", file); + code = 0; + +_OVER: + if (root != NULL) cJSON_Delete(root); + if (code != 0) { + if (pFile != NULL) taosCloseFile(&pFile); + } else { + pDnode->runtimeFile = pFile; + } + + return code; +} + +int32_t dndWriteRuntimeFile(SDnode *pDnode) { + int32_t code = -1; + int32_t len = 0; + char content[MAXLEN + 1] = {0}; + char file[PATH_MAX] = {0}; + char realfile[PATH_MAX] = {0}; + TdFilePtr pFile = NULL; + + snprintf(file, sizeof(file), "%s%s.running.bak", pDnode->dataDir, TD_DIRSEP); + snprintf(realfile, sizeof(realfile), "%s%s.running", pDnode->dataDir, TD_DIRSEP); + + pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (pFile == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + dError("failed to open file:%s since %s", file, terrstr()); + goto _OVER; + } + + len += snprintf(content + len, MAXLEN - len, "{\n"); + for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; + len += snprintf(content + len, MAXLEN - len, " \"%s_shmid\": %d,\n", dndNodeProcStr(ntype), pWrapper->shm.id); + if (ntype == NODE_MAX - 1) { + len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\": %d\n", dndNodeProcStr(ntype), pWrapper->shm.size); + } else { + len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\": %d,\n", dndNodeProcStr(ntype), pWrapper->shm.size); + } + } + len += snprintf(content + len, MAXLEN - len, "}\n"); + + if (taosWriteFile(pFile, content, len) != len) { + terrno = TAOS_SYSTEM_ERROR(errno); + dError("failed to write file:%s since %s", file, terrstr()); + goto _OVER; + } + + if (taosFsyncFile(pFile) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + dError("failed to fsync file:%s since %s", file, terrstr()); + goto _OVER; + } + + taosCloseFile(&pFile); + + if (taosRenameFile(file, realfile) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + dError("failed to rename %s to %s since %s", file, realfile, terrstr()); + return -1; + } + + dDebug("successed to write %s", realfile); + code = 0; + +_OVER: + if (pFile != NULL) { + taosCloseFile(&pFile); + } + + return code; } + +void dndCloseRuntimeFile(SDnode *pDnode) { + if (pDnode->runtimeFile) { + taosUnLockFile(pDnode->runtimeFile); + taosCloseFile(&pDnode->runtimeFile); + pDnode->runtimeFile = NULL; + } +} \ No newline at end of file diff --git a/source/dnode/mgmt/main/src/dndInt.c b/source/dnode/mgmt/main/src/dndInt.c index a9b19821385dfe6d417bace157c236e9b0861d73..7dde3561fbb49ddf4a240793698d941f516dcccf 100644 --- a/source/dnode/mgmt/main/src/dndInt.c +++ b/source/dnode/mgmt/main/src/dndInt.c @@ -20,7 +20,7 @@ static int8_t once = DND_ENV_INIT; int32_t dndInit() { - dInfo("start to init dnode env"); + dDebug("start to init dnode env"); if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) { terrno = TSDB_CODE_REPEAT_INIT; dError("failed to init dnode env since %s", terrstr()); @@ -31,12 +31,6 @@ int32_t dndInit() { taosBlockSIGPIPE(); taosResolveCRC(); - if (rpcInit() != 0) { - dError("failed to init rpc since %s", terrstr()); - dndCleanup(); - return -1; - } - SMonCfg monCfg = {0}; monCfg.maxLogs = tsMonitorMaxLogs; monCfg.port = tsMonitorPort; @@ -44,29 +38,27 @@ int32_t dndInit() { monCfg.comp = tsMonitorComp; if (monInit(&monCfg) != 0) { dError("failed to init monitor since %s", terrstr()); - dndCleanup(); return -1; } - dInfo("dnode env is initialized"); + dDebug("dnode env is initialized"); return 0; } void dndCleanup() { - dInfo("start to cleanup dnode env"); + dDebug("start to cleanup dnode env"); if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) { dError("dnode env is already cleaned up"); return; } monCleanup(); - rpcCleanup(); walCleanUp(); taosStopCacheRefreshWorker(); - dInfo("dnode env is cleaned up"); + dDebug("dnode env is cleaned up"); } -void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp, int32_t vgId) { +void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) { pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp; pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId; } @@ -92,29 +84,6 @@ void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) { pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING); } -TdFilePtr dndCheckRunning(const char *dataDir) { - char filepath[PATH_MAX] = {0}; - snprintf(filepath, sizeof(filepath), "%s/.running", dataDir); - - TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (pFile == NULL) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to lock file:%s since %s", filepath, terrstr()); - return NULL; - } - - int32_t ret = taosLockFile(pFile); - if (ret != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - dError("failed to lock file:%s since %s", filepath, terrstr()); - taosCloseFile(&pFile); - return NULL; - } - - dDebug("file:%s is locked", filepath); - return pFile; -} - void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { dDebug("startup req is received"); SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq)); diff --git a/source/dnode/mgmt/main/src/dndObj.c b/source/dnode/mgmt/main/src/dndObj.c index b9ea8df8080f6cd63ec738f1a65a9b9398d3dd0e..99dc782a9b84f7d77787c4fb1e17833ef768d136 100644 --- a/source/dnode/mgmt/main/src/dndObj.c +++ b/source/dnode/mgmt/main/src/dndObj.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "dndInt.h" -static int32_t dndInitMemory(SDnode *pDnode, const SDnodeOpt *pOption) { +static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { pDnode->numOfSupportVnodes = pOption->numOfSupportVnodes; pDnode->serverPort = pOption->serverPort; pDnode->dataDir = strdup(pOption->dataDir); @@ -24,8 +24,9 @@ static int32_t dndInitMemory(SDnode *pDnode, const SDnodeOpt *pOption) { pDnode->localFqdn = strdup(pOption->localFqdn); pDnode->firstEp = strdup(pOption->firstEp); pDnode->secondEp = strdup(pOption->secondEp); - pDnode->pDisks = pOption->pDisks; + pDnode->disks = pOption->disks; pDnode->numOfDisks = pOption->numOfDisks; + pDnode->ntype = pOption->ntype; pDnode->rebootTime = taosGetTimestampMs(); if (pDnode->dataDir == NULL || pDnode->localEp == NULL || pDnode->localFqdn == NULL || pDnode->firstEp == NULL || @@ -36,16 +37,12 @@ static int32_t dndInitMemory(SDnode *pDnode, const SDnodeOpt *pOption) { return 0; } -static void dndClearMemory(SDnode *pDnode) { +static void dndClearVars(SDnode *pDnode) { for (ENodeType n = 0; n < NODE_MAX; ++n) { SMgmtWrapper *pMgmt = &pDnode->wrappers[n]; taosMemoryFreeClear(pMgmt->path); } - if (pDnode->pLockFile != NULL) { - taosUnLockFile(pDnode->pLockFile); - taosCloseFile(&pDnode->pLockFile); - pDnode->pLockFile = NULL; - } + dndCloseRuntimeFile(pDnode); taosMemoryFreeClear(pDnode->localEp); taosMemoryFreeClear(pDnode->localFqdn); taosMemoryFreeClear(pDnode->firstEp); @@ -67,13 +64,21 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { goto _OVER; } - if (dndInitMemory(pDnode, pOption) != 0) { + if (dndInitVars(pDnode, pOption) != 0) { + dError("failed to init variables since %s", terrstr()); goto _OVER; } dndSetStatus(pDnode, DND_STAT_INIT); - pDnode->pLockFile = dndCheckRunning(pDnode->dataDir); - if (pDnode->pLockFile == NULL) { + dmGetMgmtFp(&pDnode->wrappers[DNODE]); + mmGetMgmtFp(&pDnode->wrappers[MNODE]); + vmGetMgmtFp(&pDnode->wrappers[VNODES]); + qmGetMgmtFp(&pDnode->wrappers[QNODE]); + smGetMgmtFp(&pDnode->wrappers[SNODE]); + bmGetMgmtFp(&pDnode->wrappers[BNODE]); + + if (dndOpenRuntimeFile(pDnode) != 0) { + dError("failed to open runtime file since %s", terrstr()); goto _OVER; } @@ -87,13 +92,6 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { goto _OVER; } - dmGetMgmtFp(&pDnode->wrappers[DNODE]); - mmGetMgmtFp(&pDnode->wrappers[MNODE]); - vmGetMgmtFp(&pDnode->wrappers[VNODES]); - qmGetMgmtFp(&pDnode->wrappers[QNODE]); - smGetMgmtFp(&pDnode->wrappers[SNODE]); - bmGetMgmtFp(&pDnode->wrappers[BNODE]); - if (dndInitMsgHandle(pDnode) != 0) { goto _OVER; } @@ -116,7 +114,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { _OVER: if (code != 0 && pDnode) { - dndClearMemory(pDnode); + dndClearVars(pDnode); pDnode = NULL; dError("failed to create dnode object since %s", terrstr()); } else { @@ -145,7 +143,7 @@ void dndClose(SDnode *pDnode) { dndCloseNode(pWrapper); } - dndClearMemory(pDnode); + dndClearVars(pDnode); dInfo("dnode object is closed, data:%p", pDnode); } diff --git a/source/dnode/mgmt/vm/src/vmInt.c b/source/dnode/mgmt/vm/src/vmInt.c index e40c2658e4cd8a97fd1bbe14fea0c0524796ce89..b52c6253dc9e06026acb7a3656c691a46d1a89a3 100644 --- a/source/dnode/mgmt/vm/src/vmInt.c +++ b/source/dnode/mgmt/vm/src/vmInt.c @@ -285,7 +285,7 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) { tstrncpy(dCfg.dir, pDnode->dataDir, TSDB_FILENAME_LEN); dCfg.level = 0; dCfg.primary = 1; - SDiskCfg *pDisks = pDnode->pDisks; + SDiskCfg *pDisks = pDnode->disks; int32_t numOfDisks = pDnode->numOfDisks; if (numOfDisks <= 0 || pDisks == NULL) { pDisks = &dCfg; diff --git a/source/os/src/osShm.c b/source/os/src/osShm.c new file mode 100644 index 0000000000000000000000000000000000000000..e7a22c3da15357809b02d8beefec7b5c71bf5758 --- /dev/null +++ b/source/os/src/osShm.c @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define ALLOW_FORBID_FUNC +#define _DEFAULT_SOURCE +#include "os.h" + +int32_t taosCreateShm(SShm* pShm, int32_t shmsize) { + int32_t shmid = shmget(IPC_PRIVATE, shmsize, IPC_CREAT | 0600); + if (shmid < 0) { + return -1; + } + + void* shmptr = shmat(shmid, NULL, 0); + if (shmptr == NULL) { + return -1; + } + + pShm->id = shmid; + pShm->size = shmsize; + pShm->ptr = shmptr; + return 0; +} + +void taosDropShm(SShm* pShm) { + if (pShm->id > 0) { + if (pShm->ptr != NULL) { + shmdt(pShm->ptr); + } + shmctl(pShm->id, IPC_RMID, NULL); + } + pShm->id = 0; + pShm->size = 0; + pShm->ptr = NULL; +} + +int32_t taosAttachShm(SShm* pShm) { + if (pShm->id > 0 && pShm->size > 0) { + pShm->ptr = shmat(pShm->id, NULL, 0); + if (pShm->ptr != NULL) { + return 0; + } + } + + return -1; +} + +void taosDetachShm(SShm* pShm) { + if (pShm->id > 0) { + if (pShm->ptr != NULL) { + shmdt(pShm->ptr); + pShm->ptr = NULL; + } + } + + pShm->id = 0; + pShm->size = 0; + pShm->ptr = NULL; +}