提交 9cf1cb78 编写于 作者: S slzhou

feature(udf):move start/stop udfd out of dnode management preparation

上级 98582265
...@@ -217,20 +217,20 @@ static void dmStopMgmt(SMgmtWrapper *pWrapper) { ...@@ -217,20 +217,20 @@ static void dmStopMgmt(SMgmtWrapper *pWrapper) {
dmStopStatusThread(pWrapper->pDnode); dmStopStatusThread(pWrapper->pDnode);
} }
static int32_t dmSpawnUdfd(SDnode *pDnode); static int32_t dmSpawnUdfd(SUdfdData *pData);
void dmUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) { void dmUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
dInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal); dInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
SDnode *pDnode = process->data; SUdfdData *pData = process->data;
if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pDnode->udfdData.stopCalled)) { if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pData->stopCalled)) {
dInfo("udfd process exit due to SIGINT or dnode-mgmt called stop"); dInfo("udfd process exit due to SIGINT or dnode-mgmt called stop");
} else { } else {
dInfo("udfd process restart"); dInfo("udfd process restart");
dmSpawnUdfd(pDnode); dmSpawnUdfd(pData);
} }
} }
static int32_t dmSpawnUdfd(SDnode *pDnode) { static int32_t dmSpawnUdfd(SUdfdData* pData) {
dInfo("dnode start spawning udfd"); dInfo("dnode start spawning udfd");
uv_process_options_t options = {0}; uv_process_options_t options = {0};
...@@ -252,7 +252,6 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) { ...@@ -252,7 +252,6 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) {
options.exit_cb = dmUdfdExit; options.exit_cb = dmUdfdExit;
SUdfdData *pData = &pDnode->udfdData;
uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1); uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1);
uv_stdio_container_t child_stdio[3]; uv_stdio_container_t child_stdio[3];
...@@ -268,7 +267,7 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) { ...@@ -268,7 +267,7 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) {
char dnodeIdEnvItem[32] = {0}; char dnodeIdEnvItem[32] = {0};
char thrdPoolSizeEnvItem[32] = {0}; char thrdPoolSizeEnvItem[32] = {0};
snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pDnode->data.dnodeId); snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId);
float numCpuCores = 4; float numCpuCores = 4;
taosGetCpuCores(&numCpuCores); taosGetCpuCores(&numCpuCores);
snprintf(thrdPoolSizeEnvItem,32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2); snprintf(thrdPoolSizeEnvItem,32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2);
...@@ -276,7 +275,7 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) { ...@@ -276,7 +275,7 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) {
options.env = envUdfd; options.env = envUdfd;
int err = uv_spawn(&pData->loop, &pData->process, &options); int err = uv_spawn(&pData->loop, &pData->process, &options);
pData->process.data = (void*)pDnode; pData->process.data = (void*)pData;
if (err != 0) { if (err != 0) {
dError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err)); dError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
...@@ -291,18 +290,16 @@ static void dmUdfdCloseWalkCb(uv_handle_t* handle, void* arg) { ...@@ -291,18 +290,16 @@ static void dmUdfdCloseWalkCb(uv_handle_t* handle, void* arg) {
} }
static void dmUdfdStopAsyncCb(uv_async_t *async) { static void dmUdfdStopAsyncCb(uv_async_t *async) {
SDnode *pDnode = async->data; SUdfdData *pData = async->data;
SUdfdData *pData = &pDnode->udfdData;
uv_stop(&pData->loop); uv_stop(&pData->loop);
} }
static void dmWatchUdfd(void *args) { static void dmWatchUdfd(void *args) {
SDnode *pDnode = args; SUdfdData *pData = args;
SUdfdData *pData = &pDnode->udfdData;
uv_loop_init(&pData->loop); uv_loop_init(&pData->loop);
uv_async_init(&pData->loop, &pData->stopAsync, dmUdfdStopAsyncCb); uv_async_init(&pData->loop, &pData->stopAsync, dmUdfdStopAsyncCb);
pData->stopAsync.data = pDnode; pData->stopAsync.data = pData;
int32_t err = dmSpawnUdfd(pDnode); int32_t err = dmSpawnUdfd(pData);
atomic_store_32(&pData->spawnErr, err); atomic_store_32(&pData->spawnErr, err);
uv_barrier_wait(&pData->barrier); uv_barrier_wait(&pData->barrier);
uv_run(&pData->loop, UV_RUN_DEFAULT); uv_run(&pData->loop, UV_RUN_DEFAULT);
...@@ -314,18 +311,19 @@ static void dmWatchUdfd(void *args) { ...@@ -314,18 +311,19 @@ static void dmWatchUdfd(void *args) {
return; return;
} }
static int32_t dmStartUdfd(SDnode *pDnode) { static int32_t dmStartUdfd(SDnode *pDnode, int32_t startDnodeId) {
char dnodeId[8] = {0}; char dnodeId[8] = {0};
snprintf(dnodeId, sizeof(dnodeId), "%d", pDnode->data.dnodeId); snprintf(dnodeId, sizeof(dnodeId), "%d", pDnode->data.dnodeId);
uv_os_setenv("DNODE_ID", dnodeId); uv_os_setenv("DNODE_ID", dnodeId);
SUdfdData *pData = &pDnode->udfdData; SUdfdData *pData = &pDnode->udfdData;
pData->dnodeId = startDnodeId;
if (pData->startCalled) { if (pData->startCalled) {
dInfo("dnode-mgmt start udfd already called"); dInfo("dnode-mgmt start udfd already called");
return 0; return 0;
} }
pData->startCalled = true; pData->startCalled = true;
uv_barrier_init(&pData->barrier, 2); uv_barrier_init(&pData->barrier, 2);
uv_thread_create(&pData->thread, dmWatchUdfd, pDnode); uv_thread_create(&pData->thread, dmWatchUdfd, pData);
uv_barrier_wait(&pData->barrier); uv_barrier_wait(&pData->barrier);
int32_t err = atomic_load_32(&pData->spawnErr); int32_t err = atomic_load_32(&pData->spawnErr);
if (err != 0) { if (err != 0) {
...@@ -340,10 +338,10 @@ static int32_t dmStartUdfd(SDnode *pDnode) { ...@@ -340,10 +338,10 @@ static int32_t dmStartUdfd(SDnode *pDnode) {
return err; return err;
} }
static int32_t dmStopUdfd(SDnode *pDnode) { static int32_t dmStopUdfd(SUdfdData *udfdData) {
dInfo("dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d", dInfo("dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d",
pDnode->udfdData.needCleanUp, pDnode->udfdData.spawnErr); udfdData->needCleanUp, udfdData->spawnErr);
SUdfdData *pData = &pDnode->udfdData; SUdfdData *pData = udfdData;
if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) { if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) {
return 0; return 0;
} }
...@@ -387,7 +385,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) { ...@@ -387,7 +385,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
} }
dmReportStartup(pDnode, "dnode-transport", "initialized"); dmReportStartup(pDnode, "dnode-transport", "initialized");
if (dmStartUdfd(pDnode) != 0) { if (dmStartUdfd(pDnode, pDnode->data.dnodeId) != 0) {
dError("failed to start udfd"); dError("failed to start udfd");
} }
...@@ -398,7 +396,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) { ...@@ -398,7 +396,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
static void dmCleanupMgmt(SMgmtWrapper *pWrapper) { static void dmCleanupMgmt(SMgmtWrapper *pWrapper) {
dInfo("dnode-mgmt start to clean up"); dInfo("dnode-mgmt start to clean up");
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
dmStopUdfd(pDnode); dmStopUdfd(&pDnode->udfdData);
dmStopWorker(pDnode); dmStopWorker(pDnode);
taosWLockLatch(&pDnode->data.latch); taosWLockLatch(&pDnode->data.latch);
......
...@@ -156,6 +156,8 @@ typedef struct SUdfdData { ...@@ -156,6 +156,8 @@ typedef struct SUdfdData {
uv_pipe_t ctrlPipe; uv_pipe_t ctrlPipe;
uv_async_t stopAsync; uv_async_t stopAsync;
int32_t stopCalled; int32_t stopCalled;
int32_t dnodeId;
} SUdfdData; } SUdfdData;
typedef struct SDnode { typedef struct SDnode {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册