diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 963a85922f340d86bea58b30a2dd5cabba17d8b5..a593dcf8172968d591824b604579394f706770b3 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -252,7 +252,7 @@ int transSendAsync(SAsyncPool* pool, queue* mq); do { \ if (id > 0) { \ tTrace("handle step1"); \ - SExHandle* exh2 = transAcquireExHandle(refMgt, id); \ + SExHandle* exh2 = transAcquireExHandle(id); \ if (exh2 == NULL || id != exh2->refId) { \ tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \ exh2 ? exh2->refId : 0, id); \ @@ -260,7 +260,7 @@ int transSendAsync(SAsyncPool* pool, queue* mq); } \ } else if (id == 0) { \ tTrace("handle step2"); \ - SExHandle* exh2 = transAcquireExHandle(refMgt, id); \ + SExHandle* exh2 = transAcquireExHandle(id); \ if (exh2 == NULL || id == exh2->refId) { \ tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, id, \ exh2 ? exh2->refId : 0); \ @@ -273,6 +273,7 @@ int transSendAsync(SAsyncPool* pool, queue* mq); goto _return2; \ } \ } while (0) + int transInitBuffer(SConnBuffer* buf); int transClearBuffer(SConnBuffer* buf); int transDestroyBuffer(SConnBuffer* buf); @@ -390,13 +391,15 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b); */ void transThreadOnce(); -void transInitEnv(); +void transInit(); +void transCleanup(); + int32_t transOpenExHandleMgt(int size); -void transCloseExHandleMgt(int32_t mgt); -int64_t transAddExHandle(int32_t mgt, void* p); -int32_t transRemoveExHandle(int32_t mgt, int64_t refId); -SExHandle* transAcquireExHandle(int32_t mgt, int64_t refId); -int32_t transReleaseExHandle(int32_t mgt, int64_t refId); +void transCloseExHandleMgt(); +int64_t transAddExHandle(void* p); +int32_t transRemoveExHandle(int64_t refId); +SExHandle* transAcquireExHandle(int64_t refId); +int32_t transReleaseExHandle(int64_t refId); void transDestoryExHandle(void* handle); #ifdef __cplusplus diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 4f7b19b5391c35e723a97d5f331db19297c256ef..d48bd858322590e83937ee09fb35a2ba5ca3181a 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -36,7 +36,7 @@ static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) { return 0; } void* rpcOpen(const SRpcInit* pInit) { - transInitEnv(); + rpcInit(); SRpcInfo* pRpc = taosMemoryCalloc(1, sizeof(SRpcInfo)); if (pRpc == NULL) { @@ -82,7 +82,6 @@ void rpcClose(void* arg) { tInfo("start to close rpc"); SRpcInfo* pRpc = (SRpcInfo*)arg; (*taosCloseHandle[pRpc->connType])(pRpc->tcphandle); - transCloseExHandleMgt(pRpc->refMgt); taosMemoryFree(pRpc); return; @@ -170,11 +169,13 @@ void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { //} int32_t rpcInit() { + transInit(); // impl later return 0; } void rpcCleanup(void) { // impl later + transCleanup(); return; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 7374d1fffc95ba06700a272a65826575d154ad17..c960cb4c63c3278de3dad337dbf933e0cf500432 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -15,9 +15,6 @@ #ifdef USE_UV #include "transComm.h" -static int32_t transSCliInst = 0; -static int32_t refMgt = 0; - typedef struct SCliConn { T_REF_DECLARE() uv_connect_t connReq; @@ -503,12 +500,12 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { } static void allocConnRef(SCliConn* conn, bool update) { if (update) { - transRemoveExHandle(refMgt, conn->refId); + transRemoveExHandle(conn->refId); } SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); exh->handle = conn; exh->pThrd = conn->hostThrd; - exh->refId = transAddExHandle(refMgt, exh); + exh->refId = transAddExHandle(exh); conn->refId = exh->refId; } static void addConnToPool(void* pool, SCliConn* conn) { @@ -602,9 +599,13 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); QUEUE_REMOVE(&conn->conn); QUEUE_INIT(&conn->conn); - transRemoveExHandle(refMgt, conn->refId); + transRemoveExHandle(conn->refId); if (clear) { - uv_close((uv_handle_t*)conn->stream, cliDestroy); + if (uv_is_active((uv_handle_t*)conn->stream)) { + uv_close((uv_handle_t*)conn->stream, cliDestroy); + } else { + cliDestroy((uv_handle_t*)conn->stream); + } } } static void cliDestroy(uv_handle_t* handle) { @@ -735,7 +736,7 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { } static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { int64_t refId = (int64_t)(pMsg->msg.info.handle); - SExHandle* exh = transAcquireExHandle(refMgt, refId); + SExHandle* exh = transAcquireExHandle(refId); if (exh == NULL) { tDebug("%" PRId64 " already release", refId); } @@ -761,7 +762,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { SCliConn* conn = NULL; int64_t refId = (int64_t)(pMsg->msg.info.handle); if (refId != 0) { - SExHandle* exh = transAcquireExHandle(refMgt, refId); + SExHandle* exh = transAcquireExHandle(refId); if (exh == NULL) { *ignore = true; destroyCmsg(pMsg); @@ -769,7 +770,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { // assert(0); } else { conn = exh->handle; - transReleaseExHandle(refMgt, refId); + transReleaseExHandle(refId); } return conn; }; @@ -899,10 +900,6 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, } cli->pThreadObj[i] = pThrd; } - int ref = atomic_add_fetch_32(&transSCliInst, 1); - if (ref == 1) { - refMgt = transOpenExHandleMgt(50000); - } return cli; } @@ -1086,10 +1083,6 @@ void transCloseClient(void* arg) { } taosMemoryFree(cli->pThreadObj); taosMemoryFree(cli); - int ref = atomic_sub_fetch_32(&transSCliInst, 1); - if (ref == 0) { - transCloseExHandleMgt(refMgt); - } } void transRefCliHandle(void* handle) { if (handle == NULL) { @@ -1111,12 +1104,12 @@ void transUnrefCliHandle(void* handle) { } SCliThrd* transGetWorkThrdFromHandle(int64_t handle) { SCliThrd* pThrd = NULL; - SExHandle* exh = transAcquireExHandle(refMgt, handle); + SExHandle* exh = transAcquireExHandle(handle); if (exh == NULL) { return NULL; } pThrd = exh->pThrd; - transReleaseExHandle(refMgt, handle); + transReleaseExHandle(handle); return pThrd; } SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index bff7d79bd38fd6cd8afabf6b005e17477090bcf8..bc158bb31664695bf87d6cc9aecc8421a004151a 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -16,7 +16,9 @@ #include "transComm.h" -// static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; +static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; + +static int32_t refMgt; int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { T_MD5_CTX context; @@ -478,35 +480,47 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) { return true; } -void transInitEnv() { - // +static void transInitEnv() { + refMgt = transOpenExHandleMgt(50000); uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1"); } +static void transDestroyEnv() { + // close ref + transCloseExHandleMgt(refMgt); +} +void transInit() { + // init env + taosThreadOnce(&transModuleInit, transInitEnv); +} +void transCleanup() { + // clean env + transDestroyEnv(); +} int32_t transOpenExHandleMgt(int size) { // added into once later return taosOpenRef(size, transDestoryExHandle); } -void transCloseExHandleMgt(int32_t mgt) { +void transCloseExHandleMgt() { // close ref - taosCloseRef(mgt); + taosCloseRef(refMgt); } -int64_t transAddExHandle(int32_t mgt, void* p) { +int64_t transAddExHandle(void* p) { // acquire extern handle - return taosAddRef(mgt, p); + return taosAddRef(refMgt, p); } -int32_t transRemoveExHandle(int32_t mgt, int64_t refId) { +int32_t transRemoveExHandle(int64_t refId) { // acquire extern handle - return taosRemoveRef(mgt, refId); + return taosRemoveRef(refMgt, refId); } -SExHandle* transAcquireExHandle(int32_t mgt, int64_t refId) { +SExHandle* transAcquireExHandle(int64_t refId) { // acquire extern handle - return (SExHandle*)taosAcquireRef(mgt, refId); + return (SExHandle*)taosAcquireRef(refMgt, refId); } -int32_t transReleaseExHandle(int32_t mgt, int64_t refId) { +int32_t transReleaseExHandle(int64_t refId) { // release extern handle - return taosReleaseRef(mgt, refId); + return taosReleaseRef(refMgt, refId); } void transDestoryExHandle(void* handle) { if (handle == NULL) { diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 892d32696eba25a909b60ba0f5a37368c2b7186c..a78adfb397777fc9b99d8bb600dd4a904495764e 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -19,9 +19,7 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; -static char* notify = "a"; -static int32_t tranSSvrInst = 0; -static int32_t refMgt = 0; +static char* notify = "a"; typedef struct { int notifyCount; // @@ -274,7 +272,7 @@ static void uvHandleReq(SSvrConn* pConn) { // 2. once send out data, cli conn released to conn pool immediately // 3. not mixed with persist transMsg.info.ahandle = (void*)pHead->ahandle; - transMsg.info.handle = (void*)transAcquireExHandle(refMgt, pConn->refId); + transMsg.info.handle = (void*)transAcquireExHandle(pConn->refId); transMsg.info.refId = pConn->refId; transMsg.info.traceId = pHead->traceId; @@ -292,7 +290,7 @@ static void uvHandleReq(SSvrConn* pConn) { pConnInfo->clientPort = ntohs(pConn->addr.sin_port); tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user)); - transReleaseExHandle(refMgt, pConn->refId); + transReleaseExHandle(pConn->refId); STrans* pTransInst = pConn->pTransInst; (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); @@ -523,15 +521,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) { SExHandle* exh1 = transMsg.info.handle; int64_t refId = transMsg.info.refId; - SExHandle* exh2 = transAcquireExHandle(refMgt, refId); + SExHandle* exh2 = transAcquireExHandle(refId); if (exh2 == NULL || exh1 != exh2) { tTrace("handle except msg %p, ignore it", exh1); - transReleaseExHandle(refMgt, refId); + transReleaseExHandle(refId); destroySmsg(msg); continue; } msg->pConn = exh1->handle; - transReleaseExHandle(refMgt, refId); + transReleaseExHandle(refId); (*transAsyncHandle[msg->type])(msg, pThrd); } } @@ -773,8 +771,8 @@ static SSvrConn* createConn(void* hThrd) { SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); exh->handle = pConn; exh->pThrd = pThrd; - exh->refId = transAddExHandle(refMgt, exh); - transAcquireExHandle(refMgt, exh->refId); + exh->refId = transAddExHandle(exh); + transAcquireExHandle(exh->refId); pConn->refId = exh->refId; transRefSrvHandle(pConn); @@ -803,14 +801,14 @@ static void destroyConnRegArg(SSvrConn* conn) { } } static int reallocConnRef(SSvrConn* conn) { - transReleaseExHandle(refMgt, conn->refId); - transRemoveExHandle(refMgt, conn->refId); + transReleaseExHandle(conn->refId); + transRemoveExHandle(conn->refId); // avoid app continue to send msg on invalid handle SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); exh->handle = conn; exh->pThrd = conn->hostThrd; - exh->refId = transAddExHandle(refMgt, exh); - transAcquireExHandle(refMgt, exh->refId); + exh->refId = transAddExHandle(exh); + transAcquireExHandle(exh->refId); conn->refId = exh->refId; return 0; @@ -822,8 +820,8 @@ static void uvDestroyConn(uv_handle_t* handle) { } SWorkThrd* thrd = conn->hostThrd; - transReleaseExHandle(refMgt, conn->refId); - transRemoveExHandle(refMgt, conn->refId); + transReleaseExHandle(conn->refId); + transRemoveExHandle(conn->refId); tDebug("%s conn %p destroy", transLabel(thrd->pTransInst), conn); // uv_timer_stop(&conn->pTimer); @@ -871,12 +869,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->port = port; uv_loop_init(srv->loop); - // taosThreadOnce(&transModuleInit, uvInitEnv); - int ref = atomic_add_fetch_32(&tranSSvrInst, 1); - if (ref == 1) { - refMgt = transOpenExHandleMgt(50000); - } - assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0)); #ifdef WINDOWS char pipeName[64]; @@ -1028,11 +1020,6 @@ void transCloseServer(void* arg) { taosMemoryFree(srv->pipe); taosMemoryFree(srv); - - int ref = atomic_sub_fetch_32(&tranSSvrInst, 1); - if (ref == 0) { - transCloseExHandleMgt(refMgt); - } } void transRefSrvHandle(void* handle) { @@ -1071,11 +1058,11 @@ void transReleaseSrvHandle(void* handle) { tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle); transSendAsync(pThrd->asyncPool, &m->q); - transReleaseExHandle(refMgt, refId); + transReleaseExHandle(refId); return; _return1: tTrace("handle %p failed to send to release handle", exh); - transReleaseExHandle(refMgt, refId); + transReleaseExHandle(refId); return; _return2: tTrace("handle %p failed to send to release handle", exh); @@ -1100,12 +1087,12 @@ void transSendResponse(const STransMsg* msg) { STraceId* trace = (STraceId*)&msg->info.traceId; tGTrace("conn %p start to send resp (1/2)", exh->handle); transSendAsync(pThrd->asyncPool, &m->q); - transReleaseExHandle(refMgt, refId); + transReleaseExHandle(refId); return; _return1: tTrace("handle %p failed to send resp", exh); rpcFreeCont(msg->pCont); - transReleaseExHandle(refMgt, refId); + transReleaseExHandle(refId); return; _return2: tTrace("handle %p failed to send resp", exh); @@ -1129,13 +1116,13 @@ void transRegisterMsg(const STransMsg* msg) { tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle); transSendAsync(pThrd->asyncPool, &m->q); - transReleaseExHandle(refMgt, refId); + transReleaseExHandle(refId); return; _return1: tTrace("handle %p failed to register brokenlink", exh); rpcFreeCont(msg->pCont); - transReleaseExHandle(refMgt, refId); + transReleaseExHandle(refId); return; _return2: tTrace("handle %p failed to register brokenlink", exh);