提交 8d1ddbd8 编写于 作者: S slzhou

fix: remove from task queue before signalling caller

上级 1e32228d
......@@ -964,7 +964,7 @@ void udfcUvHandleError(SClientUvConn *conn) {
uv_close((uv_handle_t *) conn->pipe, onUdfcPipeClose);
}
void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
fnTrace("udfc client %p, client read from pipe. nread: %zd", client, nread);
if (nread == 0) return;
......@@ -987,30 +987,32 @@ void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
}
void onUdfClientWrite(uv_write_t *write, int status) {
void onUdfcPipetWrite(uv_write_t *write, int status) {
SClientUvTaskNode *uvTask = write->data;
uv_pipe_t *pipe = uvTask->pipe;
fnTrace("udfc client %p write length:%zu", pipe, uvTask->reqBuf.len);
SClientUvConn *conn = pipe->data;
if (status == 0) {
SClientUvConn *conn = pipe->data;
QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
} else {
fnError("udfc client %p write error.", pipe);
udfcUvHandleError(conn);
}
fnTrace("udfc client %p write length:%zu", pipe, uvTask->reqBuf.len);
taosMemoryFree(write);
taosMemoryFree(uvTask->reqBuf.base);
}
void onUdfClientConnect(uv_connect_t *connect, int status) {
void onUdfcPipeConnect(uv_connect_t *connect, int status) {
SClientUvTaskNode *uvTask = connect->data;
uvTask->errCode = status;
if (status != 0) {
//TODO: LOG error
fnError("client connect error, task seq: %"PRId64", code: %s", uvTask->seqNum, uv_strerror(status));
}
uv_read_start((uv_stream_t *) uvTask->pipe, udfcAllocateBuffer, onUdfcRead);
uvTask->errCode = status;
uv_read_start((uv_stream_t *)uvTask->pipe, udfcAllocateBuffer, onUdfcPipeRead);
taosMemoryFree(connect);
uv_sem_post(&uvTask->taskSem);
QUEUE_REMOVE(&uvTask->procTaskQueue);
uv_sem_post(&uvTask->taskSem);
}
int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) {
......@@ -1070,6 +1072,7 @@ int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
fnTrace("event loop start uv task. task: %d, %p", uvTask->type, uvTask);
int32_t code = 0;
switch (uvTask->type) {
case UV_TASK_CONNECT: {
uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
......@@ -1088,28 +1091,35 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t));
connReq->data = uvTask;
uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfClientConnect);
uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect);
code = 0;
break;
}
case UV_TASK_REQ_RSP: {
uv_pipe_t *pipe = uvTask->pipe;
uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t));
write->data = uvTask;
uv_write(write, (uv_stream_t *) pipe, &uvTask->reqBuf, 1, onUdfClientWrite);
int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipetWrite);
if (err != 0) {
fnError("udfc event loop start req/rsp task uv_write failed. code: %s", uv_strerror(err));
}
code = err;
break;
}
case UV_TASK_DISCONNECT: {
SClientUvConn *conn = uvTask->pipe->data;
QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
uv_close((uv_handle_t *) uvTask->pipe, onUdfcPipeClose);
code = 0;
break;
}
default: {
fnError("udfc event loop unknown task type.")
break;
}
}
return 0;
return code;
}
void udfClientAsyncCb(uv_async_t *async) {
......@@ -1124,8 +1134,10 @@ void udfClientAsyncCb(uv_async_t *async) {
QUEUE* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
udfcStartUvTask(task);
QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue);
int32_t code = udfcStartUvTask(task);
if (code == 0) {
QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue);
}
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册