提交 b2be5169 编写于 作者: dengyihao's avatar dengyihao

add inst ref

上级 3f8efa10
...@@ -110,12 +110,15 @@ typedef struct { ...@@ -110,12 +110,15 @@ typedef struct {
} SRpcCtx; } SRpcCtx;
int32_t rpcInit(); int32_t rpcInit();
void rpcCleanup();
void * rpcOpen(const SRpcInit *pRpc); void rpcCleanup();
void rpcClose(void *); void *rpcOpen(const SRpcInit *pRpc);
void * rpcMallocCont(int32_t contLen);
void rpcFreeCont(void *pCont); void rpcClose(void *);
void * rpcReallocCont(void *ptr, int32_t contLen); void rpcCloseImpl(void *);
void *rpcMallocCont(int32_t contLen);
void rpcFreeCont(void *pCont);
void *rpcReallocCont(void *ptr, int32_t contLen);
// Because taosd supports multi-process mode // Because taosd supports multi-process mode
// These functions should not be used on the server side // These functions should not be used on the server side
......
...@@ -253,7 +253,7 @@ int transAsyncSend(SAsyncPool* pool, queue* mq); ...@@ -253,7 +253,7 @@ int transAsyncSend(SAsyncPool* pool, queue* mq);
do { \ do { \
if (id > 0) { \ if (id > 0) { \
tTrace("handle step1"); \ tTrace("handle step1"); \
SExHandle* exh2 = transAcquireExHandle(id); \ SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id); \
if (exh2 == NULL || id != exh2->refId) { \ if (exh2 == NULL || id != exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \ tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, \
exh2 ? exh2->refId : 0, id); \ exh2 ? exh2->refId : 0, id); \
...@@ -261,7 +261,7 @@ int transAsyncSend(SAsyncPool* pool, queue* mq); ...@@ -261,7 +261,7 @@ int transAsyncSend(SAsyncPool* pool, queue* mq);
} \ } \
} else if (id == 0) { \ } else if (id == 0) { \
tTrace("handle step2"); \ tTrace("handle step2"); \
SExHandle* exh2 = transAcquireExHandle(id); \ SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), id); \
if (exh2 == NULL || id == exh2->refId) { \ if (exh2 == NULL || id == exh2->refId) { \
tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, id, \ tTrace("handle %p except, may already freed, ignore msg, ref1: %" PRIu64 ", ref2 : %" PRIu64 "", exh1, id, \
exh2 ? exh2->refId : 0); \ exh2 ? exh2->refId : 0); \
...@@ -391,13 +391,16 @@ void transThreadOnce(); ...@@ -391,13 +391,16 @@ void transThreadOnce();
void transInit(); void transInit();
void transCleanup(); void transCleanup();
int32_t transOpenExHandleMgt(int size); int32_t transOpenRefMgt(int size, void (*func)(void*));
void transCloseExHandleMgt(); void transCloseRefMgt(int32_t refMgt);
int64_t transAddExHandle(void* p); int64_t transAddExHandle(int32_t refMgt, void* p);
int32_t transRemoveExHandle(int64_t refId); int32_t transRemoveExHandle(int32_t refMgt, int64_t refId);
SExHandle* transAcquireExHandle(int64_t refId); void* transAcquireExHandle(int32_t refMgt, int64_t refId);
int32_t transReleaseExHandle(int64_t refId); int32_t transReleaseExHandle(int32_t refMgt, int64_t refId);
void transDestoryExHandle(void* handle); void transDestoryExHandle(void* handle);
int32_t transGetRefMgt();
int32_t transGetInstMgt();
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -76,16 +76,19 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -76,16 +76,19 @@ void* rpcOpen(const SRpcInit* pInit) {
if (pInit->user) { if (pInit->user) {
memcpy(pRpc->user, pInit->user, strlen(pInit->user)); memcpy(pRpc->user, pInit->user, strlen(pInit->user));
} }
return pRpc; int64_t refId = taosAddRef(transGetInstMgt(), pRpc);
return (void*)refId;
} }
void rpcClose(void* arg) { void rpcClose(void* arg) {
tInfo("start to close rpc"); tInfo("start to close rpc");
taosRemoveRef(transGetInstMgt(), (int64_t)arg);
tInfo("finish to close rpc");
return;
}
void rpcCloseImpl(void* arg) {
SRpcInfo* pRpc = (SRpcInfo*)arg; SRpcInfo* pRpc = (SRpcInfo*)arg;
(*taosCloseHandle[pRpc->connType])(pRpc->tcphandle); (*taosCloseHandle[pRpc->connType])(pRpc->tcphandle);
taosMemoryFree(pRpc); taosMemoryFree(pRpc);
tInfo("finish to close rpc");
return;
} }
void* rpcMallocCont(int32_t contLen) { void* rpcMallocCont(int32_t contLen) {
...@@ -140,11 +143,10 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { ...@@ -140,11 +143,10 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
transSendRecv(shandle, pEpSet, pMsg, pRsp); transSendRecv(shandle, pEpSet, pMsg, pRsp);
} }
void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); } void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); }
int32_t rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return 0; } int32_t rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return 0; }
void rpcRefHandle(void* handle, int8_t type) { void rpcRefHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT); assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
(*taosRefHandle[type])(handle); (*taosRefHandle[type])(handle);
......
...@@ -501,13 +501,13 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { ...@@ -501,13 +501,13 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
} }
static void allocConnRef(SCliConn* conn, bool update) { static void allocConnRef(SCliConn* conn, bool update) {
if (update) { if (update) {
transRemoveExHandle(conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1; conn->refId = -1;
} }
SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle)); SExHandle* exh = taosMemoryCalloc(1, sizeof(SExHandle));
exh->handle = conn; exh->handle = conn;
exh->pThrd = conn->hostThrd; exh->pThrd = conn->hostThrd;
exh->refId = transAddExHandle(exh); exh->refId = transAddExHandle(transGetRefMgt(), exh);
conn->refId = exh->refId; conn->refId = exh->refId;
} }
static void addConnToPool(void* pool, SCliConn* conn) { static void addConnToPool(void* pool, SCliConn* conn) {
...@@ -601,7 +601,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { ...@@ -601,7 +601,7 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn); tTrace("%s conn %p remove from conn pool", CONN_GET_INST_LABEL(conn), conn);
QUEUE_REMOVE(&conn->conn); QUEUE_REMOVE(&conn->conn);
QUEUE_INIT(&conn->conn); QUEUE_INIT(&conn->conn);
transRemoveExHandle(conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId);
conn->refId = -1; conn->refId = -1;
if (clear) { if (clear) {
...@@ -619,7 +619,7 @@ static void cliDestroy(uv_handle_t* handle) { ...@@ -619,7 +619,7 @@ static void cliDestroy(uv_handle_t* handle) {
} }
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
transRemoveExHandle(conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId);
taosMemoryFree(conn->ip); taosMemoryFree(conn->ip);
conn->stream->data = NULL; conn->stream->data = NULL;
taosMemoryFree(conn->stream); taosMemoryFree(conn->stream);
...@@ -747,7 +747,7 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -747,7 +747,7 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
} }
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
int64_t refId = (int64_t)(pMsg->msg.info.handle); int64_t refId = (int64_t)(pMsg->msg.info.handle);
SExHandle* exh = transAcquireExHandle(refId); SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
if (exh == NULL) { if (exh == NULL) {
tDebug("%" PRId64 " already release", refId); tDebug("%" PRId64 " already release", refId);
} }
...@@ -773,7 +773,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { ...@@ -773,7 +773,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
SCliConn* conn = NULL; SCliConn* conn = NULL;
int64_t refId = (int64_t)(pMsg->msg.info.handle); int64_t refId = (int64_t)(pMsg->msg.info.handle);
if (refId != 0) { if (refId != 0) {
SExHandle* exh = transAcquireExHandle(refId); SExHandle* exh = transAcquireExHandle(transGetRefMgt(), refId);
if (exh == NULL) { if (exh == NULL) {
*ignore = true; *ignore = true;
destroyCmsg(pMsg); destroyCmsg(pMsg);
...@@ -781,7 +781,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { ...@@ -781,7 +781,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) {
// assert(0); // assert(0);
} else { } else {
conn = exh->handle; conn = exh->handle;
transReleaseExHandle(refId); transReleaseExHandle(transGetRefMgt(), refId);
} }
return conn; return conn;
}; };
...@@ -1154,12 +1154,12 @@ void transUnrefCliHandle(void* handle) { ...@@ -1154,12 +1154,12 @@ void transUnrefCliHandle(void* handle) {
} }
SCliThrd* transGetWorkThrdFromHandle(int64_t handle) { SCliThrd* transGetWorkThrdFromHandle(int64_t handle) {
SCliThrd* pThrd = NULL; SCliThrd* pThrd = NULL;
SExHandle* exh = transAcquireExHandle(handle); SExHandle* exh = transAcquireExHandle(transGetRefMgt(), handle);
if (exh == NULL) { if (exh == NULL) {
return NULL; return NULL;
} }
pThrd = exh->pThrd; pThrd = exh->pThrd;
transReleaseExHandle(handle); transReleaseExHandle(transGetRefMgt(), handle);
return pThrd; return pThrd;
} }
SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) { SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle) {
...@@ -1186,10 +1186,13 @@ void transReleaseCliHandle(void* handle) { ...@@ -1186,10 +1186,13 @@ void transReleaseCliHandle(void* handle) {
} }
void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) { void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
STrans* pTransInst = (STrans*)shandle; STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) return;
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
if (pThrd == NULL) { if (pThrd == NULL) {
transFreeMsg(pReq->pCont); transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return; return;
} }
...@@ -1215,14 +1218,18 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra ...@@ -1215,14 +1218,18 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid, tGTrace("%s send request at thread:%08" PRId64 ", dst: %s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
ASSERT(transAsyncSend(pThrd->asyncPool, &(cliMsg->q)) == 0); ASSERT(transAsyncSend(pThrd->asyncPool, &(cliMsg->q)) == 0);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return; return;
} }
void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) { void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
STrans* pTransInst = (STrans*)shandle; STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) return;
SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle); SCliThrd* pThrd = transGetWorkThrd(pTransInst, (int64_t)pReq->info.handle);
if (pThrd == NULL) { if (pThrd == NULL) {
transFreeMsg(pReq->pCont); transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return; return;
} }
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t)); tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
...@@ -1252,13 +1259,16 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM ...@@ -1252,13 +1259,16 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
tsem_wait(sem); tsem_wait(sem);
tsem_destroy(sem); tsem_destroy(sem);
taosMemoryFree(sem); taosMemoryFree(sem);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return; return;
} }
/* /*
* *
**/ **/
void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
STrans* pTransInst = shandle; STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) return;
SCvtAddr cvtAddr = {0}; SCvtAddr cvtAddr = {0};
if (ip != NULL && fqdn != NULL) { if (ip != NULL && fqdn != NULL) {
...@@ -1279,5 +1289,6 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { ...@@ -1279,5 +1289,6 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
transAsyncSend(thrd->asyncPool, &(cliMsg->q)); transAsyncSend(thrd->asyncPool, &(cliMsg->q));
} }
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
} }
#endif #endif
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
static int32_t refMgt; static int32_t refMgt;
int32_t instMgt;
int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { int transAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
T_MD5_CTX context; T_MD5_CTX context;
...@@ -481,44 +482,49 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) { ...@@ -481,44 +482,49 @@ bool transEpSetIsEqual(SEpSet* a, SEpSet* b) {
} }
static void transInitEnv() { static void transInitEnv() {
refMgt = transOpenExHandleMgt(50000); refMgt = transOpenRefMgt(50000, transDestoryExHandle);
instMgt = taosOpenRef(50, rpcCloseImpl);
uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1"); uv_os_setenv("UV_TCP_SINGLE_ACCEPT", "1");
} }
static void transDestroyEnv() { static void transDestroyEnv() {
// close ref transCloseRefMgt(refMgt);
transCloseExHandleMgt(refMgt); transCloseRefMgt(instMgt);
} }
void transInit() { void transInit() {
// init env // init env
taosThreadOnce(&transModuleInit, transInitEnv); taosThreadOnce(&transModuleInit, transInitEnv);
} }
int32_t transGetRefMgt() { return refMgt; }
int32_t transGetInstMgt() { return instMgt; }
void transCleanup() { void transCleanup() {
// clean env // clean env
transDestroyEnv(); transDestroyEnv();
} }
int32_t transOpenExHandleMgt(int size) { int32_t transOpenRefMgt(int size, void (*func)(void*)) {
// added into once later // added into once later
return taosOpenRef(size, transDestoryExHandle); return taosOpenRef(size, func);
} }
void transCloseExHandleMgt() { void transCloseRefMgt(int32_t mgt) {
// close ref // close ref
taosCloseRef(refMgt); taosCloseRef(mgt);
} }
int64_t transAddExHandle(void* p) { int64_t transAddExHandle(int32_t refMgt, void* p) {
// acquire extern handle // acquire extern handle
return taosAddRef(refMgt, p); return taosAddRef(refMgt, p);
} }
int32_t transRemoveExHandle(int64_t refId) { int32_t transRemoveExHandle(int32_t refMgt, int64_t refId) {
// acquire extern handle // acquire extern handle
return taosRemoveRef(refMgt, refId); return taosRemoveRef(refMgt, refId);
} }
SExHandle* transAcquireExHandle(int64_t refId) { void* transAcquireExHandle(int32_t refMgt, int64_t refId) {
// acquire extern handle // acquire extern handle
return (SExHandle*)taosAcquireRef(refMgt, refId); return (void*)taosAcquireRef(refMgt, refId);
} }
int32_t transReleaseExHandle(int64_t refId) { int32_t transReleaseExHandle(int32_t refMgt, int64_t refId) {
// release extern handle // release extern handle
return taosReleaseRef(refMgt, refId); return taosReleaseRef(refMgt, refId);
} }
......
...@@ -261,7 +261,7 @@ static void uvHandleReq(SSvrConn* pConn) { ...@@ -261,7 +261,7 @@ static void uvHandleReq(SSvrConn* pConn) {
// 2. once send out data, cli conn released to conn pool immediately // 2. once send out data, cli conn released to conn pool immediately
// 3. not mixed with persist // 3. not mixed with persist
transMsg.info.ahandle = (void*)pHead->ahandle; transMsg.info.ahandle = (void*)pHead->ahandle;
transMsg.info.handle = (void*)transAcquireExHandle(pConn->refId); transMsg.info.handle = (void*)transAcquireExHandle(transGetRefMgt(), pConn->refId);
transMsg.info.refId = pConn->refId; transMsg.info.refId = pConn->refId;
transMsg.info.traceId = pHead->traceId; transMsg.info.traceId = pHead->traceId;
...@@ -279,7 +279,7 @@ static void uvHandleReq(SSvrConn* pConn) { ...@@ -279,7 +279,7 @@ static void uvHandleReq(SSvrConn* pConn) {
pConnInfo->clientPort = ntohs(pConn->addr.sin_port); pConnInfo->clientPort = ntohs(pConn->addr.sin_port);
tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user)); tstrncpy(pConnInfo->user, pConn->user, sizeof(pConnInfo->user));
transReleaseExHandle(pConn->refId); transReleaseExHandle(transGetRefMgt(), pConn->refId);
STrans* pTransInst = pConn->pTransInst; STrans* pTransInst = pConn->pTransInst;
(*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
...@@ -507,15 +507,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) { ...@@ -507,15 +507,15 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
SExHandle* exh1 = transMsg.info.handle; SExHandle* exh1 = transMsg.info.handle;
int64_t refId = transMsg.info.refId; int64_t refId = transMsg.info.refId;
SExHandle* exh2 = transAcquireExHandle(refId); SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId);
if (exh2 == NULL || exh1 != exh2) { if (exh2 == NULL || exh1 != exh2) {
tTrace("handle except msg %p, ignore it", exh1); tTrace("handle except msg %p, ignore it", exh1);
transReleaseExHandle(refId); transReleaseExHandle(transGetRefMgt(), refId);
destroySmsg(msg); destroySmsg(msg);
continue; continue;
} }
msg->pConn = exh1->handle; msg->pConn = exh1->handle;
transReleaseExHandle(refId); transReleaseExHandle(transGetRefMgt(), refId);
(*transAsyncHandle[msg->type])(msg, pThrd); (*transAsyncHandle[msg->type])(msg, pThrd);
} }
} }
...@@ -757,8 +757,8 @@ static SSvrConn* createConn(void* hThrd) { ...@@ -757,8 +757,8 @@ static SSvrConn* createConn(void* hThrd) {
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
exh->handle = pConn; exh->handle = pConn;
exh->pThrd = pThrd; exh->pThrd = pThrd;
exh->refId = transAddExHandle(exh); exh->refId = transAddExHandle(transGetRefMgt(), exh);
transAcquireExHandle(exh->refId); transAcquireExHandle(transGetRefMgt(), exh->refId);
pConn->refId = exh->refId; pConn->refId = exh->refId;
transRefSrvHandle(pConn); transRefSrvHandle(pConn);
...@@ -789,14 +789,14 @@ static void destroyConnRegArg(SSvrConn* conn) { ...@@ -789,14 +789,14 @@ static void destroyConnRegArg(SSvrConn* conn) {
} }
} }
static int reallocConnRef(SSvrConn* conn) { static int reallocConnRef(SSvrConn* conn) {
transReleaseExHandle(conn->refId); transReleaseExHandle(transGetRefMgt(), conn->refId);
transRemoveExHandle(conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId);
// avoid app continue to send msg on invalid handle // avoid app continue to send msg on invalid handle
SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle));
exh->handle = conn; exh->handle = conn;
exh->pThrd = conn->hostThrd; exh->pThrd = conn->hostThrd;
exh->refId = transAddExHandle(exh); exh->refId = transAddExHandle(transGetRefMgt(), exh);
transAcquireExHandle(exh->refId); transAcquireExHandle(transGetRefMgt(), exh->refId);
conn->refId = exh->refId; conn->refId = exh->refId;
return 0; return 0;
...@@ -808,8 +808,8 @@ static void uvDestroyConn(uv_handle_t* handle) { ...@@ -808,8 +808,8 @@ static void uvDestroyConn(uv_handle_t* handle) {
} }
SWorkThrd* thrd = conn->hostThrd; SWorkThrd* thrd = conn->hostThrd;
transReleaseExHandle(conn->refId); transReleaseExHandle(transGetRefMgt(), conn->refId);
transRemoveExHandle(conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId);
tDebug("%s conn %p destroy", transLabel(thrd->pTransInst), conn); tDebug("%s conn %p destroy", transLabel(thrd->pTransInst), conn);
transQueueDestroy(&conn->srvMsgs); transQueueDestroy(&conn->srvMsgs);
...@@ -1045,11 +1045,11 @@ void transReleaseSrvHandle(void* handle) { ...@@ -1045,11 +1045,11 @@ void transReleaseSrvHandle(void* handle) {
tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle); tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q); transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(refId); transReleaseExHandle(transGetRefMgt(), refId);
return; return;
_return1: _return1:
tTrace("handle %p failed to send to release handle", exh); tTrace("handle %p failed to send to release handle", exh);
transReleaseExHandle(refId); transReleaseExHandle(transGetRefMgt(), refId);
return; return;
_return2: _return2:
tTrace("handle %p failed to send to release handle", exh); tTrace("handle %p failed to send to release handle", exh);
...@@ -1074,12 +1074,12 @@ void transSendResponse(const STransMsg* msg) { ...@@ -1074,12 +1074,12 @@ void transSendResponse(const STransMsg* msg) {
STraceId* trace = (STraceId*)&msg->info.traceId; STraceId* trace = (STraceId*)&msg->info.traceId;
tGTrace("conn %p start to send resp (1/2)", exh->handle); tGTrace("conn %p start to send resp (1/2)", exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q); transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(refId); transReleaseExHandle(transGetRefMgt(), refId);
return; return;
_return1: _return1:
tTrace("handle %p failed to send resp", exh); tTrace("handle %p failed to send resp", exh);
rpcFreeCont(msg->pCont); rpcFreeCont(msg->pCont);
transReleaseExHandle(refId); transReleaseExHandle(transGetRefMgt(), refId);
return; return;
_return2: _return2:
tTrace("handle %p failed to send resp", exh); tTrace("handle %p failed to send resp", exh);
...@@ -1103,13 +1103,13 @@ void transRegisterMsg(const STransMsg* msg) { ...@@ -1103,13 +1103,13 @@ void transRegisterMsg(const STransMsg* msg) {
tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle); tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q); transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(refId); transReleaseExHandle(transGetRefMgt(), refId);
return; return;
_return1: _return1:
tTrace("handle %p failed to register brokenlink", exh); tTrace("handle %p failed to register brokenlink", exh);
rpcFreeCont(msg->pCont); rpcFreeCont(msg->pCont);
transReleaseExHandle(refId); transReleaseExHandle(transGetRefMgt(), refId);
return; return;
_return2: _return2:
tTrace("handle %p failed to register brokenlink", exh); tTrace("handle %p failed to register brokenlink", exh);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册