未验证 提交 e1df4b13 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #11153 from taosdata/feature/shm

shm
...@@ -20,11 +20,10 @@ ...@@ -20,11 +20,10 @@
extern "C" { extern "C" {
#endif #endif
// start a copy of itself int32_t taosNewProc(char **args);
int32_t taosNewProc(const char *args); void taosSetProcName(int32_t argc, char **argv, const char *name);
void taosSetProcPath(int32_t argc, char **argv);
// the length of the new name must be less than the original name to take effect bool taosProcExists(int32_t pid);
void taosSetProcName(char **argv, const char *name);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -49,7 +49,7 @@ void taosSetSignal(int32_t signum, FSignalHandler sigfp); ...@@ -49,7 +49,7 @@ void taosSetSignal(int32_t signum, FSignalHandler sigfp);
void taosIgnSignal(int32_t signum); void taosIgnSignal(int32_t signum);
void taosDflSignal(int32_t signum); void taosDflSignal(int32_t signum);
void taosKillChildOnSelfStopped(); void taosKillChildOnParentStopped();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -50,13 +50,10 @@ static void dndSetSignalHandle() { ...@@ -50,13 +50,10 @@ static void dndSetSignalHandle() {
taosSetSignal(SIGBREAK, dndStopDnode); taosSetSignal(SIGBREAK, dndStopDnode);
if (!tsMultiProcess) { if (!tsMultiProcess) {
// Set the single process signal
} else if (global.ntype == DNODE) { } else if (global.ntype == DNODE) {
// When the child process exits, the parent process receives a signal
taosSetSignal(SIGCHLD, dndHandleChild); taosSetSignal(SIGCHLD, dndHandleChild);
} else { } else {
// When the parent process exits, the child process will receive the SIGKILL signal taosKillChildOnParentStopped();
taosKillChildOnSelfStopped();
} }
} }
...@@ -140,10 +137,11 @@ static int32_t dndInitLog() { ...@@ -140,10 +137,11 @@ static int32_t dndInitLog() {
return taosCreateLog(logName, 1, configDir, global.envFile, global.apolloUrl, global.pArgs, 0); return taosCreateLog(logName, 1, configDir, global.envFile, global.apolloUrl, global.pArgs, 0);
} }
static void dndSetProcName(char **argv) { static void dndSetProcInfo(int32_t argc, char **argv) {
taosSetProcPath(argc, argv);
if (global.ntype != DNODE) { if (global.ntype != DNODE) {
const char *name = dndNodeProcStr(global.ntype); const char *name = dndNodeProcStr(global.ntype);
taosSetProcName(argv, name); taosSetProcName(argc, argv, name);
} }
} }
...@@ -186,6 +184,7 @@ int main(int argc, char const *argv[]) { ...@@ -186,6 +184,7 @@ int main(int argc, char const *argv[]) {
return -1; return -1;
} }
dndSetProcInfo(argc, (char **)argv);
if (global.generateGrant) { if (global.generateGrant) {
dndGenerateGrant(); dndGenerateGrant();
return 0; return 0;
...@@ -213,6 +212,5 @@ int main(int argc, char const *argv[]) { ...@@ -213,6 +212,5 @@ int main(int argc, char const *argv[]) {
return 0; return 0;
} }
dndSetProcName((char **)argv);
return dndRunDnode(); return dndRunDnode();
} }
...@@ -17,8 +17,8 @@ ...@@ -17,8 +17,8 @@
#include "dndInt.h" #include "dndInt.h"
static bool dndRequireNode(SMgmtWrapper *pWrapper) { static bool dndRequireNode(SMgmtWrapper *pWrapper) {
bool required = false; bool required = false;
int32_t code =(*pWrapper->fp.requiredFp)(pWrapper, &required); int32_t code = (*pWrapper->fp.requiredFp)(pWrapper, &required);
if (!required) { if (!required) {
dDebug("node:%s, no need to start", pWrapper->name); dDebug("node:%s, no need to start", pWrapper->name);
} else { } else {
...@@ -65,36 +65,6 @@ void dndCloseNode(SMgmtWrapper *pWrapper) { ...@@ -65,36 +65,6 @@ void dndCloseNode(SMgmtWrapper *pWrapper) {
dDebug("node:%s, has been closed", pWrapper->name); dDebug("node:%s, has been closed", pWrapper->name);
} }
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
dInfo("dnode start to run in single process");
for (ENodeType n = DNODE; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue;
if (dndOpenNode(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
}
dndSetStatus(pDnode, DND_STAT_RUNNING);
for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (pWrapper->fp.startFp == NULL) continue;
if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
}
dInfo("dnode running in single process");
return 0;
}
static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen,
ProcFuncType ftype) { ProcFuncType ftype) {
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
...@@ -140,6 +110,84 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t ...@@ -140,6 +110,84 @@ static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t
taosMemoryFree(pMsg); taosMemoryFree(pMsg);
} }
static int32_t dndNewProc(SMgmtWrapper *pWrapper, ENodeType n) {
char tstr[8] = {0};
char *args[6] = {0};
snprintf(tstr, sizeof(tstr), "%d", n);
args[1] = "-c";
args[2] = configDir;
args[3] = "-n";
args[4] = tstr;
args[5] = NULL;
int32_t pid = taosNewProc(args);
if (pid <= 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("node:%s, failed to exec in new process since %s", pWrapper->name, terrstr());
return -1;
}
pWrapper->procId = pid;
dInfo("node:%s, run in new process, pid:%d", pWrapper->name, pid);
return 0;
}
static SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) {
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue,
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem,
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue,
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.shm = pWrapper->shm,
.pParent = pWrapper,
.name = pWrapper->name};
return cfg;
}
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
dInfo("dnode start to run in single process");
for (ENodeType n = DNODE; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue;
if (dndOpenNode(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
}
dndSetStatus(pDnode, DND_STAT_RUNNING);
for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (pWrapper->fp.startFp == NULL) continue;
if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
}
dInfo("TDengine initialized successfully");
dndReportStartup(pDnode, "TDengine", "initialized successfully");
while (1) {
if (pDnode->event == DND_EVENT_STOP) {
dInfo("dnode is about to stop");
break;
}
taosMsleep(100);
}
return 0;
}
static int32_t dndRunInParentProcess(SDnode *pDnode) { static int32_t dndRunInParentProcess(SDnode *pDnode) {
dInfo("dnode start to run in parent process"); dInfo("dnode start to run in parent process");
SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE]; SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE];
...@@ -160,21 +208,8 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { ...@@ -160,21 +208,8 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
return -1; return -1;
} }
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue, SProcCfg cfg = dndGenProcCfg(pWrapper);
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, cfg.isChild = false;
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue,
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.shm = pWrapper->shm,
.pParent = pWrapper,
.isChild = false,
.name = pWrapper->name};
pWrapper->procType = PROC_PARENT; pWrapper->procType = PROC_PARENT;
pWrapper->pProc = taosProcInit(&cfg); pWrapper->pProc = taosProcInit(&cfg);
if (pWrapper->pProc == NULL) { if (pWrapper->pProc == NULL) {
...@@ -195,15 +230,9 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { ...@@ -195,15 +230,9 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
if (pDnode->ntype == NODE_MAX) { if (pDnode->ntype == NODE_MAX) {
dInfo("node:%s, should be started manually", pWrapper->name); dInfo("node:%s, should be started manually", pWrapper->name);
} else { } else {
char args[PATH_MAX]; if (dndNewProc(pWrapper, n) != 0) {
int32_t pid = taosNewProc(args);
if (pid <= 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("node:%s, failed to exec in new process since %s", pWrapper->name, terrstr());
return -1; return -1;
} }
pWrapper->procId = pid;
dInfo("node:%s, run in new process, pid:%d", pWrapper->name, pid);
} }
if (taosProcRun(pWrapper->pProc) != 0) { if (taosProcRun(pWrapper->pProc) != 0) {
...@@ -219,7 +248,29 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { ...@@ -219,7 +248,29 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
return -1; return -1;
} }
dInfo("dnode running in parent process"); dInfo("TDengine initialized successfully");
dndReportStartup(pDnode, "TDengine", "initialized successfully");
while (1) {
if (pDnode->event == DND_EVENT_STOP) {
dInfo("dnode is about to stop");
break;
}
for (ENodeType n = DNODE + 1; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (pDnode->ntype == NODE_MAX) continue;
if (pWrapper->procId != 0 && !taosProcExists(pWrapper->procId)) {
dInfo("node:%s, process not exist, pid:%d", pWrapper->name, pWrapper->procId);
dndNewProc(pWrapper, n);
}
taosMsleep(100);
}
}
return 0; return 0;
} }
...@@ -236,21 +287,8 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) { ...@@ -236,21 +287,8 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) {
return -1; return -1;
} }
SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue, SProcCfg cfg = dndGenProcCfg(pWrapper);
.childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, cfg.isChild = true;
.childFreeHeadFp = (ProcFreeFp)taosFreeQitem,
.childMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.childFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue,
.parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc,
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.shm = pWrapper->shm,
.pParent = pWrapper,
.isChild = true,
.name = pWrapper->name};
pWrapper->pProc = taosProcInit(&cfg); pWrapper->pProc = taosProcInit(&cfg);
if (pWrapper->pProc == NULL) { if (pWrapper->pProc == NULL) {
dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr()); dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
...@@ -269,31 +307,8 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) { ...@@ -269,31 +307,8 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) {
return -1; return -1;
} }
dInfo("dnode running in child process");
return 0;
}
int32_t dndRun(SDnode * pDnode) {
if (!tsMultiProcess) {
if (dndRunInSingleProcess(pDnode) != 0) {
dError("failed to run dnode since %s", terrstr());
return -1;
}
} else if (pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) {
if (dndRunInParentProcess(pDnode) != 0) {
dError("failed to run dnode in parent process since %s", terrstr());
return -1;
}
} else {
if (dndRunInChildProcess(pDnode) != 0) {
dError("failed to run dnode in child process since %s", terrstr());
return -1;
}
}
dndReportStartup(pDnode, "TDengine", "initialized successfully");
dInfo("TDengine initialized successfully"); dInfo("TDengine initialized successfully");
dndReportStartup(pDnode, "TDengine", "initialized successfully");
while (1) { while (1) {
if (pDnode->event == DND_EVENT_STOP) { if (pDnode->event == DND_EVENT_STOP) {
dInfo("dnode is about to stop"); dInfo("dnode is about to stop");
...@@ -301,6 +316,16 @@ int32_t dndRun(SDnode * pDnode) { ...@@ -301,6 +316,16 @@ int32_t dndRun(SDnode * pDnode) {
} }
taosMsleep(100); taosMsleep(100);
} }
}
int32_t dndRun(SDnode *pDnode) {
if (!tsMultiProcess) {
return dndRunInSingleProcess(pDnode);
} else if (pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) {
return dndRunInParentProcess(pDnode);
} else {
return dndRunInChildProcess(pDnode);
}
return 0; return 0;
} }
...@@ -152,7 +152,9 @@ void dndClose(SDnode *pDnode) { ...@@ -152,7 +152,9 @@ void dndClose(SDnode *pDnode) {
void dndHandleEvent(SDnode *pDnode, EDndEvent event) { void dndHandleEvent(SDnode *pDnode, EDndEvent event) {
dInfo("dnode object receive event %d, data:%p", event, pDnode); dInfo("dnode object receive event %d, data:%p", event, pDnode);
pDnode->event = event; if (event == DND_EVENT_STOP) {
pDnode->event = event;
}
} }
SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType ntype) { SMgmtWrapper *dndAcquireWrapper(SDnode *pDnode, ENodeType ntype) {
......
...@@ -17,11 +17,36 @@ ...@@ -17,11 +17,36 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
int32_t taosNewProc(const char *args) { static char *tsProcPath = NULL;
return 0;
int32_t taosNewProc(char **args) {
int32_t pid = fork();
if (pid == 0) {
args[0] = tsProcPath;
return execvp(tsProcPath, args);
} else {
return pid;
}
} }
void taosSetProcName(char **argv, const char *name) { // the length of the new name must be less than the original name to take effect
void taosSetProcName(int32_t argc, char **argv, const char *name) {
prctl(PR_SET_NAME, name); prctl(PR_SET_NAME, name);
strcpy(argv[0], name);
} for (int32_t i = 0; i < argc; ++i) {
\ No newline at end of file int32_t len = strlen(argv[i]);
for (int32_t j = 0; j < len; ++j) {
argv[i][j] = 0;
}
if (i == 0) {
tstrncpy(argv[0], name, len);
}
}
}
void taosSetProcPath(int32_t argc, char **argv) { tsProcPath = argv[0]; }
bool taosProcExists(int32_t pid) {
int32_t p = getpgid(pid);
return p == 0;
}
...@@ -71,6 +71,6 @@ void taosIgnSignal(int32_t signum) { signal(signum, SIG_IGN); } ...@@ -71,6 +71,6 @@ void taosIgnSignal(int32_t signum) { signal(signum, SIG_IGN); }
void taosDflSignal(int32_t signum) { signal(signum, SIG_DFL); } void taosDflSignal(int32_t signum) { signal(signum, SIG_DFL); }
void taosKillChildOnSelfStopped() { prctl(PR_SET_PDEATHSIG, SIGKILL); } void taosKillChildOnParentStopped() { prctl(PR_SET_PDEATHSIG, SIGKILL); }
#endif #endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册