提交 164251c9 编写于 作者: S shenglian zhou

fix: uv_write callback is called even after the uvTask associated is deleted

上级 b10a95f4
...@@ -875,7 +875,7 @@ bool isUdfcUvMsgComplete(SClientConnBuf *connBuf); ...@@ -875,7 +875,7 @@ bool isUdfcUvMsgComplete(SClientConnBuf *connBuf);
void udfcUvHandleRsp(SClientUvConn *conn); void udfcUvHandleRsp(SClientUvConn *conn);
void udfcUvHandleError(SClientUvConn *conn); void udfcUvHandleError(SClientUvConn *conn);
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf); void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
void onUdfcPipetWrite(uv_write_t *write, int status); void onUdfcPipeWrite(uv_write_t *write, int status);
void onUdfcPipeConnect(uv_connect_t *connect, int status); void onUdfcPipeConnect(uv_connect_t *connect, int status);
int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask); int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask);
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask); int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask);
...@@ -1226,7 +1226,7 @@ int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode * ...@@ -1226,7 +1226,7 @@ int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *
} }
void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
SClientUvConn *conn = handle->data; SClientUvConn *conn = handle->data;
SClientConnBuf *connBuf = &conn->readBuf; SClientConnBuf *connBuf = &conn->readBuf;
int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t); int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
...@@ -1244,6 +1244,9 @@ void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf ...@@ -1244,6 +1244,9 @@ void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf
buf->base = NULL; buf->base = NULL;
buf->len = 0; buf->len = 0;
} }
} else if (connBuf->total == -1 && connBuf->len < msgHeadSize) {
buf->base = connBuf->buf + connBuf->len;
buf->len = msgHeadSize - connBuf->len;
} else { } else {
connBuf->cap = connBuf->total > connBuf->cap ? connBuf->total : connBuf->cap; connBuf->cap = connBuf->total > connBuf->cap ? connBuf->total : connBuf->cap;
void *resultBuf = taosMemoryRealloc(connBuf->buf, connBuf->cap); void *resultBuf = taosMemoryRealloc(connBuf->buf, connBuf->cap);
...@@ -1258,8 +1261,7 @@ void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf ...@@ -1258,8 +1261,7 @@ void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf
} }
} }
fnTrace("conn buf cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total); fnDebug("udfc uv alloc buffer: cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total);
} }
bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) { bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
...@@ -1267,7 +1269,7 @@ bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) { ...@@ -1267,7 +1269,7 @@ bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
connBuf->total = *(int32_t *) (connBuf->buf); connBuf->total = *(int32_t *) (connBuf->buf);
} }
if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) { if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
fnTrace("udfc complete message is received, now handle it"); fnDebug("udfc complete message is received, now handle it");
return true; return true;
} }
return false; return false;
...@@ -1278,7 +1280,7 @@ void udfcUvHandleRsp(SClientUvConn *conn) { ...@@ -1278,7 +1280,7 @@ void udfcUvHandleRsp(SClientUvConn *conn) {
int64_t seqNum = *(int64_t *) (connBuf->buf + sizeof(int32_t)); // msglen then seqnum int64_t seqNum = *(int64_t *) (connBuf->buf + sizeof(int32_t)); // msglen then seqnum
if (QUEUE_EMPTY(&conn->taskQueue)) { if (QUEUE_EMPTY(&conn->taskQueue)) {
fnError("udfc no task waiting for response on connection"); fnError("udfc no task waiting on connection. response seqnum:%"PRId64, seqNum);
return; return;
} }
bool found = false; bool found = false;
...@@ -1287,6 +1289,7 @@ void udfcUvHandleRsp(SClientUvConn *conn) { ...@@ -1287,6 +1289,7 @@ void udfcUvHandleRsp(SClientUvConn *conn) {
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
while (h != &conn->taskQueue) { while (h != &conn->taskQueue) {
fnDebug("udfc handle response iterate through queue. uvTask:%d-%p", task->seqNum, task);
if (task->seqNum == seqNum) { if (task->seqNum == seqNum) {
if (found == false) { if (found == false) {
found = true; found = true;
...@@ -1315,6 +1318,7 @@ void udfcUvHandleRsp(SClientUvConn *conn) { ...@@ -1315,6 +1318,7 @@ void udfcUvHandleRsp(SClientUvConn *conn) {
} }
void udfcUvHandleError(SClientUvConn *conn) { void udfcUvHandleError(SClientUvConn *conn) {
fnDebug("handle error on conn: %p, pipe: %p", conn, conn->pipe);
while (!QUEUE_EMPTY(&conn->taskQueue)) { while (!QUEUE_EMPTY(&conn->taskQueue)) {
QUEUE* h = QUEUE_HEAD(&conn->taskQueue); QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
...@@ -1328,7 +1332,7 @@ void udfcUvHandleError(SClientUvConn *conn) { ...@@ -1328,7 +1332,7 @@ void udfcUvHandleError(SClientUvConn *conn) {
} }
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
fnTrace("udfc client %p, client read from pipe. nread: %zd", client, nread); fnDebug("udfc client %p, client read from pipe. nread: %zd", client, nread);
if (nread == 0) return; if (nread == 0) return;
SClientUvConn *conn = client->data; SClientUvConn *conn = client->data;
...@@ -1338,31 +1342,25 @@ void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { ...@@ -1338,31 +1342,25 @@ void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
if (isUdfcUvMsgComplete(connBuf)) { if (isUdfcUvMsgComplete(connBuf)) {
udfcUvHandleRsp(conn); udfcUvHandleRsp(conn);
} }
} }
if (nread < 0) { if (nread < 0) {
fnError("udfc client pipe %p read error: %zd, %s.", client, nread, uv_strerror(nread)); fnError("udfc client pipe %p read error: %zd(%s).", client, nread, uv_strerror(nread));
if (nread == UV_EOF) { if (nread == UV_EOF) {
fnError("\tudfc client pipe %p closed", client); fnError("\tudfc client pipe %p closed", client);
} }
udfcUvHandleError(conn); udfcUvHandleError(conn);
} }
} }
void onUdfcPipetWrite(uv_write_t *write, int status) { void onUdfcPipeWrite(uv_write_t *write, int status) {
SClientUvTaskNode *uvTask = write->data; SClientUvConn *conn = write->data;
uv_pipe_t *pipe = uvTask->pipe; if (status < 0) {
fnTrace("udfc client %p write length:%zu", pipe, uvTask->reqBuf.len); fnError("udfc client connection %p write failed. status: %d(%s)", conn, status, uv_strerror(status));
SClientUvConn *conn = pipe->data;
if (status == 0) {
QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
} else {
fnError("udfc client %p write error.", pipe);
udfcUvHandleError(conn); udfcUvHandleError(conn);
} else {
fnDebug("udfc client connection %p write succeed", conn);
} }
taosMemoryFree(write); taosMemoryFree(write);
taosMemoryFree(uvTask->reqBuf.base);
} }
void onUdfcPipeConnect(uv_connect_t *connect, int status) { void onUdfcPipeConnect(uv_connect_t *connect, int status) {
...@@ -1419,7 +1417,7 @@ int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN ...@@ -1419,7 +1417,7 @@ int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
} }
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) { int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
fnTrace("queue uv task to event loop, task: %d, %p", uvTask->type, uvTask); fnDebug("queue uv task to event loop, uvTask: %d-%p", uvTask->type, uvTask);
SUdfcProxy *udfc = uvTask->udfc; SUdfcProxy *udfc = uvTask->udfc;
uv_mutex_lock(&udfc->taskQueueMutex); uv_mutex_lock(&udfc->taskQueueMutex);
QUEUE_INSERT_TAIL(&udfc->taskQueue, &uvTask->recvTaskQueue); QUEUE_INSERT_TAIL(&udfc->taskQueue, &uvTask->recvTaskQueue);
...@@ -1427,14 +1425,14 @@ int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) { ...@@ -1427,14 +1425,14 @@ int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
uv_async_send(&udfc->loopTaskAync); uv_async_send(&udfc->loopTaskAync);
uv_sem_wait(&uvTask->taskSem); uv_sem_wait(&uvTask->taskSem);
fnInfo("udfc uv task finished. task: %d, %p", uvTask->type, uvTask); fnInfo("udfc uvTask finished. uvTask:%"PRId64"-%d-%p", uvTask->seqNum, uvTask->type, uvTask);
uv_sem_destroy(&uvTask->taskSem); uv_sem_destroy(&uvTask->taskSem);
return 0; return 0;
} }
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
fnTrace("event loop start uv task. task: %d, %p", uvTask->type, uvTask); fnDebug("event loop start uv task. uvTask: %"PRId64"-%d-%p", uvTask->seqNum, uvTask->type, uvTask);
int32_t code = 0; int32_t code = 0;
switch (uvTask->type) { switch (uvTask->type) {
...@@ -1465,10 +1463,12 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { ...@@ -1465,10 +1463,12 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
code = TSDB_CODE_UDF_PIPE_NO_PIPE; code = TSDB_CODE_UDF_PIPE_NO_PIPE;
} else { } else {
uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t)); uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t));
write->data = uvTask; write->data = pipe->data;
int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipetWrite); QUEUE* connTaskQueue = &((SClientUvConn*)pipe->data)->taskQueue;
QUEUE_INSERT_TAIL(connTaskQueue, &uvTask->connTaskQueue);
int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipeWrite);
if (err != 0) { if (err != 0) {
fnError("udfc event loop start req/rsp task uv_write failed. code: %s", uv_strerror(err)); fnError("udfc event loop start req_rsp task uv_write failed. uvtask: %p, code: %s", uvTask, uv_strerror(err));
} }
code = err; code = err;
} }
...@@ -1618,6 +1618,7 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) { ...@@ -1618,6 +1618,7 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
SClientUvTaskNode *uvTask = NULL; SClientUvTaskNode *uvTask = NULL;
udfcCreateUvTask(task, uvTaskType, &uvTask); udfcCreateUvTask(task, uvTaskType, &uvTask);
fnDebug("udfc client task: %p created uvTask: %p. pipe: %p", task, uvTask, task->session->udfUvPipe);
udfcQueueUvTask(uvTask); udfcQueueUvTask(uvTask);
udfcGetUdfTaskResultFromUvTask(task, uvTask); udfcGetUdfTaskResultFromUvTask(task, uvTask);
if (uvTaskType == UV_TASK_CONNECT) { if (uvTaskType == UV_TASK_CONNECT) {
...@@ -1625,6 +1626,8 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) { ...@@ -1625,6 +1626,8 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
SClientUvConn *conn = uvTask->pipe->data; SClientUvConn *conn = uvTask->pipe->data;
conn->session = task->session; conn->session = task->session;
} }
taosMemoryFree(uvTask->reqBuf.base);
uvTask->reqBuf.base = NULL;
taosMemoryFree(uvTask); taosMemoryFree(uvTask);
uvTask = NULL; uvTask = NULL;
return task->errCode; return task->errCode;
...@@ -1670,7 +1673,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { ...@@ -1670,7 +1673,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
SSDataBlock* output, SUdfInterBuf *newState) { SSDataBlock* output, SUdfInterBuf *newState) {
fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle); fnDebug("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
SUdfcUvSession *session = (SUdfcUvSession *) handle; SUdfcUvSession *session = (SUdfcUvSession *) handle;
if (session->udfUvPipe == NULL) { if (session->udfUvPipe == NULL) {
fnError("No pipe to udfd"); fnError("No pipe to udfd");
......
...@@ -671,6 +671,9 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) { ...@@ -671,6 +671,9 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
fnError("udfd can not allocate enough memory") buf->base = NULL; fnError("udfd can not allocate enough memory") buf->base = NULL;
buf->len = 0; buf->len = 0;
} }
} else if (ctx->inputTotal == -1 && ctx->inputLen < msgHeadSize) {
buf->base = ctx->inputBuf + ctx->inputLen;
buf->len = msgHeadSize - ctx->inputLen;
} else { } else {
ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap; ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap;
void *inputBuf = taosMemoryRealloc(ctx->inputBuf, ctx->inputCap); void *inputBuf = taosMemoryRealloc(ctx->inputBuf, ctx->inputCap);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册