未验证 提交 8dc529d3 编写于 作者: Z zwZjut 提交者: GitHub

[dolphinscheduler-server] process instance is always running (#7914)

* #7698

* #7698

* #7698

* fix workflow is always running

* fix workflow is always running

* fix workflow is always running

* #7700 #7698

* add license

* fix ut

* #7700 #7698
Co-authored-by: Nhonghuo.zw <honghuo.zw@alibaba-inc.com>
上级 419f18f8
......@@ -19,5 +19,6 @@ package org.apache.dolphinscheduler.common.enums;
public enum Event {
ACK,
RESULT;
RESULT,
ACTION_STOP;
}
......@@ -48,6 +48,11 @@ public class TaskKillResponseCommand implements Serializable {
*/
private int processId;
/**
* process instance id
*/
private int processInstanceId;
/**
* other resource manager appId , for example : YARN etc
*/
......@@ -85,6 +90,14 @@ public class TaskKillResponseCommand implements Serializable {
this.processId = processId;
}
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
public List<String> getAppIds() {
return appIds;
}
......@@ -114,6 +127,7 @@ public class TaskKillResponseCommand implements Serializable {
+ ", status=" + status
+ ", processId=" + processId
+ ", appIds=" + appIds
+ ", processInstanceId=" + processInstanceId
+ '}';
}
}
......@@ -138,11 +138,13 @@ public class MasterServer implements IStoppable {
ackProcessor.init(processInstanceExecMaps);
TaskResponseProcessor taskResponseProcessor = new TaskResponseProcessor();
taskResponseProcessor.init(processInstanceExecMaps);
TaskKillResponseProcessor taskKillResponseProcessor = new TaskKillResponseProcessor();
taskKillResponseProcessor.init(processInstanceExecMaps);
StateEventProcessor stateEventProcessor = new StateEventProcessor();
stateEventProcessor.init(processInstanceExecMaps);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, ackProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
this.nettyRemotingServer.start();
......
......@@ -17,11 +17,18 @@
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -37,6 +44,19 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskKillResponseProcessor.class);
/**
* process service
*/
private final TaskResponseService taskResponseService;
public TaskKillResponseProcessor() {
this.taskResponseService = SpringApplicationContext.getBean(TaskResponseService.class);
}
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) {
this.taskResponseService.init(processInstanceExecMaps);
}
/**
* task final result response
* need master process , state persistence
......@@ -50,6 +70,12 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor {
TaskKillResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskKillResponseCommand.class);
logger.info("received task kill response command : {}", responseCommand);
// TaskResponseEvent
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newActionStop(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getTaskInstanceId(),
responseCommand.getProcessInstanceId()
);
taskResponseService.addResponse(taskResponseEvent);
}
}
......@@ -17,13 +17,13 @@
package org.apache.dolphinscheduler.server.master.processor.queue;
import com.fasterxml.jackson.annotation.JsonFormat;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import java.util.Date;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.netty.channel.Channel;
/**
......@@ -94,6 +94,17 @@ public class TaskResponseEvent {
private Channel channel;
private int processInstanceId;
public static TaskResponseEvent newActionStop(ExecutionStatus state,
int taskInstanceId,
int processInstanceId) {
TaskResponseEvent event = new TaskResponseEvent();
event.setState(state);
event.setTaskInstanceId(taskInstanceId);
event.setEvent(Event.ACTION_STOP);
event.setProcessInstanceId(processInstanceId);
return event;
}
public static TaskResponseEvent newAck(ExecutionStatus state,
Date startTime,
......
......@@ -25,6 +25,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.concurrent.ConcurrentHashMap;
......@@ -146,6 +148,16 @@ public class TaskResponsePersistThread implements Runnable {
channel.writeAndFlush(taskResponseCommand.convert2Command());
}
break;
case ACTION_STOP:
WorkflowExecuteThread workflowExecuteThread = this.processInstanceMapper.get(taskResponseEvent.getProcessInstanceId());
if (workflowExecuteThread != null) {
ITaskProcessor taskProcessor = workflowExecuteThread.getActiveTaskProcessorMaps().get(taskResponseEvent.getTaskInstanceId());
if (taskProcessor != null) {
taskProcessor.persist(TaskAction.STOP);
logger.debug("ACTION_STOP: task instance id:{}, process instance id:{}", taskResponseEvent.getTaskInstanceId(), taskResponseEvent.getProcessInstanceId());
}
}
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
}
......
......@@ -115,10 +115,20 @@ public class TaskResponseService {
try {
this.taskResponseWorker.interrupt();
this.taskResponseEventHandler.interrupt();
this.eventExecService.shutdown();
} catch (Exception e) {
logger.error("stop error:", e);
}
this.eventExecService.shutdown();
long waitSec = 5;
boolean terminated = false;
try {
terminated = eventExecService.awaitTermination(waitSec, TimeUnit.SECONDS);
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
if (!terminated) {
logger.warn("TaskResponseService: eventExecService shutdown without terminated: {}s, increase await time", waitSec);
}
}
/**
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.runner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
public class MasterExecService {
/**
* logger of MasterExecService
*/
private static final Logger logger = LoggerFactory.getLogger(MasterExecService.class);
/**
* master exec service
*/
private final ThreadPoolExecutor execService;
private final ListeningExecutorService listeningExecutorService;
/**
* start process failed map
*/
private final ConcurrentHashMap<Integer, WorkflowExecuteThread> startProcessFailedMap;
private final ConcurrentHashMap<Integer, WorkflowExecuteThread> filterMap = new ConcurrentHashMap<>();
public MasterExecService(ConcurrentHashMap<Integer, WorkflowExecuteThread> startProcessFailedMap,ThreadPoolExecutor execService) {
this.startProcessFailedMap = startProcessFailedMap;
this.execService = execService;
this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService);
}
public void execute(WorkflowExecuteThread workflowExecuteThread) {
if (workflowExecuteThread == null
|| workflowExecuteThread.getProcessInstance() == null
|| workflowExecuteThread.isStart()
|| filterMap.containsKey(workflowExecuteThread.getProcessInstance().getId())) {
return;
}
Integer processInstanceId = workflowExecuteThread.getProcessInstance().getId();
filterMap.put(processInstanceId, workflowExecuteThread);
ListenableFuture future = this.listeningExecutorService.submit(workflowExecuteThread);
FutureCallback futureCallback = new FutureCallback() {
@Override
public void onSuccess(Object o) {
if (!workflowExecuteThread.isStart()) {
startProcessFailedMap.putIfAbsent(processInstanceId, workflowExecuteThread);
} else {
startProcessFailedMap.remove(processInstanceId);
}
filterMap.remove(processInstanceId);
}
@Override
public void onFailure(Throwable throwable) {
logger.error("handle events {} failed", processInstanceId, throwable);
if (!workflowExecuteThread.isStart()) {
startProcessFailedMap.putIfAbsent(processInstanceId, workflowExecuteThread);
} else {
startProcessFailedMap.remove(processInstanceId);
}
filterMap.remove(processInstanceId);
}
};
Futures.addCallback(future, futureCallback, this.listeningExecutorService);
}
public void shutdown() {
this.execService.shutdown();
}
public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
return this.execService.awaitTermination(timeout, unit);
}
}
\ No newline at end of file
......@@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
......@@ -57,6 +58,12 @@ public class MasterSchedulerService extends Thread {
*/
private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class);
/**
* handle task event
*/
@Autowired
private TaskResponseService taskResponseService;
/**
* dolphinscheduler database interface
*/
......@@ -92,7 +99,12 @@ public class MasterSchedulerService extends Thread {
/**
* master exec service
*/
private ThreadPoolExecutor masterExecService;
private MasterExecService masterExecService;
/**
* start process failed map
*/
private final ConcurrentHashMap<Integer, WorkflowExecuteThread> startProcessFailedMap = new ConcurrentHashMap<>();
/**
* process instance execution list
......@@ -126,11 +138,15 @@ public class MasterSchedulerService extends Thread {
*/
public void init(ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps) {
this.processInstanceExecMaps = processInstanceExecMaps;
this.masterExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads());
this.masterExecService = new MasterExecService(this.startProcessFailedMap,
(ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Master-Exec-Thread", masterConfig.getMasterExecThreads()));
NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
stateWheelExecuteThread = new StateWheelExecuteThread(processService,
stateWheelExecuteThread = new StateWheelExecuteThread(
masterExecService,
processService,
startProcessFailedMap,
processTimeoutCheckList,
taskTimeoutCheckList,
taskRetryCheckList,
......@@ -202,6 +218,7 @@ public class MasterSchedulerService extends Thread {
if (processInstance != null) {
WorkflowExecuteThread workflowExecuteThread = new WorkflowExecuteThread(
processInstance
, taskResponseService
, processService
, nettyExecutorManager
, processAlertManager
......
......@@ -49,15 +49,30 @@ public class StateWheelExecuteThread extends Thread {
private ConcurrentHashMap<Integer, TaskInstance> taskInstanceRetryCheckList;
private ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps;
/**
* start process failed map
*/
private final ConcurrentHashMap<Integer, WorkflowExecuteThread> startProcessFailedMap;
private int stateCheckIntervalSecs;
public StateWheelExecuteThread(ProcessService processService,
ConcurrentHashMap<Integer, ProcessInstance> processInstanceTimeoutCheckList,
ConcurrentHashMap<Integer, TaskInstance> taskInstanceTimeoutCheckList,
ConcurrentHashMap<Integer, TaskInstance> taskInstanceRetryCheckList,
ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps,
int stateCheckIntervalSecs) {
/**
* master exec service
*/
private MasterExecService masterExecService;
public StateWheelExecuteThread(
MasterExecService masterExecService,
ProcessService processService,
ConcurrentHashMap<Integer, WorkflowExecuteThread> startProcessFailedMap,
ConcurrentHashMap<Integer, ProcessInstance> processInstanceTimeoutCheckList,
ConcurrentHashMap<Integer, TaskInstance> taskInstanceTimeoutCheckList,
ConcurrentHashMap<Integer, TaskInstance> taskInstanceRetryCheckList,
ConcurrentHashMap<Integer, WorkflowExecuteThread> processInstanceExecMaps,
int stateCheckIntervalSecs) {
this.masterExecService = masterExecService;
this.processService = processService;
this.startProcessFailedMap = startProcessFailedMap;
this.processInstanceTimeoutCheckList = processInstanceTimeoutCheckList;
this.taskInstanceTimeoutCheckList = taskInstanceTimeoutCheckList;
this.taskInstanceRetryCheckList = taskInstanceRetryCheckList;
......@@ -71,6 +86,7 @@ public class StateWheelExecuteThread extends Thread {
logger.info("state wheel thread start");
while (Stopper.isRunning()) {
try {
check4StartProcessFailed();
checkTask4Timeout();
checkTask4Retry();
checkProcess4Timeout();
......@@ -176,4 +192,12 @@ public class StateWheelExecuteThread extends Thread {
workflowExecuteThread.addStateEvent(stateEvent);
}
private void check4StartProcessFailed() {
if (startProcessFailedMap.isEmpty()) {
return;
}
for (WorkflowExecuteThread workflowExecuteThread : this.startProcessFailedMap.values()) {
masterExecService.execute(workflowExecuteThread);
}
}
}
......@@ -57,6 +57,8 @@ import org.apache.dolphinscheduler.remote.command.HostUpdateCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.master.runner.task.ITaskProcessor;
import org.apache.dolphinscheduler.server.master.runner.task.TaskAction;
import org.apache.dolphinscheduler.server.master.runner.task.TaskProcessorFactory;
......@@ -103,6 +105,11 @@ public class WorkflowExecuteThread implements Runnable {
*/
private final Map<Integer, ITaskProcessor> activeTaskProcessorMaps = new ConcurrentHashMap<>();
/**
* handle task event
*/
private TaskResponseService taskResponseService;
/**
* process instance
*/
......@@ -205,6 +212,7 @@ public class WorkflowExecuteThread implements Runnable {
* @param nettyExecutorManager nettyExecutorManager
*/
public WorkflowExecuteThread(ProcessInstance processInstance
, TaskResponseService taskResponseService
, ProcessService processService
, NettyExecutorManager nettyExecutorManager
, ProcessAlertManager processAlertManager
......@@ -212,7 +220,7 @@ public class WorkflowExecuteThread implements Runnable {
, ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList
, ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList) {
this.processService = processService;
this.taskResponseService = taskResponseService;
this.processInstance = processInstance;
this.masterConfig = masterConfig;
this.nettyExecutorManager = nettyExecutorManager;
......@@ -1249,13 +1257,12 @@ public class WorkflowExecuteThread implements Runnable {
}
ITaskProcessor taskProcessor = activeTaskProcessorMaps.get(taskId);
taskProcessor.action(TaskAction.STOP);
if (taskProcessor.taskState().typeIsFinished()) {
StateEvent stateEvent = new StateEvent();
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
stateEvent.setProcessInstanceId(this.processInstance.getId());
stateEvent.setTaskInstanceId(taskInstance.getId());
stateEvent.setExecutionStatus(taskProcessor.taskState());
this.addStateEvent(stateEvent);
if (taskProcessor != null && taskProcessor.taskState().typeIsFinished()) {
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newActionStop(
taskProcessor.taskState(),
taskInstance.getId(),
this.processInstance.getId());
taskResponseService.addResponse(taskResponseEvent);
}
}
}
......@@ -1420,4 +1427,8 @@ public class WorkflowExecuteThread implements Runnable {
TaskDependType depNodeType) throws Exception {
return DagHelper.generateFlowDag(totalTaskNodeList, startNodeNameList, recoveryNodeCodeList, depNodeType);
}
public Map<Integer, ITaskProcessor> getActiveTaskProcessorMaps() {
return activeTaskProcessorMaps;
}
}
......@@ -82,6 +82,13 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
/**
* persist task
*
* @return
*/
protected abstract boolean persistTask(TaskAction taskAction);
/**
* pause task, common tasks donot need this.
*
......@@ -102,6 +109,16 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
*/
protected abstract boolean taskTimeout();
/**
* persist
*
* @return
*/
@Override
public boolean persist(TaskAction taskAction) {
return persistTask(taskAction);
}
@Override
public void run() {
}
......
......@@ -87,6 +87,24 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
return true;
}
@Override
protected boolean persistTask(TaskAction taskAction) {
switch (taskAction) {
case STOP:
if (taskInstance.getState().typeIsFinished() && !taskInstance.getState().typeIsCancel()) {
return true;
}
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
return true;
default:
logger.error("unknown task action: {}", taskAction.toString());
}
return false;
}
/**
* common task cannot be paused
*/
......@@ -154,7 +172,6 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
if (StringUtils.isBlank(taskInstance.getHost())) {
taskInstance.setState(ExecutionStatus.KILL);
taskInstance.setEndTime(new Date());
processService.updateTaskInstance(taskInstance);
return true;
}
......
......@@ -133,11 +133,27 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
return true;
}
@Override
protected boolean persistTask(TaskAction taskAction) {
switch (taskAction) {
case STOP:
if (taskInstance.getState().typeIsFinished() && !taskInstance.getState().typeIsCancel()) {
return true;
}
this.taskInstance.setState(ExecutionStatus.KILL);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
return true;
default:
logger.error("unknown task action: {}", taskAction.toString());
}
return false;
}
@Override
protected boolean killTask() {
this.taskInstance.setState(ExecutionStatus.KILL);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
return true;
}
......
......@@ -34,7 +34,6 @@ import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.ArrayList;
import java.util.Date;
......@@ -155,11 +154,28 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
return true;
}
@Override
protected boolean persistTask(TaskAction taskAction) {
switch (taskAction) {
case STOP:
if (taskInstance.getState().typeIsFinished() && !taskInstance.getState().typeIsCancel()) {
return true;
}
this.taskInstance.setState(ExecutionStatus.KILL);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
return true;
default:
logger.error("unknown task action: {}", taskAction.toString());
}
return false;
}
@Override
protected boolean killTask() {
this.taskInstance.setState(ExecutionStatus.KILL);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
return true;
}
......
......@@ -26,6 +26,8 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
*/
public interface ITaskProcessor {
boolean persist(TaskAction taskAction);
void run();
boolean action(TaskAction taskAction);
......
......@@ -113,6 +113,17 @@ public class SubTaskProcessor extends BaseTaskProcessor {
}
}
@Override
protected boolean persistTask(TaskAction taskAction) {
switch (taskAction) {
case STOP:
return true;
default:
logger.error("unknown task action: {}", taskAction.toString());
}
return false;
}
@Override
protected boolean pauseTask() {
pauseSubWorkFlow();
......
......@@ -97,6 +97,24 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
}
}
@Override
protected boolean persistTask(TaskAction taskAction) {
switch (taskAction) {
case STOP:
if (taskInstance.getState().typeIsFinished() && !taskInstance.getState().typeIsCancel()) {
return true;
}
this.taskInstance.setState(ExecutionStatus.KILL);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
return true;
default:
logger.error("unknown task action: {}", taskAction.toString());
}
return false;
}
@Override
protected boolean pauseTask() {
this.taskInstance.setState(ExecutionStatus.PAUSE);
......@@ -109,7 +127,6 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
protected boolean killTask() {
this.taskInstance.setState(ExecutionStatus.KILL);
this.taskInstance.setEndTime(new Date());
processService.saveTaskInstance(taskInstance);
return true;
}
......
......@@ -32,10 +32,12 @@ import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.remote.utils.Pair;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.AbstractTask;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
......@@ -118,6 +120,14 @@ public class TaskKillProcessor implements NettyRequestProcessor {
try {
Integer processId = taskExecutionContext.getProcessId();
if (processId.equals(0)) {
TaskExecuteThread taskExecuteThread = workerManager.getTaskExecuteThread(taskInstanceId);
if (null != taskExecuteThread) {
AbstractTask task = taskExecuteThread.getTask();
if (task != null) {
task.cancelApplication(true);
logger.info("kill task by cancelApplication, task id:{}", taskInstanceId);
}
}
workerManager.killTaskBeforeExecuteByInstanceId(taskInstanceId);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskInstanceId);
logger.info("the task has not been executed and has been cancelled, task id:{}", taskInstanceId);
......@@ -165,6 +175,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskKillResponseCommand.setHost(taskExecutionContext.getHost());
taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
taskKillResponseCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
}
return taskKillResponseCommand;
}
......
......@@ -389,4 +389,8 @@ public class TaskExecuteThread implements Runnable, Delayed {
}
taskExecutionContext.setParamsMap(paramsMap);
}
public AbstractTask getTask() {
return task;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.runner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
public class WorkerExecService {
/**
* logger of WorkerExecService
*/
private static final Logger logger = LoggerFactory.getLogger(WorkerExecService.class);
private final ListeningExecutorService listeningExecutorService;
/**
* thread executor service
*/
private final ExecutorService execService;
/**
* running task
*/
private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap;
public WorkerExecService(ExecutorService execService, ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap) {
this.execService = execService;
this.listeningExecutorService = MoreExecutors.listeningDecorator(this.execService);
this.taskExecuteThreadMap = taskExecuteThreadMap;
}
public void submit(TaskExecuteThread taskExecuteThread) {
taskExecuteThreadMap.put(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), taskExecuteThread);
ListenableFuture future = this.listeningExecutorService.submit(taskExecuteThread);
FutureCallback futureCallback = new FutureCallback() {
@Override
public void onSuccess(Object o) {
taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
}
@Override
public void onFailure(Throwable throwable) {
logger.error("task execute failed, processInstanceId:{}, taskInstanceId:{}", taskExecuteThread.getTaskExecutionContext().getProcessInstanceId()
, taskExecuteThread.getTaskExecutionContext().getTaskInstanceId(), throwable);
taskExecuteThreadMap.remove(taskExecuteThread.getTaskExecutionContext().getTaskInstanceId());
}
};
Futures.addCallback(future, futureCallback, this.listeningExecutorService);
}
/**
* get thread pool queue size
*
* @return queue size
*/
public int getThreadPoolQueueSize() {
return ((ThreadPoolExecutor) this.execService).getQueue().size();
}
}
\ No newline at end of file
......@@ -31,9 +31,8 @@ import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.spi.task.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.spi.task.request.TaskRequest;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -52,6 +51,11 @@ public class WorkerManagerThread implements Runnable {
*/
private final DelayQueue<TaskExecuteThread> workerExecuteQueue = new DelayQueue<>();
/**
* running task
*/
private final ConcurrentHashMap<Integer, TaskExecuteThread> taskExecuteThreadMap = new ConcurrentHashMap<>();
/**
* worker config
*/
......@@ -60,7 +64,7 @@ public class WorkerManagerThread implements Runnable {
/**
* thread executor service
*/
private final ExecutorService workerExecService;
private final WorkerExecService workerExecService;
/**
* task callback service
......@@ -69,10 +73,17 @@ public class WorkerManagerThread implements Runnable {
public WorkerManagerThread() {
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", this.workerConfig.getWorkerExecThreads());
this.workerExecService = new WorkerExecService(
ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", this.workerConfig.getWorkerExecThreads()),
taskExecuteThreadMap
);
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
}
public TaskExecuteThread getTaskExecuteThread(Integer taskInstanceId) {
return this.taskExecuteThreadMap.get(taskInstanceId);
}
/**
* get delay queue size
*
......@@ -88,7 +99,7 @@ public class WorkerManagerThread implements Runnable {
* @return queue size
*/
public int getThreadPoolQueueSize() {
return ((ThreadPoolExecutor) workerExecService).getQueue().size();
return this.workerExecService.getThreadPoolQueueSize();
}
/**
......
......@@ -102,7 +102,7 @@ public class WorkflowExecuteThreadTest {
ConcurrentHashMap<Integer, TaskInstance> taskTimeoutCheckList = new ConcurrentHashMap<>();
ConcurrentHashMap<Integer, TaskInstance> taskRetryCheckList = new ConcurrentHashMap<>();
workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, processService, null, null, config, taskTimeoutCheckList, taskRetryCheckList));
workflowExecuteThread = PowerMockito.spy(new WorkflowExecuteThread(processInstance, null,processService, null, null, config, taskTimeoutCheckList, taskRetryCheckList));
// prepareProcess init dag
Field dag = WorkflowExecuteThread.class.getDeclaredField("dag");
dag.setAccessible(true);
......
......@@ -20,19 +20,26 @@ package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.ArrayList;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import io.netty.channel.Channel;
/**
* task response processor test
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class})
public class TaskKillResponseProcessorTest {
private TaskKillResponseProcessor taskKillResponseProcessor;
......@@ -41,8 +48,14 @@ public class TaskKillResponseProcessorTest {
private Channel channel;
private TaskResponseService taskResponseService;
@Before
public void before() {
PowerMockito.mockStatic(SpringApplicationContext.class);
taskResponseService = PowerMockito.mock(TaskResponseService.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskResponseService.class)).thenReturn(taskResponseService);
taskKillResponseProcessor = new TaskKillResponseProcessor();
channel = PowerMockito.mock(Channel.class);
taskKillResponseCommand = new TaskKillResponseCommand();
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册