提交 dac39371 编写于 作者: D dapan1121

fix: fix drop task memory leak

上级 976d4670
...@@ -88,7 +88,7 @@ void closeTransporter(SAppInstInfo *pAppInfo) { ...@@ -88,7 +88,7 @@ void closeTransporter(SAppInstInfo *pAppInfo) {
static bool clientRpcRfp(int32_t code, tmsg_t msgType) { static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
if (NEED_REDIRECT_ERROR(code)) { if (NEED_REDIRECT_ERROR(code)) {
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT) { msgType == TDMT_SCH_MERGE_FETCH || msgType == TDMT_SCH_QUERY_HEARTBEAT || msgType == TDMT_SCH_DROP_TASK) {
return false; return false;
} }
return true; return true;
......
...@@ -102,14 +102,14 @@ int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) { ...@@ -102,14 +102,14 @@ int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) {
} }
int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId) { int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId) {
SSchNodeInfo nodeInfo = {.addr = *addr, .handle = NULL}; SSchNodeInfo nodeInfo = {.addr = *addr, .handle = SCH_GET_TASK_HANDLE(pTask)};
if (taosHashPut(pTask->execNodes, &execId, sizeof(execId), &nodeInfo, sizeof(nodeInfo))) { if (taosHashPut(pTask->execNodes, &execId, sizeof(execId), &nodeInfo, sizeof(nodeInfo))) {
SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", errno); SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SCH_TASK_DLOG("task execNode added, execId:%d", execId); SCH_TASK_DLOG("task execNode added, execId:%d, handle:%p", execId, nodeInfo.handle);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -752,12 +752,18 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) { ...@@ -752,12 +752,18 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
return; return;
} }
int32_t i = 0;
SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL); SSchNodeInfo *nodeInfo = taosHashIterate(pTask->execNodes, NULL);
while (nodeInfo) { while (nodeInfo) {
if (nodeInfo->handle) {
SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle); SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK); schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK);
SCH_TASK_DLOG("start to drop task's %dth execNode", i);
} else {
SCH_TASK_DLOG("no need to drop task %dth execNode", i);
}
++i;
nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo); nodeInfo = taosHashIterate(pTask->execNodes, nodeInfo);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册