提交 a395b215 编写于 作者: S slzhou

fix start/stop/restart udfd

上级 d70c74d4
......@@ -220,13 +220,11 @@ static int32_t dmSpawnUdfd(SDnode *pDnode);
void dmUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
dInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
uv_close((uv_handle_t*)process, NULL);
SDnode *pDnode = process->data;
SUdfdData *pData = &pDnode->udfdData;
if (atomic_load_8(&pData->stopping) != 0) {
dDebug("udfd process exit due to stopping");
if (exitStatus == 0 && termSignal == 0) {
dInfo("udfd process exit due to SIGINT");
} else {
uv_close((uv_handle_t*)&pData->ctrlPipe, NULL);
dInfo("udfd process restart");
dmSpawnUdfd(pDnode);
}
}
......@@ -248,6 +246,7 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) {
options.file = path;
options.exit_cb = dmUdfdExit;
SUdfdData *pData = &pDnode->udfdData;
uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1);
......@@ -260,6 +259,8 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) {
options.stdio_count = 3;
options.stdio = child_stdio;
options.flags = UV_PROCESS_DETACHED;
char dnodeIdEnvItem[32] = {0};
char thrdPoolSizeEnvItem[32] = {0};
snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pDnode->data.dnodeId);
......@@ -284,24 +285,31 @@ static void dmUdfdCloseWalkCb(uv_handle_t* handle, void* arg) {
}
}
void dmWatchUdfd(void *args) {
static void dmUdfdStopAsyncCb(uv_async_t *async) {
SDnode *pDnode = async->data;
SUdfdData *pData = &pDnode->udfdData;
uv_stop(&pData->loop);
}
static void dmWatchUdfd(void *args) {
SDnode *pDnode = args;
SUdfdData *pData = &pDnode->udfdData;
uv_loop_init(&pData->loop);
uv_async_init(&pData->loop, &pData->stopAsync, dmUdfdStopAsyncCb);
pData->stopAsync.data = pDnode;
int32_t err = dmSpawnUdfd(pDnode);
atomic_store_32(&pData->spawnErr, err);
uv_barrier_wait(&pData->barrier);
uv_run(&pData->loop, UV_RUN_DEFAULT);
err = uv_loop_close(&pData->loop);
while (err == UV_EBUSY) {
uv_walk(&pData->loop, dmUdfdCloseWalkCb, NULL);
uv_run(&pData->loop, UV_RUN_DEFAULT);
err = uv_loop_close(&pData->loop);
}
uv_loop_close(&pData->loop);
uv_walk(&pData->loop, dmUdfdCloseWalkCb, NULL);
uv_run(&pData->loop, UV_RUN_DEFAULT);
uv_loop_close(&pData->loop);
return;
}
int32_t dmStartUdfd(SDnode *pDnode) {
static int32_t dmStartUdfd(SDnode *pDnode) {
SUdfdData *pData = &pDnode->udfdData;
if (pData->startCalled) {
dInfo("dnode-mgmt start udfd already called");
......@@ -309,30 +317,27 @@ int32_t dmStartUdfd(SDnode *pDnode) {
}
pData->startCalled = true;
uv_barrier_init(&pData->barrier, 2);
pData->stopping = 0;
uv_thread_create(&pData->thread, dmWatchUdfd, pDnode);
uv_barrier_wait(&pData->barrier);
pData->needCleanUp = true;
return pData->spawnErr;
}
int32_t dmStopUdfd(SDnode *pDnode) {
static int32_t dmStopUdfd(SDnode *pDnode) {
dInfo("dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d",
pDnode->udfdData.needCleanUp, pDnode->udfdData.spawnErr);
SUdfdData *pData = &pDnode->udfdData;
if (!pData->needCleanUp) {
return 0;
}
atomic_store_8(&pData->stopping, 1);
uv_barrier_destroy(&pData->barrier);
if (pData->spawnErr == 0) {
uv_process_kill(&pData->process, SIGINT);
}
uv_stop(&pData->loop);
uv_async_send(&pData->stopAsync);
uv_thread_join(&pData->thread);
atomic_store_8(&pData->stopping, 0);
return 0;
}
......
......@@ -151,8 +151,8 @@ typedef struct SUdfdData {
uv_barrier_t barrier;
uv_process_t process;
int spawnErr;
int8_t stopping;
uv_pipe_t ctrlPipe;
uv_async_t stopAsync;
} SUdfdData;
typedef struct SDnode {
......
......@@ -586,7 +586,7 @@ static int32_t udfdRun() {
udfdCloseClientRpc();
uv_mutex_destroy(&global.udfsMutex);
taosHashCleanup(global.udfsHash);
return code;
return 0;
}
int main(int argc, char *argv[]) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册