未验证 提交 22305b01 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #11147 from taosdata/feature/shm

shm
...@@ -82,6 +82,7 @@ extern "C" { ...@@ -82,6 +82,7 @@ extern "C" {
#include "osLz4.h" #include "osLz4.h"
#include "osMath.h" #include "osMath.h"
#include "osMemory.h" #include "osMemory.h"
#include "osProc.h"
#include "osRand.h" #include "osRand.h"
#include "osThread.h" #include "osThread.h"
#include "osSemaphore.h" #include "osSemaphore.h"
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_OS_PROC_H_
#define _TD_OS_PROC_H_
#ifdef __cplusplus
extern "C" {
#endif
// start a copy of itself
int32_t taosNewProc(const char *args);
// the length of the new name must be less than the original name to take effect
void taosSetProcName(char **argv, const char *name);
#ifdef __cplusplus
}
#endif
#endif /*_TD_OS_PROC_H_*/
...@@ -49,6 +49,8 @@ void taosSetSignal(int32_t signum, FSignalHandler sigfp); ...@@ -49,6 +49,8 @@ void taosSetSignal(int32_t signum, FSignalHandler sigfp);
void taosIgnSignal(int32_t signum); void taosIgnSignal(int32_t signum);
void taosDflSignal(int32_t signum); void taosDflSignal(int32_t signum);
void taosKillChildOnSelfStopped();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -78,6 +78,7 @@ int32_t* taosGetErrno(); ...@@ -78,6 +78,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_CFG_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x010C) #define TSDB_CODE_CFG_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x010C)
#define TSDB_CODE_INVALID_CFG TAOS_DEF_ERROR_CODE(0, 0x010D) #define TSDB_CODE_INVALID_CFG TAOS_DEF_ERROR_CODE(0, 0x010D)
#define TSDB_CODE_OUT_OF_SHM_MEM TAOS_DEF_ERROR_CODE(0, 0x010E) #define TSDB_CODE_OUT_OF_SHM_MEM TAOS_DEF_ERROR_CODE(0, 0x010E)
#define TSDB_CODE_INVALID_SHM_ID TAOS_DEF_ERROR_CODE(0, 0x010F)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0110) #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0110)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0111) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0111)
#define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x0112) #define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x0112)
......
...@@ -52,13 +52,11 @@ static void dndSetSignalHandle() { ...@@ -52,13 +52,11 @@ static void dndSetSignalHandle() {
if (!tsMultiProcess) { if (!tsMultiProcess) {
// Set the single process signal // Set the single process signal
} else if (global.ntype == DNODE) { } else if (global.ntype == DNODE) {
// Set the parent process signal
// When the child process exits, the parent process receives a signal // When the child process exits, the parent process receives a signal
taosSetSignal(SIGCHLD, dndHandleChild); taosSetSignal(SIGCHLD, dndHandleChild);
} else { } else {
// Set child process signal
// When the parent process exits, the child process will receive the SIGKILL signal // When the parent process exits, the child process will receive the SIGKILL signal
prctl(PR_SET_PDEATHSIG, SIGKILL); taosKillChildOnSelfStopped();
} }
} }
...@@ -145,8 +143,7 @@ static int32_t dndInitLog() { ...@@ -145,8 +143,7 @@ static int32_t dndInitLog() {
static void dndSetProcName(char **argv) { static void dndSetProcName(char **argv) {
if (global.ntype != DNODE) { if (global.ntype != DNODE) {
const char *name = dndNodeProcStr(global.ntype); const char *name = dndNodeProcStr(global.ntype);
prctl(PR_SET_NAME, name); taosSetProcName(argv, name);
strcpy(argv[0], name);
} }
} }
......
...@@ -95,13 +95,14 @@ typedef struct SMgmtWrapper { ...@@ -95,13 +95,14 @@ typedef struct SMgmtWrapper {
bool deployed; bool deployed;
bool required; bool required;
EProcType procType; EProcType procType;
int32_t procId;
SProcObj *pProc; SProcObj *pProc;
SShm shm; SShm shm;
void *pMgmt; void *pMgmt;
SDnode *pDnode; SDnode *pDnode;
NodeMsgFp msgFps[TDMT_MAX];
int8_t msgVgIds[TDMT_MAX]; // Handle the case where the same message type is distributed to qnode or vnode
SMgmtFp fp; SMgmtFp fp;
int8_t msgVgIds[TDMT_MAX]; // Handle the case where the same message type is distributed to qnode or vnode
NodeMsgFp msgFps[TDMT_MAX];
} SMgmtWrapper; } SMgmtWrapper;
typedef struct { typedef struct {
......
...@@ -192,8 +192,19 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { ...@@ -192,8 +192,19 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue; if (!pWrapper->required) continue;
dInfo("node:%s, will not start in parent process", pWrapper->name); if (pDnode->ntype == NODE_MAX) {
// exec new node dInfo("node:%s, should be started manually", pWrapper->name);
} else {
char args[PATH_MAX];
int32_t pid = taosNewProc(args);
if (pid <= 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("node:%s, failed to exec in new process since %s", pWrapper->name, terrstr());
return -1;
}
pWrapper->procId = pid;
dInfo("node:%s, run in new process, pid:%d", pWrapper->name, pid);
}
if (taosProcRun(pWrapper->pProc) != 0) { if (taosProcRun(pWrapper->pProc) != 0) {
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
...@@ -214,8 +225,12 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) { ...@@ -214,8 +225,12 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
static int32_t dndRunInChildProcess(SDnode *pDnode) { static int32_t dndRunInChildProcess(SDnode *pDnode) {
dInfo("dnode start to run in child process"); dInfo("dnode start to run in child process");
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
SMsgCb msgCb = dndCreateMsgcb(pWrapper);
tmsgSetDefaultMsgCb(&msgCb);
pWrapper->procType = PROC_CHILD;
if (dndOpenNode(pWrapper) != 0) { if (dndOpenNode(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr()); dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1; return -1;
...@@ -236,13 +251,19 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) { ...@@ -236,13 +251,19 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) {
.isChild = true, .isChild = true,
.name = pWrapper->name}; .name = pWrapper->name};
pWrapper->procType = PROC_CHILD;
pWrapper->pProc = taosProcInit(&cfg); pWrapper->pProc = taosProcInit(&cfg);
if (pWrapper->pProc == NULL) { if (pWrapper->pProc == NULL) {
dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr()); dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
return -1; return -1;
} }
if (pWrapper->fp.startFp != NULL) {
if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
}
if (taosProcRun(pWrapper->pProc) != 0) { if (taosProcRun(pWrapper->pProc) != 0) {
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr()); dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
return -1; return -1;
...@@ -258,7 +279,7 @@ int32_t dndRun(SDnode * pDnode) { ...@@ -258,7 +279,7 @@ int32_t dndRun(SDnode * pDnode) {
dError("failed to run dnode since %s", terrstr()); dError("failed to run dnode since %s", terrstr());
return -1; return -1;
} }
} else if (pDnode->ntype == DNODE) { } else if (pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) {
if (dndRunInParentProcess(pDnode) != 0) { if (dndRunInParentProcess(pDnode) != 0) {
dError("failed to run dnode in parent process since %s", terrstr()); dError("failed to run dnode in parent process since %s", terrstr());
return -1; return -1;
......
...@@ -179,7 +179,7 @@ int32_t dndReadShmFile(SDnode *pDnode) { ...@@ -179,7 +179,7 @@ int32_t dndReadShmFile(SDnode *pDnode) {
} }
} }
if (!tsMultiProcess || pDnode->ntype == DNODE) { if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == DNODE) {
for (ENodeType ntype = DNODE; ntype < NODE_MAX; ++ntype) { for (ENodeType ntype = DNODE; ntype < NODE_MAX; ++ntype) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
if (pWrapper->shm.id >= 0) { if (pWrapper->shm.id >= 0) {
...@@ -191,7 +191,7 @@ int32_t dndReadShmFile(SDnode *pDnode) { ...@@ -191,7 +191,7 @@ int32_t dndReadShmFile(SDnode *pDnode) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype]; SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
if (taosAttachShm(&pWrapper->shm) != 0) { if (taosAttachShm(&pWrapper->shm) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
dError("shmid:%d, failed to attach since %s", pWrapper->shm.id, terrstr()); dError("shmid:%d, failed to attach shm since %s", pWrapper->shm.id, terrstr());
goto _OVER; goto _OVER;
} }
dDebug("shmid:%d, is attached, size:%d", pWrapper->shm.id, pWrapper->shm.size); dDebug("shmid:%d, is attached, size:%d", pWrapper->shm.id, pWrapper->shm.size);
......
...@@ -35,9 +35,11 @@ static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { ...@@ -35,9 +35,11 @@ static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) {
return -1; return -1;
} }
pDnode->lockfile = dndCheckRunning(pDnode->dataDir); if (!tsMultiProcess || pDnode->ntype == DNODE || pDnode->ntype == NODE_MAX) {
if (pDnode->lockfile == NULL) { pDnode->lockfile = dndCheckRunning(pDnode->dataDir);
return -1; if (pDnode->lockfile == NULL) {
return -1;
}
} }
return 0; return 0;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define ALLOW_FORBID_FUNC
#define _DEFAULT_SOURCE
#include "os.h"
int32_t taosNewProc(const char *args) {
return 0;
}
void taosSetProcName(char **argv, const char *name) {
prctl(PR_SET_NAME, name);
strcpy(argv[0], name);
}
\ No newline at end of file
...@@ -49,12 +49,11 @@ void taosDropShm(SShm* pShm) { ...@@ -49,12 +49,11 @@ void taosDropShm(SShm* pShm) {
} }
int32_t taosAttachShm(SShm* pShm) { int32_t taosAttachShm(SShm* pShm) {
if (pShm->id >= 0) { errno = 0;
pShm->ptr = shmat(pShm->id, NULL, 0);
if (pShm->ptr != NULL) {
return 0;
}
}
return -1; void* ptr = shmat(pShm->id, NULL, 0);
if (errno == 0) {
pShm->ptr = ptr;
}
return errno;
} }
...@@ -71,4 +71,6 @@ void taosIgnSignal(int32_t signum) { signal(signum, SIG_IGN); } ...@@ -71,4 +71,6 @@ void taosIgnSignal(int32_t signum) { signal(signum, SIG_IGN); }
void taosDflSignal(int32_t signum) { signal(signum, SIG_DFL); } void taosDflSignal(int32_t signum) { signal(signum, SIG_DFL); }
void taosKillChildOnSelfStopped() { prctl(PR_SET_PDEATHSIG, SIGKILL); }
#endif #endif
...@@ -85,7 +85,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization ...@@ -85,7 +85,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_REPEAT_INIT, "Repeat initialization
TAOS_DEFINE_ERROR(TSDB_CODE_CFG_NOT_FOUND, "Config not found") TAOS_DEFINE_ERROR(TSDB_CODE_CFG_NOT_FOUND, "Config not found")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CFG, "Invalid config option") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CFG, "Invalid config option")
TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_SHM_MEM, "Out of Share memory") TAOS_DEFINE_ERROR(TSDB_CODE_OUT_OF_SHM_MEM, "Out of Share memory")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_SHM_ID, "Invalid SHM ID")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs") TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs")
TAOS_DEFINE_ERROR(TSDB_CODE_REF_ID_REMOVED, "Ref ID is removed") TAOS_DEFINE_ERROR(TSDB_CODE_REF_ID_REMOVED, "Ref ID is removed")
......
...@@ -26,108 +26,3 @@ class UtilTestQueue : public ::testing::Test { ...@@ -26,108 +26,3 @@ class UtilTestQueue : public ::testing::Test {
static void SetUpTestSuite() {} static void SetUpTestSuite() {}
static void TearDownTestSuite() {} static void TearDownTestSuite() {}
}; };
#if 0
TEST_F(UtilTestQueue, 01_fork) {
pid_t pid;
int shmid;
int* shmptr;
int* tmp;
int err;
pthread_mutexattr_t mattr;
if ((err = taosThreadMutexAttrInit(&mattr)) < 0) {
printf("mutex addr init error:%s\n", strerror(err));
exit(1);
}
if ((err = taosThreadMutexAttrSetPshared(&mattr, PTHREAD_PROCESS_SHARED)) < 0) {
printf("mutex addr get shared error:%s\n", strerror(err));
exit(1);
}
pthread_mutex_t* m;
int mid = shmget(IPC_PRIVATE, sizeof(pthread_mutex_t), 0600);
m = (pthread_mutex_t*)shmat(mid, NULL, 0);
if ((err = taosThreadMutexInit(m, &mattr)) < 0) {
printf("mutex mutex init error:%s\n", strerror(err));
exit(1);
}
if ((shmid = shmget(IPC_PRIVATE, 1000, IPC_CREAT | 0600)) < 0) {
perror("shmget error");
exit(1);
}
if ((shmptr = (int*)shmat(shmid, 0, 0)) == (void*)-1) {
perror("shmat error");
exit(1);
}
tmp = shmptr;
int shmid2;
int** shmptr2;
if ((shmid2 = shmget(IPC_PRIVATE, 20, IPC_CREAT | 0600)) < 0) {
perror("shmget2 error");
exit(1);
}
if ((shmptr2 = (int**)shmat(shmid2, 0, 0)) == (void*)-1) {
perror("shmat2 error");
exit(1);
}
*shmptr2 = shmptr;
if ((pid = fork()) < 0) {
perror("fork error");
exit(1);
} else if (pid == 0) {
if ((err = taosThreadMutexLock(m)) < 0) {
printf("lock error:%s\n", strerror(err));
exit(1);
}
for (int i = 0; i < 30; ++i) {
**shmptr2 = i;
(*shmptr2)++;
}
if ((err = taosThreadMutexUnlock(m)) < 0) {
printf("unlock error:%s\n", strerror(err));
exit(1);
}
exit(0);
} else {
if ((err = taosThreadMutexLock(m)) < 0) {
printf("lock error:%s\n", strerror(err));
exit(1);
}
for (int i = 10; i < 42; ++i) {
**shmptr2 = i;
(*shmptr2)++;
}
if ((err = taosThreadMutexUnlock(m)) < 0) {
printf("unlock error:%s\n", strerror(err));
exit(1);
}
}
wait(NULL);
for (int i = 0; i < 70; ++i) {
printf("%d ", tmp[i]);
}
printf("\n");
taosThreadAttrDestroy(&mattr);
//销毁mutex
taosThreadMutexDestroy(m);
exit(0);
}
#endif
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册