提交 ceb9d2fc 编写于 作者: P plum-lihui

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

...@@ -220,13 +220,11 @@ static int32_t dmSpawnUdfd(SDnode *pDnode); ...@@ -220,13 +220,11 @@ static int32_t dmSpawnUdfd(SDnode *pDnode);
void dmUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) { void dmUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
dInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal); dInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
uv_close((uv_handle_t*)process, NULL);
SDnode *pDnode = process->data; SDnode *pDnode = process->data;
SUdfdData *pData = &pDnode->udfdData; if (exitStatus == 0 && termSignal == 0 || atomic_load_32(&pDnode->udfdData.stopCalled)) {
if (atomic_load_8(&pData->stopping) != 0) { dInfo("udfd process exit due to SIGINT or dnode-mgmt called stop");
dDebug("udfd process exit due to stopping");
} else { } else {
uv_close((uv_handle_t*)&pData->ctrlPipe, NULL); dInfo("udfd process restart");
dmSpawnUdfd(pDnode); dmSpawnUdfd(pDnode);
} }
} }
...@@ -248,6 +246,7 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) { ...@@ -248,6 +246,7 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) {
options.file = path; options.file = path;
options.exit_cb = dmUdfdExit; options.exit_cb = dmUdfdExit;
SUdfdData *pData = &pDnode->udfdData; SUdfdData *pData = &pDnode->udfdData;
uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1); uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1);
...@@ -260,6 +259,8 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) { ...@@ -260,6 +259,8 @@ static int32_t dmSpawnUdfd(SDnode *pDnode) {
options.stdio_count = 3; options.stdio_count = 3;
options.stdio = child_stdio; options.stdio = child_stdio;
options.flags = UV_PROCESS_DETACHED;
char dnodeIdEnvItem[32] = {0}; char dnodeIdEnvItem[32] = {0};
char thrdPoolSizeEnvItem[32] = {0}; char thrdPoolSizeEnvItem[32] = {0};
snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pDnode->data.dnodeId); snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pDnode->data.dnodeId);
...@@ -284,24 +285,31 @@ static void dmUdfdCloseWalkCb(uv_handle_t* handle, void* arg) { ...@@ -284,24 +285,31 @@ static void dmUdfdCloseWalkCb(uv_handle_t* handle, void* arg) {
} }
} }
void dmWatchUdfd(void *args) { static void dmUdfdStopAsyncCb(uv_async_t *async) {
SDnode *pDnode = async->data;
SUdfdData *pData = &pDnode->udfdData;
uv_stop(&pData->loop);
}
static void dmWatchUdfd(void *args) {
SDnode *pDnode = args; SDnode *pDnode = args;
SUdfdData *pData = &pDnode->udfdData; SUdfdData *pData = &pDnode->udfdData;
uv_loop_init(&pData->loop); uv_loop_init(&pData->loop);
uv_async_init(&pData->loop, &pData->stopAsync, dmUdfdStopAsyncCb);
pData->stopAsync.data = pDnode;
int32_t err = dmSpawnUdfd(pDnode); int32_t err = dmSpawnUdfd(pDnode);
atomic_store_32(&pData->spawnErr, err); atomic_store_32(&pData->spawnErr, err);
uv_barrier_wait(&pData->barrier); uv_barrier_wait(&pData->barrier);
uv_run(&pData->loop, UV_RUN_DEFAULT); uv_run(&pData->loop, UV_RUN_DEFAULT);
err = uv_loop_close(&pData->loop); uv_loop_close(&pData->loop);
while (err == UV_EBUSY) {
uv_walk(&pData->loop, dmUdfdCloseWalkCb, NULL); uv_walk(&pData->loop, dmUdfdCloseWalkCb, NULL);
uv_run(&pData->loop, UV_RUN_DEFAULT); uv_run(&pData->loop, UV_RUN_DEFAULT);
err = uv_loop_close(&pData->loop); uv_loop_close(&pData->loop);
}
return; return;
} }
int32_t dmStartUdfd(SDnode *pDnode) { static int32_t dmStartUdfd(SDnode *pDnode) {
SUdfdData *pData = &pDnode->udfdData; SUdfdData *pData = &pDnode->udfdData;
if (pData->startCalled) { if (pData->startCalled) {
dInfo("dnode-mgmt start udfd already called"); dInfo("dnode-mgmt start udfd already called");
...@@ -309,30 +317,25 @@ int32_t dmStartUdfd(SDnode *pDnode) { ...@@ -309,30 +317,25 @@ int32_t dmStartUdfd(SDnode *pDnode) {
} }
pData->startCalled = true; pData->startCalled = true;
uv_barrier_init(&pData->barrier, 2); uv_barrier_init(&pData->barrier, 2);
pData->stopping = 0;
uv_thread_create(&pData->thread, dmWatchUdfd, pDnode); uv_thread_create(&pData->thread, dmWatchUdfd, pDnode);
uv_barrier_wait(&pData->barrier); uv_barrier_wait(&pData->barrier);
pData->needCleanUp = true; pData->needCleanUp = true;
return pData->spawnErr; return pData->spawnErr;
} }
int32_t dmStopUdfd(SDnode *pDnode) { static int32_t dmStopUdfd(SDnode *pDnode) {
dInfo("dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d", dInfo("dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d",
pDnode->udfdData.needCleanUp, pDnode->udfdData.spawnErr); pDnode->udfdData.needCleanUp, pDnode->udfdData.spawnErr);
SUdfdData *pData = &pDnode->udfdData; SUdfdData *pData = &pDnode->udfdData;
if (!pData->needCleanUp) { if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) {
return 0; return 0;
} }
atomic_store_8(&pData->stopping, 1); atomic_store_32(&pData->stopCalled, 1);
pData->needCleanUp = false;
uv_barrier_destroy(&pData->barrier); uv_barrier_destroy(&pData->barrier);
if (pData->spawnErr == 0) { uv_async_send(&pData->stopAsync);
uv_process_kill(&pData->process, SIGINT);
}
uv_stop(&pData->loop);
uv_thread_join(&pData->thread); uv_thread_join(&pData->thread);
atomic_store_8(&pData->stopping, 0);
return 0; return 0;
} }
......
...@@ -151,8 +151,9 @@ typedef struct SUdfdData { ...@@ -151,8 +151,9 @@ typedef struct SUdfdData {
uv_barrier_t barrier; uv_barrier_t barrier;
uv_process_t process; uv_process_t process;
int spawnErr; int spawnErr;
int8_t stopping;
uv_pipe_t ctrlPipe; uv_pipe_t ctrlPipe;
uv_async_t stopAsync;
int32_t stopCalled;
} SUdfdData; } SUdfdData;
typedef struct SDnode { typedef struct SDnode {
......
...@@ -586,7 +586,7 @@ static int32_t udfdRun() { ...@@ -586,7 +586,7 @@ static int32_t udfdRun() {
udfdCloseClientRpc(); udfdCloseClientRpc();
uv_mutex_destroy(&global.udfsMutex); uv_mutex_destroy(&global.udfsMutex);
taosHashCleanup(global.udfsHash); taosHashCleanup(global.udfsHash);
return code; return 0;
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册