未验证 提交 239be31a 编写于 作者: C caishunfeng 提交者: GitHub

[Bug] cancel application when kill task (#9624)

* cancel application when kill task

* add warn log

* add cancel application
上级 ae849003
...@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.processor; ...@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils; import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils; import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants; import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
...@@ -27,12 +28,12 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; ...@@ -27,12 +28,12 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand; import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel; import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.Pair; import org.apache.dolphinscheduler.remote.utils.Pair;
import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.log.LogClientService;
...@@ -93,8 +94,9 @@ public class TaskKillProcessor implements NettyRequestProcessor { ...@@ -93,8 +94,9 @@ public class TaskKillProcessor implements NettyRequestProcessor {
return; return;
} }
Integer processId = taskExecutionContext.getProcessId(); int processId = taskExecutionContext.getProcessId();
if (processId.equals(0)) { if (processId == 0) {
this.cancelApplication(taskInstanceId);
workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId); workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId); logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
...@@ -121,22 +123,8 @@ public class TaskKillProcessor implements NettyRequestProcessor { ...@@ -121,22 +123,8 @@ public class TaskKillProcessor implements NettyRequestProcessor {
* @return kill result * @return kill result
*/ */
private Pair<Boolean, List<String>> doKill(TaskExecutionContext taskExecutionContext) { private Pair<Boolean, List<String>> doKill(TaskExecutionContext taskExecutionContext) {
boolean processFlag = true; // kill system process
List<String> appIds = Collections.emptyList(); boolean processFlag = killProcess(taskExecutionContext.getTenantCode(), taskExecutionContext.getProcessId());
try {
String pidsStr = ProcessUtils.getPidsStr(taskExecutionContext.getProcessId());
if (!StringUtils.isEmpty(pidsStr)) {
String cmd = String.format("kill -9 %s", pidsStr);
cmd = OSUtils.getSudoCmd(taskExecutionContext.getTenantCode(), cmd);
logger.info("process id:{}, cmd:{}", taskExecutionContext.getProcessId(), cmd);
OSUtils.exeCmd(cmd);
}
} catch (Exception e) {
processFlag = false;
logger.error("kill task error", e);
}
// find log and kill yarn job // find log and kill yarn job
Pair<Boolean, List<String>> yarnResult = killYarnJob(Host.of(taskExecutionContext.getHost()), Pair<Boolean, List<String>> yarnResult = killYarnJob(Host.of(taskExecutionContext.getHost()),
taskExecutionContext.getLogPath(), taskExecutionContext.getLogPath(),
...@@ -146,27 +134,51 @@ public class TaskKillProcessor implements NettyRequestProcessor { ...@@ -146,27 +134,51 @@ public class TaskKillProcessor implements NettyRequestProcessor {
} }
/** /**
* build TaskKillResponseCommand * kill task by cancel application
* * @param taskInstanceId
* @param killCommand kill command
* @param result exe result
* @return build TaskKillResponseCommand
*/ */
private TaskKillResponseCommand buildKillTaskResponseCommand(TaskKillRequestCommand killCommand, protected void cancelApplication(int taskInstanceId) {
Pair<Boolean, List<String>> result) { TaskExecuteThread taskExecuteThread = workerManager.getTaskExecuteThread(taskInstanceId);
TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand(); if (taskExecuteThread == null) {
taskKillResponseCommand.setStatus(result.getLeft() ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode()); logger.warn("taskExecuteThread not found, taskInstanceId:{}", taskInstanceId);
taskKillResponseCommand.setAppIds(result.getRight()); return;
TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId()); }
if (taskExecutionContext == null) { AbstractTask task = taskExecuteThread.getTask();
return taskKillResponseCommand; if (task == null) {
logger.warn("task not found, taskInstanceId:{}", taskInstanceId);
return;
} }
if (taskExecutionContext != null) { try {
taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); task.cancelApplication(true);
taskKillResponseCommand.setHost(taskExecutionContext.getHost()); } catch (Exception e) {
taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId()); logger.error("kill task error", e);
}
logger.info("kill task by cancelApplication, task id:{}", taskInstanceId);
}
/**
* kill system process
* @param tenantCode
* @param processId
*/
protected boolean killProcess(String tenantCode, Integer processId) {
boolean processFlag = true;
if (processId == null || processId.equals(0)) {
return true;
}
try {
String pidsStr = ProcessUtils.getPidsStr(processId);
if (!StringUtils.isEmpty(pidsStr)) {
String cmd = String.format("kill -9 %s", pidsStr);
cmd = OSUtils.getSudoCmd(tenantCode, cmd);
logger.info("process id:{}, cmd:{}", processId, cmd);
OSUtils.exeCmd(cmd);
}
} catch (Exception e) {
processFlag = false;
logger.error("kill task error", e);
} }
return taskKillResponseCommand; return processFlag;
} }
/** /**
......
...@@ -330,6 +330,10 @@ public class TaskExecuteThread implements Runnable, Delayed { ...@@ -330,6 +330,10 @@ public class TaskExecuteThread implements Runnable, Delayed {
return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS)); return Long.compare(this.getDelay(TimeUnit.MILLISECONDS), o.getDelay(TimeUnit.MILLISECONDS));
} }
public AbstractTask getTask() {
return task;
}
private void preBuildBusinessParams() { private void preBuildBusinessParams() {
Map<String, Property> paramsMap = new HashMap<>(); Map<String, Property> paramsMap = new HashMap<>();
// replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job // replace variable TIME with $[YYYYmmddd...] in shell file when history run job and batch complement job
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
public class WorkerExecService {
/**
* logger of WorkerExecService
*/
private static final Logger logger = LoggerFactory.getLogger(WorkerExecService.class);
private final ListeningExecutorService listeningExecutorService;
/**
* thread executor service
*/
private final ExecutorService execService;
/**
* running task
*/
private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap;
public WorkerExecService(ExecutorService execService, ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap) {
this.execService = execService;
this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService);
this.taskExecuteThreadMap = taskExecuteThreadMap;
}
public void submit(TaskExecuteThread taskExecuteThread) {
taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread);
ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread);
FutureCallback futureCallback = new FutureCallback() {
@Override
public void onSuccess(Object o) {
taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
}
@Override
public void onFailure(Throwable throwable) {
logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}", taskExecuteThread.getTaskExecutionContext().getProcessInstanceId()
, taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), throwable);
taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
}
};
Futures.addCallback(future, futureCallback, this.listeningExecutorService);
}
/**
* get thread pool queue size
*
* @return queue size
*/
public int getThreadPoolQueueSize() {
return ((ThreadPoolExecutor) this.execService).getQueue().size();
}
}
\ No newline at end of file
...@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus; ...@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService; import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue; import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
...@@ -54,7 +55,7 @@ public class WorkerManagerThread implements Runnable { ...@@ -54,7 +55,7 @@ public class WorkerManagerThread implements Runnable {
/** /**
* thread executor service * thread executor service
*/ */
private final ExecutorService workerExecService; private final WorkerExecService workerExecService;
/** /**
* task callback service * task callback service
...@@ -62,8 +63,20 @@ public class WorkerManagerThread implements Runnable { ...@@ -62,8 +63,20 @@ public class WorkerManagerThread implements Runnable {
@Autowired @Autowired
private TaskCallbackService taskCallbackService; private TaskCallbackService taskCallbackService;
/**
* running task
*/
private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap = new ConcurrentHashMap<>();
public WorkerManagerThread(WorkerConfig workerConfig) { public WorkerManagerThread(WorkerConfig workerConfig) {
workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()); workerExecService = new WorkerExecService(
ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getExecThreads()),
taskExecuteThreadMap
);
}
public TaskExecuteThread getTaskExecuteThread(Integer taskInstanceId) {
return this.taskExecuteThreadMap.get(taskInstanceId);
} }
/** /**
...@@ -81,7 +94,7 @@ public class WorkerManagerThread implements Runnable { ...@@ -81,7 +94,7 @@ public class WorkerManagerThread implements Runnable {
* @return queue size * @return queue size
*/ */
public int getThreadPoolQueueSize() { public int getThreadPoolQueueSize() {
return ((ThreadPoolExecutor) workerExecService).getQueue().size(); return this.workerExecService.getThreadPoolQueueSize();
} }
/** /**
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册