diff --git a/source/dnode/mgmt/implement/src/dmHandle.c b/source/dnode/mgmt/implement/src/dmHandle.c index 32205b337c978dfa20f3afa0a8455c22977d3ce2..959730756734b9b71fee63193ee82e21c8c3b141 100644 --- a/source/dnode/mgmt/implement/src/dmHandle.c +++ b/source/dnode/mgmt/implement/src/dmHandle.c @@ -217,20 +217,20 @@ static void dmStopMgmt(SMgmtWrapper *pWrapper) { dmStopStatusThread(pWrapper->pDnode); } -static int32_t dmSpawnUdfd(SDnode *pDnode); +static int32_t dmSpawnUdfd(SUdfdData *pData); void dmUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) { dInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal); - SDnode *pDnode = process->data; - if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pDnode->udfdData.stopCalled)) { + SUdfdData *pData = process->data; + if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) { dInfo("udfd process exit due to SIGINT or dnode-mgmt called stop"); } else { dInfo("udfd process restart"); - dmSpawnUdfd(pDnode); + dmSpawnUdfd(pData); } } -static int32_t dmSpawnUdfd(SDnode *pDnode) { +static int32_t dmSpawnUdfd(SUdfdData* pData) { dInfo("dnode start spawning udfd"); uv_process_options_t options = {0}; @@ -252,7 +252,6 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) { options.exit_cb = dmUdfdExit; - SUdfdData *pData = &pDnode->udfdData; uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1); uv_stdio_container_t child_stdio[3]; @@ -268,7 +267,7 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) { char dnodeIdEnvItem[32] = {0}; char thrdPoolSizeEnvItem[32] = {0}; - snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pDnode->data.dnodeId); + snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId); float numCpuCores = 4; taosGetCpuCores(&numCpuCores); snprintf(thrdPoolSizeEnvItem,32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2); @@ -276,7 +275,7 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) { options.env = envUdfd; int err = uv_spawn(&pData->loop, &pData->process, &options); - pData->process.data = (void*)pDnode; + pData->process.data = (void*)pData; if (err != 0) { dError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err)); @@ -291,18 +290,16 @@ static void dmUdfdCloseWalkCb(uv_handle_t* handle, void* arg) { } static void dmUdfdStopAsyncCb(uv_async_t *async) { - SDnode *pDnode = async->data; - SUdfdData *pData = &pDnode->udfdData; + SUdfdData *pData = async->data; uv_stop(&pData->loop); } static void dmWatchUdfd(void *args) { - SDnode *pDnode = args; - SUdfdData *pData = &pDnode->udfdData; + SUdfdData *pData = args; uv_loop_init(&pData->loop); uv_async_init(&pData->loop, &pData->stopAsync, dmUdfdStopAsyncCb); - pData->stopAsync.data = pDnode; - int32_t err = dmSpawnUdfd(pDnode); + pData->stopAsync.data = pData; + int32_t err = dmSpawnUdfd(pData); atomic_store_32(&pData->spawnErr, err); uv_barrier_wait(&pData->barrier); uv_run(&pData->loop, UV_RUN_DEFAULT); @@ -314,18 +311,19 @@ static void dmWatchUdfd(void *args) { return; } -static int32_t dmStartUdfd(SDnode *pDnode) { +static int32_t dmStartUdfd(SDnode *pDnode, int32_t startDnodeId) { char dnodeId[8] = {0}; snprintf(dnodeId, sizeof(dnodeId), "%d", pDnode->data.dnodeId); uv_os_setenv("DNODE_ID", dnodeId); SUdfdData *pData = &pDnode->udfdData; + pData->dnodeId = startDnodeId; if (pData->startCalled) { dInfo("dnode-mgmt start udfd already called"); return 0; } pData->startCalled = true; uv_barrier_init(&pData->barrier, 2); - uv_thread_create(&pData->thread, dmWatchUdfd, pDnode); + uv_thread_create(&pData->thread, dmWatchUdfd, pData); uv_barrier_wait(&pData->barrier); int32_t err = atomic_load_32(&pData->spawnErr); if (err != 0) { @@ -340,10 +338,10 @@ static int32_t dmStartUdfd(SDnode *pDnode) { return err; } -static int32_t dmStopUdfd(SDnode *pDnode) { +static int32_t dmStopUdfd(SUdfdData *udfdData) { dInfo("dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d", - pDnode->udfdData.needCleanUp, pDnode->udfdData.spawnErr); - SUdfdData *pData = &pDnode->udfdData; + udfdData->needCleanUp, udfdData->spawnErr); + SUdfdData *pData = udfdData; if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) { return 0; } @@ -387,7 +385,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) { } dmReportStartup(pDnode, "dnode-transport", "initialized"); - if (dmStartUdfd(pDnode) != 0) { + if (dmStartUdfd(pDnode, pDnode->data.dnodeId) != 0) { dError("failed to start udfd"); } @@ -398,7 +396,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) { static void dmCleanupMgmt(SMgmtWrapper *pWrapper) { dInfo("dnode-mgmt start to clean up"); SDnode *pDnode = pWrapper->pDnode; - dmStopUdfd(pDnode); + dmStopUdfd(&pDnode->udfdData); dmStopWorker(pDnode); taosWLockLatch(&pDnode->data.latch); diff --git a/source/dnode/mgmt/interface/inc/dmDef.h b/source/dnode/mgmt/interface/inc/dmDef.h index 2e8ad982d87cf1d94aca8ad59b8fe26fdf5862ae..445e1d42f5d6bd96d2fe93aac84d6e24683313f5 100644 --- a/source/dnode/mgmt/interface/inc/dmDef.h +++ b/source/dnode/mgmt/interface/inc/dmDef.h @@ -156,6 +156,8 @@ typedef struct SUdfdData { uv_pipe_t ctrlPipe; uv_async_t stopAsync; int32_t stopCalled; + + int32_t dnodeId; } SUdfdData; typedef struct SDnode {