From 5f51fb3a3267f7be77646f58c25c19fcf7ec8a22 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 11 Mar 2022 11:18:05 +0800 Subject: [PATCH] modify transport --- source/libs/transport/src/transSrv.c | 44 ++++++++++++++++++---------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 432ec472fb..9e68b0bf7b 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -65,6 +65,7 @@ typedef struct SWorkThrdObj { queue conn; pthread_mutex_t msgMtx; void* pTransInst; + bool stop; } SWorkThrdObj; typedef struct SServerObj { @@ -386,7 +387,6 @@ static void destroyAllConn(SWorkThrdObj* pThrd) { SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue); transUnrefSrvHandle(c); - // destroyConn(c, true); } } void uvWorkerAsyncCb(uv_async_t* handle) { @@ -411,10 +411,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) { } if (msg->pConn == NULL) { free(msg); - destroyAllConn(pThrd); - - uv_loop_close(pThrd->loop); - uv_stop(pThrd->loop); + bool noConn = QUEUE_IS_EMPTY(&pThrd->conn); + if (noConn == true) { + uv_loop_close(pThrd->loop); + uv_stop(pThrd->loop); + } else { + destroyAllConn(pThrd); + uv_loop_close(pThrd->loop); + pThrd->stop = true; + } } else { uvStartSendResp(msg); } @@ -422,12 +427,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) { } static void uvAcceptAsyncCb(uv_async_t* async) { SServerObj* srv = async->data; + tDebug("close server port %d", srv->port); uv_close((uv_handle_t*)&srv->server, NULL); uv_stop(srv->loop); } static void uvShutDownCb(uv_shutdown_t* req, int status) { - tDebug("conn failed to shut down: %s", uv_err_name(status)); + if (status != 0) { + tDebug("conn failed to shut down: %s", uv_err_name(status)); + } uv_close((uv_handle_t*)req->handle, uvDestroyConn); free(req); } @@ -509,14 +517,14 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { int addrlen = sizeof(pConn->addr); if (0 != uv_tcp_getpeername(pConn->pTcp, (struct sockaddr*)&pConn->addr, &addrlen)) { tError("server conn %p failed to get peer info", pConn); - destroyConn(pConn, true); + transUnrefSrvHandle(pConn); return; } addrlen = sizeof(pConn->locaddr); if (0 != uv_tcp_getsockname(pConn->pTcp, (struct sockaddr*)&pConn->locaddr, &addrlen)) { tError("server conn %p failed to get local info", pConn); - destroyConn(pConn, true); + transUnrefSrvHandle(pConn); return; } @@ -524,7 +532,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { } else { tDebug("failed to create new connection"); - destroyConn(pConn, true); + transUnrefSrvHandle(pConn); } } @@ -602,6 +610,7 @@ static SSrvConn* createConn(void* hThrd) { tTrace("conn %p created", pConn); pConn->broken = false; + transRefSrvHandle(pConn); return pConn; } @@ -617,25 +626,26 @@ static void destroyConn(SSrvConn* conn, bool clear) { destroySmsg(msg); } conn->srvMsgs = taosArrayDestroy(conn->srvMsgs); - QUEUE_REMOVE(&conn->queue); - if (clear) { tTrace("try to destroy conn %p", conn); - // uv_tcp_close_reset(conn->pTcp, uvDestroyConn); uv_shutdown_t* req = malloc(sizeof(uv_shutdown_t)); uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb); - // uv_unref((uv_handle_t*)conn->pTcp); - // uv_close((uv_handle_t*)conn->pTcp, uvDestroyConn); } } static void uvDestroyConn(uv_handle_t* handle) { - SSrvConn* conn = handle->data; + SSrvConn* conn = handle->data; + SWorkThrdObj* thrd = conn->hostThrd; + tDebug("server conn %p destroy", conn); uv_timer_stop(conn->pTimer); - // free(conn->pTimer); + QUEUE_REMOVE(&conn->queue); free(conn->pTcp); free(conn->pWriter); free(conn); + + if (thrd->stop && QUEUE_IS_EMPTY(&thrd->conn)) { + uv_stop(thrd->loop); + } } static int transAddAuthPart(SSrvConn* pConn, char* msg, int msgLen) { STransMsgHead* pHead = (STransMsgHead*)msg; @@ -670,6 +680,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < srv->numOfThreads; i++) { SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj)); + thrd->stop = false; srv->pThreadObj[i] = thrd; srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); @@ -767,6 +778,7 @@ void transUnrefSrvHandle(void* handle) { return; } int ref = T_REF_DEC((SSrvConn*)handle); + tDebug("handle %p ref count: %d", handle, ref); if (ref == 0) { destroyConn((SSrvConn*)handle, true); -- GitLab