提交 de8bb6c2 编写于 作者: dengyihao's avatar dengyihao

fix: avoid rpc mem leak

上级 ed1c777b
...@@ -232,6 +232,7 @@ typedef struct { ...@@ -232,6 +232,7 @@ typedef struct {
SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb); SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb);
void transDestroyAsyncPool(SAsyncPool* pool); void transDestroyAsyncPool(SAsyncPool* pool);
int transAsyncSend(SAsyncPool* pool, queue* mq); int transAsyncSend(SAsyncPool* pool, queue* mq);
bool transAsyncPoolIsEmpty(SAsyncPool* pool);
#define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc) \ #define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc) \
do { \ do { \
......
...@@ -70,6 +70,8 @@ typedef struct SCliThrd { ...@@ -70,6 +70,8 @@ typedef struct SCliThrd {
SCvtAddr cvtAddr; SCvtAddr cvtAddr;
SCliMsg* stopMsg;
bool quit; bool quit;
} SCliThrd; } SCliThrd;
...@@ -761,14 +763,17 @@ void cliConnCb(uv_connect_t* req, int status) { ...@@ -761,14 +763,17 @@ void cliConnCb(uv_connect_t* req, int status) {
} }
static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) { static void cliHandleQuit(SCliMsg* pMsg, SCliThrd* pThrd) {
if (!transAsyncPoolIsEmpty(pThrd->asyncPool)) {
pThrd->stopMsg = pMsg;
return;
}
pThrd->stopMsg = NULL;
pThrd->quit = true; pThrd->quit = true;
tDebug("cli work thread %p start to quit", pThrd); tDebug("cli work thread %p start to quit", pThrd);
destroyCmsg(pMsg); destroyCmsg(pMsg);
destroyConnPool(pThrd->pool); destroyConnPool(pThrd->pool);
uv_timer_stop(&pThrd->timer); uv_timer_stop(&pThrd->timer);
uv_walk(pThrd->loop, cliWalkCb, NULL); uv_walk(pThrd->loop, cliWalkCb, NULL);
// uv_stop(pThrd->loop);
} }
static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) { static void cliHandleRelease(SCliMsg* pMsg, SCliThrd* pThrd) {
int64_t refId = (int64_t)(pMsg->msg.info.handle); int64_t refId = (int64_t)(pMsg->msg.info.handle);
...@@ -925,6 +930,7 @@ static void cliAsyncCb(uv_async_t* handle) { ...@@ -925,6 +930,7 @@ static void cliAsyncCb(uv_async_t* handle) {
if (count >= 2) { if (count >= 2) {
tTrace("cli process batch size:%d", count); tTrace("cli process batch size:%d", count);
} }
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
} }
static void* cliWorkThread(void* arg) { static void* cliWorkThread(void* arg) {
......
...@@ -228,6 +228,14 @@ int transAsyncSend(SAsyncPool* pool, queue* q) { ...@@ -228,6 +228,14 @@ int transAsyncSend(SAsyncPool* pool, queue* q) {
} }
return uv_async_send(async); return uv_async_send(async);
} }
bool transAsyncPoolIsEmpty(SAsyncPool* pool) {
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
if (!QUEUE_IS_EMPTY(&item->qmsg)) return false;
}
return true;
}
void transCtxInit(STransCtx* ctx) { void transCtxInit(STransCtx* ctx) {
// init transCtx // init transCtx
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册