From 736fddc4cdc9c69d56ad1f0b1a4b208dbbb7a7d1 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Mon, 9 May 2022 09:36:50 +0800 Subject: [PATCH] feature(udf): pipe is placed under datadir and remove it after udfd exist and is hidden --- source/libs/function/src/tudf.c | 10 ++++++++-- source/libs/function/src/udfd.c | 5 +++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 118c840d5e..84860ca5a1 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -127,7 +127,7 @@ enum { int64_t gUdfTaskSeqNum = 0; typedef struct SUdfdProxy { - char udfdPipeName[UDF_LISTEN_PIPE_NAME_LEN]; + char udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2]; uv_barrier_t gUdfInitBarrier; uv_loop_t gUdfdLoop; @@ -224,9 +224,15 @@ int32_t getUdfdPipeName(char* pipeName, int32_t size) { size_t dnodeIdSize = sizeof(dnodeId); int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize); if (err != 0) { + fnError("get dnode id from env. error: %s.", uv_err_name(err)); dnodeId[0] = '1'; } +#ifdef _WIN32 snprintf(pipeName, size, "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId); +#else + snprintf(pipeName, size, "%s/%s%s", tsDataDir, UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId); +#endif + fnInfo("get dnode id from env. dnode id: %s. pipe path: %s", dnodeId, pipeName); return 0; } @@ -998,7 +1004,7 @@ int32_t udfcOpen() { return 0; } SUdfdProxy *proxy = &gUdfdProxy; - getUdfdPipeName(proxy->udfdPipeName, UDF_LISTEN_PIPE_NAME_LEN); + getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName)); proxy->gUdfcState = UDFC_STATE_STARTNG; uv_barrier_init(&proxy->gUdfInitBarrier, 2); uv_thread_create(&proxy->gUdfLoopThread, constructUdfService, proxy); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index f5e4a9c6e6..7695598fb8 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -30,7 +30,7 @@ typedef struct SUdfdContext { uv_loop_t *loop; uv_pipe_t ctrlPipe; uv_signal_t intrSignal; - char listenPipeName[UDF_LISTEN_PIPE_NAME_LEN]; + char listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2]; uv_pipe_t listeningPipe; void *clientRpc; @@ -652,7 +652,7 @@ static int32_t udfdUvInit() { uv_pipe_open(&global.ctrlPipe, 0); uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb); - getUdfdPipeName(global.listenPipeName, UDF_LISTEN_PIPE_NAME_LEN); + getUdfdPipeName(global.listenPipeName, sizeof(global.listenPipeName)); removeListeningPipe(); @@ -696,6 +696,7 @@ static int32_t udfdRun() { fnInfo("udfd stopped. result: %s, code: %d", uv_err_name(code), code); int codeClose = uv_loop_close(global.loop); fnDebug("uv loop close. result: %s", uv_err_name(codeClose)); + removeListeningPipe(); udfdCloseClientRpc(); uv_mutex_destroy(&global.udfsMutex); taosHashCleanup(global.udfsHash); -- GitLab