diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java index 9cec2766f162e9025841b12ca213d034c54aa72c..5adfe63d6372c63ed138aab25f1f3f91500ad7c6 100644 --- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java +++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/Event.java @@ -19,5 +19,6 @@ package org.apache.dolphinscheduler.common.enums; public enum Event { ACK, - RESULT; + RESULT, + ACTION_STOP; } diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java index 03ad4dd6940460d566d2be02e1242fd082a3fb21..98fbe0f8a55025838ae6ece43a63530262c1e666 100644 --- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java +++ b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskKillResponseCommand.java @@ -48,6 +48,11 @@ public class TaskKillResponseCommand implements Serializable { */ private int processId; + /** + * process instance id + */ + private int processInstanceId; + /** * other resource manager appId , for example : YARN etc */ @@ -85,6 +90,14 @@ public class TaskKillResponseCommand implements Serializable { this.processId = processId; } + public int getProcessInstanceId() { + return processInstanceId; + } + + public void setProcessInstanceId(int processInstanceId) { + this.processInstanceId = processInstanceId; + } + public List getAppIds() { return appIds; } @@ -114,6 +127,7 @@ public class TaskKillResponseCommand implements Serializable { + ", status=" + status + ", processId=" + processId + ", appIds=" + appIds + + ", processInstanceId=" + processInstanceId + '}'; } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java index a5d7cf4bfb59f4ed5683381223e110eb4b8852ff..a6642a1b5800eda5f493c742b2c9fbeeae1e0c8f 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/MasterServer.java @@ -138,11 +138,13 @@ public class MasterServer implements IStoppable { ackProcessor.init(processInstanceExecMaps); TaskResponseProcessor taskResponseProcessor = new TaskResponseProcessor(); taskResponseProcessor.init(processInstanceExecMaps); + TaskKillResponseProcessor taskKillResponseProcessor = new TaskKillResponseProcessor(); + taskKillResponseProcessor.init(processInstanceExecMaps); StateEventProcessor stateEventProcessor = new StateEventProcessor(); stateEventProcessor.init(processInstanceExecMaps); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor); this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, ackProcessor); - this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor()); + this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor); this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor); this.nettyRemotingServer.start(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java index 28f18fe9613acea677ad028b7b95d63fde076b59..36dde2982c3f18dd13037f7d4a7dd44bbfb4e6a6 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessor.java @@ -17,11 +17,18 @@ package org.apache.dolphinscheduler.server.master.processor; +import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import org.apache.dolphinscheduler.common.utils.JSONUtils; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand; import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; +import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; + +import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +44,19 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor { private final Logger logger = LoggerFactory.getLogger(TaskKillResponseProcessor.class); + /** + * process service + */ + private final TaskResponseService taskResponseService; + + public TaskKillResponseProcessor() { + this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class); + } + + public void init(ConcurrentHashMap processInstanceExecMaps) { + this.taskResponseService.init(processInstanceExecMaps); + } + /** * task final result response * need master process , state persistence @@ -50,6 +70,12 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor { TaskKillResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskKillResponseCommand.class); logger.info("received task kill response command : {}", responseCommand); + // TaskResponseEvent + TaskResponseEvent taskResponseEvent = TaskResponseEvent.newActionStop(ExecutionStatus.of(responseCommand.getStatus()), + responseCommand.getTaskInstanceId(), + responseCommand.getProcessInstanceId() + ); + taskResponseService.addResponse(taskResponseEvent); } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java index 224a61753da6cb5a12e8c4ddc86bd86e04b35e73..f2a080c60765df01cfe7cdd539ef5d1734c7aaf8 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java @@ -17,13 +17,13 @@ package org.apache.dolphinscheduler.server.master.processor.queue; -import com.fasterxml.jackson.annotation.JsonFormat; - import org.apache.dolphinscheduler.common.enums.Event; import org.apache.dolphinscheduler.common.enums.ExecutionStatus; import java.util.Date; +import com.fasterxml.jackson.annotation.JsonFormat; + import io.netty.channel.Channel; /** @@ -94,6 +94,17 @@ public class TaskResponseEvent { private Channel channel; private int processInstanceId; + + public static TaskResponseEvent newActionStop(ExecutionStatus state, + int taskInstanceId, + int processInstanceId) { + TaskResponseEvent event = new TaskResponseEvent(); + event.setState(state); + event.setTaskInstanceId(taskInstanceId); + event.setEvent(Event.ACTION_STOP); + event.setProcessInstanceId(processInstanceId); + return event; + } public static TaskResponseEvent newAck(ExecutionStatus state, Date startTime, diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java index 621dd79c74e9a1ec0c4a81ecba6490964486d760..ca8ad0ac931af5fe2c8b4618876f7e950972c5b3 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponsePersistThread.java @@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand; import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand; import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread; +import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor; +import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.concurrent.ConcurrentHashMap; @@ -146,6 +148,16 @@ public class TaskResponsePersistThread implements Runnable { channel.writeAndFlush(taskResponseCommand.convert2Command()); } break; + case ACTION_STOP: + WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(taskResponseEvent.getProcessInstanceId()); + if (workflowExecuteThread != null) { + ITaskProcessor taskProcessor = workflowExecuteThread.getActiveTaskProcessorMaps().get(taskResponseEvent.getTaskInstanceId()); + if (taskProcessor != null) { + taskProcessor.persist(TaskAction.STOP); + logger.debug("ACTION_STOP: task instance id:{}, process instance id:{}", taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getProcessInstanceId()); + } + } + break; default: throw new IllegalArgumentException("invalid event type : " + event); } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java index 5ef235089e7cdff693ecbed57beddb8637410123..b5e70eedc87abc67bbd72a986b34cedc91ac0bc5 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java @@ -115,10 +115,20 @@ public class TaskResponseService { try { this.taskResponseWorker.interrupt(); this.taskResponseEventHandler.interrupt(); - this.eventExecService.shutdown(); } catch (Exception e) { logger.error("stop error:", e); } + this.eventExecService.shutdown(); + long waitSec = 5; + boolean terminated = false; + try { + terminated = eventExecService.awaitTermination(waitSec, TimeUnit.SECONDS); + } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + } + if (!terminated) { + logger.warn("TaskResponseService: eventExecService shutdown without terminated: {}s, increase await time", waitSec); + } } /** diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecService.java new file mode 100644 index 0000000000000000000000000000000000000000..edb5e6611d98489d71a4750e5b4d7f5ed7cab8ac --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecService.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.runner; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +public class MasterExecService { + + /** + * logger of MasterExecService + */ + private static final Logger logger = LoggerFactory.getLogger(MasterExecService.class); + + /** + * master exec service + */ + private final ThreadPoolExecutor execService; + + private final ListeningExecutorService listeningExecutorService; + + /** + * start process failed map + */ + private final ConcurrentHashMap startProcessFailedMap; + + private final ConcurrentHashMap filterMap = new ConcurrentHashMap<>(); + + public MasterExecService(ConcurrentHashMap startProcessFailedMap,ThreadPoolExecutor execService) { + this.startProcessFailedMap = startProcessFailedMap; + this.execService = execService; + this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService); + } + + public void execute(WorkflowExecuteThread workflowExecuteThread) { + if (workflowExecuteThread == null + || workflowExecuteThread.getProcessInstance() == null + || workflowExecuteThread.isStart() + || filterMap.containsKey(workflowExecuteThread.getProcessInstance().getId())) { + return; + } + Integer processInstanceId = workflowExecuteThread.getProcessInstance().getId(); + filterMap.put(processInstanceId, workflowExecuteThread); + ListenableFuture future = this.listeningExecutorService.submit(workflowExecuteThread); + FutureCallback futureCallback = new FutureCallback() { + @Override + public void onSuccess(Object o) { + if (!workflowExecuteThread.isStart()) { + startProcessFailedMap.putIfAbsent(processInstanceId, workflowExecuteThread); + } else { + startProcessFailedMap.remove(processInstanceId); + } + filterMap.remove(processInstanceId); + } + + @Override + public void onFailure(Throwable throwable) { + logger.error("handle events {} failed", processInstanceId, throwable); + if (!workflowExecuteThread.isStart()) { + startProcessFailedMap.putIfAbsent(processInstanceId, workflowExecuteThread); + } else { + startProcessFailedMap.remove(processInstanceId); + } + filterMap.remove(processInstanceId); + } + }; + Futures.addCallback(future, futureCallback, this.listeningExecutorService); + } + + public void shutdown() { + this.execService.shutdown(); + } + + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return this.execService.awaitTermination(timeout, unit); + } +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java index 33c84b38e81c7715d17a299bb9e30c57ea0677a5..79cce4f05e45c79840f8c209cb3de2e628488761 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterSchedulerService.java @@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient; import org.apache.dolphinscheduler.remote.config.NettyClientConfig; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient; import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager; import org.apache.dolphinscheduler.service.alert.ProcessAlertManager; @@ -57,6 +58,12 @@ public class MasterSchedulerService extends Thread { */ private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class); + /** + * handle task event + */ + @Autowired + private TaskResponseService taskResponseService; + /** * dolphinscheduler database interface */ @@ -92,7 +99,12 @@ public class MasterSchedulerService extends Thread { /** * master exec service */ - private ThreadPoolExecutor masterExecService; + private MasterExecService masterExecService; + + /** + * start process failed map + */ + private final ConcurrentHashMap startProcessFailedMap = new ConcurrentHashMap<>(); /** * process instance execution list @@ -126,11 +138,15 @@ public class MasterSchedulerService extends Thread { */ public void init(ConcurrentHashMap processInstanceExecMaps) { this.processInstanceExecMaps = processInstanceExecMaps; - this.masterExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads()); + this.masterExecService = new MasterExecService(this.startProcessFailedMap, + (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads())); NettyClientConfig clientConfig = new NettyClientConfig(); this.nettyRemotingClient = new NettyRemotingClient(clientConfig); - stateWheelExecuteThread = new StateWheelExecuteThread(processService, + stateWheelExecuteThread = new StateWheelExecuteThread( + masterExecService, + processService, + startProcessFailedMap, processTimeoutCheckList, taskTimeoutCheckList, taskRetryCheckList, @@ -202,6 +218,7 @@ public class MasterSchedulerService extends Thread { if (processInstance != null) { WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread( processInstance + , taskResponseService , processService , nettyExecutorManager , processAlertManager diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java index de6db1ddb261c3d5b08a65bbd7347a9d3ac5925b..e12be0ce97905c66e4966b9a898505123bc49559 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/StateWheelExecuteThread.java @@ -49,15 +49,30 @@ public class StateWheelExecuteThread extends Thread { private ConcurrentHashMap taskInstanceRetryCheckList; private ConcurrentHashMap processInstanceExecMaps; + /** + * start process failed map + */ + private final ConcurrentHashMap startProcessFailedMap; + private int stateCheckIntervalSecs; - public StateWheelExecuteThread(ProcessService processService, - ConcurrentHashMap processInstanceTimeoutCheckList, - ConcurrentHashMap taskInstanceTimeoutCheckList, - ConcurrentHashMap taskInstanceRetryCheckList, - ConcurrentHashMap processInstanceExecMaps, - int stateCheckIntervalSecs) { + /** + * master exec service + */ + private MasterExecService masterExecService; + + public StateWheelExecuteThread( + MasterExecService masterExecService, + ProcessService processService, + ConcurrentHashMap startProcessFailedMap, + ConcurrentHashMap processInstanceTimeoutCheckList, + ConcurrentHashMap taskInstanceTimeoutCheckList, + ConcurrentHashMap taskInstanceRetryCheckList, + ConcurrentHashMap processInstanceExecMaps, + int stateCheckIntervalSecs) { + this.masterExecService = masterExecService; this.processService = processService; + this.startProcessFailedMap = startProcessFailedMap; this.processInstanceTimeoutCheckList = processInstanceTimeoutCheckList; this.taskInstanceTimeoutCheckList = taskInstanceTimeoutCheckList; this.taskInstanceRetryCheckList = taskInstanceRetryCheckList; @@ -71,6 +86,7 @@ public class StateWheelExecuteThread extends Thread { logger.info("state wheel thread start"); while (Stopper.isRunning()) { try { + check4StartProcessFailed(); checkTask4Timeout(); checkTask4Retry(); checkProcess4Timeout(); @@ -176,4 +192,12 @@ public class StateWheelExecuteThread extends Thread { workflowExecuteThread.addStateEvent(stateEvent); } + private void check4StartProcessFailed() { + if (startProcessFailedMap.isEmpty()) { + return; + } + for (WorkflowExecuteThread workflowExecuteThread : this.startProcessFailedMap.values()) { + masterExecService.execute(workflowExecuteThread); + } + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java index 92862ae61927c36d9858b83d5bcb524d345440ec..5988839a5cd45319f6d852028d38abad7b5bcfbf 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java @@ -57,6 +57,8 @@ import org.apache.dolphinscheduler.remote.command.HostUpdateCommand; import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor; import org.apache.dolphinscheduler.server.master.runner.task.TaskAction; import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory; @@ -103,6 +105,11 @@ public class WorkflowExecuteThread implements Runnable { */ private final Map activeTaskProcessorMaps = new ConcurrentHashMap<>(); + /** + * handle task event + */ + private TaskResponseService taskResponseService; + /** * process instance */ @@ -205,6 +212,7 @@ public class WorkflowExecuteThread implements Runnable { * @param nettyExecutorManager nettyExecutorManager */ public WorkflowExecuteThread(ProcessInstance processInstance + , TaskResponseService taskResponseService , ProcessService processService , NettyExecutorManager nettyExecutorManager , ProcessAlertManager processAlertManager @@ -212,7 +220,7 @@ public class WorkflowExecuteThread implements Runnable { , ConcurrentHashMap taskTimeoutCheckList , ConcurrentHashMap taskRetryCheckList) { this.processService = processService; - + this.taskResponseService = taskResponseService; this.processInstance = processInstance; this.masterConfig = masterConfig; this.nettyExecutorManager = nettyExecutorManager; @@ -1249,13 +1257,12 @@ public class WorkflowExecuteThread implements Runnable { } ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskId); taskProcessor.action(TaskAction.STOP); - if (taskProcessor.taskState().typeIsFinished()) { - StateEvent stateEvent = new StateEvent(); - stateEvent.setType(StateEventType.TASK_STATE_CHANGE); - stateEvent.setProcessInstanceId(this.processInstance.getId()); - stateEvent.setTaskInstanceId(taskInstance.getId()); - stateEvent.setExecutionStatus(taskProcessor.taskState()); - this.addStateEvent(stateEvent); + if (taskProcessor != null && taskProcessor.taskState().typeIsFinished()) { + TaskResponseEvent taskResponseEvent = TaskResponseEvent.newActionStop( + taskProcessor.taskState(), + taskInstance.getId(), + this.processInstance.getId()); + taskResponseService.addResponse(taskResponseEvent); } } } @@ -1420,4 +1427,8 @@ public class WorkflowExecuteThread implements Runnable { TaskDependType depNodeType) throws Exception { return DagHelper.generateFlowDag(totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType); } + + public Map getActiveTaskProcessorMaps() { + return activeTaskProcessorMaps; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java index 5532477568fe325797d06488ab9811074d8aee8e..4446485b6c4a8aeb27687f4b18ef9ef239cb4239 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/BaseTaskProcessor.java @@ -82,6 +82,13 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class); + /** + * persist task + * + * @return + */ + protected abstract boolean persistTask(TaskAction taskAction); + /** * pause task, common tasks donot need this. * @@ -102,6 +109,16 @@ public abstract class BaseTaskProcessor implements ITaskProcessor { */ protected abstract boolean taskTimeout(); + /** + * persist + * + * @return + */ + @Override + public boolean persist(TaskAction taskAction) { + return persistTask(taskAction); + } + @Override public void run() { } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java index 23988f99304c8b897d762e5c10d7f53944b33287..14bb3afd565c9ca0615010b0e3f041b234cf96ba 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/CommonTaskProcessor.java @@ -87,6 +87,24 @@ public class CommonTaskProcessor extends BaseTaskProcessor { return true; } + @Override + protected boolean persistTask(TaskAction taskAction) { + switch (taskAction) { + case STOP: + if (taskInstance.getState().typeIsFinished() && !taskInstance.getState().typeIsCancel()) { + return true; + } + taskInstance.setState(ExecutionStatus.KILL); + taskInstance.setEndTime(new Date()); + processService.updateTaskInstance(taskInstance); + return true; + default: + logger.error("unknown task action: {}", taskAction.toString()); + + } + return false; + } + /** * common task cannot be paused */ @@ -154,7 +172,6 @@ public class CommonTaskProcessor extends BaseTaskProcessor { if (StringUtils.isBlank(taskInstance.getHost())) { taskInstance.setState(ExecutionStatus.KILL); taskInstance.setEndTime(new Date()); - processService.updateTaskInstance(taskInstance); return true; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java index 7c593b0f30ab21ae28c73081f38603f2271f8e10..584e48412372224c50618e69412c3573ea15df67 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ConditionTaskProcessor.java @@ -133,11 +133,27 @@ public class ConditionTaskProcessor extends BaseTaskProcessor { return true; } + @Override + protected boolean persistTask(TaskAction taskAction) { + switch (taskAction) { + case STOP: + if (taskInstance.getState().typeIsFinished() && !taskInstance.getState().typeIsCancel()) { + return true; + } + this.taskInstance.setState(ExecutionStatus.KILL); + this.taskInstance.setEndTime(new Date()); + processService.saveTaskInstance(taskInstance); + return true; + default: + logger.error("unknown task action: {}", taskAction.toString()); + } + return false; + } + @Override protected boolean killTask() { this.taskInstance.setState(ExecutionStatus.KILL); this.taskInstance.setEndTime(new Date()); - processService.saveTaskInstance(taskInstance); return true; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java index b26e6411187a1f8e9b958a462ef17b4a08e3540f..0f84a5f66377082566982da9de75f5353184f589 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/DependentTaskProcessor.java @@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig; import org.apache.dolphinscheduler.server.utils.DependentExecute; import org.apache.dolphinscheduler.server.utils.LogUtils; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; -import org.apache.dolphinscheduler.service.process.ProcessService; import java.util.ArrayList; import java.util.Date; @@ -155,11 +154,28 @@ public class DependentTaskProcessor extends BaseTaskProcessor { return true; } + @Override + protected boolean persistTask(TaskAction taskAction) { + switch (taskAction) { + case STOP: + if (taskInstance.getState().typeIsFinished() && !taskInstance.getState().typeIsCancel()) { + return true; + } + this.taskInstance.setState(ExecutionStatus.KILL); + this.taskInstance.setEndTime(new Date()); + processService.saveTaskInstance(taskInstance); + return true; + default: + logger.error("unknown task action: {}", taskAction.toString()); + + } + return false; + } + @Override protected boolean killTask() { this.taskInstance.setState(ExecutionStatus.KILL); this.taskInstance.setEndTime(new Date()); - processService.saveTaskInstance(taskInstance); return true; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java index b68dc221a9f22a1b414e96c1109daaac8b17c2f4..aa1e490a09e14097308d9c504f20e7dffb51577a 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/ITaskProcessor.java @@ -26,6 +26,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance; */ public interface ITaskProcessor { + boolean persist(TaskAction taskAction); + void run(); boolean action(TaskAction taskAction); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java index e0cd3e8603dca2a95307791625bcd5c8af072e03..02f08a828ce22f67a4cbeb0ad93ef45c91d65214 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SubTaskProcessor.java @@ -113,6 +113,17 @@ public class SubTaskProcessor extends BaseTaskProcessor { } } + @Override + protected boolean persistTask(TaskAction taskAction) { + switch (taskAction) { + case STOP: + return true; + default: + logger.error("unknown task action: {}", taskAction.toString()); + } + return false; + } + @Override protected boolean pauseTask() { pauseSubWorkFlow(); diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java index 8e9316fd2d4e802525d5d35880a8a49485837b06..c48a7110084d505eb315edb04492832ee48c8def 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/task/SwitchTaskProcessor.java @@ -97,6 +97,24 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { } } + @Override + protected boolean persistTask(TaskAction taskAction) { + switch (taskAction) { + case STOP: + if (taskInstance.getState().typeIsFinished() && !taskInstance.getState().typeIsCancel()) { + return true; + } + this.taskInstance.setState(ExecutionStatus.KILL); + this.taskInstance.setEndTime(new Date()); + processService.saveTaskInstance(taskInstance); + return true; + default: + logger.error("unknown task action: {}", taskAction.toString()); + + } + return false; + } + @Override protected boolean pauseTask() { this.taskInstance.setState(ExecutionStatus.PAUSE); @@ -109,7 +127,6 @@ public class SwitchTaskProcessor extends BaseTaskProcessor { protected boolean killTask() { this.taskInstance.setState(ExecutionStatus.KILL); this.taskInstance.setEndTime(new Date()); - processService.saveTaskInstance(taskInstance); return true; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java index 4341b8c6f4059b1377535f89c77edb798b71d18c..4f235eaf78d269a879e886a2545e47c3e5f57071 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskKillProcessor.java @@ -32,10 +32,12 @@ import org.apache.dolphinscheduler.remote.utils.Host; import org.apache.dolphinscheduler.remote.utils.Pair; import org.apache.dolphinscheduler.server.utils.ProcessUtils; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; +import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread; import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread; import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import org.apache.dolphinscheduler.service.log.LogClientService; import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; +import org.apache.dolphinscheduler.spi.task.AbstractTask; import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.spi.task.request.TaskRequest; @@ -118,6 +120,14 @@ public class TaskKillProcessor implements NettyRequestProcessor { try { Integer processId = taskExecutionContext.getProcessId(); if (processId.equals(0)) { + TaskExecuteThread taskExecuteThread = workerManager.getTaskExecuteThread(taskInstanceId); + if (null != taskExecuteThread) { + AbstractTask task = taskExecuteThread.getTask(); + if (task != null) { + task.cancelApplication(true); + logger.info("kill task by cancelApplication, task id:{}", taskInstanceId); + } + } workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId); TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId); logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId); @@ -165,6 +175,7 @@ public class TaskKillProcessor implements NettyRequestProcessor { taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId()); taskKillResponseCommand.setHost(taskExecutionContext.getHost()); taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId()); + taskKillResponseCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId()); } return taskKillResponseCommand; } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java index 0b18dcfce1462a90ebec5f7a4d21fca53637c844..4b9f99ca391f12ada96b3653e308111239aab2bd 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java @@ -389,4 +389,8 @@ public class TaskExecuteThread implements Runnable, Delayed { } taskExecutionContext.setParamsMap(paramsMap); } + + public AbstractTask getTask() { + return task; + } } diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java new file mode 100644 index 0000000000000000000000000000000000000000..b98024674b240159910f87200251c6c00164a0ac --- /dev/null +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerExecService.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.worker.runner; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +public class WorkerExecService { + /** + * logger of WorkerExecService + */ + private static final Logger logger = LoggerFactory.getLogger(WorkerExecService.class); + + private final ListeningExecutorService listeningExecutorService; + + /** + * thread executor service + */ + private final ExecutorService execService; + + /** + * running task + */ + private final ConcurrentHashMap taskExecuteThreadMap; + + public WorkerExecService(ExecutorService execService, ConcurrentHashMap taskExecuteThreadMap) { + this.execService = execService; + this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService); + this.taskExecuteThreadMap = taskExecuteThreadMap; + } + + public void submit(TaskExecuteThread taskExecuteThread) { + taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread); + ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread); + FutureCallback futureCallback = new FutureCallback() { + @Override + public void onSuccess(Object o) { + taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId()); + } + + @Override + public void onFailure(Throwable throwable) { + logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}", taskExecuteThread.getTaskExecutionContext().getProcessInstanceId() + , taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), throwable); + taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId()); + } + }; + Futures.addCallback(future, futureCallback, this.listeningExecutorService); + } + + /** + * get thread pool queue size + * + * @return queue size + */ + public int getThreadPoolQueueSize() { + return ((ThreadPoolExecutor) this.execService).getQueue().size(); + } + +} \ No newline at end of file diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java index 4f68166ebde6a6add5db4ed715d7061574a9ccda..0deab9e974393d460b1a6813e030c7bacd6e728e 100644 --- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java +++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java @@ -31,9 +31,8 @@ import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext; import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager; import org.apache.dolphinscheduler.spi.task.request.TaskRequest; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.DelayQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadPoolExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,6 +51,11 @@ public class WorkerManagerThread implements Runnable { */ private final DelayQueue workerExecuteQueue = new DelayQueue<>(); + /** + * running task + */ + private final ConcurrentHashMap taskExecuteThreadMap = new ConcurrentHashMap<>(); + /** * worker config */ @@ -60,7 +64,7 @@ public class WorkerManagerThread implements Runnable { /** * thread executor service */ - private final ExecutorService workerExecService; + private final WorkerExecService workerExecService; /** * task callback service @@ -69,10 +73,17 @@ public class WorkerManagerThread implements Runnable { public WorkerManagerThread() { this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class); - this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", this.workerConfig.getWorkerExecThreads()); + this.workerExecService = new WorkerExecService( + ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", this.workerConfig.getWorkerExecThreads()), + taskExecuteThreadMap + ); this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class); } + public TaskExecuteThread getTaskExecuteThread(Integer taskInstanceId) { + return this.taskExecuteThreadMap.get(taskInstanceId); + } + /** * get delay queue size * @@ -88,7 +99,7 @@ public class WorkerManagerThread implements Runnable { * @return queue size */ public int getThreadPoolQueueSize() { - return ((ThreadPoolExecutor) workerExecService).getQueue().size(); + return this.workerExecService.getThreadPoolQueueSize(); } /** diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java index bf527d22b3f38e790bde6de40265e9a2dd4cd0f7..35520c5d9966579b1f10fa1a8474d06dfd6e2c37 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/WorkflowExecuteThreadTest.java @@ -102,7 +102,7 @@ public class WorkflowExecuteThreadTest { ConcurrentHashMap taskTimeoutCheckList = new ConcurrentHashMap<>(); ConcurrentHashMap taskRetryCheckList = new ConcurrentHashMap<>(); - workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, taskTimeoutCheckList, taskRetryCheckList)); + workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, null,processService, null, null, config, taskTimeoutCheckList, taskRetryCheckList)); // prepareProcess init dag Field dag = WorkflowExecuteThread.class.getDeclaredField("dag"); dag.setAccessible(true); diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java index c7f047569e95fb08ef52e384a3493c9b0496d8a7..8bef045f5aca1cb830aaf6b2f724ce87f1a5aced 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskKillResponseProcessorTest.java @@ -20,19 +20,26 @@ package org.apache.dolphinscheduler.server.master.processor; import org.apache.dolphinscheduler.remote.command.Command; import org.apache.dolphinscheduler.remote.command.CommandType; import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand; +import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService; +import org.apache.dolphinscheduler.service.bean.SpringApplicationContext; import java.util.ArrayList; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; import io.netty.channel.Channel; /** * task response processor test */ +@RunWith(PowerMockRunner.class) +@PrepareForTest({SpringApplicationContext.class}) public class TaskKillResponseProcessorTest { private TaskKillResponseProcessor taskKillResponseProcessor; @@ -41,8 +48,14 @@ public class TaskKillResponseProcessorTest { private Channel channel; + private TaskResponseService taskResponseService; + @Before public void before() { + PowerMockito.mockStatic(SpringApplicationContext.class); + + taskResponseService = PowerMockito.mock(TaskResponseService.class); + PowerMockito.when(SpringApplicationContext.getBean(TaskResponseService.class)).thenReturn(taskResponseService); taskKillResponseProcessor = new TaskKillResponseProcessor(); channel = PowerMockito.mock(Channel.class); taskKillResponseCommand = new TaskKillResponseCommand();