From c166bcff3cd8a5c4eb8f5cd0b3fab7f5aa5acc12 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 1 Jun 2022 15:35:14 +0800 Subject: [PATCH] enh: refactor trans code --- source/libs/transport/src/trans.c | 1 - source/libs/transport/src/transCli.c | 12 +++++++ source/libs/transport/src/transComm.c | 2 +- source/libs/transport/src/transSvr.c | 47 +++------------------------ 4 files changed, 18 insertions(+), 44 deletions(-) diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 6f6f335ce1..925de2f321 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -76,7 +76,6 @@ void* rpcOpen(const SRpcInit* pInit) { if (pInit->user) { memcpy(pRpc->user, pInit->user, strlen(pInit->user)); } - // pRpc->refMgt = transOpenExHandleMgt(50000); return pRpc; } void rpcClose(void* arg) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index a8e79266ac..d82b1dc540 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -15,6 +15,9 @@ #ifdef USE_UV #include "transComm.h" +static int32_t transSCliInst = 0; +static int32_t refMgt = 0; + typedef struct SCliConn { T_REF_DECLARE() uv_connect_t connReq; @@ -846,6 +849,11 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, } cli->pThreadObj[i] = pThrd; } + int ref = atomic_add_fetch_32(&transSCliInst, 1); + if (ref == 1) { + refMgt = transOpenExHandleMgt(50000); + } + return cli; } @@ -1019,6 +1027,10 @@ void transCloseClient(void* arg) { } taosMemoryFree(cli->pThreadObj); taosMemoryFree(cli); + int ref = atomic_sub_fetch_32(&transSCliInst, 1); + if (ref == 0) { + transCloseExHandleMgt(refMgt); + } } void transRefCliHandle(void* handle) { if (handle == NULL) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index d962ceb142..a04e8b5fca 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -472,8 +472,8 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) { } void transInitEnv() { + // uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1"); - // uvOpenExHandleMgt(10000); } int32_t transOpenExHandleMgt(int size) { // added into once later diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 479cee63af..608fd00b2c 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -20,7 +20,7 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static char* notify = "a"; -static int tranSSvrInst = 0; +static int32_t tranSSvrInst = 0; static int32_t refMgt = 0; typedef struct { @@ -878,8 +878,8 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, uv_loop_init(srv->loop); // taosThreadOnce(&transModuleInit, uvInitEnv); - tranSSvrInst++; - if (tranSSvrInst == 1) { + int ref = atomic_add_fetch_32(&tranSSvrInst, 1); + if (ref == 1) { refMgt = transOpenExHandleMgt(50000); } @@ -941,43 +941,6 @@ End: return NULL; } -// void uvInitEnv() { -// uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1"); -// uvOpenExHandleMgt(10000); -//} -// void uvOpenExHandleMgt(int size) { -// // added into once later -// exHandlesMgt = taosOpenRef(size, uvDestoryExHandle); -//} -// void uvCloseExHandleMgt() { -// // close ref -// taosCloseRef(exHandlesMgt); -//} -// int64_t uvAddExHandle(void* p) { -// // acquire extern handle -// return taosAddRef(exHandlesMgt, p); -//} -// int32_t uvRemoveExHandle(int64_t refId) { -// // acquire extern handle -// return taosRemoveRef(exHandlesMgt, refId); -//} -// -// SExHandle* uvAcquireExHandle(int64_t refId) { -// // acquire extern handle -// return (SExHandle*)taosAcquireRef(exHandlesMgt, refId); -//} -// -// int32_t uvReleaseExHandle(int64_t refId) { -// // release extern handle -// return taosReleaseRef(exHandlesMgt, refId); -//} -// void uvDestoryExHandle(void* handle) { -// if (handle == NULL) { -// return; -// } -// taosMemoryFree(handle); -//} - void uvHandleQuit(SSvrMsg* msg, SWorkThrdObj* thrd) { thrd->quit = true; if (QUEUE_IS_EMPTY(&thrd->conn)) { @@ -1072,8 +1035,8 @@ void transCloseServer(void* arg) { taosMemoryFree(srv); - tranSSvrInst--; - if (tranSSvrInst == 0) { + int ref = atomic_sub_fetch_32(&tranSSvrInst, 1); + if (ref == 0) { // TdThreadOnce tmpInit = PTHREAD_ONCE_INIT; // memcpy(&transModuleInit, &tmpInit, sizeof(TdThreadOnce)); transCloseExHandleMgt(refMgt); -- GitLab