diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 03b97b6fa1a948768c74316f66c1e7eaaa670986..9e53811fd381a26b7fd35997041f63069a505761 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -195,7 +195,7 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) void transDestroyAsyncPool(SAsyncPool* pool) { for (int i = 0; i < pool->nAsync; i++) { uv_async_t* async = &(pool->asyncs[i]); - uv_close((uv_handle_t*)async, NULL); + // uv_close((uv_handle_t*)async, NULL); SAsyncItem* item = async->data; taosThreadMutexDestroy(&item->mtx); taosMemoryFree(item); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 50ce0bddcd96ba82045c4775823944ef49a0e325..2d37dfa37cb7f20222b2f3908ad6f0418791595a 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -146,7 +146,8 @@ static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd); -static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, uvHandleRegister}; +static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, + uvHandleRegister}; static void uvDestroyConn(uv_handle_t* handle); @@ -209,12 +210,13 @@ static void uvHandleReq(SSrvConn* pConn) { } if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); - tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), - ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen); + tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), + taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), + ntohs(pConn->locaddr.sin_port), transMsg.contLen); } else { - tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn, TMSG_INFO(transMsg.msgType), - taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), - transMsg.contLen, pHead->noResp); + tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn, + TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), + taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp); // no ref here } @@ -354,8 +356,9 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { char* msg = (char*)pHead; int32_t len = transMsgLenFromCont(pMsg->contLen); - tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), - ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); + tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType), + taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), + ntohs(pConn->locaddr.sin_port)); pHead->msgLen = htonl(len); wb->base = msg; @@ -685,9 +688,9 @@ static void uvDestroyConn(uv_handle_t* handle) { if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { tTrace("work thread quit"); - // uv_walk(thrd->loop, uvWalkCb, NULL); - uv_loop_close(thrd->loop); - uv_stop(thrd->loop); + uv_walk(thrd->loop, uvWalkCb, NULL); + // uv_loop_close(thrd->loop); + // uv_stop(thrd->loop); } } @@ -749,9 +752,9 @@ End: void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { thrd->quit = true; if (QUEUE_IS_EMPTY(&thrd->conn)) { - // uv_walk(thrd->loop, uvWalkCb, NULL); - uv_loop_close(thrd->loop); - uv_stop(thrd->loop); + uv_walk(thrd->loop, uvWalkCb, NULL); + // uv_loop_close(thrd->loop); + // uv_stop(thrd->loop); } else { destroyAllConn(thrd); } @@ -802,7 +805,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { return; } taosThreadJoin(pThrd->thread, NULL); - // MAKE_VALGRIND_HAPPY(pThrd->loop); + MAKE_VALGRIND_HAPPY(pThrd->loop); transDestroyAsyncPool(pThrd->asyncPool); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd);