From 9913d0c72ad20e61a0686ba4011a1c8fbe05fc4e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 11 Mar 2022 18:27:52 +0800 Subject: [PATCH] fix except --- source/libs/transport/src/transCli.c | 107 +++++++++++++------------- source/libs/transport/src/transComm.c | 4 + source/libs/transport/src/transSrv.c | 43 ++++++----- 3 files changed, 78 insertions(+), 76 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 13a5d57dfe..1323677071 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -26,7 +26,7 @@ typedef struct SCliConn { T_REF_DECLARE() uv_connect_t connReq; uv_stream_t* stream; - uv_write_t* writeReq; + uv_write_t writeReq; void* hostThrd; SConnBuffer readBuf; void* data; @@ -34,12 +34,12 @@ typedef struct SCliConn { uint64_t expireTime; int8_t ctnRdCnt; // continue read count int hThrdIdx; + bool broken; // link broken or not int persist; // // spi configure - char spi; - char secured; - int32_t ref; + char spi; + char secured; // debug and log info struct sockaddr_in addr; struct sockaddr_in locaddr; @@ -54,11 +54,10 @@ typedef struct SCliMsg { } SCliMsg; typedef struct SCliThrdObj { - pthread_t thread; - uv_loop_t* loop; - // uv_async_t* cliAsync; // + pthread_t thread; + uv_loop_t* loop; SAsyncPool* asyncPool; - uv_timer_t* timer; + uv_timer_t timer; void* pool; // conn pool // msg queue @@ -83,7 +82,7 @@ typedef struct SConnList { // conn pool // add expire timeout and capacity limit -static void* creatConnPool(int size); +static void* createConnPool(int size); static void* destroyConnPool(void* pool); static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); static void addConnToPool(void* pool, char* ip, uint32_t port, SCliConn* conn); @@ -99,8 +98,10 @@ static void clientWriteCb(uv_write_t* req, int status); // callback after conn to server static void clientConnCb(uv_connect_t* req, int status); static void clientAsyncCb(uv_async_t* handle); -static void clientDestroy(uv_handle_t* handle); -static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/); + +static SCliConn* clientConnCreate(SCliThrdObj* thrd); +static void clientConnDestroy(SCliConn* pConn, bool clear /*clear tcp handle or not*/); +static void clientDestroy(uv_handle_t* handle); // process data read from server, add decompress etc later static void clientHandleResp(SCliConn* conn); @@ -176,14 +177,14 @@ static void clientHandleResp(SCliConn* conn) { conn->data = NULL; // start thread's timer of conn pool if not active - if (!uv_is_active((uv_handle_t*)pThrd->timer) && pTransInst->idleTime > 0) { - // uv_timer_start((uv_timer_t*)pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); + if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) { + // uv_timer_start((uv_timer_t*)&pThrd->timer, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } } static void clientHandleExcept(SCliConn* pConn) { if (pConn->data == NULL) { // handle conn except in conn pool - clientConnDestroy(pConn, true); + transUnrefCliHandle(pConn); return; } SCliThrdObj* pThrd = pConn->hostThrd; @@ -209,7 +210,7 @@ static void clientHandleExcept(SCliConn* pConn) { pConn->data = NULL; tTrace("%s client conn %p start to destroy", CONN_GET_INST_LABEL(pConn), pConn); - clientConnDestroy(pConn, true); + transUnrefCliHandle(pConn); } static void clientTimeoutCb(uv_timer_t* handle) { @@ -225,9 +226,7 @@ static void clientTimeoutCb(uv_timer_t* handle) { SCliConn* c = QUEUE_DATA(h, SCliConn, conn); if (c->expireTime < currentTime) { QUEUE_REMOVE(h); - // uv_stream_t stm = *(c->stream); - // uv_close((uv_handle_t*)&stm, clientDestroy); - clientConnDestroy(c, true); + transUnrefCliHandle(c); } else { break; } @@ -238,7 +237,7 @@ static void clientTimeoutCb(uv_timer_t* handle) { pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pRpc->idleTime); uv_timer_start(handle, clientTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); } -static void* creatConnPool(int size) { +static void* createConnPool(int size) { // thread local, no lock return taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); } @@ -253,7 +252,7 @@ static void* destroyConnPool(void* pool) { } connList = taosHashIterate((SHashObj*)pool, connList); } - taosHashClear(pool); + taosHashCleanup(pool); } static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { @@ -328,26 +327,38 @@ static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf } if (nread < 0) { tError("%s client conn %p read error: %s", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread)); + conn->broken = true; clientHandleExcept(conn); } } +static SCliConn* clientConnCreate(SCliThrdObj* pThrd) { + SCliConn* conn = calloc(1, sizeof(SCliConn)); + // read/write stream handle + conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); + uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); + conn->stream->data = conn; + + conn->writeReq.data = conn; + conn->connReq.data = conn; + + QUEUE_INIT(&conn->conn); + conn->hostThrd = pThrd; + conn->broken = false; + transRefCliHandle(conn); + return conn; +} static void clientConnDestroy(SCliConn* conn, bool clear) { - // - conn->ref--; - if (conn->ref == 0) { - tTrace("%s client conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); - QUEUE_REMOVE(&conn->conn); - if (clear) { - uv_close((uv_handle_t*)conn->stream, clientDestroy); - } + tTrace("%s client conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); + QUEUE_REMOVE(&conn->conn); + if (clear) { + uv_close((uv_handle_t*)conn->stream, clientDestroy); } } static void clientDestroy(uv_handle_t* handle) { SCliConn* conn = handle->data; free(conn->stream); - free(conn->writeReq); tTrace("%s client conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); free(conn); } @@ -359,7 +370,6 @@ static void clientWriteCb(uv_write_t* req, int status) { tTrace("%s client conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn); SCliMsg* pMsg = pConn->data; if (pMsg == NULL) { - // handle return; } destroyUserdata(&pMsg->msg); @@ -410,7 +420,7 @@ static void clientWrite(SCliConn* pConn) { TMSG_INFO(pHead->msgType), inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); - uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); + uv_write(&pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); } static void clientConnCb(uv_connect_t* req, int status) { // impl later @@ -436,10 +446,10 @@ static void clientHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { tDebug("client work thread %p start to quit", pThrd); destroyCmsg(pMsg); destroyConnPool(pThrd->pool); - // transDestroyAsyncPool(pThr) uv_close((uv_handle_t*)pThrd->cliAsync, NULL); - uv_timer_stop(pThrd->timer); + + uv_timer_stop(&pThrd->timer); + pThrd->quit = true; - // uv__async_stop(pThrd->cliAsync); uv_stop(pThrd->loop); } static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { @@ -463,7 +473,7 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { if (conn != NULL) { conn->data = pMsg; - conn->writeReq->data = conn; + conn->writeReq.data = conn; transDestroyBuffer(&conn->readBuf); if (pThrd->quit) { @@ -472,21 +482,8 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { } clientWrite(conn); } else { - conn = calloc(1, sizeof(SCliConn)); - conn->ref++; - // read/write stream handle - conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); - uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); - conn->stream->data = conn; - - conn->writeReq = malloc(sizeof(uv_write_t)); - conn->writeReq->data = conn; - - QUEUE_INIT(&conn->conn); - - conn->connReq.data = conn; + conn = clientConnCreate(pThrd); conn->data = pMsg; - conn->hostThrd = pThrd; int ret = transSetConnOption((uv_tcp_t*)conn->stream); if (ret) { @@ -585,11 +582,10 @@ static SCliThrdObj* createThrdObj() { pThrd->asyncPool = transCreateAsyncPool(pThrd->loop, 5, pThrd, clientAsyncCb); - pThrd->timer = malloc(sizeof(uv_timer_t)); - uv_timer_init(pThrd->loop, pThrd->timer); - pThrd->timer->data = pThrd; + uv_timer_init(pThrd->loop, &pThrd->timer); + pThrd->timer.data = pThrd; - pThrd->pool = creatConnPool(4); + pThrd->pool = createConnPool(4); pThrd->quit = false; return pThrd; @@ -602,8 +598,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd) { pthread_join(pThrd->thread, NULL); pthread_mutex_destroy(&pThrd->msgMtx); transDestroyAsyncPool(pThrd->asyncPool); - // free(pThrd->cliAsync); - free(pThrd->timer); + + uv_timer_stop(&pThrd->timer); free(pThrd->loop); free(pThrd); } @@ -649,6 +645,7 @@ void transUnrefCliHandle(void* handle) { } int ref = T_REF_DEC((SCliConn*)handle); if (ref == 0) { + clientConnDestroy((SCliConn*)handle, true); } // unref cli handle diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 92e42cb380..c83f76c2ec 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -226,9 +226,13 @@ int transAllocBuffer(SConnBuffer* connBuf, uv_buf_t* uvBuf) { uvBuf->base = p->buf; uvBuf->len = CAPACITY; + } else if (p->total == -1 && p->len < CAPACITY) { + uvBuf->base = p->buf + p->len; + uvBuf->len = CAPACITY - p->len; } else { p->cap = p->total; p->buf = realloc(p->buf, p->cap); + uvBuf->base = p->buf + p->len; uvBuf->len = p->cap - p->len; } diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 9e68b0bf7b..593d0acfc5 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -19,11 +19,10 @@ typedef struct SSrvConn { T_REF_DECLARE() - uv_tcp_t* pTcp; - uv_write_t* pWriter; - uv_timer_t* pTimer; + uv_tcp_t* pTcp; + uv_write_t pWriter; + uv_timer_t pTimer; - // uv_async_t* pWorkerAsync; queue queue; int ref; int persist; // persist connection or not @@ -65,7 +64,7 @@ typedef struct SWorkThrdObj { queue conn; pthread_mutex_t msgMtx; void* pTransInst; - bool stop; + bool quit; } SWorkThrdObj; typedef struct SServerObj { @@ -236,7 +235,7 @@ static void uvHandleReq(SSrvConn* pConn) { inet_ntoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), inet_ntoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), rpcMsg.contLen); (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); - // uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); + // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); // auth // validate msg type } @@ -312,6 +311,7 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { } else { tError("fail to dispatch conn to work thread"); } + free(req); } static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { @@ -349,8 +349,8 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) { uvPrepareSendData(smsg, &wb); SSrvConn* pConn = smsg->pConn; - uv_timer_stop(pConn->pTimer); - uv_write(pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb); + uv_timer_stop(&pConn->pTimer); + uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnWriteCb); } static void uvStartSendResp(SSrvMsg* smsg) { // impl @@ -417,8 +417,8 @@ void uvWorkerAsyncCb(uv_async_t* handle) { uv_stop(pThrd->loop); } else { destroyAllConn(pThrd); - uv_loop_close(pThrd->loop); - pThrd->stop = true; + // uv_loop_close(pThrd->loop); + pThrd->quit = true; } } else { uvStartSendResp(msg); @@ -493,9 +493,8 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { pConn->pTransInst = pThrd->pTransInst; /* init conn timer*/ - pConn->pTimer = malloc(sizeof(uv_timer_t)); - uv_timer_init(pThrd->loop, pConn->pTimer); - pConn->pTimer->data = pConn; + uv_timer_init(pThrd->loop, &pConn->pTimer); + pConn->pTimer.data = pConn; pConn->hostThrd = pThrd; @@ -504,8 +503,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_tcp_init(pThrd->loop, pConn->pTcp); pConn->pTcp->data = pConn; - pConn->pWriter = calloc(1, sizeof(uv_write_t)); - pConn->pWriter->data = pConn; + pConn->pWriter.data = pConn; transSetConnOption((uv_tcp_t*)pConn->pTcp); @@ -633,17 +631,20 @@ static void destroyConn(SSrvConn* conn, bool clear) { } } static void uvDestroyConn(uv_handle_t* handle) { - SSrvConn* conn = handle->data; + SSrvConn* conn = handle->data; + if (conn == NULL) { + return; + } SWorkThrdObj* thrd = conn->hostThrd; tDebug("server conn %p destroy", conn); - uv_timer_stop(conn->pTimer); + uv_timer_stop(&conn->pTimer); QUEUE_REMOVE(&conn->queue); free(conn->pTcp); - free(conn->pWriter); - free(conn); + // free(conn); - if (thrd->stop && QUEUE_IS_EMPTY(&thrd->conn)) { + if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { + uv_loop_close(thrd->loop); uv_stop(thrd->loop); } } @@ -680,7 +681,7 @@ void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, for (int i = 0; i < srv->numOfThreads; i++) { SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj)); - thrd->stop = false; + thrd->quit = false; srv->pThreadObj[i] = thrd; srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); -- GitLab