未验证 提交 928af5f0 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #12472 from taosdata/feature/udf

fix: taosd core by thread switching to new thread and free uvtask
...@@ -964,7 +964,7 @@ void udfcUvHandleError(SClientUvConn *conn) { ...@@ -964,7 +964,7 @@ void udfcUvHandleError(SClientUvConn *conn) {
uv_close((uv_handle_t *) conn->pipe, onUdfcPipeClose); uv_close((uv_handle_t *) conn->pipe, onUdfcPipeClose);
} }
void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
fnTrace("udfc client %p, client read from pipe. nread: %zd", client, nread); fnTrace("udfc client %p, client read from pipe. nread: %zd", client, nread);
if (nread == 0) return; if (nread == 0) return;
...@@ -987,30 +987,32 @@ void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { ...@@ -987,30 +987,32 @@ void onUdfcRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
} }
void onUdfClientWrite(uv_write_t *write, int status) { void onUdfcPipetWrite(uv_write_t *write, int status) {
SClientUvTaskNode *uvTask = write->data; SClientUvTaskNode *uvTask = write->data;
uv_pipe_t *pipe = uvTask->pipe; uv_pipe_t *pipe = uvTask->pipe;
fnTrace("udfc client %p write length:%zu", pipe, uvTask->reqBuf.len);
SClientUvConn *conn = pipe->data;
if (status == 0) { if (status == 0) {
SClientUvConn *conn = pipe->data;
QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue); QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
} else { } else {
fnError("udfc client %p write error.", pipe); fnError("udfc client %p write error.", pipe);
udfcUvHandleError(conn);
} }
fnTrace("udfc client %p write length:%zu", pipe, uvTask->reqBuf.len);
taosMemoryFree(write); taosMemoryFree(write);
taosMemoryFree(uvTask->reqBuf.base); taosMemoryFree(uvTask->reqBuf.base);
} }
void onUdfClientConnect(uv_connect_t *connect, int status) { void onUdfcPipeConnect(uv_connect_t *connect, int status) {
SClientUvTaskNode *uvTask = connect->data; SClientUvTaskNode *uvTask = connect->data;
uvTask->errCode = status;
if (status != 0) { if (status != 0) {
//TODO: LOG error fnError("client connect error, task seq: %"PRId64", code: %s", uvTask->seqNum, uv_strerror(status));
} }
uv_read_start((uv_stream_t *) uvTask->pipe, udfcAllocateBuffer, onUdfcRead); uvTask->errCode = status;
uv_read_start((uv_stream_t *)uvTask->pipe, udfcAllocateBuffer, onUdfcPipeRead);
taosMemoryFree(connect); taosMemoryFree(connect);
uv_sem_post(&uvTask->taskSem);
QUEUE_REMOVE(&uvTask->procTaskQueue); QUEUE_REMOVE(&uvTask->procTaskQueue);
uv_sem_post(&uvTask->taskSem);
} }
int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) { int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) {
...@@ -1088,14 +1090,17 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { ...@@ -1088,14 +1090,17 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t)); uv_connect_t *connReq = taosMemoryMalloc(sizeof(uv_connect_t));
connReq->data = uvTask; connReq->data = uvTask;
uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfClientConnect); uv_pipe_connect(connReq, pipe, uvTask->udfc->udfdPipeName, onUdfcPipeConnect);
break; break;
} }
case UV_TASK_REQ_RSP: { case UV_TASK_REQ_RSP: {
uv_pipe_t *pipe = uvTask->pipe; uv_pipe_t *pipe = uvTask->pipe;
uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t)); uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t));
write->data = uvTask; write->data = uvTask;
uv_write(write, (uv_stream_t *) pipe, &uvTask->reqBuf, 1, onUdfClientWrite); int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipetWrite);
if (err != 0) {
fnError("udfc event loop start req/rsp task uv_write failed. code: %s", uv_strerror(err));
}
break; break;
} }
case UV_TASK_DISCONNECT: { case UV_TASK_DISCONNECT: {
...@@ -1105,6 +1110,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { ...@@ -1105,6 +1110,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
break; break;
} }
default: { default: {
fnError("udfc event loop unknown task type.")
break; break;
} }
} }
...@@ -1124,8 +1130,10 @@ void udfClientAsyncCb(uv_async_t *async) { ...@@ -1124,8 +1130,10 @@ void udfClientAsyncCb(uv_async_t *async) {
QUEUE* h = QUEUE_HEAD(&wq); QUEUE* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
udfcStartUvTask(task); int32_t code = udfcStartUvTask(task);
QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue); if (code == 0) {
QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue);
}
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册