未验证 提交 94d25227 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #11112 from taosdata/feature/shm

adjust signal
...@@ -51,7 +51,7 @@ extern int32_t tsCompatibleModel; ...@@ -51,7 +51,7 @@ extern int32_t tsCompatibleModel;
extern bool tsEnableSlaveQuery; extern bool tsEnableSlaveQuery;
extern bool tsPrintAuth; extern bool tsPrintAuth;
extern int64_t tsTickPerDay[3]; extern int64_t tsTickPerDay[3];
extern int32_t tsMultiProcess; extern bool tsMultiProcess;
// monitor // monitor
extern bool tsEnableMonitor; extern bool tsEnableMonitor;
......
...@@ -46,11 +46,12 @@ typedef struct { ...@@ -46,11 +46,12 @@ typedef struct {
char localFqdn[TSDB_FQDN_LEN]; char localFqdn[TSDB_FQDN_LEN];
char firstEp[TSDB_EP_LEN]; char firstEp[TSDB_EP_LEN];
char secondEp[TSDB_EP_LEN]; char secondEp[TSDB_EP_LEN];
SDiskCfg *pDisks; SDiskCfg *disks;
int32_t numOfDisks; int32_t numOfDisks;
int8_t ntype;
} SDnodeOpt; } SDnodeOpt;
typedef enum { DND_EVENT_START, DND_EVENT_STOP = 1, DND_EVENT_RELOAD } EDndEvent; typedef enum { DND_EVENT_START, DND_EVENT_STOP = 1, DND_EVENT_CHILD } EDndEvent;
/** /**
* @brief Initialize and start the dnode. * @brief Initialize and start the dnode.
......
...@@ -73,7 +73,6 @@ extern "C" { ...@@ -73,7 +73,6 @@ extern "C" {
#include <wchar.h> #include <wchar.h>
#include <wctype.h> #include <wctype.h>
#include "osAtomic.h" #include "osAtomic.h"
#include "osDef.h" #include "osDef.h"
#include "osDir.h" #include "osDir.h"
...@@ -87,6 +86,7 @@ extern "C" { ...@@ -87,6 +86,7 @@ extern "C" {
#include "osThread.h" #include "osThread.h"
#include "osSemaphore.h" #include "osSemaphore.h"
#include "osSignal.h" #include "osSignal.h"
#include "osShm.h"
#include "osSleep.h" #include "osSleep.h"
#include "osSocket.h" #include "osSocket.h"
#include "osString.h" #include "osString.h"
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_OS_SHM_H_
#define _TD_OS_SHM_H_
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
int32_t id;
int32_t size;
void* ptr;
} SShm;
int32_t taosCreateShm(SShm *pShm, int32_t shmsize) ;
void taosDropShm(SShm *pShm);
int32_t taosAttachShm(SShm *pShm);
void taosDetachShm(SShm *pShm);
#ifdef __cplusplus
}
#endif
#endif /*_TD_OS_SHM_H_*/
...@@ -45,7 +45,7 @@ float tsRatioOfQueryCores = 1.0f; ...@@ -45,7 +45,7 @@ float tsRatioOfQueryCores = 1.0f;
int32_t tsMaxBinaryDisplayWidth = 30; int32_t tsMaxBinaryDisplayWidth = 30;
bool tsEnableSlaveQuery = 1; bool tsEnableSlaveQuery = 1;
bool tsPrintAuth = 0; bool tsPrintAuth = 0;
int32_t tsMultiProcess = 0; bool tsMultiProcess = 0;
// monitor // monitor
bool tsEnableMonitor = 1; bool tsEnableMonitor = 1;
...@@ -347,7 +347,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -347,7 +347,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1; if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1;
if (cfgAddBool(pCfg, "slaveQuery", tsEnableSlaveQuery, 0) != 0) return -1; if (cfgAddBool(pCfg, "slaveQuery", tsEnableSlaveQuery, 0) != 0) return -1;
if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1; if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1; if (cfgAddBool(pCfg, "multiProcess", tsMultiProcess, 0) != 0) return -1;
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 360000, 0) != 0) return -1;
...@@ -466,7 +466,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -466,7 +466,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval; tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval;
tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval; tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval;
tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->bval; tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->bval;
tsMultiProcess = cfgGetItem(pCfg, "multiProcess")->i32; tsMultiProcess = cfgGetItem(pCfg, "multiProcess")->bval;
tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval; tsEnableMonitor = cfgGetItem(pCfg, "monitor")->bval;
tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32; tsMonitorInterval = cfgGetItem(pCfg, "monitorInterval")->i32;
......
...@@ -29,7 +29,7 @@ static struct { ...@@ -29,7 +29,7 @@ static struct {
ENodeType ntype; ENodeType ntype;
} global = {0}; } global = {0};
static void dndSigintHandle(int signum, void *info, void *ctx) { static void dndStopDnode(int signum, void *info, void *ctx) {
dInfo("signal:%d is received", signum); dInfo("signal:%d is received", signum);
SDnode *pDnode = atomic_val_compare_exchange_ptr(&global.pDnode, 0, global.pDnode); SDnode *pDnode = atomic_val_compare_exchange_ptr(&global.pDnode, 0, global.pDnode);
if (pDnode != NULL) { if (pDnode != NULL) {
...@@ -37,12 +37,29 @@ static void dndSigintHandle(int signum, void *info, void *ctx) { ...@@ -37,12 +37,29 @@ static void dndSigintHandle(int signum, void *info, void *ctx) {
} }
} }
static void dndHandleChild(int signum, void *info, void *ctx) {
dInfo("signal:%d is received", signum);
dndHandleEvent(global.pDnode, DND_EVENT_CHILD);
}
static void dndSetSignalHandle() { static void dndSetSignalHandle() {
taosSetSignal(SIGTERM, dndSigintHandle); taosSetSignal(SIGTERM, dndStopDnode);
taosSetSignal(SIGHUP, dndSigintHandle); taosSetSignal(SIGHUP, dndStopDnode);
taosSetSignal(SIGINT, dndSigintHandle); taosSetSignal(SIGINT, dndStopDnode);
taosSetSignal(SIGABRT, dndSigintHandle); taosSetSignal(SIGABRT, dndStopDnode);
taosSetSignal(SIGBREAK, dndSigintHandle); taosSetSignal(SIGBREAK, dndStopDnode);
if (!tsMultiProcess) {
// Set the single process signal
} else if (global.ntype == DNODE) {
// Set the parent process signal
// When the child process exits, the parent process receives a signal
taosSetSignal(SIGCHLD, dndHandleChild);
} else {
// Set child process signal
// When the parent process exits, the child process will receive the SIGKILL signal
prctl(PR_SET_PDEATHSIG, SIGKILL);
}
} }
static int32_t dndParseArgs(int32_t argc, char const *argv[]) { static int32_t dndParseArgs(int32_t argc, char const *argv[]) {
...@@ -109,8 +126,9 @@ static SDnodeOpt dndGetOpt() { ...@@ -109,8 +126,9 @@ static SDnodeOpt dndGetOpt() {
option.serverPort = tsServerPort; option.serverPort = tsServerPort;
tstrncpy(option.localFqdn, tsLocalFqdn, sizeof(option.localFqdn)); tstrncpy(option.localFqdn, tsLocalFqdn, sizeof(option.localFqdn));
snprintf(option.localEp, sizeof(option.localEp), "%s:%u", option.localFqdn, option.serverPort); snprintf(option.localEp, sizeof(option.localEp), "%s:%u", option.localFqdn, option.serverPort);
option.pDisks = tsDiskCfg; option.disks = tsDiskCfg;
option.numOfDisks = tsDiskCfgNum; option.numOfDisks = tsDiskCfgNum;
option.ntype = global.ntype;
return option; return option;
} }
...@@ -121,7 +139,7 @@ static int32_t dndInitLog() { ...@@ -121,7 +139,7 @@ static int32_t dndInitLog() {
} }
static void dndSetProcName(char **argv) { static void dndSetProcName(char **argv) {
if (global.ntype != 0) { if (global.ntype != DNODE) {
const char *name = dndNodeProcStr(global.ntype); const char *name = dndNodeProcStr(global.ntype);
prctl(PR_SET_NAME, name); prctl(PR_SET_NAME, name);
strcpy(argv[0], name); strcpy(argv[0], name);
......
...@@ -96,10 +96,11 @@ typedef struct SMgmtWrapper { ...@@ -96,10 +96,11 @@ typedef struct SMgmtWrapper {
bool required; bool required;
EProcType procType; EProcType procType;
SProcObj *pProc; SProcObj *pProc;
SShm shm;
void *pMgmt; void *pMgmt;
SDnode *pDnode; SDnode *pDnode;
NodeMsgFp msgFps[TDMT_MAX]; NodeMsgFp msgFps[TDMT_MAX];
int32_t msgVgIds[TDMT_MAX]; // Handle the case where the same message type is distributed to qnode or vnode int8_t msgVgIds[TDMT_MAX]; // Handle the case where the same message type is distributed to qnode or vnode
SMgmtFp fp; SMgmtFp fp;
} SMgmtWrapper; } SMgmtWrapper;
...@@ -119,14 +120,15 @@ typedef struct SDnode { ...@@ -119,14 +120,15 @@ typedef struct SDnode {
char *firstEp; char *firstEp;
char *secondEp; char *secondEp;
char *dataDir; char *dataDir;
SDiskCfg *pDisks; SDiskCfg *disks;
int32_t numOfDisks; int32_t numOfDisks;
uint16_t serverPort; uint16_t serverPort;
bool dropped; bool dropped;
ENodeType ntype;
EDndStatus status; EDndStatus status;
EDndEvent event; EDndEvent event;
SStartupReq startup; SStartupReq startup;
TdFilePtr pLockFile; TdFilePtr runtimeFile;
STransMgmt trans; STransMgmt trans;
SMgmtWrapper wrappers[NODE_MAX]; SMgmtWrapper wrappers[NODE_MAX];
} SDnode; } SDnode;
...@@ -135,7 +137,7 @@ const char *dndNodeLogStr(ENodeType ntype); ...@@ -135,7 +137,7 @@ const char *dndNodeLogStr(ENodeType ntype);
const char *dndNodeProcStr(ENodeType ntype); const char *dndNodeProcStr(ENodeType ntype);
EDndStatus dndGetStatus(SDnode *pDnode); EDndStatus dndGetStatus(SDnode *pDnode);
void dndSetStatus(SDnode *pDnode, EDndStatus stat); void dndSetStatus(SDnode *pDnode, EDndStatus stat);
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp, int32_t vgId); void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId);
void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc); void dndReportStartup(SDnode *pDnode, const char *pName, const char *pDesc);
void dndSendMonitorReport(SDnode *pDnode); void dndSendMonitorReport(SDnode *pDnode);
......
...@@ -34,7 +34,6 @@ int32_t dndInit(); ...@@ -34,7 +34,6 @@ int32_t dndInit();
void dndCleanup(); void dndCleanup();
const char *dndStatStr(EDndStatus stat); const char *dndStatStr(EDndStatus stat);
void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup); void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup);
TdFilePtr dndCheckRunning(const char *dataDir);
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg);
// dndMsg.c // dndMsg.c
...@@ -58,6 +57,11 @@ void dndCleanupClient(SDnode *pDnode); ...@@ -58,6 +57,11 @@ void dndCleanupClient(SDnode *pDnode);
int32_t dndInitMsgHandle(SDnode *pDnode); int32_t dndInitMsgHandle(SDnode *pDnode);
void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp);
// dndFile.c
int32_t dndOpenRuntimeFile(SDnode *pDnode);
int32_t dndWriteRuntimeFile(SDnode *pDnode);
void dndCloseRuntimeFile(SDnode *pDnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -259,7 +259,7 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) { ...@@ -259,7 +259,7 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
} }
int32_t dndRun(SDnode *pDnode) { int32_t dndRun(SDnode *pDnode) {
if (tsMultiProcess == 0) { if (!tsMultiProcess) {
if (dndRunInSingleProcess(pDnode) != 0) { if (dndRunInSingleProcess(pDnode) != 0) {
dError("failed to run dnode in single process mode since %s", terrstr()); dError("failed to run dnode in single process mode since %s", terrstr());
return -1; return -1;
......
...@@ -19,13 +19,12 @@ ...@@ -19,13 +19,12 @@
#define MAXLEN 1024 #define MAXLEN 1024
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed) { int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed) {
int32_t code = TSDB_CODE_NODE_PARSE_FILE_ERROR; int32_t code = TSDB_CODE_NODE_PARSE_FILE_ERROR;
int32_t len = 0; int64_t len = 0;
const int32_t maxLen = MAXLEN; char content[MAXLEN + 1] = {0};
char content[MAXLEN + 1] = {0}; cJSON *root = NULL;
cJSON *root = NULL; char file[PATH_MAX];
char file[PATH_MAX]; TdFilePtr pFile = NULL;
TdFilePtr pFile = NULL;
snprintf(file, sizeof(file), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name); snprintf(file, sizeof(file), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name);
pFile = taosOpenFile(file, TD_FILE_READ); pFile = taosOpenFile(file, TD_FILE_READ);
...@@ -35,13 +34,12 @@ int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed) { ...@@ -35,13 +34,12 @@ int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed) {
goto _OVER; goto _OVER;
} }
len = (int32_t)taosReadFile(pFile, content, maxLen); len = taosReadFile(pFile, content, MAXLEN);
if (len <= 0) { if (len <= 0) {
dError("failed to read %s since content is null", file); dError("failed to read %s since content is null", file);
goto _OVER; goto _OVER;
} }
content[len] = 0;
root = cJSON_Parse(content); root = cJSON_Parse(content);
if (root == NULL) { if (root == NULL) {
dError("failed to read %s since invalid json format", file); dError("failed to read %s since invalid json format", file);
...@@ -55,8 +53,8 @@ int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed) { ...@@ -55,8 +53,8 @@ int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed) {
} }
*pDeployed = deployed->valueint != 0; *pDeployed = deployed->valueint != 0;
code = 0;
dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed); dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed);
code = 0;
_OVER: _OVER:
if (root != NULL) cJSON_Delete(root); if (root != NULL) cJSON_Delete(root);
...@@ -67,30 +65,40 @@ _OVER: ...@@ -67,30 +65,40 @@ _OVER:
} }
int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed) { int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed) {
char file[PATH_MAX] = {0}; int32_t code = -1;
int32_t len = 0;
char content[MAXLEN + 1] = {0};
char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0};
TdFilePtr pFile = NULL;
snprintf(file, sizeof(file), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name); snprintf(file, sizeof(file), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name);
snprintf(realfile, sizeof(realfile), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name);
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) { if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to write %s since %s", file, terrstr()); dError("failed to write %s since %s", file, terrstr());
return -1; goto _OVER;
} }
int32_t len = 0; len += snprintf(content + len, MAXLEN - len, "{\n");
const int32_t maxLen = MAXLEN; len += snprintf(content + len, MAXLEN - len, " \"deployed\": %d\n", deployed);
char content[MAXLEN + 1] = {0}; len += snprintf(content + len, MAXLEN - len, "}\n");
len += snprintf(content + len, maxLen - len, "{\n"); if (taosWriteFile(pFile, content, len) != len) {
len += snprintf(content + len, maxLen - len, " \"deployed\": %d\n", deployed); terrno = TAOS_SYSTEM_ERROR(errno);
len += snprintf(content + len, maxLen - len, "}\n"); dError("failed to write file:%s since %s", file, terrstr());
goto _OVER;
}
taosWriteFile(pFile, content, len); if (taosFsyncFile(pFile) != 0) {
taosFsyncFile(pFile); terrno = TAOS_SYSTEM_ERROR(errno);
taosCloseFile(&pFile); dError("failed to fsync file:%s since %s", file, terrstr());
goto _OVER;
}
char realfile[PATH_MAX] = {0}; taosCloseFile(&pFile);
snprintf(realfile, sizeof(realfile), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name);
if (taosRenameFile(file, realfile) != 0) { if (taosRenameFile(file, realfile) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
...@@ -99,5 +107,158 @@ int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed) { ...@@ -99,5 +107,158 @@ int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed) {
} }
dInfo("successed to write %s, deployed:%d", realfile, deployed); dInfo("successed to write %s, deployed:%d", realfile, deployed);
return 0; code = 0;
_OVER:
if (pFile != NULL) {
taosCloseFile(&pFile);
}
return code;
}
int32_t dndOpenRuntimeFile(SDnode *pDnode) {
int32_t code = -1;
char itemName[24] = {0};
char content[MAXLEN + 1] = {0};
char file[PATH_MAX] = {0};
cJSON *root = NULL;
TdFilePtr pFile = NULL;
snprintf(file, sizeof(file), "%s%s.running", pDnode->dataDir, TD_DIRSEP);
pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to open file:%s since %s", file, terrstr());
goto _OVER;
}
if (taosLockFile(pFile) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to lock file:%s since %s", file, terrstr());
goto _OVER;
}
if (taosReadFile(pFile, content, MAXLEN) > 0) {
root = cJSON_Parse(content);
if (root == NULL) {
terrno = TSDB_CODE_NODE_PARSE_FILE_ERROR;
dError("failed to read %s since invalid json format", file);
goto _OVER;
}
for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) {
snprintf(itemName, sizeof(itemName), "%s_shmid", dndNodeProcStr(ntype));
cJSON *shmid = cJSON_GetObjectItem(root, itemName);
if (shmid && shmid->type == cJSON_Number) {
pDnode->wrappers[ntype].shm.id = shmid->valueint;
}
snprintf(itemName, sizeof(itemName), "%s_shmsize", dndNodeProcStr(ntype));
cJSON *shmsize = cJSON_GetObjectItem(root, itemName);
if (shmsize && shmsize->type == cJSON_Number) {
pDnode->wrappers[ntype].shm.size = shmsize->valueint;
}
}
}
if (tsMultiProcess || pDnode->ntype == DNODE) {
for (ENodeType ntype = DNODE; ntype < NODE_MAX; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
if (pWrapper->shm.id > 0) {
dDebug("shmid:%d, is closed, size:%d", pWrapper->shm.id, pWrapper->shm.size);
taosDropShm(&pWrapper->shm);
}
}
} else {
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
if (taosAttachShm(&pWrapper->shm) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("shmid:%d, failed to attach since %s", pWrapper->shm.id, terrstr());
goto _OVER;
}
dDebug("shmid:%d, is attached, size:%d", pWrapper->shm.id, pWrapper->shm.size);
}
dDebug("successed to open %s", file);
code = 0;
_OVER:
if (root != NULL) cJSON_Delete(root);
if (code != 0) {
if (pFile != NULL) taosCloseFile(&pFile);
} else {
pDnode->runtimeFile = pFile;
}
return code;
}
int32_t dndWriteRuntimeFile(SDnode *pDnode) {
int32_t code = -1;
int32_t len = 0;
char content[MAXLEN + 1] = {0};
char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0};
TdFilePtr pFile = NULL;
snprintf(file, sizeof(file), "%s%s.running.bak", pDnode->dataDir, TD_DIRSEP);
snprintf(realfile, sizeof(realfile), "%s%s.running", pDnode->dataDir, TD_DIRSEP);
pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to open file:%s since %s", file, terrstr());
goto _OVER;
}
len += snprintf(content + len, MAXLEN - len, "{\n");
for (ENodeType ntype = DNODE + 1; ntype < NODE_MAX; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
len += snprintf(content + len, MAXLEN - len, " \"%s_shmid\": %d,\n", dndNodeProcStr(ntype), pWrapper->shm.id);
if (ntype == NODE_MAX - 1) {
len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\": %d\n", dndNodeProcStr(ntype), pWrapper->shm.size);
} else {
len += snprintf(content + len, MAXLEN - len, " \"%s_shmsize\": %d,\n", dndNodeProcStr(ntype), pWrapper->shm.size);
}
}
len += snprintf(content + len, MAXLEN - len, "}\n");
if (taosWriteFile(pFile, content, len) != len) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to write file:%s since %s", file, terrstr());
goto _OVER;
}
if (taosFsyncFile(pFile) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to fsync file:%s since %s", file, terrstr());
goto _OVER;
}
taosCloseFile(&pFile);
if (taosRenameFile(file, realfile) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to rename %s to %s since %s", file, realfile, terrstr());
return -1;
}
dDebug("successed to write %s", realfile);
code = 0;
_OVER:
if (pFile != NULL) {
taosCloseFile(&pFile);
}
return code;
} }
void dndCloseRuntimeFile(SDnode *pDnode) {
if (pDnode->runtimeFile) {
taosUnLockFile(pDnode->runtimeFile);
taosCloseFile(&pDnode->runtimeFile);
pDnode->runtimeFile = NULL;
}
}
\ No newline at end of file
...@@ -20,7 +20,7 @@ ...@@ -20,7 +20,7 @@
static int8_t once = DND_ENV_INIT; static int8_t once = DND_ENV_INIT;
int32_t dndInit() { int32_t dndInit() {
dInfo("start to init dnode env"); dDebug("start to init dnode env");
if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) { if (atomic_val_compare_exchange_8(&once, DND_ENV_INIT, DND_ENV_READY) != DND_ENV_INIT) {
terrno = TSDB_CODE_REPEAT_INIT; terrno = TSDB_CODE_REPEAT_INIT;
dError("failed to init dnode env since %s", terrstr()); dError("failed to init dnode env since %s", terrstr());
...@@ -31,12 +31,6 @@ int32_t dndInit() { ...@@ -31,12 +31,6 @@ int32_t dndInit() {
taosBlockSIGPIPE(); taosBlockSIGPIPE();
taosResolveCRC(); taosResolveCRC();
if (rpcInit() != 0) {
dError("failed to init rpc since %s", terrstr());
dndCleanup();
return -1;
}
SMonCfg monCfg = {0}; SMonCfg monCfg = {0};
monCfg.maxLogs = tsMonitorMaxLogs; monCfg.maxLogs = tsMonitorMaxLogs;
monCfg.port = tsMonitorPort; monCfg.port = tsMonitorPort;
...@@ -44,29 +38,27 @@ int32_t dndInit() { ...@@ -44,29 +38,27 @@ int32_t dndInit() {
monCfg.comp = tsMonitorComp; monCfg.comp = tsMonitorComp;
if (monInit(&monCfg) != 0) { if (monInit(&monCfg) != 0) {
dError("failed to init monitor since %s", terrstr()); dError("failed to init monitor since %s", terrstr());
dndCleanup();
return -1; return -1;
} }
dInfo("dnode env is initialized"); dDebug("dnode env is initialized");
return 0; return 0;
} }
void dndCleanup() { void dndCleanup() {
dInfo("start to cleanup dnode env"); dDebug("start to cleanup dnode env");
if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) { if (atomic_val_compare_exchange_8(&once, DND_ENV_READY, DND_ENV_CLEANUP) != DND_ENV_READY) {
dError("dnode env is already cleaned up"); dError("dnode env is already cleaned up");
return; return;
} }
monCleanup(); monCleanup();
rpcCleanup();
walCleanUp(); walCleanUp();
taosStopCacheRefreshWorker(); taosStopCacheRefreshWorker();
dInfo("dnode env is cleaned up"); dDebug("dnode env is cleaned up");
} }
void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp, int32_t vgId) { void dndSetMsgHandle(SMgmtWrapper *pWrapper, tmsg_t msgType, NodeMsgFp nodeMsgFp, int8_t vgId) {
pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp; pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp;
pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId; pWrapper->msgVgIds[TMSG_INDEX(msgType)] = vgId;
} }
...@@ -92,29 +84,6 @@ void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) { ...@@ -92,29 +84,6 @@ void dndGetStartup(SDnode *pDnode, SStartupReq *pStartup) {
pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING); pStartup->finished = (dndGetStatus(pDnode) == DND_STAT_RUNNING);
} }
TdFilePtr dndCheckRunning(const char *dataDir) {
char filepath[PATH_MAX] = {0};
snprintf(filepath, sizeof(filepath), "%s/.running", dataDir);
TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to lock file:%s since %s", filepath, terrstr());
return NULL;
}
int32_t ret = taosLockFile(pFile);
if (ret != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("failed to lock file:%s since %s", filepath, terrstr());
taosCloseFile(&pFile);
return NULL;
}
dDebug("file:%s is locked", filepath);
return pFile;
}
void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) { void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug("startup req is received"); dDebug("startup req is received");
SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq)); SStartupReq *pStartup = rpcMallocCont(sizeof(SStartupReq));
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dndInt.h" #include "dndInt.h"
static int32_t dndInitMemory(SDnode *pDnode, const SDnodeOpt *pOption) { static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
pDnode->numOfSupportVnodes = pOption->numOfSupportVnodes; pDnode->numOfSupportVnodes = pOption->numOfSupportVnodes;
pDnode->serverPort = pOption->serverPort; pDnode->serverPort = pOption->serverPort;
pDnode->dataDir = strdup(pOption->dataDir); pDnode->dataDir = strdup(pOption->dataDir);
...@@ -24,8 +24,9 @@ static int32_t dndInitMemory(SDnode *pDnode, const SDnodeOpt *pOption) { ...@@ -24,8 +24,9 @@ static int32_t dndInitMemory(SDnode *pDnode, const SDnodeOpt *pOption) {
pDnode->localFqdn = strdup(pOption->localFqdn); pDnode->localFqdn = strdup(pOption->localFqdn);
pDnode->firstEp = strdup(pOption->firstEp); pDnode->firstEp = strdup(pOption->firstEp);
pDnode->secondEp = strdup(pOption->secondEp); pDnode->secondEp = strdup(pOption->secondEp);
pDnode->pDisks = pOption->pDisks; pDnode->disks = pOption->disks;
pDnode->numOfDisks = pOption->numOfDisks; pDnode->numOfDisks = pOption->numOfDisks;
pDnode->ntype = pOption->ntype;
pDnode->rebootTime = taosGetTimestampMs(); pDnode->rebootTime = taosGetTimestampMs();
if (pDnode->dataDir == NULL || pDnode->localEp == NULL || pDnode->localFqdn == NULL || pDnode->firstEp == NULL || if (pDnode->dataDir == NULL || pDnode->localEp == NULL || pDnode->localFqdn == NULL || pDnode->firstEp == NULL ||
...@@ -36,16 +37,12 @@ static int32_t dndInitMemory(SDnode *pDnode, const SDnodeOpt *pOption) { ...@@ -36,16 +37,12 @@ static int32_t dndInitMemory(SDnode *pDnode, const SDnodeOpt *pOption) {
return 0; return 0;
} }
static void dndClearMemory(SDnode *pDnode) { static void dndClearVars(SDnode *pDnode) {
for (ENodeType n = 0; n < NODE_MAX; ++n) { for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pMgmt = &pDnode->wrappers[n]; SMgmtWrapper *pMgmt = &pDnode->wrappers[n];
taosMemoryFreeClear(pMgmt->path); taosMemoryFreeClear(pMgmt->path);
} }
if (pDnode->pLockFile != NULL) { dndCloseRuntimeFile(pDnode);
taosUnLockFile(pDnode->pLockFile);
taosCloseFile(&pDnode->pLockFile);
pDnode->pLockFile = NULL;
}
taosMemoryFreeClear(pDnode->localEp); taosMemoryFreeClear(pDnode->localEp);
taosMemoryFreeClear(pDnode->localFqdn); taosMemoryFreeClear(pDnode->localFqdn);
taosMemoryFreeClear(pDnode->firstEp); taosMemoryFreeClear(pDnode->firstEp);
...@@ -67,13 +64,21 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { ...@@ -67,13 +64,21 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
goto _OVER; goto _OVER;
} }
if (dndInitMemory(pDnode, pOption) != 0) { if (dndInitVars(pDnode, pOption) != 0) {
dError("failed to init variables since %s", terrstr());
goto _OVER; goto _OVER;
} }
dndSetStatus(pDnode, DND_STAT_INIT); dndSetStatus(pDnode, DND_STAT_INIT);
pDnode->pLockFile = dndCheckRunning(pDnode->dataDir); dmGetMgmtFp(&pDnode->wrappers[DNODE]);
if (pDnode->pLockFile == NULL) { mmGetMgmtFp(&pDnode->wrappers[MNODE]);
vmGetMgmtFp(&pDnode->wrappers[VNODES]);
qmGetMgmtFp(&pDnode->wrappers[QNODE]);
smGetMgmtFp(&pDnode->wrappers[SNODE]);
bmGetMgmtFp(&pDnode->wrappers[BNODE]);
if (dndOpenRuntimeFile(pDnode) != 0) {
dError("failed to open runtime file since %s", terrstr());
goto _OVER; goto _OVER;
} }
...@@ -87,13 +92,6 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { ...@@ -87,13 +92,6 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
goto _OVER; goto _OVER;
} }
dmGetMgmtFp(&pDnode->wrappers[DNODE]);
mmGetMgmtFp(&pDnode->wrappers[MNODE]);
vmGetMgmtFp(&pDnode->wrappers[VNODES]);
qmGetMgmtFp(&pDnode->wrappers[QNODE]);
smGetMgmtFp(&pDnode->wrappers[SNODE]);
bmGetMgmtFp(&pDnode->wrappers[BNODE]);
if (dndInitMsgHandle(pDnode) != 0) { if (dndInitMsgHandle(pDnode) != 0) {
goto _OVER; goto _OVER;
} }
...@@ -116,7 +114,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) { ...@@ -116,7 +114,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
_OVER: _OVER:
if (code != 0 && pDnode) { if (code != 0 && pDnode) {
dndClearMemory(pDnode); dndClearVars(pDnode);
pDnode = NULL; pDnode = NULL;
dError("failed to create dnode object since %s", terrstr()); dError("failed to create dnode object since %s", terrstr());
} else { } else {
...@@ -145,7 +143,7 @@ void dndClose(SDnode *pDnode) { ...@@ -145,7 +143,7 @@ void dndClose(SDnode *pDnode) {
dndCloseNode(pWrapper); dndCloseNode(pWrapper);
} }
dndClearMemory(pDnode); dndClearVars(pDnode);
dInfo("dnode object is closed, data:%p", pDnode); dInfo("dnode object is closed, data:%p", pDnode);
} }
......
...@@ -285,7 +285,7 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) { ...@@ -285,7 +285,7 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
tstrncpy(dCfg.dir, pDnode->dataDir, TSDB_FILENAME_LEN); tstrncpy(dCfg.dir, pDnode->dataDir, TSDB_FILENAME_LEN);
dCfg.level = 0; dCfg.level = 0;
dCfg.primary = 1; dCfg.primary = 1;
SDiskCfg *pDisks = pDnode->pDisks; SDiskCfg *pDisks = pDnode->disks;
int32_t numOfDisks = pDnode->numOfDisks; int32_t numOfDisks = pDnode->numOfDisks;
if (numOfDisks <= 0 || pDisks == NULL) { if (numOfDisks <= 0 || pDisks == NULL) {
pDisks = &dCfg; pDisks = &dCfg;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define ALLOW_FORBID_FUNC
#define _DEFAULT_SOURCE
#include "os.h"
int32_t taosCreateShm(SShm* pShm, int32_t shmsize) {
int32_t shmid = shmget(IPC_PRIVATE, shmsize, IPC_CREAT | 0600);
if (shmid < 0) {
return -1;
}
void* shmptr = shmat(shmid, NULL, 0);
if (shmptr == NULL) {
return -1;
}
pShm->id = shmid;
pShm->size = shmsize;
pShm->ptr = shmptr;
return 0;
}
void taosDropShm(SShm* pShm) {
if (pShm->id > 0) {
if (pShm->ptr != NULL) {
shmdt(pShm->ptr);
}
shmctl(pShm->id, IPC_RMID, NULL);
}
pShm->id = 0;
pShm->size = 0;
pShm->ptr = NULL;
}
int32_t taosAttachShm(SShm* pShm) {
if (pShm->id > 0 && pShm->size > 0) {
pShm->ptr = shmat(pShm->id, NULL, 0);
if (pShm->ptr != NULL) {
return 0;
}
}
return -1;
}
void taosDetachShm(SShm* pShm) {
if (pShm->id > 0) {
if (pShm->ptr != NULL) {
shmdt(pShm->ptr);
pShm->ptr = NULL;
}
}
pShm->id = 0;
pShm->size = 0;
pShm->ptr = NULL;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册