diff --git a/source/dnode/mgmt/implement/src/dmHandle.c b/source/dnode/mgmt/implement/src/dmHandle.c index c59ff1521a6c4261cab74834f9d8cf321b3f095d..7b7ab7fa9ad2986b6e32e234d13b6f33ac9b0f25 100644 --- a/source/dnode/mgmt/implement/src/dmHandle.c +++ b/source/dnode/mgmt/implement/src/dmHandle.c @@ -226,6 +226,7 @@ void dmUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) { if (atomic_load_8(&pData->stopping) != 0) { dDebug("udfd process exit due to stopping"); } else { + uv_close((uv_handle_t*)&pData->ctrlPipe, NULL); dmSpawnUdfd(pDnode); } } @@ -243,20 +244,21 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) { options.file = path; options.exit_cb = dmUdfdExit; + SUdfdData *pData = &pDnode->udfdData; + uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1); - options.stdio_count = 3; uv_stdio_container_t child_stdio[3]; - child_stdio[0].flags = UV_IGNORE; - child_stdio[1].flags = UV_INHERIT_FD; - child_stdio[1].data.fd = 1; + child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; + child_stdio[0].data.stream = (uv_stream_t*) &pData->ctrlPipe; + child_stdio[1].flags = UV_IGNORE; child_stdio[2].flags = UV_INHERIT_FD; child_stdio[2].data.fd = 2; + options.stdio_count = 3; options.stdio = child_stdio; char dnodeIdEnvItem[32] = {0}; char thrdPoolSizeEnvItem[32] = {0}; snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pDnode->data.dnodeId); - SUdfdData *pData = &pDnode->udfdData; float numCpuCores = 4; taosGetCpuCores(&numCpuCores); snprintf(thrdPoolSizeEnvItem,32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2); diff --git a/source/dnode/mgmt/interface/inc/dmDef.h b/source/dnode/mgmt/interface/inc/dmDef.h index e76ac73b8562b258f1109e6a9c55b38c07c74295..4f4a2ed3499f1c135626a702370bdb9536a2f9d8 100644 --- a/source/dnode/mgmt/interface/inc/dmDef.h +++ b/source/dnode/mgmt/interface/inc/dmDef.h @@ -152,6 +152,7 @@ typedef struct SUdfdData { uv_process_t process; int spawnErr; int8_t stopping; + uv_pipe_t ctrlPipe; } SUdfdData; typedef struct SDnode { diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 1dd0871ae9f8d5fe08d07a9023c71732c2ea167f..d6e7a436660d4f4fc5d8dc33fd5de6d7c7eaa9c6 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -27,7 +27,10 @@ typedef struct SUdfdContext { uv_loop_t *loop; + uv_pipe_t ctrlPipe; + uv_signal_t intrSignal; char listenPipeName[UDF_LISTEN_PIPE_NAME_LEN]; + uv_pipe_t listeningPipe; void *clientRpc; uv_mutex_t udfsMutex; @@ -380,10 +383,12 @@ void udfdOnNewConnection(uv_stream_t *server, int status) { } } -void removeListeningPipe(int sig) { +void udfdIntrSignalHandler(uv_signal_t *handle, int signum) { + fnInfo("udfd signal received: %d\n", signum); uv_fs_t req; - uv_fs_unlink(global.loop, &req, "udf.sock", NULL); - exit(0); + uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); + uv_signal_stop(handle); + uv_stop(global.loop); } void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; } @@ -492,37 +497,67 @@ static int32_t udfdInitLog() { return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, 0); } +void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { + buf->base = taosMemoryMalloc(suggested_size); + buf->len = suggested_size; +} + +void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) { + if (nread < 0) { + fnError("udfd ctrl pipe read error. %s", uv_err_name(nread)); + uv_close((uv_handle_t*)q, NULL); + uv_stop(global.loop); + return; + } + fnError("udfd ctrl pipe read %zu bytes", nread); + taosMemoryFree(buf->base); +} + +static int32_t removeListeningPipe() { + uv_fs_t req; + int err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); + uv_fs_req_cleanup(&req); + return err; +} + static int32_t udfdUvInit() { uv_loop_t* loop = taosMemoryMalloc(sizeof(uv_loop_t)); if (loop) { uv_loop_init(loop); } global.loop = loop; + + uv_pipe_init(global.loop, &global.ctrlPipe, 1); + uv_pipe_open(&global.ctrlPipe, 0); + uv_read_start((uv_stream_t*)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb); + char dnodeId[8] = {0}; size_t dnodeIdSize; - uv_os_getenv("DNODE_ID", dnodeId, &dnodeIdSize); + int32_t err = uv_os_getenv("DNODE_ID", dnodeId, &dnodeIdSize); + if (err != 0) { + dnodeId[0] = '1'; + } char listenPipeName[32] = {0}; snprintf(listenPipeName, sizeof(listenPipeName), "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId); strcpy(global.listenPipeName, listenPipeName); - uv_fs_t req; - uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); + removeListeningPipe(); - uv_pipe_t server; - uv_pipe_init(global.loop, &server, 0); + uv_pipe_init(global.loop, &global.listeningPipe, 0); - signal(SIGINT, removeListeningPipe); + uv_signal_init(global.loop, &global.intrSignal); + uv_signal_start(&global.intrSignal, udfdIntrSignalHandler, SIGINT); int r; fnInfo("bind to pipe %s", global.listenPipeName); - if ((r = uv_pipe_bind(&server, listenPipeName))) { + if ((r = uv_pipe_bind(&global.listeningPipe, listenPipeName))) { fnError("Bind error %s", uv_err_name(r)); - removeListeningPipe(0); + removeListeningPipe(); return -1; } - if ((r = uv_listen((uv_stream_t *)&server, 128, udfdOnNewConnection))) { + if ((r = uv_listen((uv_stream_t *)&global.listeningPipe, 128, udfdOnNewConnection))) { fnError("Listen error %s", uv_err_name(r)); - removeListeningPipe(0); + removeListeningPipe(); return -2; } return 0;