From dac39371cd1130469f54cdeb561156366aec15db Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 21 Jul 2022 10:27:39 +0800 Subject: [PATCH] fix: fix drop task memory leak --- source/client/src/clientEnv.c | 2 +- source/libs/scheduler/src/schTask.c | 16 +++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 635d4bf2f9..5b96729503 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -88,7 +88,7 @@ void closeTransporter(SAppInstInfo *pAppInfo) { static bool clientRpcRfp(int32_t code, tmsg_t msgType) { if (NEED_REDIRECT_ERROR(code)) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || - msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT) { + msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT || msgType == TDMT_SCH_DROP_TASK) { return false; } return true; diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 282e81bb5d..c40e56ab6f 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -102,14 +102,14 @@ int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) { } int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId) { - SSchNodeInfo nodeInfo = {.addr = *addr, .handle = NULL}; + SSchNodeInfo nodeInfo = {.addr = *addr, .handle = SCH_GET_TASK_HANDLE(pTask)}; if (taosHashPut(pTask->execNodes, &execId, sizeof(execId), &nodeInfo, sizeof(nodeInfo))) { SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_TASK_DLOG("task execNode added, execId:%d", execId); + SCH_TASK_DLOG("task execNode added, execId:%d, handle:%p", execId, nodeInfo.handle); return TSDB_CODE_SUCCESS; } @@ -752,12 +752,18 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { return; } + int32_t i = 0; SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL); while (nodeInfo) { - SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle); - - schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK); + if (nodeInfo->handle) { + SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle); + schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK); + SCH_TASK_DLOG("start to drop task's %dth execNode", i); + } else { + SCH_TASK_DLOG("no need to drop task %dth execNode", i); + } + ++i; nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo); } -- GitLab