diff --git a/source/libs/function/inc/tudf.h b/source/libs/function/inc/tudf.h index db62accb8571d3f755417237dede99080819915d..775e6133f7f3be9c2a9f8f5a58fc6f35503b1e29 100644 --- a/source/libs/function/inc/tudf.h +++ b/source/libs/function/inc/tudf.h @@ -32,6 +32,7 @@ extern "C" { enum { UDFC_CODE_STOPPING = -1, UDFC_CODE_RESTARTING = -2, + UDFC_CODE_PIPE_READ_ERR = -3, }; /** diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 1bc961d5275508827f48424b1e9c1877b3e3f51c..e3444ffa215805412cdb3119d5df29e79c8863b9 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -697,7 +697,18 @@ void udfcUvHandleRsp(SClientUvConn *conn) { } void udfcUvHandleError(SClientUvConn *conn) { - uv_close((uv_handle_t *) conn->pipe, onUdfcPipeClose); + while (!QUEUE_EMPTY(&conn->taskQueue)) { + QUEUE* h = QUEUE_HEAD(&conn->taskQueue); + SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue); + task->errCode = UDFC_CODE_PIPE_READ_ERR; + uv_sem_post(&task->taskSem); + QUEUE_REMOVE(&task->procTaskQueue); + } + + uv_close((uv_handle_t *) conn->pipe, NULL); + taosMemoryFree(conn->pipe); + taosMemoryFree(conn->readBuf.buf); + taosMemoryFree(conn); } void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { @@ -903,8 +914,11 @@ void udfStopAsyncCb(uv_async_t *async) { uv_stop(&gUdfdLoop); } } + int32_t startUdfd(); + void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) { + //TODO: pipe close will be first received debugPrint("Process exited with status %" PRId64 ", signal %d", exit_status, term_signal); uv_close((uv_handle_t *) req, NULL); //TODO: restart the udfd process @@ -919,7 +933,6 @@ void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) { cleanUpUvTasks(); startUdfd(); } - } int32_t startUdfd() {