提交 57436e5a 编写于 作者: dengyihao's avatar dengyihao

avoid mem leak

上级 dac39371
...@@ -226,6 +226,7 @@ typedef struct { ...@@ -226,6 +226,7 @@ typedef struct {
int index; int index;
int nAsync; int nAsync;
uv_async_t* asyncs; uv_async_t* asyncs;
int8_t stop;
} SAsyncPool; } SAsyncPool;
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb); SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb);
......
...@@ -1020,6 +1020,7 @@ void cliSendQuit(SCliThrd* thrd) { ...@@ -1020,6 +1020,7 @@ void cliSendQuit(SCliThrd* thrd) {
SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg)); SCliMsg* msg = taosMemoryCalloc(1, sizeof(SCliMsg));
msg->type = Quit; msg->type = Quit;
transAsyncSend(thrd->asyncPool, &msg->q); transAsyncSend(thrd->asyncPool, &msg->q);
atomic_store_8(&thrd->asyncPool->stop, 1);
} }
void cliWalkCb(uv_handle_t* handle, void* arg) { void cliWalkCb(uv_handle_t* handle, void* arg) {
if (!uv_is_closing(handle)) { if (!uv_is_closing(handle)) {
...@@ -1238,7 +1239,9 @@ int transReleaseCliHandle(void* handle) { ...@@ -1238,7 +1239,9 @@ int transReleaseCliHandle(void* handle) {
cmsg->msg = tmsg; cmsg->msg = tmsg;
cmsg->type = Release; cmsg->type = Release;
transAsyncSend(pThrd->asyncPool, &cmsg->q); if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) {
return -1;
}
return 0; return 0;
} }
...@@ -1279,7 +1282,10 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran ...@@ -1279,7 +1282,10 @@ int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STran
STraceId* trace = &pReq->info.traceId; STraceId* trace = &pReq->info.traceId;
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
ASSERT(transAsyncSend(pThrd->asyncPool, &(cliMsg->q)) == 0); if (0 != transAsyncSend(pThrd->asyncPool, &(cliMsg->q))) {
destroyCmsg(cliMsg);
return -1;
}
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return 0; return 0;
} }
...@@ -1323,7 +1329,10 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs ...@@ -1323,7 +1329,10 @@ int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMs
tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid, tGDebug("%s send request at thread:%08" PRId64 ", dst:%s:%d, app:%p", transLabel(pTransInst), pThrd->pid,
EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle); EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet), pReq->info.ahandle);
transAsyncSend(pThrd->asyncPool, &(cliMsg->q)); if (0 != transAsyncSend(pThrd->asyncPool, &cliMsg->q)) {
destroyCmsg(cliMsg);
return -1;
}
tsem_wait(sem); tsem_wait(sem);
tsem_destroy(sem); tsem_destroy(sem);
taosMemoryFree(sem); taosMemoryFree(sem);
...@@ -1358,7 +1367,10 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) { ...@@ -1358,7 +1367,10 @@ int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn) {
SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i]; SCliThrd* thrd = ((SCliObj*)pTransInst->tcphandle)->pThreadObj[i];
tDebug("%s update epset at thread:%08" PRId64, pTransInst->label, thrd->pid); tDebug("%s update epset at thread:%08" PRId64, pTransInst->label, thrd->pid);
transAsyncSend(thrd->asyncPool, &(cliMsg->q)); if (transAsyncSend(thrd->asyncPool, &(cliMsg->q)) != 0) {
destroyCmsg(cliMsg);
return -1;
}
} }
transReleaseExHandle(transGetInstMgt(), (int64_t)shandle); transReleaseExHandle(transGetInstMgt(), (int64_t)shandle);
return 0; return 0;
......
...@@ -177,7 +177,6 @@ int transSetConnOption(uv_tcp_t* stream) { ...@@ -177,7 +177,6 @@ int transSetConnOption(uv_tcp_t* stream) {
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) { SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) {
SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool)); SAsyncPool* pool = taosMemoryCalloc(1, sizeof(SAsyncPool));
pool->index = 0;
pool->nAsync = sz; pool->nAsync = sz;
pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync); pool->asyncs = taosMemoryCalloc(1, sizeof(uv_async_t) * pool->nAsync);
...@@ -207,6 +206,9 @@ void transDestroyAsyncPool(SAsyncPool* pool) { ...@@ -207,6 +206,9 @@ void transDestroyAsyncPool(SAsyncPool* pool) {
taosMemoryFree(pool); taosMemoryFree(pool);
} }
int transAsyncSend(SAsyncPool* pool, queue* q) { int transAsyncSend(SAsyncPool* pool, queue* q) {
if (atomic_load_8(&pool->stop) == 1) {
return -1;
}
int idx = pool->index; int idx = pool->index;
idx = idx % pool->nAsync; idx = idx % pool->nAsync;
// no need mutex here // no need mutex here
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册