diff --git a/source/dnode/mgmt/implement/src/dmHandle.c b/source/dnode/mgmt/implement/src/dmHandle.c index 122c121ed8a073d4a51b060dc8abc0b115c9664c..c4eba53a3448d24f3eae7e7b35a1100ec6ec828f 100644 --- a/source/dnode/mgmt/implement/src/dmHandle.c +++ b/source/dnode/mgmt/implement/src/dmHandle.c @@ -220,13 +220,11 @@ static int32_t dmSpawnUdfd(SDnode *pDnode); void dmUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) { dInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal); - uv_close((uv_handle_t*)process, NULL); SDnode *pDnode = process->data; - SUdfdData *pData = &pDnode->udfdData; - if (atomic_load_8(&pData->stopping) != 0) { - dDebug("udfd process exit due to stopping"); + if (exitStatus == 0 && termSignal == 0) { + dInfo("udfd process exit due to SIGINT"); } else { - uv_close((uv_handle_t*)&pData->ctrlPipe, NULL); + dInfo("udfd process restart"); dmSpawnUdfd(pDnode); } } @@ -248,6 +246,7 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) { options.file = path; options.exit_cb = dmUdfdExit; + SUdfdData *pData = &pDnode->udfdData; uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1); @@ -260,6 +259,8 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) { options.stdio_count = 3; options.stdio = child_stdio; + options.flags = UV_PROCESS_DETACHED; + char dnodeIdEnvItem[32] = {0}; char thrdPoolSizeEnvItem[32] = {0}; snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pDnode->data.dnodeId); @@ -284,24 +285,31 @@ static void dmUdfdCloseWalkCb(uv_handle_t* handle, void* arg) { } } -void dmWatchUdfd(void *args) { +static void dmUdfdStopAsyncCb(uv_async_t *async) { + SDnode *pDnode = async->data; + SUdfdData *pData = &pDnode->udfdData; + uv_stop(&pData->loop); +} + +static void dmWatchUdfd(void *args) { SDnode *pDnode = args; SUdfdData *pData = &pDnode->udfdData; uv_loop_init(&pData->loop); + uv_async_init(&pData->loop, &pData->stopAsync, dmUdfdStopAsyncCb); + pData->stopAsync.data = pDnode; int32_t err = dmSpawnUdfd(pDnode); atomic_store_32(&pData->spawnErr, err); uv_barrier_wait(&pData->barrier); uv_run(&pData->loop, UV_RUN_DEFAULT); - err = uv_loop_close(&pData->loop); - while (err == UV_EBUSY) { - uv_walk(&pData->loop, dmUdfdCloseWalkCb, NULL); - uv_run(&pData->loop, UV_RUN_DEFAULT); - err = uv_loop_close(&pData->loop); - } + uv_loop_close(&pData->loop); + + uv_walk(&pData->loop, dmUdfdCloseWalkCb, NULL); + uv_run(&pData->loop, UV_RUN_DEFAULT); + uv_loop_close(&pData->loop); return; } -int32_t dmStartUdfd(SDnode *pDnode) { +static int32_t dmStartUdfd(SDnode *pDnode) { SUdfdData *pData = &pDnode->udfdData; if (pData->startCalled) { dInfo("dnode-mgmt start udfd already called"); @@ -309,30 +317,27 @@ int32_t dmStartUdfd(SDnode *pDnode) { } pData->startCalled = true; uv_barrier_init(&pData->barrier, 2); - pData->stopping = 0; uv_thread_create(&pData->thread, dmWatchUdfd, pDnode); uv_barrier_wait(&pData->barrier); pData->needCleanUp = true; return pData->spawnErr; } -int32_t dmStopUdfd(SDnode *pDnode) { +static int32_t dmStopUdfd(SDnode *pDnode) { dInfo("dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d", pDnode->udfdData.needCleanUp, pDnode->udfdData.spawnErr); SUdfdData *pData = &pDnode->udfdData; if (!pData->needCleanUp) { return 0; } - atomic_store_8(&pData->stopping, 1); uv_barrier_destroy(&pData->barrier); if (pData->spawnErr == 0) { uv_process_kill(&pData->process, SIGINT); } - uv_stop(&pData->loop); + uv_async_send(&pData->stopAsync); uv_thread_join(&pData->thread); - atomic_store_8(&pData->stopping, 0); return 0; } diff --git a/source/dnode/mgmt/interface/inc/dmDef.h b/source/dnode/mgmt/interface/inc/dmDef.h index 4f4a2ed3499f1c135626a702370bdb9536a2f9d8..dd31faf1b7d6290acd46661b26eb750a509ec75a 100644 --- a/source/dnode/mgmt/interface/inc/dmDef.h +++ b/source/dnode/mgmt/interface/inc/dmDef.h @@ -151,8 +151,8 @@ typedef struct SUdfdData { uv_barrier_t barrier; uv_process_t process; int spawnErr; - int8_t stopping; uv_pipe_t ctrlPipe; + uv_async_t stopAsync; } SUdfdData; typedef struct SDnode { diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 5f7532da87c1ceeb313a6eefe104219e0b3690d9..65408517580181d5f79eb5ba37f671e596414b1e 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -586,7 +586,7 @@ static int32_t udfdRun() { udfdCloseClientRpc(); uv_mutex_destroy(&global.udfsMutex); taosHashCleanup(global.udfsHash); - return code; + return 0; } int main(int argc, char *argv[]) {