提交 1f9b6f14 编写于 作者: D dapan1121

fix mem leak

上级 a5e9b14d
...@@ -60,4 +60,5 @@ void dsScheduleProcess(void* ahandle, void* pItem) { ...@@ -60,4 +60,5 @@ void dsScheduleProcess(void* ahandle, void* pItem) {
void dsDestroyDataSinker(DataSinkHandle handle) { void dsDestroyDataSinker(DataSinkHandle handle) {
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
pHandleImpl->fDestroy(pHandleImpl); pHandleImpl->fDestroy(pHandleImpl);
taosMemoryFree(pHandleImpl);
} }
...@@ -2826,6 +2826,7 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray ...@@ -2826,6 +2826,7 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray
} }
if (p->info.colId == pmInfo->colId) { if (p->info.colId == pmInfo->colId) {
colDataDestroy(taosArrayGet(pBlock->pDataBlock, pmInfo->targetSlotId));
taosArraySet(pBlock->pDataBlock, pmInfo->targetSlotId, p); taosArraySet(pBlock->pDataBlock, pmInfo->targetSlotId, p);
i++; i++;
j++; j++;
...@@ -4687,7 +4688,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT ...@@ -4687,7 +4688,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
char* p = taosMemoryCalloc(1, 128); char* p = taosMemoryCalloc(1, 128);
snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId); snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
pTaskInfo->id.str = strdup(p); pTaskInfo->id.str = p;
return pTaskInfo; return pTaskInfo;
} }
...@@ -5300,6 +5301,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { ...@@ -5300,6 +5301,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
// taosArrayDestroy(pTaskInfo->summary.queryProfEvents); // taosArrayDestroy(pTaskInfo->summary.queryProfEvents);
// taosHashCleanup(pTaskInfo->summary.operatorProfResults); // taosHashCleanup(pTaskInfo->summary.operatorProfResults);
destroyOperatorInfo(pTaskInfo->pRoot);
taosMemoryFreeClear(pTaskInfo->sql); taosMemoryFreeClear(pTaskInfo->sql);
taosMemoryFreeClear(pTaskInfo->id.str); taosMemoryFreeClear(pTaskInfo->id.str);
taosMemoryFreeClear(pTaskInfo); taosMemoryFreeClear(pTaskInfo);
......
...@@ -25,6 +25,7 @@ extern "C" { ...@@ -25,6 +25,7 @@ extern "C" {
#include "tlockfree.h" #include "tlockfree.h"
#include "ttimer.h" #include "ttimer.h"
#include "tref.h" #include "tref.h"
#include "plannodes.h"
#define QW_DEFAULT_SCHEDULER_NUMBER 10000 #define QW_DEFAULT_SCHEDULER_NUMBER 10000
#define QW_DEFAULT_TASK_NUMBER 10000 #define QW_DEFAULT_TASK_NUMBER 10000
...@@ -133,6 +134,7 @@ typedef struct SQWTaskCtx { ...@@ -133,6 +134,7 @@ typedef struct SQWTaskCtx {
void *taskHandle; void *taskHandle;
void *sinkHandle; void *sinkHandle;
SSubplan *plan;
} SQWTaskCtx; } SQWTaskCtx;
typedef struct SQWSchStatus { typedef struct SQWSchStatus {
......
...@@ -424,6 +424,11 @@ void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { ...@@ -424,6 +424,11 @@ void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
dsDestroyDataSinker(ctx->sinkHandle); dsDestroyDataSinker(ctx->sinkHandle);
ctx->sinkHandle = NULL; ctx->sinkHandle = NULL;
} }
if (ctx->plan) {
nodesDestroyNode(ctx->plan);
ctx->plan = NULL;
}
} }
int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
...@@ -440,6 +445,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { ...@@ -440,6 +445,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
atomic_store_ptr(&ctx->taskHandle, NULL); atomic_store_ptr(&ctx->taskHandle, NULL);
atomic_store_ptr(&ctx->sinkHandle, NULL); atomic_store_ptr(&ctx->sinkHandle, NULL);
atomic_store_ptr(&ctx->plan, NULL);
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP);
...@@ -922,7 +928,7 @@ _return: ...@@ -922,7 +928,7 @@ _return:
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) { int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) {
int32_t code = 0; int32_t code = 0;
bool queryRsped = false; bool queryRsped = false;
struct SSubplan *plan = NULL; SSubplan* plan = NULL;
SQWPhaseInput input = {0}; SQWPhaseInput input = {0};
qTaskInfo_t pTaskInfo = NULL; qTaskInfo_t pTaskInfo = NULL;
DataSinkHandle sinkHandle = NULL; DataSinkHandle sinkHandle = NULL;
...@@ -950,6 +956,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex ...@@ -950,6 +956,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }
ctx->plan = plan;
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH); code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
if (code) { if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
...@@ -1428,6 +1436,10 @@ void qwCloseRef(void) { ...@@ -1428,6 +1436,10 @@ void qwCloseRef(void) {
taosWUnLockLatch(&gQwMgmt.lock); taosWUnLockLatch(&gQwMgmt.lock);
} }
void qwDestroySchStatus(SQWSchStatus *pStatus) {
taosHashCleanup(pStatus->tasksHash);
}
void qwDestroyImpl(void *pMgmt) { void qwDestroyImpl(void *pMgmt) {
SQWorker *mgmt = (SQWorker *)pMgmt; SQWorker *mgmt = (SQWorker *)pMgmt;
...@@ -1439,6 +1451,13 @@ void qwDestroyImpl(void *pMgmt) { ...@@ -1439,6 +1451,13 @@ void qwDestroyImpl(void *pMgmt) {
// TODO FREE ALL // TODO FREE ALL
taosHashCleanup(mgmt->ctxHash); taosHashCleanup(mgmt->ctxHash);
void *pIter = taosHashIterate(mgmt->schHash, NULL);
while (pIter) {
SQWSchStatus *sch = (SQWSchStatus *)pIter;
qwDestroySchStatus(sch);
pIter = taosHashIterate(mgmt->schHash, pIter);
}
taosHashCleanup(mgmt->schHash); taosHashCleanup(mgmt->schHash);
taosMemoryFree(mgmt); taosMemoryFree(mgmt);
......
...@@ -786,6 +786,10 @@ static void uvDestroyConn(uv_handle_t* handle) { ...@@ -786,6 +786,10 @@ static void uvDestroyConn(uv_handle_t* handle) {
transQueueDestroy(&conn->srvMsgs); transQueueDestroy(&conn->srvMsgs);
QUEUE_REMOVE(&conn->queue); QUEUE_REMOVE(&conn->queue);
taosMemoryFree(conn->pTcp); taosMemoryFree(conn->pTcp);
if (conn->regArg.init == 1) {
transFreeMsg(conn->regArg.msg.pCont);
conn->regArg.init = 0;
}
taosMemoryFree(conn); taosMemoryFree(conn);
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册