提交 49b2c544 编写于 作者: dengyihao's avatar dengyihao

enh: change refMgt of rpc

上级 de06f055
...@@ -252,7 +252,7 @@ int transSendAsync(SAsyncPool* pool, queue* mq); ...@@ -252,7 +252,7 @@ int transSendAsync(SAsyncPool* pool, queue* mq);
do { \ do { \
if (id > 0) { \ if (id > 0) { \
tTrace("handle step1"); \ tTrace("handle step1"); \
SExHandle* exh2 = transAcquireExHandle(refMgt, id); \ SExHandle* exh2 = transAcquireExHandle(id); \
if (exh2 == NULL || id != exh2->refId) { \ if (exh2 == NULL || id != exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \ tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
exh2 ? exh2->refId : 0, id); \ exh2 ? exh2->refId : 0, id); \
...@@ -260,7 +260,7 @@ int transSendAsync(SAsyncPool* pool, queue* mq); ...@@ -260,7 +260,7 @@ int transSendAsync(SAsyncPool* pool, queue* mq);
} \ } \
} else if (id == 0) { \ } else if (id == 0) { \
tTrace("handle step2"); \ tTrace("handle step2"); \
SExHandle* exh2 = transAcquireExHandle(refMgt, id); \ SExHandle* exh2 = transAcquireExHandle(id); \
if (exh2 == NULL || id == exh2->refId) { \ if (exh2 == NULL || id == exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, id, \ tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, id, \
exh2 ? exh2->refId : 0); \ exh2 ? exh2->refId : 0); \
...@@ -273,6 +273,7 @@ int transSendAsync(SAsyncPool* pool, queue* mq); ...@@ -273,6 +273,7 @@ int transSendAsync(SAsyncPool* pool, queue* mq);
goto _return2; \ goto _return2; \
} \ } \
} while (0) } while (0)
int transInitBuffer(SConnBuffer* buf); int transInitBuffer(SConnBuffer* buf);
int transClearBuffer(SConnBuffer* buf); int transClearBuffer(SConnBuffer* buf);
int transDestroyBuffer(SConnBuffer* buf); int transDestroyBuffer(SConnBuffer* buf);
...@@ -390,13 +391,15 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b); ...@@ -390,13 +391,15 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b);
*/ */
void transThreadOnce(); void transThreadOnce();
void transInitEnv(); void transInit();
void transCleanup();
int32_t transOpenExHandleMgt(int size); int32_t transOpenExHandleMgt(int size);
void transCloseExHandleMgt(int32_t mgt); void transCloseExHandleMgt();
int64_t transAddExHandle(int32_t mgt, void* p); int64_t transAddExHandle(void* p);
int32_t transRemoveExHandle(int32_t mgt, int64_t refId); int32_t transRemoveExHandle(int64_t refId);
SExHandle* transAcquireExHandle(int32_t mgt, int64_t refId); SExHandle* transAcquireExHandle(int64_t refId);
int32_t transReleaseExHandle(int32_t mgt, int64_t refId); int32_t transReleaseExHandle(int64_t refId);
void transDestoryExHandle(void* handle); void transDestoryExHandle(void* handle);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -36,7 +36,7 @@ static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) { ...@@ -36,7 +36,7 @@ static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) {
return 0; return 0;
} }
void* rpcOpen(const SRpcInit* pInit) { void* rpcOpen(const SRpcInit* pInit) {
transInitEnv(); rpcInit();
SRpcInfo* pRpc = taosMemoryCalloc(1, sizeof(SRpcInfo)); SRpcInfo* pRpc = taosMemoryCalloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) { if (pRpc == NULL) {
...@@ -82,7 +82,6 @@ void rpcClose(void* arg) { ...@@ -82,7 +82,6 @@ void rpcClose(void* arg) {
tInfo("start to close rpc"); tInfo("start to close rpc");
SRpcInfo* pRpc = (SRpcInfo*)arg; SRpcInfo* pRpc = (SRpcInfo*)arg;
(*taosCloseHandle[pRpc->connType])(pRpc->tcphandle); (*taosCloseHandle[pRpc->connType])(pRpc->tcphandle);
transCloseExHandleMgt(pRpc->refMgt);
taosMemoryFree(pRpc); taosMemoryFree(pRpc);
return; return;
...@@ -170,11 +169,13 @@ void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { ...@@ -170,11 +169,13 @@ void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
//} //}
int32_t rpcInit() { int32_t rpcInit() {
transInit();
// impl later // impl later
return 0; return 0;
} }
void rpcCleanup(void) { void rpcCleanup(void) {
// impl later // impl later
transCleanup();
return; return;
} }
......
...@@ -15,9 +15,6 @@ ...@@ -15,9 +15,6 @@
#ifdef USE_UV #ifdef USE_UV
#include "transComm.h" #include "transComm.h"
static int32_t transSCliInst = 0;
static int32_t refMgt = 0;
typedef struct SCliConn { typedef struct SCliConn {
T_REF_DECLARE() T_REF_DECLARE()
uv_connect_t connReq; uv_connect_t connReq;
...@@ -503,12 +500,12 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { ...@@ -503,12 +500,12 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
} }
static void allocConnRef(SCliConn* conn, bool update) { static void allocConnRef(SCliConn* conn, bool update) {
if (update) { if (update) {
transRemoveExHandle(refMgt, conn->refId); transRemoveExHandle(conn->refId);
} }
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
exh->handle = conn; exh->handle = conn;
exh->pThrd = conn->hostThrd; exh->pThrd = conn->hostThrd;
exh->refId = transAddExHandle(refMgt, exh); exh->refId = transAddExHandle(exh);
conn->refId = exh->refId; conn->refId = exh->refId;
} }
static void addConnToPool(void* pool, SCliConn* conn) { static void addConnToPool(void* pool, SCliConn* conn) {
...@@ -602,9 +599,13 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { ...@@ -602,9 +599,13 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
QUEUE_REMOVE(&conn->conn); QUEUE_REMOVE(&conn->conn);
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->conn);
transRemoveExHandle(refMgt, conn->refId); transRemoveExHandle(conn->refId);
if (clear) { if (clear) {
uv_close((uv_handle_t*)conn->stream, cliDestroy); if (uv_is_active((uv_handle_t*)conn->stream)) {
uv_close((uv_handle_t*)conn->stream, cliDestroy);
} else {
cliDestroy((uv_handle_t*)conn->stream);
}
} }
} }
static void cliDestroy(uv_handle_t* handle) { static void cliDestroy(uv_handle_t* handle) {
...@@ -735,7 +736,7 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -735,7 +736,7 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
} }
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
int64_t refId = (int64_t)(pMsg->msg.info.handle); int64_t refId = (int64_t)(pMsg->msg.info.handle);
SExHandle* exh = transAcquireExHandle(refMgt, refId); SExHandle* exh = transAcquireExHandle(refId);
if (exh == NULL) { if (exh == NULL) {
tDebug("%" PRId64 " already release", refId); tDebug("%" PRId64 " already release", refId);
} }
...@@ -761,7 +762,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { ...@@ -761,7 +762,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
SCliConn* conn = NULL; SCliConn* conn = NULL;
int64_t refId = (int64_t)(pMsg->msg.info.handle); int64_t refId = (int64_t)(pMsg->msg.info.handle);
if (refId != 0) { if (refId != 0) {
SExHandle* exh = transAcquireExHandle(refMgt, refId); SExHandle* exh = transAcquireExHandle(refId);
if (exh == NULL) { if (exh == NULL) {
*ignore = true; *ignore = true;
destroyCmsg(pMsg); destroyCmsg(pMsg);
...@@ -769,7 +770,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { ...@@ -769,7 +770,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
// assert(0); // assert(0);
} else { } else {
conn = exh->handle; conn = exh->handle;
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refId);
} }
return conn; return conn;
}; };
...@@ -899,10 +900,6 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -899,10 +900,6 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
} }
cli->pThreadObj[i] = pThrd; cli->pThreadObj[i] = pThrd;
} }
int ref = atomic_add_fetch_32(&transSCliInst, 1);
if (ref == 1) {
refMgt = transOpenExHandleMgt(50000);
}
return cli; return cli;
} }
...@@ -1086,10 +1083,6 @@ void transCloseClient(void* arg) { ...@@ -1086,10 +1083,6 @@ void transCloseClient(void* arg) {
} }
taosMemoryFree(cli->pThreadObj); taosMemoryFree(cli->pThreadObj);
taosMemoryFree(cli); taosMemoryFree(cli);
int ref = atomic_sub_fetch_32(&transSCliInst, 1);
if (ref == 0) {
transCloseExHandleMgt(refMgt);
}
} }
void transRefCliHandle(void* handle) { void transRefCliHandle(void* handle) {
if (handle == NULL) { if (handle == NULL) {
...@@ -1111,12 +1104,12 @@ void transUnrefCliHandle(void* handle) { ...@@ -1111,12 +1104,12 @@ void transUnrefCliHandle(void* handle) {
} }
SCliThrd* transGetWorkThrdFromHandle(int64_t handle) { SCliThrd* transGetWorkThrdFromHandle(int64_t handle) {
SCliThrd* pThrd = NULL; SCliThrd* pThrd = NULL;
SExHandle* exh = transAcquireExHandle(refMgt, handle); SExHandle* exh = transAcquireExHandle(handle);
if (exh == NULL) { if (exh == NULL) {
return NULL; return NULL;
} }
pThrd = exh->pThrd; pThrd = exh->pThrd;
transReleaseExHandle(refMgt, handle); transReleaseExHandle(handle);
return pThrd; return pThrd;
} }
SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) { SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) {
......
...@@ -16,7 +16,9 @@ ...@@ -16,7 +16,9 @@
#include "transComm.h" #include "transComm.h"
// static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
static int32_t refMgt;
int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
T_MD5_CTX context; T_MD5_CTX context;
...@@ -478,35 +480,47 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) { ...@@ -478,35 +480,47 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
return true; return true;
} }
void transInitEnv() { static void transInitEnv() {
// refMgt = transOpenExHandleMgt(50000);
uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1"); uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
} }
static void transDestroyEnv() {
// close ref
transCloseExHandleMgt(refMgt);
}
void transInit() {
// init env
taosThreadOnce(&transModuleInit, transInitEnv);
}
void transCleanup() {
// clean env
transDestroyEnv();
}
int32_t transOpenExHandleMgt(int size) { int32_t transOpenExHandleMgt(int size) {
// added into once later // added into once later
return taosOpenRef(size, transDestoryExHandle); return taosOpenRef(size, transDestoryExHandle);
} }
void transCloseExHandleMgt(int32_t mgt) { void transCloseExHandleMgt() {
// close ref // close ref
taosCloseRef(mgt); taosCloseRef(refMgt);
} }
int64_t transAddExHandle(int32_t mgt, void* p) { int64_t transAddExHandle(void* p) {
// acquire extern handle // acquire extern handle
return taosAddRef(mgt, p); return taosAddRef(refMgt, p);
} }
int32_t transRemoveExHandle(int32_t mgt, int64_t refId) { int32_t transRemoveExHandle(int64_t refId) {
// acquire extern handle // acquire extern handle
return taosRemoveRef(mgt, refId); return taosRemoveRef(refMgt, refId);
} }
SExHandle* transAcquireExHandle(int32_t mgt, int64_t refId) { SExHandle* transAcquireExHandle(int64_t refId) {
// acquire extern handle // acquire extern handle
return (SExHandle*)taosAcquireRef(mgt, refId); return (SExHandle*)taosAcquireRef(refMgt, refId);
} }
int32_t transReleaseExHandle(int32_t mgt, int64_t refId) { int32_t transReleaseExHandle(int64_t refId) {
// release extern handle // release extern handle
return taosReleaseRef(mgt, refId); return taosReleaseRef(refMgt, refId);
} }
void transDestoryExHandle(void* handle) { void transDestoryExHandle(void* handle) {
if (handle == NULL) { if (handle == NULL) {
......
...@@ -19,9 +19,7 @@ ...@@ -19,9 +19,7 @@
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
static char* notify = "a"; static char* notify = "a";
static int32_t tranSSvrInst = 0;
static int32_t refMgt = 0;
typedef struct { typedef struct {
int notifyCount; // int notifyCount; //
...@@ -274,7 +272,7 @@ static void uvHandleReq(SSvrConn* pConn) { ...@@ -274,7 +272,7 @@ static void uvHandleReq(SSvrConn* pConn) {
// 2. once send out data, cli conn released to conn pool immediately // 2. once send out data, cli conn released to conn pool immediately
// 3. not mixed with persist // 3. not mixed with persist
transMsg.info.ahandle = (void*)pHead->ahandle; transMsg.info.ahandle = (void*)pHead->ahandle;
transMsg.info.handle = (void*)transAcquireExHandle(refMgt, pConn->refId); transMsg.info.handle = (void*)transAcquireExHandle(pConn->refId);
transMsg.info.refId = pConn->refId; transMsg.info.refId = pConn->refId;
transMsg.info.traceId = pHead->traceId; transMsg.info.traceId = pHead->traceId;
...@@ -292,7 +290,7 @@ static void uvHandleReq(SSvrConn* pConn) { ...@@ -292,7 +290,7 @@ static void uvHandleReq(SSvrConn* pConn) {
pConnInfo->clientPort = ntohs(pConn->addr.sin_port); pConnInfo->clientPort = ntohs(pConn->addr.sin_port);
tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user)); tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user));
transReleaseExHandle(refMgt, pConn->refId); transReleaseExHandle(pConn->refId);
STrans* pTransInst = pConn->pTransInst; STrans* pTransInst = pConn->pTransInst;
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
...@@ -523,15 +521,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) { ...@@ -523,15 +521,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
SExHandle* exh1 = transMsg.info.handle; SExHandle* exh1 = transMsg.info.handle;
int64_t refId = transMsg.info.refId; int64_t refId = transMsg.info.refId;
SExHandle* exh2 = transAcquireExHandle(refMgt, refId); SExHandle* exh2 = transAcquireExHandle(refId);
if (exh2 == NULL || exh1 != exh2) { if (exh2 == NULL || exh1 != exh2) {
tTrace("handle except msg %p, ignore it", exh1); tTrace("handle except msg %p, ignore it", exh1);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refId);
destroySmsg(msg); destroySmsg(msg);
continue; continue;
} }
msg->pConn = exh1->handle; msg->pConn = exh1->handle;
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refId);
(*transAsyncHandle[msg->type])(msg, pThrd); (*transAsyncHandle[msg->type])(msg, pThrd);
} }
} }
...@@ -773,8 +771,8 @@ static SSvrConn* createConn(void* hThrd) { ...@@ -773,8 +771,8 @@ static SSvrConn* createConn(void* hThrd) {
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
exh->handle = pConn; exh->handle = pConn;
exh->pThrd = pThrd; exh->pThrd = pThrd;
exh->refId = transAddExHandle(refMgt, exh); exh->refId = transAddExHandle(exh);
transAcquireExHandle(refMgt, exh->refId); transAcquireExHandle(exh->refId);
pConn->refId = exh->refId; pConn->refId = exh->refId;
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
...@@ -803,14 +801,14 @@ static void destroyConnRegArg(SSvrConn* conn) { ...@@ -803,14 +801,14 @@ static void destroyConnRegArg(SSvrConn* conn) {
} }
} }
static int reallocConnRef(SSvrConn* conn) { static int reallocConnRef(SSvrConn* conn) {
transReleaseExHandle(refMgt, conn->refId); transReleaseExHandle(conn->refId);
transRemoveExHandle(refMgt, conn->refId); transRemoveExHandle(conn->refId);
// avoid app continue to send msg on invalid handle // avoid app continue to send msg on invalid handle
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
exh->handle = conn; exh->handle = conn;
exh->pThrd = conn->hostThrd; exh->pThrd = conn->hostThrd;
exh->refId = transAddExHandle(refMgt, exh); exh->refId = transAddExHandle(exh);
transAcquireExHandle(refMgt, exh->refId); transAcquireExHandle(exh->refId);
conn->refId = exh->refId; conn->refId = exh->refId;
return 0; return 0;
...@@ -822,8 +820,8 @@ static void uvDestroyConn(uv_handle_t* handle) { ...@@ -822,8 +820,8 @@ static void uvDestroyConn(uv_handle_t* handle) {
} }
SWorkThrd* thrd = conn->hostThrd; SWorkThrd* thrd = conn->hostThrd;
transReleaseExHandle(refMgt, conn->refId); transReleaseExHandle(conn->refId);
transRemoveExHandle(refMgt, conn->refId); transRemoveExHandle(conn->refId);
tDebug("%s conn %p destroy", transLabel(thrd->pTransInst), conn); tDebug("%s conn %p destroy", transLabel(thrd->pTransInst), conn);
// uv_timer_stop(&conn->pTimer); // uv_timer_stop(&conn->pTimer);
...@@ -871,12 +869,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -871,12 +869,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv->port = port; srv->port = port;
uv_loop_init(srv->loop); uv_loop_init(srv->loop);
// taosThreadOnce(&transModuleInit, uvInitEnv);
int ref = atomic_add_fetch_32(&tranSSvrInst, 1);
if (ref == 1) {
refMgt = transOpenExHandleMgt(50000);
}
assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0)); assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0));
#ifdef WINDOWS #ifdef WINDOWS
char pipeName[64]; char pipeName[64];
...@@ -1028,11 +1020,6 @@ void transCloseServer(void* arg) { ...@@ -1028,11 +1020,6 @@ void transCloseServer(void* arg) {
taosMemoryFree(srv->pipe); taosMemoryFree(srv->pipe);
taosMemoryFree(srv); taosMemoryFree(srv);
int ref = atomic_sub_fetch_32(&tranSSvrInst, 1);
if (ref == 0) {
transCloseExHandleMgt(refMgt);
}
} }
void transRefSrvHandle(void* handle) { void transRefSrvHandle(void* handle) {
...@@ -1071,11 +1058,11 @@ void transReleaseSrvHandle(void* handle) { ...@@ -1071,11 +1058,11 @@ void transReleaseSrvHandle(void* handle) {
tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle); tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
transSendAsync(pThrd->asyncPool, &m->q); transSendAsync(pThrd->asyncPool, &m->q);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refId);
return; return;
_return1: _return1:
tTrace("handle %p failed to send to release handle", exh); tTrace("handle %p failed to send to release handle", exh);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refId);
return; return;
_return2: _return2:
tTrace("handle %p failed to send to release handle", exh); tTrace("handle %p failed to send to release handle", exh);
...@@ -1100,12 +1087,12 @@ void transSendResponse(const STransMsg* msg) { ...@@ -1100,12 +1087,12 @@ void transSendResponse(const STransMsg* msg) {
STraceId* trace = (STraceId*)&msg->info.traceId; STraceId* trace = (STraceId*)&msg->info.traceId;
tGTrace("conn %p start to send resp (1/2)", exh->handle); tGTrace("conn %p start to send resp (1/2)", exh->handle);
transSendAsync(pThrd->asyncPool, &m->q); transSendAsync(pThrd->asyncPool, &m->q);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refId);
return; return;
_return1: _return1:
tTrace("handle %p failed to send resp", exh); tTrace("handle %p failed to send resp", exh);
rpcFreeCont(msg->pCont); rpcFreeCont(msg->pCont);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refId);
return; return;
_return2: _return2:
tTrace("handle %p failed to send resp", exh); tTrace("handle %p failed to send resp", exh);
...@@ -1129,13 +1116,13 @@ void transRegisterMsg(const STransMsg* msg) { ...@@ -1129,13 +1116,13 @@ void transRegisterMsg(const STransMsg* msg) {
tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle); tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle);
transSendAsync(pThrd->asyncPool, &m->q); transSendAsync(pThrd->asyncPool, &m->q);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refId);
return; return;
_return1: _return1:
tTrace("handle %p failed to register brokenlink", exh); tTrace("handle %p failed to register brokenlink", exh);
rpcFreeCont(msg->pCont); rpcFreeCont(msg->pCont);
transReleaseExHandle(refMgt, refId); transReleaseExHandle(refId);
return; return;
_return2: _return2:
tTrace("handle %p failed to register brokenlink", exh); tTrace("handle %p failed to register brokenlink", exh);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册