提交 623a71d3 编写于 作者: S slzhou

feature(udf):move start/stop udfd out of dnode management preparation

上级 a2e074be
......@@ -336,6 +336,17 @@ int32_t udfcOpen();
*/
int32_t udfcClose();
/**
* start udfd that serves udf function invocation under dnode startDnodeId
* @param startDnodeId
* @return
*/
int32_t udfStartUdfd(int32_t startDnodeId);
/**
* stop udfd
* @return
*/
int32_t udfStopUdfd();
#ifdef __cplusplus
}
#endif
......
......@@ -217,143 +217,6 @@ static void dmStopMgmt(SMgmtWrapper *pWrapper) {
dmStopStatusThread(pWrapper->pDnode);
}
static int32_t dmSpawnUdfd(SUdfdData *pData);
void dmUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
dInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
SUdfdData *pData = process->data;
if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) {
dInfo("udfd process exit due to SIGINT or dnode-mgmt called stop");
} else {
dInfo("udfd process restart");
dmSpawnUdfd(pData);
}
}
static int32_t dmSpawnUdfd(SUdfdData* pData) {
dInfo("dnode start spawning udfd");
uv_process_options_t options = {0};
char path[PATH_MAX] = {0};
if (tsProcPath == NULL) {
path[0] = '.';
} else {
strncpy(path, tsProcPath, strlen(tsProcPath));
taosDirName(path);
}
#ifdef WINDOWS
strcat(path, "udfd.exe");
#else
strcat(path, "/udfd");
#endif
char* argsUdfd[] = {path, "-c", configDir, NULL};
options.args = argsUdfd;
options.file = path;
options.exit_cb = dmUdfdExit;
uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1);
uv_stdio_container_t child_stdio[3];
child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
child_stdio[0].data.stream = (uv_stream_t*) &pData->ctrlPipe;
child_stdio[1].flags = UV_IGNORE;
child_stdio[2].flags = UV_INHERIT_FD;
child_stdio[2].data.fd = 2;
options.stdio_count = 3;
options.stdio = child_stdio;
options.flags = UV_PROCESS_DETACHED;
char dnodeIdEnvItem[32] = {0};
char thrdPoolSizeEnvItem[32] = {0};
snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId);
float numCpuCores = 4;
taosGetCpuCores(&numCpuCores);
snprintf(thrdPoolSizeEnvItem,32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2);
char* envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL};
options.env = envUdfd;
int err = uv_spawn(&pData->loop, &pData->process, &options);
pData->process.data = (void*)pData;
if (err != 0) {
dError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
}
return err;
}
static void dmUdfdCloseWalkCb(uv_handle_t* handle, void* arg) {
if (!uv_is_closing(handle)) {
uv_close(handle, NULL);
}
}
static void dmUdfdStopAsyncCb(uv_async_t *async) {
SUdfdData *pData = async->data;
uv_stop(&pData->loop);
}
static void dmWatchUdfd(void *args) {
SUdfdData *pData = args;
uv_loop_init(&pData->loop);
uv_async_init(&pData->loop, &pData->stopAsync, dmUdfdStopAsyncCb);
pData->stopAsync.data = pData;
int32_t err = dmSpawnUdfd(pData);
atomic_store_32(&pData->spawnErr, err);
uv_barrier_wait(&pData->barrier);
uv_run(&pData->loop, UV_RUN_DEFAULT);
uv_loop_close(&pData->loop);
uv_walk(&pData->loop, dmUdfdCloseWalkCb, NULL);
uv_run(&pData->loop, UV_RUN_DEFAULT);
uv_loop_close(&pData->loop);
return;
}
static int32_t dmStartUdfd(SUdfdData *pUdfdData, int32_t startDnodeId) {
char dnodeId[8] = {0};
snprintf(dnodeId, sizeof(dnodeId), "%d", startDnodeId);
uv_os_setenv("DNODE_ID", dnodeId);
SUdfdData *pData = pUdfdData;
pData->dnodeId = startDnodeId;
if (pData->startCalled) {
dInfo("dnode-mgmt start udfd already called");
return 0;
}
pData->startCalled = true;
uv_barrier_init(&pData->barrier, 2);
uv_thread_create(&pData->thread, dmWatchUdfd, pData);
uv_barrier_wait(&pData->barrier);
int32_t err = atomic_load_32(&pData->spawnErr);
if (err != 0) {
uv_barrier_destroy(&pData->barrier);
uv_async_send(&pData->stopAsync);
uv_thread_join(&pData->thread);
pData->needCleanUp = false;
dInfo("dnode-mgmt udfd cleaned up after spawn err");
} else {
pData->needCleanUp = true;
}
return err;
}
static int32_t dmStopUdfd(SUdfdData *udfdData) {
dInfo("dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d",
udfdData->needCleanUp, udfdData->spawnErr);
SUdfdData *pData = udfdData;
if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) {
return 0;
}
atomic_store_32(&pData->stopCalled, 1);
pData->needCleanUp = false;
uv_barrier_destroy(&pData->barrier);
uv_async_send(&pData->stopAsync);
uv_thread_join(&pData->thread);
dInfo("dnode-mgmt udfd cleaned up");
return 0;
}
static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
dInfo("dnode-mgmt start to init");
SDnode *pDnode = pWrapper->pDnode;
......@@ -385,7 +248,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
}
dmReportStartup(pDnode, "dnode-transport", "initialized");
if (dmStartUdfd(&pDnode->udfdData, pDnode->data.dnodeId) != 0) {
if (udfStartUdfd(pDnode->data.dnodeId) != 0) {
dError("failed to start udfd");
}
......@@ -396,7 +259,9 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
static void dmCleanupMgmt(SMgmtWrapper *pWrapper) {
dInfo("dnode-mgmt start to clean up");
SDnode *pDnode = pWrapper->pDnode;
dmStopUdfd(&pDnode->udfdData);
udfStopUdfd();
dmStopWorker(pDnode);
taosWLockLatch(&pDnode->data.latch);
......
......@@ -23,10 +23,165 @@
#include "builtinsimpl.h"
#include "functionMgt.h"
//TODO: network error processing.
//TODO: add unit test
//TODO: include all global variable under context struct
typedef struct SUdfdData {
bool startCalled;
bool needCleanUp;
uv_loop_t loop;
uv_thread_t thread;
uv_barrier_t barrier;
uv_process_t process;
int spawnErr;
uv_pipe_t ctrlPipe;
uv_async_t stopAsync;
int32_t stopCalled;
int32_t dnodeId;
} SUdfdData;
SUdfdData udfdGlobal = {0};
static int32_t udfSpawnUdfd(SUdfdData *pData);
void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
SUdfdData *pData = process->data;
if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) {
fnInfo("udfd process exit due to SIGINT or dnode-mgmt called stop");
} else {
fnInfo("udfd process restart");
udfSpawnUdfd(pData);
}
}
static int32_t udfSpawnUdfd(SUdfdData* pData) {
fnInfo("dnode start spawning udfd");
uv_process_options_t options = {0};
char path[PATH_MAX] = {0};
if (tsProcPath == NULL) {
path[0] = '.';
} else {
strncpy(path, tsProcPath, strlen(tsProcPath));
taosDirName(path);
}
#ifdef WINDOWS
strcat(path, "udfd.exe");
#else
strcat(path, "/udfd");
#endif
char* argsUdfd[] = {path, "-c", configDir, NULL};
options.args = argsUdfd;
options.file = path;
options.exit_cb = udfUdfdExit;
uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1);
uv_stdio_container_t child_stdio[3];
child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
child_stdio[0].data.stream = (uv_stream_t*) &pData->ctrlPipe;
child_stdio[1].flags = UV_IGNORE;
child_stdio[2].flags = UV_INHERIT_FD;
child_stdio[2].data.fd = 2;
options.stdio_count = 3;
options.stdio = child_stdio;
options.flags = UV_PROCESS_DETACHED;
char dnodeIdEnvItem[32] = {0};
char thrdPoolSizeEnvItem[32] = {0};
snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId);
float numCpuCores = 4;
taosGetCpuCores(&numCpuCores);
snprintf(thrdPoolSizeEnvItem,32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2);
char* envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL};
options.env = envUdfd;
int err = uv_spawn(&pData->loop, &pData->process, &options);
pData->process.data = (void*)pData;
if (err != 0) {
fnError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
}
return err;
}
static void udfUdfdCloseWalkCb(uv_handle_t* handle, void* arg) {
if (!uv_is_closing(handle)) {
uv_close(handle, NULL);
}
}
static void udfUdfdStopAsyncCb(uv_async_t *async) {
SUdfdData *pData = async->data;
uv_stop(&pData->loop);
}
static void udfWatchUdfd(void *args) {
SUdfdData *pData = args;
uv_loop_init(&pData->loop);
uv_async_init(&pData->loop, &pData->stopAsync, udfUdfdStopAsyncCb);
pData->stopAsync.data = pData;
int32_t err = udfSpawnUdfd(pData);
atomic_store_32(&pData->spawnErr, err);
uv_barrier_wait(&pData->barrier);
uv_run(&pData->loop, UV_RUN_DEFAULT);
uv_loop_close(&pData->loop);
uv_walk(&pData->loop, udfUdfdCloseWalkCb, NULL);
uv_run(&pData->loop, UV_RUN_DEFAULT);
uv_loop_close(&pData->loop);
return;
}
int32_t udfStartUdfd(int32_t startDnodeId) {
SUdfdData *pData = &udfdGlobal;
if (pData->startCalled) {
fnInfo("dnode-mgmt start udfd already called");
return 0;
}
pData->startCalled = true;
char dnodeId[8] = {0};
snprintf(dnodeId, sizeof(dnodeId), "%d", startDnodeId);
uv_os_setenv("DNODE_ID", dnodeId);
pData->dnodeId = startDnodeId;
uv_barrier_init(&pData->barrier, 2);
uv_thread_create(&pData->thread, udfWatchUdfd, pData);
uv_barrier_wait(&pData->barrier);
int32_t err = atomic_load_32(&pData->spawnErr);
if (err != 0) {
uv_barrier_destroy(&pData->barrier);
uv_async_send(&pData->stopAsync);
uv_thread_join(&pData->thread);
pData->needCleanUp = false;
fnInfo("dnode-mgmt udfd cleaned up after spawn err");
} else {
pData->needCleanUp = true;
}
return err;
}
int32_t udfStopUdfd() {
SUdfdData *pData = &udfdGlobal;
fnInfo("dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d",
pData->needCleanUp, pData->spawnErr);
if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) {
return 0;
}
atomic_store_32(&pData->stopCalled, 1);
pData->needCleanUp = false;
uv_barrier_destroy(&pData->barrier);
uv_async_send(&pData->stopAsync);
uv_thread_join(&pData->thread);
fnInfo("dnode-mgmt udfd cleaned up");
return 0;
}
//==============================================================================================
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
* The QUEUE is copied from queue.h under libuv
* */
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册