未验证 提交 ea282e0d 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #12246 from taosdata/feature/udf

feature(udf): pipe is placed under datadir and remove it after udfd exist and is hidden
...@@ -127,7 +127,7 @@ enum { ...@@ -127,7 +127,7 @@ enum {
int64_t gUdfTaskSeqNum = 0; int64_t gUdfTaskSeqNum = 0;
typedef struct SUdfdProxy { typedef struct SUdfdProxy {
char udfdPipeName[UDF_LISTEN_PIPE_NAME_LEN]; char udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
uv_barrier_t gUdfInitBarrier; uv_barrier_t gUdfInitBarrier;
uv_loop_t gUdfdLoop; uv_loop_t gUdfdLoop;
...@@ -224,9 +224,15 @@ int32_t getUdfdPipeName(char* pipeName, int32_t size) { ...@@ -224,9 +224,15 @@ int32_t getUdfdPipeName(char* pipeName, int32_t size) {
size_t dnodeIdSize = sizeof(dnodeId); size_t dnodeIdSize = sizeof(dnodeId);
int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize); int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize);
if (err != 0) { if (err != 0) {
fnError("get dnode id from env. error: %s.", uv_err_name(err));
dnodeId[0] = '1'; dnodeId[0] = '1';
} }
#ifdef _WIN32
snprintf(pipeName, size, "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId); snprintf(pipeName, size, "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
#else
snprintf(pipeName, size, "%s/%s%s", tsDataDir, UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
#endif
fnInfo("get dnode id from env. dnode id: %s. pipe path: %s", dnodeId, pipeName);
return 0; return 0;
} }
...@@ -998,7 +1004,7 @@ int32_t udfcOpen() { ...@@ -998,7 +1004,7 @@ int32_t udfcOpen() {
return 0; return 0;
} }
SUdfdProxy *proxy = &gUdfdProxy; SUdfdProxy *proxy = &gUdfdProxy;
getUdfdPipeName(proxy->udfdPipeName, UDF_LISTEN_PIPE_NAME_LEN); getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName));
proxy->gUdfcState = UDFC_STATE_STARTNG; proxy->gUdfcState = UDFC_STATE_STARTNG;
uv_barrier_init(&proxy->gUdfInitBarrier, 2); uv_barrier_init(&proxy->gUdfInitBarrier, 2);
uv_thread_create(&proxy->gUdfLoopThread, constructUdfService, proxy); uv_thread_create(&proxy->gUdfLoopThread, constructUdfService, proxy);
......
...@@ -30,7 +30,7 @@ typedef struct SUdfdContext { ...@@ -30,7 +30,7 @@ typedef struct SUdfdContext {
uv_loop_t *loop; uv_loop_t *loop;
uv_pipe_t ctrlPipe; uv_pipe_t ctrlPipe;
uv_signal_t intrSignal; uv_signal_t intrSignal;
char listenPipeName[UDF_LISTEN_PIPE_NAME_LEN]; char listenPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2];
uv_pipe_t listeningPipe; uv_pipe_t listeningPipe;
void *clientRpc; void *clientRpc;
...@@ -652,7 +652,7 @@ static int32_t udfdUvInit() { ...@@ -652,7 +652,7 @@ static int32_t udfdUvInit() {
uv_pipe_open(&global.ctrlPipe, 0); uv_pipe_open(&global.ctrlPipe, 0);
uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb); uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb);
getUdfdPipeName(global.listenPipeName, UDF_LISTEN_PIPE_NAME_LEN); getUdfdPipeName(global.listenPipeName, sizeof(global.listenPipeName));
removeListeningPipe(); removeListeningPipe();
...@@ -696,6 +696,7 @@ static int32_t udfdRun() { ...@@ -696,6 +696,7 @@ static int32_t udfdRun() {
fnInfo("udfd stopped. result: %s, code: %d", uv_err_name(code), code); fnInfo("udfd stopped. result: %s, code: %d", uv_err_name(code), code);
int codeClose = uv_loop_close(global.loop); int codeClose = uv_loop_close(global.loop);
fnDebug("uv loop close. result: %s", uv_err_name(codeClose)); fnDebug("uv loop close. result: %s", uv_err_name(codeClose));
removeListeningPipe();
udfdCloseClientRpc(); udfdCloseClientRpc();
uv_mutex_destroy(&global.udfsMutex); uv_mutex_destroy(&global.udfsMutex);
taosHashCleanup(global.udfsHash); taosHashCleanup(global.udfsHash);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册