提交 293a96d9 编写于 作者: dengyihao's avatar dengyihao

fix mem leak

上级 5ed23e79
...@@ -582,8 +582,11 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou ...@@ -582,8 +582,11 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou
// add impl later // add impl later
if (node->condType == LOGIC_COND_TYPE_AND) { if (node->condType == LOGIC_COND_TYPE_AND) {
taosArrayAddAll(output->result, params[m].result); taosArrayAddAll(output->result, params[m].result);
taosArrayDestroy(params[m].result);
params[m].result = NULL;
} else if (node->condType == LOGIC_COND_TYPE_OR) { } else if (node->condType == LOGIC_COND_TYPE_OR) {
taosArrayAddAll(output->result, params[m].result); taosArrayAddAll(output->result, params[m].result);
params[m].result = NULL;
} else if (node->condType == LOGIC_COND_TYPE_NOT) { } else if (node->condType == LOGIC_COND_TYPE_NOT) {
// taosArrayAddAll(output->result, params[m].result); // taosArrayAddAll(output->result, params[m].result);
} }
...@@ -593,6 +596,8 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou ...@@ -593,6 +596,8 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou
} else { } else {
for (int32_t m = 0; m < node->pParameterList->length; m++) { for (int32_t m = 0; m < node->pParameterList->length; m++) {
output->status = sifMergeCond(node->condType, output->status, params[m].status); output->status = sifMergeCond(node->condType, output->status, params[m].status);
taosArrayDestroy(params[m].result);
params[m].result = NULL;
} }
} }
_return: _return:
...@@ -607,6 +612,7 @@ static EDealRes sifWalkFunction(SNode *pNode, void *context) { ...@@ -607,6 +612,7 @@ static EDealRes sifWalkFunction(SNode *pNode, void *context) {
SIFCtx *ctx = context; SIFCtx *ctx = context;
ctx->code = sifExecFunction(node, ctx, &output); ctx->code = sifExecFunction(node, ctx, &output);
if (ctx->code != TSDB_CODE_SUCCESS) { if (ctx->code != TSDB_CODE_SUCCESS) {
sifFreeParam(&output);
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
...@@ -624,6 +630,7 @@ static EDealRes sifWalkLogic(SNode *pNode, void *context) { ...@@ -624,6 +630,7 @@ static EDealRes sifWalkLogic(SNode *pNode, void *context) {
SIFCtx *ctx = context; SIFCtx *ctx = context;
ctx->code = sifExecLogic(node, ctx, &output); ctx->code = sifExecLogic(node, ctx, &output);
if (ctx->code) { if (ctx->code) {
sifFreeParam(&output);
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
...@@ -640,6 +647,7 @@ static EDealRes sifWalkOper(SNode *pNode, void *context) { ...@@ -640,6 +647,7 @@ static EDealRes sifWalkOper(SNode *pNode, void *context) {
SIFCtx *ctx = context; SIFCtx *ctx = context;
ctx->code = sifExecOper(node, ctx, &output); ctx->code = sifExecOper(node, ctx, &output);
if (ctx->code) { if (ctx->code) {
sifFreeParam(&output);
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) { if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
...@@ -698,7 +706,11 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) { ...@@ -698,7 +706,11 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
} }
nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx); nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx);
SIF_ERR_RET(ctx.code);
if (ctx.code != 0) {
sifFreeRes(ctx.pRes);
return ctx.code;
}
if (pDst) { if (pDst) {
SIFParam *res = (SIFParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES); SIFParam *res = (SIFParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES);
...@@ -714,8 +726,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) { ...@@ -714,8 +726,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES); taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
} }
sifFreeRes(ctx.pRes); sifFreeRes(ctx.pRes);
return code;
SIF_RET(code);
} }
static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) { static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) {
...@@ -732,8 +743,10 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) { ...@@ -732,8 +743,10 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) {
} }
nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx); nodesWalkExprPostOrder(pNode, sifCalcWalker, &ctx);
if (ctx.code != 0) {
SIF_ERR_RET(ctx.code); sifFreeRes(ctx.pRes);
return ctx.code;
}
SIFParam *res = (SIFParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES); SIFParam *res = (SIFParam *)taosHashGet(ctx.pRes, (void *)&pNode, POINTER_BYTES);
if (res == NULL) { if (res == NULL) {
...@@ -745,8 +758,7 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) { ...@@ -745,8 +758,7 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status) {
sifFreeParam(res); sifFreeParam(res);
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES); taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
taosHashCleanup(ctx.pRes); taosHashCleanup(ctx.pRes);
return code;
SIF_RET(code);
} }
int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, SIdxFltStatus *status) { int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, SIdxFltStatus *status) {
...@@ -760,7 +772,11 @@ int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, ...@@ -760,7 +772,11 @@ int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result,
SArray *output = taosArrayInit(8, sizeof(uint64_t)); SArray *output = taosArrayInit(8, sizeof(uint64_t));
SIFParam param = {.arg = *metaArg, .result = output}; SIFParam param = {.arg = *metaArg, .result = output};
SIF_ERR_RET(sifCalculate((SNode *)pFilterNode, &param)); int32_t code = sifCalculate((SNode *)pFilterNode, &param);
if (code != 0) {
sifFreeParam(&param);
return code;
}
taosArrayAddAll(result, param.result); taosArrayAddAll(result, param.result);
sifFreeParam(&param); sifFreeParam(&param);
......
/** Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com> /** Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* *
* This program is free software: you can use, redistribute, and/or modify * This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * it under the terms of the GNU Affero General Public License, version 3
...@@ -61,6 +60,7 @@ typedef struct SCliThrd { ...@@ -61,6 +60,7 @@ typedef struct SCliThrd {
uv_loop_t* loop; uv_loop_t* loop;
SAsyncPool* asyncPool; SAsyncPool* asyncPool;
uv_idle_t* idle; uv_idle_t* idle;
uv_prepare_t* prepare;
uv_timer_t timer; uv_timer_t timer;
void* pool; // conn pool void* pool; // conn pool
...@@ -118,6 +118,7 @@ static void cliSendCb(uv_write_t* req, int status); ...@@ -118,6 +118,7 @@ static void cliSendCb(uv_write_t* req, int status);
static void cliConnCb(uv_connect_t* req, int status); static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle); static void cliAsyncCb(uv_async_t* handle);
static void cliIdleCb(uv_idle_t* handle); static void cliIdleCb(uv_idle_t* handle);
static void cliPrepareCb(uv_prepare_t* handle);
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg); static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
...@@ -198,7 +199,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { ...@@ -198,7 +199,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
pThrd = (SCliThrd*)(exh)->pThrd; \ pThrd = (SCliThrd*)(exh)->pThrd; \
} \ } \
} while (0) } while (0)
#define CONN_PERSIST_TIME(para) ((para) == 0 ? 3 * 1000 : (para)) #define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_SHOULD_RELEASE(conn, head) \ #define CONN_SHOULD_RELEASE(conn, head) \
...@@ -967,6 +968,62 @@ static void cliAsyncCb(uv_async_t* handle) { ...@@ -967,6 +968,62 @@ static void cliAsyncCb(uv_async_t* handle) {
static void cliIdleCb(uv_idle_t* handle) { static void cliIdleCb(uv_idle_t* handle) {
SCliThrd* thrd = handle->data; SCliThrd* thrd = handle->data;
tTrace("do idle work"); tTrace("do idle work");
SAsyncPool* pool = thrd->asyncPool;
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
queue wq;
taosThreadMutexLock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq);
taosThreadMutexUnlock(&item->mtx);
int count = 0;
while (!QUEUE_IS_EMPTY(&wq)) {
queue* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (pMsg == NULL) {
continue;
}
(*cliAsyncHandle[pMsg->type])(pMsg, thrd);
count++;
}
}
tTrace("prepare work end");
if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
}
static void cliPrepareCb(uv_prepare_t* handle) {
SCliThrd* thrd = handle->data;
tTrace("prepare work start");
SAsyncPool* pool = thrd->asyncPool;
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
queue wq;
taosThreadMutexLock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq);
taosThreadMutexUnlock(&item->mtx);
int count = 0;
while (!QUEUE_IS_EMPTY(&wq)) {
queue* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
if (pMsg == NULL) {
continue;
}
(*cliAsyncHandle[pMsg->type])(pMsg, thrd);
count++;
}
}
tTrace("prepare work end");
if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
} }
static void* cliWorkThread(void* arg) { static void* cliWorkThread(void* arg) {
...@@ -1035,6 +1092,11 @@ static SCliThrd* createThrdObj() { ...@@ -1035,6 +1092,11 @@ static SCliThrd* createThrdObj() {
// pThrd->idle->data = pThrd; // pThrd->idle->data = pThrd;
// uv_idle_start(pThrd->idle, cliIdleCb); // uv_idle_start(pThrd->idle, cliIdleCb);
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
uv_prepare_init(pThrd->loop, pThrd->prepare);
pThrd->prepare->data = pThrd;
uv_prepare_start(pThrd->prepare, cliPrepareCb);
pThrd->pool = createConnPool(4); pThrd->pool = createConnPool(4);
transDQCreate(pThrd->loop, &pThrd->delayQueue); transDQCreate(pThrd->loop, &pThrd->delayQueue);
...@@ -1058,6 +1120,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { ...@@ -1058,6 +1120,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
transDQDestroy(pThrd->timeoutQueue, NULL); transDQDestroy(pThrd->timeoutQueue, NULL);
taosMemoryFree(pThrd->idle); taosMemoryFree(pThrd->idle);
taosMemoryFree(pThrd->prepare);
taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd); taosMemoryFree(pThrd);
} }
......
...@@ -73,6 +73,7 @@ typedef struct SWorkThrd { ...@@ -73,6 +73,7 @@ typedef struct SWorkThrd {
uv_os_fd_t fd; uv_os_fd_t fd;
uv_loop_t* loop; uv_loop_t* loop;
SAsyncPool* asyncPool; SAsyncPool* asyncPool;
uv_prepare_t* prepare;
queue msg; queue msg;
TdThreadMutex msgMtx; TdThreadMutex msgMtx;
...@@ -112,6 +113,7 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) ...@@ -112,6 +113,7 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf)
static void uvWorkerAsyncCb(uv_async_t* handle); static void uvWorkerAsyncCb(uv_async_t* handle);
static void uvAcceptAsyncCb(uv_async_t* handle); static void uvAcceptAsyncCb(uv_async_t* handle);
static void uvShutDownCb(uv_shutdown_t* req, int status); static void uvShutDownCb(uv_shutdown_t* req, int status);
static void uvPrepareCb(uv_prepare_t* handle);
/* /*
* time-consuming task throwed into BG work thread * time-consuming task throwed into BG work thread
...@@ -546,6 +548,52 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) { ...@@ -546,6 +548,52 @@ static void uvShutDownCb(uv_shutdown_t* req, int status) {
uv_close((uv_handle_t*)req->handle, uvDestroyConn); uv_close((uv_handle_t*)req->handle, uvDestroyConn);
taosMemoryFree(req); taosMemoryFree(req);
} }
static void uvPrepareCb(uv_prepare_t* handle) {
// prepare callback
SWorkThrd* pThrd = handle->data;
SAsyncPool* pool = pThrd->asyncPool;
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
queue wq;
taosThreadMutexLock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq);
taosThreadMutexUnlock(&item->mtx);
while (!QUEUE_IS_EMPTY(&wq)) {
queue* head = QUEUE_HEAD(&wq);
QUEUE_REMOVE(head);
SSvrMsg* msg = QUEUE_DATA(head, SSvrMsg, q);
if (msg == NULL) {
tError("unexcept occurred, continue");
continue;
}
// release handle to rpc init
if (msg->type == Quit) {
(*transAsyncHandle[msg->type])(msg, pThrd);
continue;
} else {
STransMsg transMsg = msg->msg;
SExHandle* exh1 = transMsg.info.handle;
int64_t refId = transMsg.info.refId;
SExHandle* exh2 = transAcquireExHandle(transGetRefMgt(), refId);
if (exh2 == NULL || exh1 != exh2) {
tTrace("handle except msg %p, ignore it", exh1);
transReleaseExHandle(transGetRefMgt(), refId);
destroySmsg(msg);
continue;
}
msg->pConn = exh1->handle;
transReleaseExHandle(transGetRefMgt(), refId);
(*transAsyncHandle[msg->type])(msg, pThrd);
}
}
}
}
static void uvWorkDoTask(uv_work_t* req) { static void uvWorkDoTask(uv_work_t* req) {
// doing time-consumeing task // doing time-consumeing task
...@@ -695,13 +743,17 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { ...@@ -695,13 +743,17 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
} }
uv_pipe_init(pThrd->loop, pThrd->pipe, 1); uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
// int r = uv_pipe_open(pThrd->pipe, pThrd->fd);
pThrd->pipe->data = pThrd; pThrd->pipe->data = pThrd;
QUEUE_INIT(&pThrd->msg); QUEUE_INIT(&pThrd->msg);
taosThreadMutexInit(&pThrd->msgMtx, NULL); taosThreadMutexInit(&pThrd->msgMtx, NULL);
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
uv_prepare_init(pThrd->loop, pThrd->prepare);
uv_prepare_start(pThrd->prepare, uvPrepareCb);
pThrd->prepare->data = pThrd;
// conn set // conn set
QUEUE_INIT(&pThrd->conn); QUEUE_INIT(&pThrd->conn);
...@@ -986,6 +1038,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) { ...@@ -986,6 +1038,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) {
SRV_RELEASE_UV(pThrd->loop); SRV_RELEASE_UV(pThrd->loop);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg);
transAsyncPoolDestroy(pThrd->asyncPool); transAsyncPoolDestroy(pThrd->asyncPool);
taosMemoryFree(pThrd->prepare);
taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd); taosMemoryFree(pThrd);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册