提交 adf993cd 编写于 作者: S slzhou

start/stop/restart from dnode

上级 d8ba1d32
......@@ -216,6 +216,95 @@ static void dmStopMgmt(SMgmtWrapper *pWrapper) {
dmStopStatusThread(pWrapper->pDnode);
}
static int32_t dmSpawnUdfd(SDnodeData *pData);
void dmUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
dInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
uv_close((uv_handle_t*)process, NULL);
SDnodeData *pData = process->data;
if (atomic_load_8(&pData->udfdStoping) != 0) {
dDebug("udfd process exit due to stopping");
} else {
dmSpawnUdfd(pData);
}
}
static int32_t dmSpawnUdfd(SDnodeData *pData) {
dInfo("dnode start spawning udfd");
uv_process_options_t options = {0};
char path[PATH_MAX] = {0};
size_t cwdSize;
uv_cwd(path, &cwdSize);
strcat(path, "/udfd");
char* argsUdfd[] = {path, "-c", configDir, NULL};
options.args = argsUdfd;
options.file = path;
options.exit_cb = dmUdfdExit;
options.stdio_count = 3;
uv_stdio_container_t child_stdio[3];
child_stdio[0].flags = UV_IGNORE;
child_stdio[1].flags = UV_INHERIT_FD;
child_stdio[1].data.fd = 1;
child_stdio[2].flags = UV_INHERIT_FD;
child_stdio[2].data.fd = 2;
options.stdio = child_stdio;
char dnodeIdEnvItem[32] = {0};
char thrdPoolSizeEnvItem[32] = {0};
snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId);
float numCpuCores = 4;
taosGetCpuCores(&numCpuCores);
snprintf(thrdPoolSizeEnvItem,32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2);
char* envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL};
options.env = envUdfd;
int err = uv_spawn(&pData->udfdLoop, &pData->udfdProcess, &options);
pData->udfdProcess.data = (void*)pData;
if (err != 0) {
dError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
}
return err;
}
void dmWatchUdfd(void *args) {
SDnodeData *pData = args;
uv_loop_init(&pData->udfdLoop);
int err = dmSpawnUdfd(pData);
pData->udfdErrCode = err;
uv_barrier_wait(&pData->udfdBarrier);
if (pData->udfdErrCode == 0) {
uv_run(&pData->udfdLoop, UV_RUN_DEFAULT);
}
uv_loop_close(&pData->udfdLoop);
return;
}
int32_t dmStartUdfd(SDnode *pDnode) {
SDnodeData *pData = &pDnode->data;
uv_barrier_init(&pData->udfdBarrier, 2);
pData->udfdStoping = 0;
uv_thread_create(&pData->udfdThread, dmWatchUdfd, pData);
uv_barrier_wait(&pData->udfdBarrier);
return pData->udfdErrCode;
}
int32_t dmStopUdfd(SDnode *pDnode) {
SDnodeData *pData = &pDnode->data;
atomic_store_8(&pData->udfdStoping, 1);
uv_barrier_destroy(&pData->udfdBarrier);
uv_process_kill(&pData->udfdProcess, SIGINT);
uv_thread_join(&pData->udfdThread);
atomic_store_8(&pData->udfdStoping, 0);
return 0;
}
static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
dInfo("dnode-mgmt start to init");
SDnode *pDnode = pWrapper->pDnode;
......@@ -247,6 +336,10 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
}
dmReportStartup(pDnode, "dnode-transport", "initialized");
if (dmStartUdfd(pDnode) != 0) {
dError("failed to start udfd");
}
dInfo("dnode-mgmt is initialized");
return 0;
}
......@@ -254,6 +347,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
static void dmCleanupMgmt(SMgmtWrapper *pWrapper) {
dInfo("dnode-mgmt start to clean up");
SDnode *pDnode = pWrapper->pDnode;
dmStopUdfd(pDnode);
dmStopWorker(pDnode);
taosWLockLatch(&pDnode->data.latch);
......
......@@ -16,6 +16,7 @@
#ifndef _TD_DM_DEF_H_
#define _TD_DM_DEF_H_
#include "uv.h"
#include "dmLog.h"
#include "cJSON.h"
......@@ -135,6 +136,13 @@ typedef struct {
int32_t numOfDisks;
int32_t supportVnodes;
uint16_t serverPort;
uv_loop_t udfdLoop;
uv_thread_t udfdThread;
uv_barrier_t udfdBarrier;
uv_process_t udfdProcess;
int udfdErrCode;
int8_t udfdStoping;
} SDnodeData;
typedef struct {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册