提交 7c54b699 编写于 作者: S slzhou

sigkill to kill taosd causes udfd to exit

上级 ad398bb6
......@@ -226,6 +226,7 @@ void dmUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
if (atomic_load_8(&pData->stopping) != 0) {
dDebug("udfd process exit due to stopping");
} else {
uv_close((uv_handle_t*)&pData->ctrlPipe, NULL);
dmSpawnUdfd(pDnode);
}
}
......@@ -243,20 +244,21 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) {
options.file = path;
options.exit_cb = dmUdfdExit;
SUdfdData *pData = &pDnode->udfdData;
uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1);
options.stdio_count = 3;
uv_stdio_container_t child_stdio[3];
child_stdio[0].flags = UV_IGNORE;
child_stdio[1].flags = UV_INHERIT_FD;
child_stdio[1].data.fd = 1;
child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
child_stdio[0].data.stream = (uv_stream_t*) &pData->ctrlPipe;
child_stdio[1].flags = UV_IGNORE;
child_stdio[2].flags = UV_INHERIT_FD;
child_stdio[2].data.fd = 2;
options.stdio_count = 3;
options.stdio = child_stdio;
char dnodeIdEnvItem[32] = {0};
char thrdPoolSizeEnvItem[32] = {0};
snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pDnode->data.dnodeId);
SUdfdData *pData = &pDnode->udfdData;
float numCpuCores = 4;
taosGetCpuCores(&numCpuCores);
snprintf(thrdPoolSizeEnvItem,32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2);
......
......@@ -152,6 +152,7 @@ typedef struct SUdfdData {
uv_process_t process;
int spawnErr;
int8_t stopping;
uv_pipe_t ctrlPipe;
} SUdfdData;
typedef struct SDnode {
......
......@@ -27,7 +27,10 @@
typedef struct SUdfdContext {
uv_loop_t *loop;
uv_pipe_t ctrlPipe;
uv_signal_t intrSignal;
char listenPipeName[UDF_LISTEN_PIPE_NAME_LEN];
uv_pipe_t listeningPipe;
void *clientRpc;
uv_mutex_t udfsMutex;
......@@ -380,10 +383,12 @@ void udfdOnNewConnection(uv_stream_t *server, int status) {
}
}
void removeListeningPipe(int sig) {
void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
fnInfo("udfd signal received: %d\n", signum);
uv_fs_t req;
uv_fs_unlink(global.loop, &req, "udf.sock", NULL);
exit(0);
uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
uv_signal_stop(handle);
uv_stop(global.loop);
}
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; }
......@@ -492,37 +497,67 @@ static int32_t udfdInitLog() {
return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, 0);
}
void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
buf->base = taosMemoryMalloc(suggested_size);
buf->len = suggested_size;
}
void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) {
if (nread < 0) {
fnError("udfd ctrl pipe read error. %s", uv_err_name(nread));
uv_close((uv_handle_t*)q, NULL);
uv_stop(global.loop);
return;
}
fnError("udfd ctrl pipe read %zu bytes", nread);
taosMemoryFree(buf->base);
}
static int32_t removeListeningPipe() {
uv_fs_t req;
int err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
uv_fs_req_cleanup(&req);
return err;
}
static int32_t udfdUvInit() {
uv_loop_t* loop = taosMemoryMalloc(sizeof(uv_loop_t));
if (loop) {
uv_loop_init(loop);
}
global.loop = loop;
uv_pipe_init(global.loop, &global.ctrlPipe, 1);
uv_pipe_open(&global.ctrlPipe, 0);
uv_read_start((uv_stream_t*)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb);
char dnodeId[8] = {0};
size_t dnodeIdSize;
uv_os_getenv("DNODE_ID", dnodeId, &dnodeIdSize);
int32_t err = uv_os_getenv("DNODE_ID", dnodeId, &dnodeIdSize);
if (err != 0) {
dnodeId[0] = '1';
}
char listenPipeName[32] = {0};
snprintf(listenPipeName, sizeof(listenPipeName), "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
strcpy(global.listenPipeName, listenPipeName);
uv_fs_t req;
uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
removeListeningPipe();
uv_pipe_t server;
uv_pipe_init(global.loop, &server, 0);
uv_pipe_init(global.loop, &global.listeningPipe, 0);
signal(SIGINT, removeListeningPipe);
uv_signal_init(global.loop, &global.intrSignal);
uv_signal_start(&global.intrSignal, udfdIntrSignalHandler, SIGINT);
int r;
fnInfo("bind to pipe %s", global.listenPipeName);
if ((r = uv_pipe_bind(&server, listenPipeName))) {
if ((r = uv_pipe_bind(&global.listeningPipe, listenPipeName))) {
fnError("Bind error %s", uv_err_name(r));
removeListeningPipe(0);
removeListeningPipe();
return -1;
}
if ((r = uv_listen((uv_stream_t *)&server, 128, udfdOnNewConnection))) {
if ((r = uv_listen((uv_stream_t *)&global.listeningPipe, 128, udfdOnNewConnection))) {
fnError("Listen error %s", uv_err_name(r));
removeListeningPipe(0);
removeListeningPipe();
return -2;
}
return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册