From 8d1ddbd87788dbf0bfdd0fdd9b0b8dafb8566a8e Mon Sep 17 00:00:00 2001 From: slzhou Date: Sat, 14 May 2022 15:42:21 +0800 Subject: [PATCH] fix: remove from task queue before signalling caller --- source/libs/function/src/tudf.c | 40 +++++++++++++++++++++------------ 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 9fa20d3556..dd5bee208b 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -964,7 +964,7 @@ void udfcUvHandleError(SClientUvConn *conn) { uv_close((uv_handle_t *) conn->pipe, onUdfcPipeClose); } -void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { +void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { fnTrace("udfc client %p, client read from pipe. nread: %zd", client, nread); if (nread == 0) return; @@ -987,30 +987,32 @@ void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { } -void onUdfClientWrite(uv_write_t *write, int status) { +void onUdfcPipetWrite(uv_write_t *write, int status) { SClientUvTaskNode *uvTask = write->data; uv_pipe_t *pipe = uvTask->pipe; + fnTrace("udfc client %p write length:%zu", pipe, uvTask->reqBuf.len); + SClientUvConn *conn = pipe->data; if (status == 0) { - SClientUvConn *conn = pipe->data; QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue); } else { fnError("udfc client %p write error.", pipe); + udfcUvHandleError(conn); } - fnTrace("udfc client %p write length:%zu", pipe, uvTask->reqBuf.len); taosMemoryFree(write); taosMemoryFree(uvTask->reqBuf.base); } -void onUdfClientConnect(uv_connect_t *connect, int status) { +void onUdfcPipeConnect(uv_connect_t *connect, int status) { SClientUvTaskNode *uvTask = connect->data; - uvTask->errCode = status; if (status != 0) { - //TODO: LOG error + fnError("client connect error, task seq: %"PRId64", code: %s", uvTask->seqNum, uv_strerror(status)); } - uv_read_start((uv_stream_t *) uvTask->pipe, udfcAllocateBuffer, onUdfcRead); + uvTask->errCode = status; + + uv_read_start((uv_stream_t *)uvTask->pipe, udfcAllocateBuffer, onUdfcPipeRead); taosMemoryFree(connect); - uv_sem_post(&uvTask->taskSem); QUEUE_REMOVE(&uvTask->procTaskQueue); + uv_sem_post(&uvTask->taskSem); } int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) { @@ -1070,6 +1072,7 @@ int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) { int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { fnTrace("event loop start uv task. task: %d, %p", uvTask->type, uvTask); + int32_t code = 0; switch (uvTask->type) { case UV_TASK_CONNECT: { uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t)); @@ -1088,28 +1091,35 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t)); connReq->data = uvTask; - uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfClientConnect); + uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect); + code = 0; break; } case UV_TASK_REQ_RSP: { uv_pipe_t *pipe = uvTask->pipe; uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t)); write->data = uvTask; - uv_write(write, (uv_stream_t *) pipe, &uvTask->reqBuf, 1, onUdfClientWrite); + int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipetWrite); + if (err != 0) { + fnError("udfc event loop start req/rsp task uv_write failed. code: %s", uv_strerror(err)); + } + code = err; break; } case UV_TASK_DISCONNECT: { SClientUvConn *conn = uvTask->pipe->data; QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue); uv_close((uv_handle_t *) uvTask->pipe, onUdfcPipeClose); + code = 0; break; } default: { + fnError("udfc event loop unknown task type.") break; } } - return 0; + return code; } void udfClientAsyncCb(uv_async_t *async) { @@ -1124,8 +1134,10 @@ void udfClientAsyncCb(uv_async_t *async) { QUEUE* h = QUEUE_HEAD(&wq); QUEUE_REMOVE(h); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue); - udfcStartUvTask(task); - QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue); + int32_t code = udfcStartUvTask(task); + if (code == 0) { + QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue); + } } } -- GitLab