diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 462debb2472c3d848a6720f0fd6c7f5f12baf586..5fb2980cebc2b45a5f83c29c2403a4a15c841db2 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -57,7 +57,7 @@ typedef struct { void* parent; void* tcphandle; // returned handle from TCP initialization - int32_t refMgt; + int64_t refId; TdThreadMutex mutex; } SRpcInfo; diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index c970440d47a386c09e6d6ace7e5ba2da0862c042..48e7d7c91db116f88df001da7d425c7608c31aec 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -76,12 +76,16 @@ void* rpcOpen(const SRpcInit* pInit) { if (pInit->user) { memcpy(pRpc->user, pInit->user, strlen(pInit->user)); } - int64_t refId = taosAddRef(transGetInstMgt(), pRpc); + + int64_t refId = transAddExHandle(transGetInstMgt(), pRpc); + transAcquireExHandle(transGetInstMgt(), refId); + pRpc->refId = refId; return (void*)refId; } void rpcClose(void* arg) { tInfo("start to close rpc"); - taosRemoveRef(transGetInstMgt(), (int64_t)arg); + transRemoveExHandle(transGetInstMgt(), (int64_t)arg); + transReleaseExHandle(transGetInstMgt(), (int64_t)arg); tInfo("finish to close rpc"); return; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index bda40cbc2a7e14a735c26889ac02bff9b8f784cf..f948461c40bb50d94a5f101db70bc49f55803a23 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -47,6 +47,7 @@ typedef struct SCliMsg { queue q; STransMsgType type; + int64_t refId; uint64_t st; int sent; //(0: no send, 1: alread sent) } SCliMsg; @@ -606,11 +607,9 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { if (clear) { if (!uv_is_closing((uv_handle_t*)conn->stream)) { + uv_read_stop(conn->stream); uv_close((uv_handle_t*)conn->stream, cliDestroy); } - //} else { - // cliDestroy((uv_handle_t*)conn->stream); - //} } } static void cliDestroy(uv_handle_t* handle) { @@ -635,7 +634,6 @@ static bool cliHandleNoResp(SCliConn* conn) { SCliMsg* pMsg = transQueueGet(&conn->cliMsgs, 0); if (REQUEST_NO_RESP(&pMsg->msg)) { transQueuePop(&conn->cliMsgs); - // taosArrayRemove(msgs, 0); destroyCmsg(pMsg); res = true; } @@ -979,6 +977,7 @@ void cliSendQuit(SCliThrd* thrd) { } void cliWalkCb(uv_handle_t* handle, void* arg) { if (!uv_is_closing(handle)) { + uv_read_stop((uv_stream_t*)handle); uv_close(handle, cliDestroy); } } @@ -1213,6 +1212,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); cliMsg->type = Normal; + cliMsg->refId = (int64_t)shandle; STraceId* trace = &pReq->info.traceId; tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid, @@ -1250,6 +1250,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM cliMsg->msg = *pReq; cliMsg->st = taosGetTimestampUs(); cliMsg->type = Normal; + cliMsg->refId = (int64_t)shandle; STraceId* trace = &pReq->info.traceId; tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid, @@ -1283,6 +1284,7 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { SCliMsg* cliMsg = taosMemoryCalloc(1, sizeof(SCliMsg)); cliMsg->ctx = pCtx; cliMsg->type = Update; + cliMsg->refId = (int64_t)shandle; SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; tDebug("%s update epset at thread:%08" PRId64 "", pTransInst->label, thrd->pid); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 676985b31c94bd54a3599d8cd12c77a2f00d5ebf..6172f4ad5b8792891d0aaf2de995b36441df7afa 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -19,7 +19,7 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static int32_t refMgt; -int32_t instMgt; +static int32_t instMgt; int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { T_MD5_CTX context;