提交 f0c0bc92 编写于 作者: dengyihao's avatar dengyihao

enh(rpc): fix mem leak

上级 5f24bf5b
...@@ -129,6 +129,15 @@ static void transDestroyConnCtx(STransConnCtx* ctx); ...@@ -129,6 +129,15 @@ static void transDestroyConnCtx(STransConnCtx* ctx);
static SCliThrdObj* createThrdObj(); static SCliThrdObj* createThrdObj();
static void destroyThrdObj(SCliThrdObj* pThrd); static void destroyThrdObj(SCliThrdObj* pThrd);
static void cliWalkCb(uv_handle_t* handle, void* arg);
#define CLI_RELEASE_UV(loop) \
do { \
uv_walk(loop, cliWalkCb, NULL); \
uv_run(loop, UV_RUN_DEFAULT); \
uv_loop_close(loop); \
} while (0);
// snprintf may cause performance problem // snprintf may cause performance problem
#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \ #define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \
do { \ do { \
...@@ -212,8 +221,10 @@ static void destroyThrdObj(SCliThrdObj* pThrd); ...@@ -212,8 +221,10 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
} \ } \
} while (0) } while (0)
#define CONN_NO_PERSIST_BY_APP(conn) (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) #define CONN_NO_PERSIST_BY_APP(conn) \
#define CONN_RELEASE_BY_SERVER(conn) (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
#define CONN_RELEASE_BY_SERVER(conn) \
(((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) #define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) #define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
...@@ -288,8 +299,9 @@ void cliHandleResp(SCliConn* conn) { ...@@ -288,8 +299,9 @@ void cliHandleResp(SCliConn* conn) {
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
} }
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, TMSG_INFO(pHead->msgType), tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen); TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
conn->secured = pHead->secured; conn->secured = pHead->secured;
...@@ -355,10 +367,12 @@ void cliHandleExcept(SCliConn* pConn) { ...@@ -355,10 +367,12 @@ void cliHandleExcept(SCliConn* pConn) {
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle, TMSG_INFO(transMsg.msgType)); tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle,
TMSG_INFO(transMsg.msgType));
if (transMsg.ahandle == NULL) { if (transMsg.ahandle == NULL) {
transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType)); transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType));
tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle); tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
transMsg.ahandle);
} }
} else { } else {
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
...@@ -631,8 +645,9 @@ void cliSend(SCliConn* pConn) { ...@@ -631,8 +645,9 @@ void cliSend(SCliConn* pConn) {
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType), tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
if (pHead->persist == 1) { if (pHead->persist == 1) {
CONN_SET_PERSIST_BY_APP(pConn); CONN_SET_PERSIST_BY_APP(pConn);
...@@ -671,9 +686,11 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { ...@@ -671,9 +686,11 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
destroyCmsg(pMsg); destroyCmsg(pMsg);
destroyConnPool(pThrd->pool); destroyConnPool(pThrd->pool);
uv_timer_stop(&pThrd->timer); uv_timer_stop(&pThrd->timer);
uv_walk(pThrd->loop, cliWalkCb, NULL);
pThrd->quit = true; pThrd->quit = true;
uv_stop(pThrd->loop); // uv_stop(pThrd->loop);
} }
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
SCliConn* conn = pMsg->msg.handle; SCliConn* conn = pMsg->msg.handle;
...@@ -786,7 +803,6 @@ static void* cliWorkThread(void* arg) { ...@@ -786,7 +803,6 @@ static void* cliWorkThread(void* arg) {
SCliThrdObj* pThrd = (SCliThrdObj*)arg; SCliThrdObj* pThrd = (SCliThrdObj*)arg;
setThreadName("trans-cli-work"); setThreadName("trans-cli-work");
uv_run(pThrd->loop, UV_RUN_DEFAULT); uv_run(pThrd->loop, UV_RUN_DEFAULT);
return NULL; return NULL;
} }
...@@ -851,8 +867,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd) { ...@@ -851,8 +867,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd) {
if (pThrd == NULL) { if (pThrd == NULL) {
return; return;
} }
uv_stop(pThrd->loop);
taosThreadJoin(pThrd->thread, NULL); taosThreadJoin(pThrd->thread, NULL);
CLI_RELEASE_UV(pThrd->loop);
taosThreadMutexDestroy(&pThrd->msgMtx); taosThreadMutexDestroy(&pThrd->msgMtx);
transDestroyAsyncPool(pThrd->asyncPool); transDestroyAsyncPool(pThrd->asyncPool);
...@@ -874,6 +890,11 @@ void cliSendQuit(SCliThrdObj* thrd) { ...@@ -874,6 +890,11 @@ void cliSendQuit(SCliThrdObj* thrd) {
msg->type = Quit; msg->type = Quit;
transSendAsync(thrd->asyncPool, &msg->q); transSendAsync(thrd->asyncPool, &msg->q);
} }
void cliWalkCb(uv_handle_t* handle, void* arg) {
if (!uv_is_closing(handle)) {
uv_close(handle, NULL);
}
}
int cliRBChoseIdx(STrans* pTransInst) { int cliRBChoseIdx(STrans* pTransInst) {
int64_t index = pTransInst->index; int64_t index = pTransInst->index;
......
...@@ -93,27 +93,6 @@ typedef struct SServerObj { ...@@ -93,27 +93,6 @@ typedef struct SServerObj {
static const char* notify = "a"; static const char* notify = "a";
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
tTrace("server conn %p received release request", conn); \
\
STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
uvStartSendRespInternal(srvMsg); \
return; \
} \
} while (0)
static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
...@@ -125,11 +104,8 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) ...@@ -125,11 +104,8 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf)
static void uvWorkerAsyncCb(uv_async_t* handle); static void uvWorkerAsyncCb(uv_async_t* handle);
static void uvAcceptAsyncCb(uv_async_t* handle); static void uvAcceptAsyncCb(uv_async_t* handle);
static void uvShutDownCb(uv_shutdown_t* req, int status); static void uvShutDownCb(uv_shutdown_t* req, int status);
static void uvWalkCb(uv_handle_t* handle, void* arg);
static void uvFreeCb(uv_handle_t* handle) { static void uvFreeCb(uv_handle_t* handle);
//
taosMemoryFree(handle);
}
static void uvStartSendRespInternal(SSrvMsg* smsg); static void uvStartSendRespInternal(SSrvMsg* smsg);
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb); static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
...@@ -159,6 +135,34 @@ static void* transAcceptThread(void* arg); ...@@ -159,6 +135,34 @@ static void* transAcceptThread(void* arg);
static bool addHandleToWorkloop(void* arg); static bool addHandleToWorkloop(void* arg);
static bool addHandleToAcceptloop(void* arg); static bool addHandleToAcceptloop(void* arg);
#define CONN_SHOULD_RELEASE(conn, head) \
do { \
if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \
conn->status = ConnRelease; \
transClearBuffer(&conn->readBuf); \
transFreeMsg(transContFromHead((char*)head)); \
tTrace("server conn %p received release request", conn); \
\
STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \
return; \
} \
uvStartSendRespInternal(srvMsg); \
return; \
} \
} while (0)
#define SRV_RELEASE_UV(loop) \
do { \
uv_walk(loop, uvWalkCb, NULL); \
uv_run(loop, UV_RUN_DEFAULT); \
uv_loop_close(loop); \
} while (0);
void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SSrvConn* conn = handle->data; SSrvConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf; SConnBuffer* pBuf = &conn->readBuf;
...@@ -370,7 +374,7 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) { ...@@ -370,7 +374,7 @@ static void uvStartSendRespInternal(SSrvMsg* smsg) {
uvPrepareSendData(smsg, &wb); uvPrepareSendData(smsg, &wb);
SSrvConn* pConn = smsg->pConn; SSrvConn* pConn = smsg->pConn;
uv_timer_stop(&pConn->pTimer); // uv_timer_stop(&pConn->pTimer);
uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb); uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
} }
static void uvStartSendResp(SSrvMsg* smsg) { static void uvStartSendResp(SSrvMsg* smsg) {
...@@ -439,36 +443,17 @@ void uvWorkerAsyncCb(uv_async_t* handle) { ...@@ -439,36 +443,17 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
static void uvWalkCb(uv_handle_t* handle, void* arg) { static void uvWalkCb(uv_handle_t* handle, void* arg) {
if (!uv_is_closing(handle)) { if (!uv_is_closing(handle)) {
uv_close(handle, NULL); uv_close(handle, NULL);
// uv_unref(handle);
tDebug("handle: %p -----test----", handle);
} }
} }
#define MAKE_VALGRIND_HAPPY(loop) \ static void uvFreeCb(uv_handle_t* handle) {
do { \ //
uv_walk(loop, uvWalkCb, NULL); \ taosMemoryFree(handle);
uv_run(loop, UV_RUN_DEFAULT); \ }
uv_loop_close(loop); \
} while (0);
static void uvAcceptAsyncCb(uv_async_t* async) { static void uvAcceptAsyncCb(uv_async_t* async) {
SServerObj* srv = async->data; SServerObj* srv = async->data;
tDebug("close server port %d", srv->port); tDebug("close server port %d", srv->port);
uv_walk(srv->loop, uvWalkCb, NULL); uv_walk(srv->loop, uvWalkCb, NULL);
// uv_close((uv_handle_t*)async, NULL);
// uv_close((uv_handle_t*)&srv->server, NULL);
// uv_stop(srv->loop);
// uv_print_all_handles(srv->loop, stderr);
// int ref = uv_loop_alive(srv->loop);
// assert(ref == 0);
// tError("active size %d", ref);
// uv_stop(srv->loop);
// uv_run(srv->loop, UV_RUN_DEFAULT);
// fprintf(stderr, "------------------------------------");
// uv_print_all_handles(srv->loop, stderr);
// int ret = uv_loop_close(srv->loop);
// tError("(loop)->active_reqs.count: %d, ret: %d", (srv->loop)->active_reqs.count, ret);
// assert(ret == 0);
} }
static void uvShutDownCb(uv_shutdown_t* req, int status) { static void uvShutDownCb(uv_shutdown_t* req, int status) {
...@@ -535,8 +520,8 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -535,8 +520,8 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
pConn->pTransInst = pThrd->pTransInst; pConn->pTransInst = pThrd->pTransInst;
/* init conn timer*/ /* init conn timer*/
uv_timer_init(pThrd->loop, &pConn->pTimer); // uv_timer_init(pThrd->loop, &pConn->pTimer);
pConn->pTimer.data = pConn; // pConn->pTimer.data = pConn;
pConn->hostThrd = pThrd; pConn->hostThrd = pThrd;
...@@ -680,11 +665,11 @@ static void uvDestroyConn(uv_handle_t* handle) { ...@@ -680,11 +665,11 @@ static void uvDestroyConn(uv_handle_t* handle) {
SWorkThrdObj* thrd = conn->hostThrd; SWorkThrdObj* thrd = conn->hostThrd;
tDebug("server conn %p destroy", conn); tDebug("server conn %p destroy", conn);
uv_timer_stop(&conn->pTimer); // uv_timer_stop(&conn->pTimer);
transQueueDestroy(&conn->srvMsgs); transQueueDestroy(&conn->srvMsgs);
QUEUE_REMOVE(&conn->queue); QUEUE_REMOVE(&conn->queue);
taosMemoryFree(conn->pTcp); taosMemoryFree(conn->pTcp);
// taosMemoryFree(conn); taosMemoryFree(conn);
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
tTrace("work thread quit"); tTrace("work thread quit");
...@@ -805,7 +790,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { ...@@ -805,7 +790,7 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
return; return;
} }
taosThreadJoin(pThrd->thread, NULL); taosThreadJoin(pThrd->thread, NULL);
MAKE_VALGRIND_HAPPY(pThrd->loop); SRV_RELEASE_UV(pThrd->loop);
transDestroyAsyncPool(pThrd->asyncPool); transDestroyAsyncPool(pThrd->asyncPool);
taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd); taosMemoryFree(pThrd);
...@@ -825,7 +810,7 @@ void transCloseServer(void* arg) { ...@@ -825,7 +810,7 @@ void transCloseServer(void* arg) {
uv_async_send(srv->pAcceptAsync); uv_async_send(srv->pAcceptAsync);
taosThreadJoin(srv->thread, NULL); taosThreadJoin(srv->thread, NULL);
MAKE_VALGRIND_HAPPY(srv->loop); SRV_RELEASE_UV(srv->loop);
for (int i = 0; i < srv->numOfThreads; i++) { for (int i = 0; i < srv->numOfThreads; i++) {
sendQuitToWorkThrd(srv->pThreadObj[i]); sendQuitToWorkThrd(srv->pThreadObj[i]);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册