From 52cc00987b5aa31c2d8389205978a025a9d99b24 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Sat, 23 Apr 2022 22:16:26 +0800 Subject: [PATCH] udfd pipe name generating with dnode id through environment passing --- source/dnode/mgmt/implement/src/dmHandle.c | 3 +++ source/libs/function/inc/tudf.h | 3 ++- source/libs/function/inc/tudfInt.h | 1 + source/libs/function/src/tudf.c | 21 +++++++++++++++------ source/libs/function/src/udfd.c | 12 ++---------- source/libs/function/test/runUdf.c | 2 +- 6 files changed, 24 insertions(+), 18 deletions(-) diff --git a/source/dnode/mgmt/implement/src/dmHandle.c b/source/dnode/mgmt/implement/src/dmHandle.c index 8ba12513db..a89f39f927 100644 --- a/source/dnode/mgmt/implement/src/dmHandle.c +++ b/source/dnode/mgmt/implement/src/dmHandle.c @@ -310,6 +310,9 @@ static void dmWatchUdfd(void *args) { } static int32_t dmStartUdfd(SDnode *pDnode) { + char dnodeId[8] = {0}; + snprintf(dnodeId, sizeof(dnodeId), "%d", pDnode->data.dnodeId); + uv_os_setenv("DNODE_ID", dnodeId); SUdfdData *pData = &pDnode->udfdData; if (pData->startCalled) { dInfo("dnode-mgmt start udfd already called"); diff --git a/source/libs/function/inc/tudf.h b/source/libs/function/inc/tudf.h index 5f4f96c4cc..247258ff9a 100644 --- a/source/libs/function/inc/tudf.h +++ b/source/libs/function/inc/tudf.h @@ -29,6 +29,7 @@ extern "C" { #define UDF_LISTEN_PIPE_NAME_LEN 32 #define UDF_LISTEN_PIPE_NAME_PREFIX "udfd.sock." +#define UDF_DNODE_ID_ENV_NAME "DNODE_ID" //====================================================================================== //begin API to taosd and qworker @@ -45,7 +46,7 @@ typedef void *UdfcFuncHandle; * create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf * @return error code */ -int32_t udfcOpen(int32_t dnodeId, UdfcHandle* proxyHandle); +int32_t udfcOpen(UdfcHandle* proxyHandle); /** * destroy udfd proxy diff --git a/source/libs/function/inc/tudfInt.h b/source/libs/function/inc/tudfInt.h index 2c16afbd0d..2ea89de73c 100644 --- a/source/libs/function/inc/tudfInt.h +++ b/source/libs/function/inc/tudfInt.h @@ -112,6 +112,7 @@ void freeUdfDataDataBlock(SUdfDataBlock *block); int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock); int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block); +int32_t getUdfdPipeName(char* pipeName, int32_t size); #ifdef __cplusplus } #endif diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 7c376fcd34..907a19078e 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -124,7 +124,7 @@ enum { int64_t gUdfTaskSeqNum = 0; typedef struct SUdfdProxy { - int32_t dnodeId; + char udfdPipeName[UDF_LISTEN_PIPE_NAME_LEN]; uv_barrier_t gUdfInitBarrier; uv_loop_t gUdfdLoop; @@ -212,6 +212,17 @@ enum { UDFC_STATUS_FINAL, // stopped }; +int32_t getUdfdPipeName(char* pipeName, int32_t size) { + char dnodeId[8] = {0}; + size_t dnodeIdSize; + int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize); + if (err != 0) { + dnodeId[0] = '1'; + } + snprintf(pipeName, size, "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId); + return 0; +} + int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) { int32_t len = 0; len += taosEncodeBinary(buf, setup->udfName, TSDB_FUNC_NAME_LEN); @@ -874,9 +885,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t)); connReq->data = uvTask; - char listeningPipeName[32] = {0}; - sprintf(listeningPipeName, "%s%d", UDF_LISTEN_PIPE_NAME_PREFIX, uvTask->udfc->dnodeId); - uv_pipe_connect(connReq, pipe, listeningPipeName, onUdfClientConnect); + uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfClientConnect); break; } case UV_TASK_REQ_RSP: { @@ -972,9 +981,9 @@ void constructUdfService(void *argsThread) { uv_loop_close(&udfc->gUdfdLoop); } -int32_t udfcOpen(int32_t dnodeId, UdfcHandle *udfc) { +int32_t udfcOpen(UdfcHandle *udfc) { SUdfdProxy *proxy = taosMemoryCalloc(1, sizeof(SUdfdProxy)); - proxy->dnodeId = dnodeId; + getUdfdPipeName(proxy->udfdPipeName, UDF_LISTEN_PIPE_NAME_LEN); proxy->gUdfcState = UDFC_STATE_STARTNG; uv_barrier_init(&proxy->gUdfInitBarrier, 2); uv_thread_create(&proxy->gUdfLoopThread, constructUdfService, proxy); diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 5d0ed0daf0..0c78183565 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -531,15 +531,7 @@ static int32_t udfdUvInit() { uv_pipe_open(&global.ctrlPipe, 0); uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb); - char dnodeId[8] = {0}; - size_t dnodeIdSize; - int32_t err = uv_os_getenv("DNODE_ID", dnodeId, &dnodeIdSize); - if (err != 0) { - dnodeId[0] = '1'; - } - char listenPipeName[32] = {0}; - snprintf(listenPipeName, sizeof(listenPipeName), "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId); - strcpy(global.listenPipeName, listenPipeName); + getUdfdPipeName(global.listenPipeName, UDF_LISTEN_PIPE_NAME_LEN); removeListeningPipe(); @@ -550,7 +542,7 @@ static int32_t udfdUvInit() { int r; fnInfo("bind to pipe %s", global.listenPipeName); - if ((r = uv_pipe_bind(&global.listeningPipe, listenPipeName))) { + if ((r = uv_pipe_bind(&global.listeningPipe, global.listenPipeName))) { fnError("Bind error %s", uv_err_name(r)); removeListeningPipe(); return -1; diff --git a/source/libs/function/test/runUdf.c b/source/libs/function/test/runUdf.c index 0727a4a1d2..5085f6250d 100644 --- a/source/libs/function/test/runUdf.c +++ b/source/libs/function/test/runUdf.c @@ -9,7 +9,7 @@ int main(int argc, char *argv[]) { UdfcHandle udfc; - udfcOpen(1, &udfc); + udfcOpen(&udfc); uv_sleep(1000); char path[256] = {0}; size_t cwdSize = 256; -- GitLab