提交 31cfa1fa 编写于 作者: dengyihao's avatar dengyihao

fix: avoid rpc mem leak

上级 38431432
......@@ -124,18 +124,16 @@ void *rpcReallocCont(void *ptr, int32_t contLen);
// Because taosd supports multi-process mode
// These functions should not be used on the server side
// Please use tmsg<xx> functions, which are defined in tmsgcb.h
void rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
void rpcSendResponse(const SRpcMsg *pMsg);
void rpcRegisterBrokenLinkArg(SRpcMsg *msg);
void rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock
int rpcSendRequest(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid);
int rpcSendResponse(const SRpcMsg *pMsg);
int rpcRegisterBrokenLinkArg(SRpcMsg *msg);
int rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc instance, no close sock
// These functions will not be called in the child process
void rpcSendRedirectRsp(void *pConn, const SEpSet *pEpSet);
void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
void rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
void* rpcAllocHandle();
int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
void *rpcAllocHandle();
#ifdef __cplusplus
}
......
......@@ -289,14 +289,14 @@ void transUnrefSrvHandle(void* handle);
void transRefCliHandle(void* handle);
void transUnrefCliHandle(void* handle);
void transReleaseCliHandle(void* handle);
void transReleaseSrvHandle(void* handle);
void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
void transSendResponse(const STransMsg* msg);
void transRegisterMsg(const STransMsg* msg);
void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
int transReleaseCliHandle(void* handle);
int transReleaseSrvHandle(void* handle);
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
int transSendResponse(const STransMsg* msg);
int transRegisterMsg(const STransMsg* msg);
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);
int64_t transAllocHandle();
......
......@@ -25,7 +25,7 @@ void (*taosCloseHandle[])(void* arg) = {transCloseServer, transCloseClient};
void (*taosRefHandle[])(void* handle) = {transRefSrvHandle, transRefCliHandle};
void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHandle};
void (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle};
int (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle};
static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) {
*ip = taosGetIpv4FromFqdn(localFqdn);
......@@ -129,25 +129,20 @@ void* rpcReallocCont(void* ptr, int32_t contLen) {
return st + TRANS_MSG_OVERHEAD;
}
void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) {
// deprecated api
assert(0);
}
int32_t rpcReportProgress(void* pConn, char* pCont, int32_t contLen) { return -1; }
void rpcCancelRequest(int64_t rid) { return; }
void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
transSendRequest(shandle, pEpSet, pMsg, NULL);
int rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
return transSendRequest(shandle, pEpSet, pMsg, NULL);
}
void rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
transSendRequest(shandle, pEpSet, pMsg, pCtx);
int rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
return transSendRequest(shandle, pEpSet, pMsg, pCtx);
}
void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
transSendRecv(shandle, pEpSet, pMsg, pRsp);
int rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
return transSendRecv(shandle, pEpSet, pMsg, pRsp);
}
void rpcSendResponse(const SRpcMsg* pMsg) { transSendResponse(pMsg); }
int rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); }
void rpcRefHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
......@@ -159,15 +154,15 @@ void rpcUnrefHandle(void* handle, int8_t type) {
(*taosUnRefHandle[type])(handle);
}
void rpcRegisterBrokenLinkArg(SRpcMsg* msg) { transRegisterMsg(msg); }
void rpcReleaseHandle(void* handle, int8_t type) {
int rpcRegisterBrokenLinkArg(SRpcMsg* msg) { return transRegisterMsg(msg); }
int rpcReleaseHandle(void* handle, int8_t type) {
assert(type == TAOS_CONN_SERVER || type == TAOS_CONN_CLIENT);
(*transReleaseHandle[type])(handle);
return (*transReleaseHandle[type])(handle);
}
void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
int rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) {
// later
transSetDefaultAddr(thandle, ip, fqdn);
return transSetDefaultAddr(thandle, ip, fqdn);
}
void* rpcAllocHandle() { return (void*)transAllocHandle(); }
......
......@@ -1224,13 +1224,13 @@ SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle, bool* validHandle) {
}
return pThrd;
}
void transReleaseCliHandle(void* handle) {
int transReleaseCliHandle(void* handle) {
int idx = -1;
bool valid = false;
SCliThrd* pThrd = transGetWorkThrdFromHandle((int64_t)handle, &valid);
if (pThrd == NULL) {
return;
return -1;
}
STransMsg tmsg = {.info.handle = handle};
SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg));
......@@ -1238,14 +1238,14 @@ void transReleaseCliHandle(void* handle) {
cmsg->type = Release;
transAsyncSend(pThrd->asyncPool, &cmsg->q);
return;
return 0;
}
void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransCtx* ctx) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) {
transFreeMsg(pReq->pCont);
return;
return -1;
}
bool valid = false;
......@@ -1253,7 +1253,7 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
if (pThrd == NULL && valid == false) {
transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return;
return -1;
}
TRACE_SET_MSGID(&pReq->info.traceId, tGenIdPI64());
......@@ -1280,14 +1280,14 @@ void transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STra
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
ASSERT(transAsyncSend(pThrd->asyncPool, &(cliMsg->q)) == 0);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return;
return 0;
}
void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) {
transFreeMsg(pReq->pCont);
return;
return -1;
}
bool valid = false;
......@@ -1295,7 +1295,7 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
if (pThrd == NULL && valid == false) {
transFreeMsg(pReq->pCont);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return;
return -1;
}
tsem_t* sem = taosMemoryCalloc(1, sizeof(tsem_t));
......@@ -1328,14 +1328,16 @@ void transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransM
taosMemoryFree(sem);
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return;
return 0;
}
/*
*
**/
void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
if (pTransInst == NULL) return;
if (pTransInst == NULL) {
return -1;
}
SCvtAddr cvtAddr = {0};
if (ip != NULL && fqdn != NULL) {
......@@ -1358,6 +1360,7 @@ void transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
transAsyncSend(thrd->asyncPool, &(cliMsg->q));
}
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return 0;
}
int64_t transAllocHandle() {
......
......@@ -1034,7 +1034,7 @@ void transUnrefSrvHandle(void* handle) {
}
}
void transReleaseSrvHandle(void* handle) {
int transReleaseSrvHandle(void* handle) {
SRpcHandleInfo* info = handle;
SExHandle* exh = info->handle;
int64_t refId = info->refId;
......@@ -1053,16 +1053,16 @@ void transReleaseSrvHandle(void* handle) {
tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(transGetRefMgt(), refId);
return;
return 0;
_return1:
tTrace("handle %p failed to send to release handle", exh);
transReleaseExHandle(transGetRefMgt(), refId);
return;
return -1;
_return2:
tTrace("handle %p failed to send to release handle", exh);
return;
return -1;
}
void transSendResponse(const STransMsg* msg) {
int transSendResponse(const STransMsg* msg) {
SExHandle* exh = msg->info.handle;
int64_t refId = msg->info.refId;
ASYNC_CHECK_HANDLE(exh, refId);
......@@ -1082,18 +1082,18 @@ void transSendResponse(const STransMsg* msg) {
tGTrace("conn %p start to send resp (1/2)", exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(transGetRefMgt(), refId);
return;
return 0;
_return1:
tTrace("handle %p failed to send resp", exh);
rpcFreeCont(msg->pCont);
transReleaseExHandle(transGetRefMgt(), refId);
return;
return -1;
_return2:
tTrace("handle %p failed to send resp", exh);
rpcFreeCont(msg->pCont);
return;
return -1;
}
void transRegisterMsg(const STransMsg* msg) {
int transRegisterMsg(const STransMsg* msg) {
SExHandle* exh = msg->info.handle;
int64_t refId = msg->info.refId;
ASYNC_CHECK_HANDLE(exh, refId);
......@@ -1112,16 +1112,17 @@ void transRegisterMsg(const STransMsg* msg) {
tTrace("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle);
transAsyncSend(pThrd->asyncPool, &m->q);
transReleaseExHandle(transGetRefMgt(), refId);
return;
return 0;
_return1:
tTrace("handle %p failed to register brokenlink", exh);
rpcFreeCont(msg->pCont);
transReleaseExHandle(transGetRefMgt(), refId);
return;
return -1;
_return2:
tTrace("handle %p failed to register brokenlink", exh);
rpcFreeCont(msg->pCont);
return -1;
}
int transGetConnInfo(void* thandle, STransHandleInfo* pConnInfo) { return -1; }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册