diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 997397314fc253d55530d9729c3961cc1357950b..64206fc10aac0ab9835d65333322657a0ccaecbf 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -60,4 +60,5 @@ void dsScheduleProcess(void* ahandle, void* pItem) { void dsDestroyDataSinker(DataSinkHandle handle) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; pHandleImpl->fDestroy(pHandleImpl); + taosMemoryFree(pHandleImpl); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a237eb0e7d886e1634be068f883d574f7f93a989..10d5e717bd4c25af10d4d7f668a5fe21930ba024 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2826,6 +2826,7 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray } if (p->info.colId == pmInfo->colId) { + colDataDestroy(taosArrayGet(pBlock->pDataBlock, pmInfo->targetSlotId)); taosArraySet(pBlock->pDataBlock, pmInfo->targetSlotId, p); i++; j++; @@ -4687,7 +4688,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT char* p = taosMemoryCalloc(1, 128); snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId); - pTaskInfo->id.str = strdup(p); + pTaskInfo->id.str = p; return pTaskInfo; } @@ -5300,6 +5301,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { // taosArrayDestroy(pTaskInfo->summary.queryProfEvents); // taosHashCleanup(pTaskInfo->summary.operatorProfResults); + destroyOperatorInfo(pTaskInfo->pRoot); taosMemoryFreeClear(pTaskInfo->sql); taosMemoryFreeClear(pTaskInfo->id.str); taosMemoryFreeClear(pTaskInfo); diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index ca471262ff747b2466159a93571ac77120b3b9ac..3ecf9878119be9891e74e56e4d59afc23bb99b34 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -25,6 +25,7 @@ extern "C" { #include "tlockfree.h" #include "ttimer.h" #include "tref.h" +#include "plannodes.h" #define QW_DEFAULT_SCHEDULER_NUMBER 10000 #define QW_DEFAULT_TASK_NUMBER 10000 @@ -131,8 +132,9 @@ typedef struct SQWTaskCtx { int8_t events[QW_EVENT_MAX]; - void *taskHandle; - void *sinkHandle; + void *taskHandle; + void *sinkHandle; + SSubplan *plan; } SQWTaskCtx; typedef struct SQWSchStatus { diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index bd7ee6321aa067357d36d6739ea252ce9e00fdc1..717958c033c491304ed9c7ceba909a49b1f03e3f 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -424,6 +424,11 @@ void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { dsDestroyDataSinker(ctx->sinkHandle); ctx->sinkHandle = NULL; } + + if (ctx->plan) { + nodesDestroyNode(ctx->plan); + ctx->plan = NULL; + } } int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { @@ -440,6 +445,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { atomic_store_ptr(&ctx->taskHandle, NULL); atomic_store_ptr(&ctx->sinkHandle, NULL); + atomic_store_ptr(&ctx->plan, NULL); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_DROP); @@ -922,7 +928,7 @@ _return: int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) { int32_t code = 0; bool queryRsped = false; - struct SSubplan *plan = NULL; + SSubplan* plan = NULL; SQWPhaseInput input = {0}; qTaskInfo_t pTaskInfo = NULL; DataSinkHandle sinkHandle = NULL; @@ -950,6 +956,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex QW_ERR_JRET(code); } + ctx->plan = plan; + code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); @@ -1428,6 +1436,10 @@ void qwCloseRef(void) { taosWUnLockLatch(&gQwMgmt.lock); } +void qwDestroySchStatus(SQWSchStatus *pStatus) { + taosHashCleanup(pStatus->tasksHash); +} + void qwDestroyImpl(void *pMgmt) { SQWorker *mgmt = (SQWorker *)pMgmt; @@ -1439,6 +1451,13 @@ void qwDestroyImpl(void *pMgmt) { // TODO FREE ALL taosHashCleanup(mgmt->ctxHash); + + void *pIter = taosHashIterate(mgmt->schHash, NULL); + while (pIter) { + SQWSchStatus *sch = (SQWSchStatus *)pIter; + qwDestroySchStatus(sch); + pIter = taosHashIterate(mgmt->schHash, pIter); + } taosHashCleanup(mgmt->schHash); taosMemoryFree(mgmt); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index ad3f5202106805084dcf3eaa88418cc55ee6b2d3..c0d8ca44eea9b6a4f048f2a99f1b751c029eb6b7 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -786,6 +786,10 @@ static void uvDestroyConn(uv_handle_t* handle) { transQueueDestroy(&conn->srvMsgs); QUEUE_REMOVE(&conn->queue); taosMemoryFree(conn->pTcp); + if (conn->regArg.init == 1) { + transFreeMsg(conn->regArg.msg.pCont); + conn->regArg.init = 0; + } taosMemoryFree(conn); if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {