diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9dc12a3dece380e15f6ee410047793de77c4984b..8eb1a3ee7d2fe5cf844c66be3452283568946abe 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -129,6 +129,15 @@ static void transDestroyConnCtx(STransConnCtx* ctx); static SCliThrdObj* createThrdObj(); static void destroyThrdObj(SCliThrdObj* pThrd); +static void cliWalkCb(uv_handle_t* handle, void* arg); + +#define CLI_RELEASE_UV(loop) \ + do { \ + uv_walk(loop, cliWalkCb, NULL); \ + uv_run(loop, UV_RUN_DEFAULT); \ + uv_loop_close(loop); \ + } while (0); + // snprintf may cause performance problem #define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \ do { \ @@ -212,8 +221,10 @@ static void destroyThrdObj(SCliThrdObj* pThrd); } \ } while (0) -#define CONN_NO_PERSIST_BY_APP(conn) (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) -#define CONN_RELEASE_BY_SERVER(conn) (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) +#define CONN_NO_PERSIST_BY_APP(conn) \ + (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) +#define CONN_RELEASE_BY_SERVER(conn) \ + (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) #define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) #define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) @@ -288,8 +299,9 @@ void cliHandleResp(SCliConn* conn) { tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); } - tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, TMSG_INFO(pHead->msgType), - taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen); + tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, + TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), + taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen); conn->secured = pHead->secured; @@ -355,10 +367,12 @@ void cliHandleExcept(SCliConn* pConn) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); - tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle, TMSG_INFO(transMsg.msgType)); + tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle, + TMSG_INFO(transMsg.msgType)); if (transMsg.ahandle == NULL) { transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType)); - tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle); + tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, + transMsg.ahandle); } } else { transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; @@ -631,8 +645,9 @@ void cliSend(SCliConn* pConn) { pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType), - taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); + tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, + TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), + taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); if (pHead->persist == 1) { CONN_SET_PERSIST_BY_APP(pConn); @@ -671,9 +686,11 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { destroyCmsg(pMsg); destroyConnPool(pThrd->pool); uv_timer_stop(&pThrd->timer); + uv_walk(pThrd->loop, cliWalkCb, NULL); + pThrd->quit = true; - uv_stop(pThrd->loop); + // uv_stop(pThrd->loop); } static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = pMsg->msg.handle; @@ -786,7 +803,6 @@ static void* cliWorkThread(void* arg) { SCliThrdObj* pThrd = (SCliThrdObj*)arg; setThreadName("trans-cli-work"); uv_run(pThrd->loop, UV_RUN_DEFAULT); - return NULL; } @@ -851,8 +867,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd) { if (pThrd == NULL) { return; } - uv_stop(pThrd->loop); taosThreadJoin(pThrd->thread, NULL); + CLI_RELEASE_UV(pThrd->loop); taosThreadMutexDestroy(&pThrd->msgMtx); transDestroyAsyncPool(pThrd->asyncPool); @@ -874,6 +890,11 @@ void cliSendQuit(SCliThrdObj* thrd) { msg->type = Quit; transSendAsync(thrd->asyncPool, &msg->q); } +void cliWalkCb(uv_handle_t* handle, void* arg) { + if (!uv_is_closing(handle)) { + uv_close(handle, NULL); + } +} int cliRBChoseIdx(STrans* pTransInst) { int64_t index = pTransInst->index; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 2d37dfa37cb7f20222b2f3908ad6f0418791595a..158d599bdfb183c4c1c5d1f4942d2d0898d2fbda 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -93,27 +93,6 @@ typedef struct SServerObj { static const char* notify = "a"; -#define CONN_SHOULD_RELEASE(conn, head) \ - do { \ - if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ - conn->status = ConnRelease; \ - transClearBuffer(&conn->readBuf); \ - transFreeMsg(transContFromHead((char*)head)); \ - tTrace("server conn %p received release request", conn); \ - \ - STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \ - SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \ - srvMsg->msg = tmsg; \ - srvMsg->type = Release; \ - srvMsg->pConn = conn; \ - if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ - return; \ - } \ - uvStartSendRespInternal(srvMsg); \ - return; \ - } \ - } while (0) - static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); @@ -125,11 +104,8 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) static void uvWorkerAsyncCb(uv_async_t* handle); static void uvAcceptAsyncCb(uv_async_t* handle); static void uvShutDownCb(uv_shutdown_t* req, int status); - -static void uvFreeCb(uv_handle_t* handle) { - // - taosMemoryFree(handle); -} +static void uvWalkCb(uv_handle_t* handle, void* arg); +static void uvFreeCb(uv_handle_t* handle); static void uvStartSendRespInternal(SSrvMsg* smsg); static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb); @@ -159,6 +135,34 @@ static void* transAcceptThread(void* arg); static bool addHandleToWorkloop(void* arg); static bool addHandleToAcceptloop(void* arg); +#define CONN_SHOULD_RELEASE(conn, head) \ + do { \ + if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ + conn->status = ConnRelease; \ + transClearBuffer(&conn->readBuf); \ + transFreeMsg(transContFromHead((char*)head)); \ + tTrace("server conn %p received release request", conn); \ + \ + STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \ + SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \ + srvMsg->msg = tmsg; \ + srvMsg->type = Release; \ + srvMsg->pConn = conn; \ + if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ + return; \ + } \ + uvStartSendRespInternal(srvMsg); \ + return; \ + } \ + } while (0) + +#define SRV_RELEASE_UV(loop) \ + do { \ + uv_walk(loop, uvWalkCb, NULL); \ + uv_run(loop, UV_RUN_DEFAULT); \ + uv_loop_close(loop); \ + } while (0); + void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SSrvConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; @@ -370,7 +374,7 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) { uvPrepareSendData(smsg, &wb); SSrvConn* pConn = smsg->pConn; - uv_timer_stop(&pConn->pTimer); + // uv_timer_stop(&pConn->pTimer); uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb); } static void uvStartSendResp(SSrvMsg* smsg) { @@ -439,36 +443,17 @@ void uvWorkerAsyncCb(uv_async_t* handle) { static void uvWalkCb(uv_handle_t* handle, void* arg) { if (!uv_is_closing(handle)) { uv_close(handle, NULL); - // uv_unref(handle); - tDebug("handle: %p -----test----", handle); } } -#define MAKE_VALGRIND_HAPPY(loop) \ - do { \ - uv_walk(loop, uvWalkCb, NULL); \ - uv_run(loop, UV_RUN_DEFAULT); \ - uv_loop_close(loop); \ - } while (0); +static void uvFreeCb(uv_handle_t* handle) { + // + taosMemoryFree(handle); +} static void uvAcceptAsyncCb(uv_async_t* async) { SServerObj* srv = async->data; tDebug("close server port %d", srv->port); uv_walk(srv->loop, uvWalkCb, NULL); - // uv_close((uv_handle_t*)async, NULL); - // uv_close((uv_handle_t*)&srv->server, NULL); - // uv_stop(srv->loop); - // uv_print_all_handles(srv->loop, stderr); - // int ref = uv_loop_alive(srv->loop); - // assert(ref == 0); - // tError("active size %d", ref); - // uv_stop(srv->loop); - // uv_run(srv->loop, UV_RUN_DEFAULT); - // fprintf(stderr, "------------------------------------"); - // uv_print_all_handles(srv->loop, stderr); - - // int ret = uv_loop_close(srv->loop); - // tError("(loop)->active_reqs.count: %d, ret: %d", (srv->loop)->active_reqs.count, ret); - // assert(ret == 0); } static void uvShutDownCb(uv_shutdown_t* req, int status) { @@ -535,8 +520,8 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { pConn->pTransInst = pThrd->pTransInst; /* init conn timer*/ - uv_timer_init(pThrd->loop, &pConn->pTimer); - pConn->pTimer.data = pConn; + // uv_timer_init(pThrd->loop, &pConn->pTimer); + // pConn->pTimer.data = pConn; pConn->hostThrd = pThrd; @@ -680,11 +665,11 @@ static void uvDestroyConn(uv_handle_t* handle) { SWorkThrdObj* thrd = conn->hostThrd; tDebug("server conn %p destroy", conn); - uv_timer_stop(&conn->pTimer); + // uv_timer_stop(&conn->pTimer); transQueueDestroy(&conn->srvMsgs); QUEUE_REMOVE(&conn->queue); taosMemoryFree(conn->pTcp); - // taosMemoryFree(conn); + taosMemoryFree(conn); if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { tTrace("work thread quit"); @@ -805,7 +790,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { return; } taosThreadJoin(pThrd->thread, NULL); - MAKE_VALGRIND_HAPPY(pThrd->loop); + SRV_RELEASE_UV(pThrd->loop); transDestroyAsyncPool(pThrd->asyncPool); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); @@ -825,7 +810,7 @@ void transCloseServer(void* arg) { uv_async_send(srv->pAcceptAsync); taosThreadJoin(srv->thread, NULL); - MAKE_VALGRIND_HAPPY(srv->loop); + SRV_RELEASE_UV(srv->loop); for (int i = 0; i < srv->numOfThreads; i++) { sendQuitToWorkThrd(srv->pThreadObj[i]);