From eae11bf8f083fc227479b30cbf18ef9b6a1866b7 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Tue, 19 Apr 2022 09:03:04 +0800 Subject: [PATCH] handle uv read error --- source/libs/function/inc/tudf.h | 1 + source/libs/function/src/tudf.c | 17 +++++++++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/source/libs/function/inc/tudf.h b/source/libs/function/inc/tudf.h index db62accb85..775e6133f7 100644 --- a/source/libs/function/inc/tudf.h +++ b/source/libs/function/inc/tudf.h @@ -32,6 +32,7 @@ extern "C" { enum { UDFC_CODE_STOPPING = -1, UDFC_CODE_RESTARTING = -2, + UDFC_CODE_PIPE_READ_ERR = -3, }; /** diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 1bc961d527..e3444ffa21 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -697,7 +697,18 @@ void udfcUvHandleRsp(SClientUvConn *conn) { } void udfcUvHandleError(SClientUvConn *conn) { - uv_close((uv_handle_t *) conn->pipe, onUdfcPipeClose); + while (!QUEUE_EMPTY(&conn->taskQueue)) { + QUEUE* h = QUEUE_HEAD(&conn->taskQueue); + SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue); + task->errCode = UDFC_CODE_PIPE_READ_ERR; + uv_sem_post(&task->taskSem); + QUEUE_REMOVE(&task->procTaskQueue); + } + + uv_close((uv_handle_t *) conn->pipe, NULL); + taosMemoryFree(conn->pipe); + taosMemoryFree(conn->readBuf.buf); + taosMemoryFree(conn); } void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { @@ -903,8 +914,11 @@ void udfStopAsyncCb(uv_async_t *async) { uv_stop(&gUdfdLoop); } } + int32_t startUdfd(); + void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) { + //TODO: pipe close will be first received debugPrint("Process exited with status %" PRId64 ", signal %d", exit_status, term_signal); uv_close((uv_handle_t *) req, NULL); //TODO: restart the udfd process @@ -919,7 +933,6 @@ void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) { cleanUpUvTasks(); startUdfd(); } - } int32_t startUdfd() { -- GitLab