未验证 提交 ddf1ff98 编写于 作者: C caishunfeng 提交者: GitHub

[Improvement-7697][Master/Worker] Change the task ack to runnning callback (#8719)

* rebase dev

* change task state to dispatch if dispatch success

* update task host when dispatch

* add dispatch task event

* test

* check tenant after enable auto create

* handle dispatch state
Co-authored-by: Ncaishunfeng <534328519@qq.com>
上级 b4b52417
......@@ -17,9 +17,10 @@
package org.apache.dolphinscheduler.common;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import java.util.regex.Pattern;
......@@ -48,20 +49,20 @@ public final class Constants {
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_MASTERS = "/lock/failover/masters";
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_WORKERS = "/lock/failover/workers";
public static final String REGISTRY_DOLPHINSCHEDULER_LOCK_FAILOVER_STARTUP_MASTERS = "/lock/failover/startup-masters";
public static final String FORMAT_SS ="%s%s";
public static final String FORMAT_S_S ="%s/%s";
public static final String AWS_ACCESS_KEY_ID="aws.access.key.id";
public static final String AWS_SECRET_ACCESS_KEY="aws.secret.access.key";
public static final String AWS_REGION="aws.region";
public static final String FOLDER_SEPARATOR ="/";
public static final String FORMAT_SS = "%s%s";
public static final String FORMAT_S_S = "%s/%s";
public static final String AWS_ACCESS_KEY_ID = "aws.access.key.id";
public static final String AWS_SECRET_ACCESS_KEY = "aws.secret.access.key";
public static final String AWS_REGION = "aws.region";
public static final String FOLDER_SEPARATOR = "/";
public static final String RESOURCE_TYPE_FILE = "resources";
public static final String RESOURCE_TYPE_UDF="udfs";
public static final String RESOURCE_TYPE_UDF = "udfs";
public static final String STORAGE_S3="S3";
public static final String STORAGE_S3 = "S3";
public static final String STORAGE_HDFS="HDFS";
public static final String STORAGE_HDFS = "HDFS";
public static final String BUCKET_NAME = "dolphinscheduler-test";
......@@ -71,7 +72,6 @@ public final class Constants {
public static final String FS_DEFAULT_FS = "fs.defaultFS";
/**
* hadoop configuration
*/
......@@ -254,7 +254,7 @@ public final class Constants {
* user name regex
*/
public static final Pattern REGEX_USER_NAME = Pattern.compile("^[a-zA-Z0-9._-]{3,39}$");
/**
* read permission
*/
......@@ -424,7 +424,7 @@ public final class Constants {
/**
* process or task definition first version
*/
public static final int VERSION_FIRST = 1;
public static final int VERSION_FIRST = 1;
/**
* date format of yyyyMMdd
......@@ -584,7 +584,6 @@ public final class Constants {
public static final long DEPENDENT_ALL_TASK_CODE = 0;
/**
* preview schedule execute count
*/
......@@ -640,20 +639,22 @@ public final class Constants {
*/
public static final String TASK_LOG_INFO_FORMAT = "TaskLogInfo-%s";
public static final int[] NOT_TERMINATED_STATES = new int[] {
ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.DELAY_EXECUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
ExecutionStatus.READY_STOP.ordinal(),
ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
ExecutionStatus.WAITING_THREAD.ordinal(),
ExecutionStatus.WAITING_DEPEND.ordinal()
public static final int[] NOT_TERMINATED_STATES = new int[]{
ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.DISPATCH.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.DELAY_EXECUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
ExecutionStatus.READY_STOP.ordinal(),
ExecutionStatus.NEED_FAULT_TOLERANCE.ordinal(),
ExecutionStatus.WAITING_THREAD.ordinal(),
ExecutionStatus.WAITING_DEPEND.ordinal()
};
public static final int[] RUNNING_PROCESS_STATE = new int[] {
public static final int[] RUNNING_PROCESS_STATE = new int[]{
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.DISPATCH.ordinal(),
ExecutionStatus.SERIAL_WAIT.ordinal()
};
......
......@@ -18,6 +18,9 @@
package org.apache.dolphinscheduler.common.enums;
public enum Event {
ACK,
RESULT;
DISPATCH,
DELAY,
RUNNING,
RESULT,
;
}
......@@ -54,6 +54,7 @@ public enum TaskStateType {
};
case RUNNING:
return new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.DISPATCH.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.DELAY_EXECUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
......
......@@ -27,10 +27,10 @@ import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.processor.CacheProcessor;
import org.apache.dolphinscheduler.server.master.processor.StateEventProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskEventProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.MasterRegistryClient;
import org.apache.dolphinscheduler.server.master.runner.EventExecuteService;
import org.apache.dolphinscheduler.server.master.runner.FailoverExecuteThread;
......@@ -75,10 +75,10 @@ public class MasterServer implements IStoppable {
private Scheduler scheduler;
@Autowired
private TaskAckProcessor taskAckProcessor;
private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
@Autowired
private TaskResponseProcessor taskResponseProcessor;
private TaskExecuteResponseProcessor taskExecuteResponseProcessor;
@Autowired
private TaskEventProcessor taskEventProcessor;
......@@ -115,8 +115,8 @@ public class MasterServer implements IStoppable {
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(masterConfig.getListenPort());
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.STATE_EVENT_REQUEST, stateEventProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_FORCE_STATE_EVENT_REQUEST, taskEventProcessor);
......
......@@ -24,11 +24,14 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.ExecutorDispatcher;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import org.apache.dolphinscheduler.service.exceptions.TaskPriorityQueueException;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.TaskPriority;
......@@ -79,6 +82,11 @@ public class TaskPriorityQueueConsumer extends Thread {
@Autowired
private ExecutorDispatcher dispatcher;
/**
* processInstance cache manager
*/
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
/**
* master config
......@@ -86,6 +94,12 @@ public class TaskPriorityQueueConsumer extends Thread {
@Autowired
private MasterConfig masterConfig;
/**
* task response service
*/
@Autowired
private TaskEventService taskEventService;
/**
* consumer thread pool
*/
......@@ -168,12 +182,24 @@ public class TaskPriorityQueueConsumer extends Thread {
}
result = dispatcher.dispatch(executionContext);
if (result) {
addDispatchEvent(context, executionContext);
}
} catch (RuntimeException | ExecuteException e) {
logger.error("dispatch error: {}", e.getMessage(), e);
}
return result;
}
/**
* add dispatch event
*/
private void addDispatchEvent(TaskExecutionContext context, ExecutionContext executionContext) {
TaskEvent taskEvent = TaskEvent.newDispatchEvent(context.getProcessInstanceId(), context.getTaskInstanceId(), executionContext.getHost().getAddress());
taskEventService.addEvent(taskEvent);
}
private Command toCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(taskExecutionContext));
......
......@@ -26,9 +26,9 @@ import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskExecuteResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskExecuteRunningProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.commons.collections.CollectionUtils;
......@@ -46,7 +46,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* netty executor manager
* netty executor manager
*/
@Service
public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
......@@ -60,13 +60,13 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
private ServerNodeManager serverNodeManager;
@Autowired
private TaskAckProcessor taskAckProcessor;
private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
@Autowired
private TaskKillResponseProcessor taskKillResponseProcessor;
@Autowired
private TaskResponseProcessor taskResponseProcessor;
private TaskExecuteResponseProcessor taskExecuteResponseProcessor;
/**
* netty remote client
......@@ -83,13 +83,14 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
@PostConstruct
public void init() {
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskResponseProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, taskExecuteResponseProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING, taskExecuteRunningProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, taskKillResponseProcessor);
}
/**
* execute logic
*
* @param context context
* @return result
* @throws ExecuteException if error throws ExecuteException
......@@ -119,7 +120,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
boolean success = false;
while (!success) {
try {
doExecute(host,command);
doExecute(host, command);
success = true;
context.setHost(host);
} catch (ExecuteException ex) {
......@@ -150,7 +151,8 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
}
/**
* execute logic
* execute logic
*
* @param host host
* @param command command
* @throws ExecuteException if error throws ExecuteException
......@@ -178,7 +180,8 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean> {
}
/**
* get all nodes
* get all nodes
*
* @param context context
* @return nodes
*/
......
......@@ -18,13 +18,12 @@
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -36,15 +35,15 @@ import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**
* task response processor
* task execute response processor
*/
@Component
public class TaskResponseProcessor implements NettyRequestProcessor {
public class TaskExecuteResponseProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskResponseProcessor.class);
private final Logger logger = LoggerFactory.getLogger(TaskExecuteResponseProcessor.class);
@Autowired
private TaskResponseService taskResponseService;
private TaskEventService taskEventService;
/**
* task final result response
......@@ -57,19 +56,10 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskExecuteResponseCommand responseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class);
logger.info("received command : {}", responseCommand);
TaskExecuteResponseCommand taskExecuteResponseCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteResponseCommand.class);
logger.info("received command : {}", taskExecuteResponseCommand);
// TaskResponseEvent
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newResult(ExecutionStatus.of(responseCommand.getStatus()),
responseCommand.getEndTime(),
responseCommand.getProcessId(),
responseCommand.getAppIds(),
responseCommand.getTaskInstanceId(),
responseCommand.getVarPool(),
channel,
responseCommand.getProcessInstanceId()
);
taskResponseService.addResponse(taskResponseEvent);
TaskEvent taskResponseEvent = TaskEvent.newResultEvent(taskExecuteResponseCommand, channel);
taskEventService.addEvent(taskResponseEvent);
}
}
......@@ -18,14 +18,12 @@
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseService;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -37,15 +35,15 @@ import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**
* task ack processor
* task execute running processor
*/
@Component
public class TaskAckProcessor implements NettyRequestProcessor {
public class TaskExecuteRunningProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(TaskAckProcessor.class);
private final Logger logger = LoggerFactory.getLogger(TaskExecuteRunningProcessor.class);
@Autowired
private TaskResponseService taskResponseService;
private TaskEventService taskEventService;
/**
* task ack process
......@@ -55,25 +53,12 @@ public class TaskAckProcessor implements NettyRequestProcessor {
*/
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskExecuteAckCommand taskAckCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteAckCommand.class);
logger.info("taskAckCommand : {}", taskAckCommand);
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskExecuteRunningCommand taskExecuteRunningCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class);
logger.info("taskExecuteRunningCommand: {}", taskExecuteRunningCommand);
String workerAddress = ChannelUtils.toAddress(channel).getAddress();
ExecutionStatus ackStatus = ExecutionStatus.of(taskAckCommand.getStatus());
// TaskResponseEvent
TaskResponseEvent taskResponseEvent = TaskResponseEvent.newAck(ackStatus,
taskAckCommand.getStartTime(),
workerAddress,
taskAckCommand.getExecutePath(),
taskAckCommand.getLogPath(),
taskAckCommand.getTaskInstanceId(),
channel,
taskAckCommand.getProcessInstanceId());
taskResponseService.addResponse(taskResponseEvent);
TaskEvent taskEvent = TaskEvent.newRunningEvent(taskExecuteRunningCommand, channel);
taskEventService.addEvent(taskEvent);
}
}
......@@ -19,6 +19,9 @@ package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import java.util.Date;
......@@ -27,7 +30,7 @@ import io.netty.channel.Channel;
/**
* task event
*/
public class TaskResponseEvent {
public class TaskEvent {
/**
* taskInstanceId
......@@ -90,46 +93,45 @@ public class TaskResponseEvent {
private Channel channel;
private int processInstanceId;
public static TaskResponseEvent newAck(ExecutionStatus state,
Date startTime,
String workerAddress,
String executePath,
String logPath,
int taskInstanceId,
Channel channel,
int processInstanceId) {
TaskResponseEvent event = new TaskResponseEvent();
event.setState(state);
event.setStartTime(startTime);
event.setWorkerAddress(workerAddress);
event.setExecutePath(executePath);
event.setLogPath(logPath);
public static TaskEvent newDispatchEvent(int processInstanceId, int taskInstanceId, String workerAddress) {
TaskEvent event = new TaskEvent();
event.setProcessInstanceId(processInstanceId);
event.setTaskInstanceId(taskInstanceId);
event.setEvent(Event.ACK);
event.setWorkerAddress(workerAddress);
event.setEvent(Event.DISPATCH);
return event;
}
public static TaskEvent newRunningEvent(TaskExecuteRunningCommand command, Channel channel) {
TaskEvent event = new TaskEvent();
event.setProcessInstanceId(command.getProcessInstanceId());
event.setTaskInstanceId(command.getTaskInstanceId());
event.setState(ExecutionStatus.of(command.getStatus()));
event.setStartTime(command.getStartTime());
event.setExecutePath(command.getExecutePath());
event.setLogPath(command.getLogPath());
event.setChannel(channel);
event.setProcessInstanceId(processInstanceId);
event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress());
event.setEvent(Event.RUNNING);
return event;
}
public static TaskResponseEvent newResult(ExecutionStatus state,
Date endTime,
int processId,
String appIds,
int taskInstanceId,
String varPool,
Channel channel,
int processInstanceId) {
TaskResponseEvent event = new TaskResponseEvent();
event.setState(state);
event.setEndTime(endTime);
event.setProcessId(processId);
event.setAppIds(appIds);
event.setTaskInstanceId(taskInstanceId);
event.setEvent(Event.RESULT);
event.setVarPool(varPool);
public static TaskEvent newResultEvent(TaskExecuteResponseCommand command, Channel channel) {
TaskEvent event = new TaskEvent();
event.setProcessInstanceId(command.getProcessInstanceId());
event.setTaskInstanceId(command.getTaskInstanceId());
event.setState(ExecutionStatus.of(command.getStatus()));
event.setStartTime(command.getStartTime());
event.setExecutePath(command.getExecutePath());
event.setLogPath(command.getLogPath());
event.setEndTime(command.getEndTime());
event.setProcessId(command.getProcessId());
event.setAppIds(command.getAppIds());
event.setVarPool(command.getVarPool());
event.setChannel(channel);
event.setProcessInstanceId(processInstanceId);
event.setWorkerAddress(ChannelUtils.toAddress(channel).getAddress());
event.setEvent(Event.RESULT);
return event;
}
......@@ -140,7 +142,7 @@ public class TaskResponseEvent {
public void setVarPool(String varPool) {
this.varPool = varPool;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
......
......@@ -23,8 +23,8 @@ import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThread;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
......@@ -50,17 +50,17 @@ import io.netty.channel.Channel;
* task manager
*/
@Component
public class TaskResponseService {
public class TaskEventService {
/**
* logger
*/
private final Logger logger = LoggerFactory.getLogger(TaskResponseService.class);
private final Logger logger = LoggerFactory.getLogger(TaskEventService.class);
/**
* attemptQueue
*/
private final BlockingQueue<TaskResponseEvent> eventQueue = new LinkedBlockingQueue<>();
private final BlockingQueue<TaskEvent> eventQueue = new LinkedBlockingQueue<>();
/**
* process service
......@@ -75,9 +75,9 @@ public class TaskResponseService {
private DataQualityResultOperator dataQualityResultOperator;
/**
* task response worker
* task event worker
*/
private Thread taskResponseWorker;
private Thread taskEventWorker;
@Autowired
private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
......@@ -87,19 +87,19 @@ public class TaskResponseService {
@PostConstruct
public void start() {
this.taskResponseWorker = new TaskResponseWorker();
this.taskResponseWorker.setName("StateEventResponseWorker");
this.taskResponseWorker.start();
this.taskEventWorker = new TaskEventWorker();
this.taskEventWorker.setName("TaskStateEventWorker");
this.taskEventWorker.start();
}
@PreDestroy
public void stop() {
try {
this.taskResponseWorker.interrupt();
this.taskEventWorker.interrupt();
if (!eventQueue.isEmpty()) {
List<TaskResponseEvent> remainEvents = new ArrayList<>(eventQueue.size());
List<TaskEvent> remainEvents = new ArrayList<>(eventQueue.size());
eventQueue.drainTo(remainEvents);
for (TaskResponseEvent event : remainEvents) {
for (TaskEvent event : remainEvents) {
this.persist(event);
}
}
......@@ -109,15 +109,15 @@ public class TaskResponseService {
}
/**
* put task to attemptQueue
* add event to queue
*
* @param taskResponseEvent taskResponseEvent
* @param taskEvent taskEvent
*/
public void addResponse(TaskResponseEvent taskResponseEvent) {
public void addEvent(TaskEvent taskEvent) {
try {
eventQueue.put(taskResponseEvent);
eventQueue.put(taskEvent);
} catch (InterruptedException e) {
logger.error("put task : {} error :{}", taskResponseEvent, e);
logger.error("add task event : {} error :{}", taskEvent, e);
Thread.currentThread().interrupt();
}
}
......@@ -125,7 +125,7 @@ public class TaskResponseService {
/**
* task worker thread
*/
class TaskResponseWorker extends Thread {
class TaskEventWorker extends Thread {
@Override
public void run() {
......@@ -133,8 +133,8 @@ public class TaskResponseService {
while (Stopper.isRunning()) {
try {
// if not task , blocking here
TaskResponseEvent taskResponseEvent = eventQueue.take();
persist(taskResponseEvent);
TaskEvent taskEvent = eventQueue.take();
persist(taskEvent);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
......@@ -147,14 +147,14 @@ public class TaskResponseService {
}
/**
* persist taskResponseEvent
* persist task event
*
* @param taskResponseEvent taskResponseEvent
* @param taskEvent taskEvent
*/
private void persist(TaskResponseEvent taskResponseEvent) {
Event event = taskResponseEvent.getEvent();
int taskInstanceId = taskResponseEvent.getTaskInstanceId();
int processInstanceId = taskResponseEvent.getProcessInstanceId();
private void persist(TaskEvent taskEvent) {
Event event = taskEvent.getEvent();
int taskInstanceId = taskEvent.getTaskInstanceId();
int processInstanceId = taskEvent.getProcessInstanceId();
TaskInstance taskInstance;
WorkflowExecuteThread workflowExecuteThread = this.processInstanceExecCacheManager.getByProcessInstanceId(processInstanceId);
......@@ -165,75 +165,103 @@ public class TaskResponseService {
}
switch (event) {
case ACK:
handleAckEvent(taskResponseEvent, taskInstance);
case DISPATCH:
handleDispatchEvent(taskEvent, taskInstance);
// dispatch event do not need to submit state event
return;
case DELAY:
case RUNNING:
handleRunningEvent(taskEvent, taskInstance);
break;
case RESULT:
handleResultEvent(taskResponseEvent, taskInstance);
handleResultEvent(taskEvent, taskInstance);
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
}
StateEvent stateEvent = new StateEvent();
stateEvent.setProcessInstanceId(taskResponseEvent.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskResponseEvent.getTaskInstanceId());
stateEvent.setExecutionStatus(taskResponseEvent.getState());
stateEvent.setProcessInstanceId(taskEvent.getProcessInstanceId());
stateEvent.setTaskInstanceId(taskEvent.getTaskInstanceId());
stateEvent.setExecutionStatus(taskEvent.getState());
stateEvent.setType(StateEventType.TASK_STATE_CHANGE);
workflowExecuteThreadPool.submitStateEvent(stateEvent);
}
/**
* handle ack event
* handle dispatch event
*/
private void handleAckEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) {
Channel channel = taskResponseEvent.getChannel();
private void handleDispatchEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
if (taskInstance == null) {
logger.error("taskInstance is null");
return;
}
if (taskInstance.getState() != ExecutionStatus.SUBMITTED_SUCCESS) {
return;
}
taskInstance.setState(ExecutionStatus.DISPATCH);
taskInstance.setHost(taskEvent.getWorkerAddress());
processService.saveTaskInstance(taskInstance);
}
/**
* handle running event
*/
private void handleRunningEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
Channel channel = taskEvent.getChannel();
try {
if (taskInstance != null) {
if (taskInstance.getState().typeIsFinished()) {
logger.warn("task is finish, ack is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState());
logger.warn("task is finish, running event is meaningless, taskInstanceId:{}, state:{}", taskInstance.getId(), taskInstance.getState());
} else {
processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
taskResponseEvent.getStartTime(),
taskResponseEvent.getWorkerAddress(),
taskResponseEvent.getExecutePath(),
taskResponseEvent.getLogPath()
);
taskInstance.setState(taskEvent.getState());
taskInstance.setStartTime(taskEvent.getStartTime());
taskInstance.setHost(taskEvent.getWorkerAddress());
taskInstance.setLogPath(taskEvent.getLogPath());
taskInstance.setExecutePath(taskEvent.getExecutePath());
taskInstance.setPid(taskEvent.getProcessId());
taskInstance.setAppLink(taskEvent.getAppIds());
processService.saveTaskInstance(taskInstance);
}
}
// if taskInstance is null (maybe deleted) or finish. retry will be meaningless . so ack success
DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskAckCommand.convert2Command());
TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
} catch (Exception e) {
logger.error("worker ack master error", e);
DBTaskAckCommand taskAckCommand = new DBTaskAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
channel.writeAndFlush(taskAckCommand.convert2Command());
TaskExecuteRunningAckCommand taskExecuteRunningAckCommand = new TaskExecuteRunningAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
channel.writeAndFlush(taskExecuteRunningAckCommand.convert2Command());
}
}
/**
* handle result event
*/
private void handleResultEvent(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) {
Channel channel = taskResponseEvent.getChannel();
private void handleResultEvent(TaskEvent taskEvent, TaskInstance taskInstance) {
Channel channel = taskEvent.getChannel();
try {
if (taskInstance != null) {
dataQualityResultOperator.operateDqExecuteResult(taskResponseEvent, taskInstance);
processService.changeTaskState(taskInstance, taskResponseEvent.getState(),
taskResponseEvent.getEndTime(),
taskResponseEvent.getProcessId(),
taskResponseEvent.getAppIds(),
taskResponseEvent.getVarPool()
);
dataQualityResultOperator.operateDqExecuteResult(taskEvent, taskInstance);
taskInstance.setStartTime(taskEvent.getStartTime());
taskInstance.setHost(taskEvent.getWorkerAddress());
taskInstance.setLogPath(taskEvent.getLogPath());
taskInstance.setExecutePath(taskEvent.getExecutePath());
taskInstance.setPid(taskEvent.getProcessId());
taskInstance.setAppLink(taskEvent.getAppIds());
taskInstance.setState(taskEvent.getState());
taskInstance.setEndTime(taskEvent.getEndTime());
taskInstance.setVarPool(taskEvent.getVarPool());
processService.changeOutParam(taskInstance);
processService.saveTaskInstance(taskInstance);
}
// if taskInstance is null (maybe deleted) . retry will be meaningless . so response success
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.SUCCESS.getCode(), taskResponseEvent.getTaskInstanceId());
channel.writeAndFlush(taskResponseCommand.convert2Command());
TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
} catch (Exception e) {
logger.error("worker response master error", e);
DBTaskResponseCommand taskResponseCommand = new DBTaskResponseCommand(ExecutionStatus.FAILURE.getCode(), -1);
channel.writeAndFlush(taskResponseCommand.convert2Command());
TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = new TaskExecuteResponseAckCommand(ExecutionStatus.FAILURE.getCode(), -1);
channel.writeAndFlush(taskExecuteResponseAckCommand.convert2Command());
}
}
}
\ No newline at end of file
......@@ -259,11 +259,8 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
// verify tenant is null
if (verifyTenantIsNull(tenant, taskInstance)) {
processService.changeTaskState(taskInstance, ExecutionStatus.FAILURE,
taskInstance.getStartTime(),
taskInstance.getHost(),
null,
null);
taskInstance.setState(ExecutionStatus.FAILURE);
processService.saveTaskInstance(taskInstance);
return null;
}
// set queue for process instance, user-specified queue takes precedence over tenant queue
......
......@@ -27,7 +27,7 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.dp.CheckType;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqFailureStrategy;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.DqTaskState;
import org.apache.dolphinscheduler.plugin.task.api.enums.dp.OperatorType;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskResponseEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
......@@ -59,7 +59,7 @@ public class DataQualityResultOperator {
* @param taskResponseEvent
* @param taskInstance
*/
public void operateDqExecuteResult(TaskResponseEvent taskResponseEvent, TaskInstance taskInstance) {
public void operateDqExecuteResult(TaskEvent taskResponseEvent, TaskInstance taskInstance) {
if (TASK_TYPE_DATA_QUALITY.equals(taskInstance.getTaskType())) {
ProcessInstance processInstance =
......@@ -92,7 +92,7 @@ public class DataQualityResultOperator {
* @param dqExecuteResult
* @param processInstance
*/
private void checkDqExecuteResult(TaskResponseEvent taskResponseEvent,
private void checkDqExecuteResult(TaskEvent taskResponseEvent,
DqExecuteResult dqExecuteResult,
ProcessInstance processInstance) {
if (isFailure(dqExecuteResult)) {
......
......@@ -231,6 +231,7 @@ public class DependentExecute {
if (state.typeIsRunning()
|| state == ExecutionStatus.SUBMITTED_SUCCESS
|| state == ExecutionStatus.DISPATCH
|| state == ExecutionStatus.WAITING_THREAD) {
return DependResult.WAITING;
} else {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.util.Date;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import io.netty.channel.Channel;
/**
* task ack processor test
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class, TaskEvent.class})
public class TaskAckProcessorTest {
private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
private TaskEventService taskEventService;
private ProcessService processService;
private TaskExecuteRunningCommand taskExecuteRunningCommand;
private TaskEvent taskResponseEvent;
private Channel channel;
@Before
public void before() {
PowerMockito.mockStatic(SpringApplicationContext.class);
taskEventService = PowerMockito.mock(TaskEventService.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskEventService.class)).thenReturn(taskEventService);
processService = PowerMockito.mock(ProcessService.class);
PowerMockito.when(SpringApplicationContext.getBean(ProcessService.class)).thenReturn(processService);
taskExecuteRunningProcessor = new TaskExecuteRunningProcessor();
channel = PowerMockito.mock(Channel.class);
taskResponseEvent = PowerMockito.mock(TaskEvent.class);
taskExecuteRunningCommand = new TaskExecuteRunningCommand();
taskExecuteRunningCommand.setStatus(1);
taskExecuteRunningCommand.setExecutePath("/dolphinscheduler/worker");
taskExecuteRunningCommand.setHost("localhost");
taskExecuteRunningCommand.setLogPath("/temp/worker.log");
taskExecuteRunningCommand.setStartTime(new Date());
taskExecuteRunningCommand.setTaskInstanceId(1);
taskExecuteRunningCommand.setProcessInstanceId(1);
}
@Test
public void testProcess() {
// Command command = taskExecuteAckCommand.convert2Command();
// Assert.assertEquals(CommandType.TASK_EXECUTE_ACK,command.getType());
// InetSocketAddress socketAddress = new InetSocketAddress("localhost",12345);
// PowerMockito.when(channel.remoteAddress()).thenReturn(socketAddress);
// PowerMockito.mockStatic(TaskResponseEvent.class);
//
// PowerMockito.when(TaskResponseEvent.newAck(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), Mockito.anyInt(), channel))
// .thenReturn(taskResponseEvent);
// TaskInstance taskInstance = PowerMockito.mock(TaskInstance.class);
// PowerMockito.when(processService.findTaskInstanceById(Mockito.any())).thenReturn(taskInstance);
//
// taskAckProcessor.process(channel,command);
}
}
......@@ -19,9 +19,14 @@ package org.apache.dolphinscheduler.server.master.processor.queue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.server.master.cache.impl.ProcessInstanceExecCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.server.utils.DataQualityResultOperator;
import org.apache.dolphinscheduler.service.process.ProcessService;
import java.net.InetSocketAddress;
import java.util.Date;
import org.junit.After;
......@@ -42,41 +47,52 @@ public class TaskResponseServiceTest {
private ProcessService processService;
@InjectMocks
TaskResponseService taskRspService;
TaskEventService taskEventService;
@Mock
private Channel channel;
private TaskResponseEvent ackEvent;
private TaskEvent ackEvent;
private TaskResponseEvent resultEvent;
private TaskEvent resultEvent;
private TaskInstance taskInstance;
@Mock
private ProcessInstanceExecCacheManagerImpl processInstanceExecCacheManager;
@Mock
private DataQualityResultOperator dataQualityResultOperator;
@Mock
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@Before
public void before() {
taskRspService.start();
ackEvent = TaskResponseEvent.newAck(ExecutionStatus.RUNNING_EXECUTION,
new Date(),
"127.*.*.*",
"path",
"logPath",
22,
channel,
1);
resultEvent = TaskResponseEvent.newResult(ExecutionStatus.SUCCESS,
new Date(),
1,
"ids",
22,
"varPol",
channel,
1);
taskEventService.start();
Mockito.when(channel.remoteAddress()).thenReturn(InetSocketAddress.createUnresolved("127.0.0.1", 1234));
TaskExecuteRunningCommand taskExecuteRunningCommand = new TaskExecuteRunningCommand();
taskExecuteRunningCommand.setProcessId(1);
taskExecuteRunningCommand.setTaskInstanceId(22);
taskExecuteRunningCommand.setStatus(ExecutionStatus.RUNNING_EXECUTION.getCode());
taskExecuteRunningCommand.setExecutePath("path");
taskExecuteRunningCommand.setLogPath("logPath");
taskExecuteRunningCommand.setHost("127.*.*.*");
taskExecuteRunningCommand.setStartTime(new Date());
ackEvent = TaskEvent.newRunningEvent(taskExecuteRunningCommand, channel);
TaskExecuteResponseCommand taskExecuteResponseCommand = new TaskExecuteResponseCommand();
taskExecuteResponseCommand.setProcessInstanceId(1);
taskExecuteResponseCommand.setTaskInstanceId(22);
taskExecuteResponseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
taskExecuteResponseCommand.setEndTime(new Date());
taskExecuteResponseCommand.setVarPool("varPol");
taskExecuteResponseCommand.setAppIds("ids");
taskExecuteResponseCommand.setProcessId(1);
resultEvent = TaskEvent.newResultEvent(taskExecuteResponseCommand, channel);
taskInstance = new TaskInstance();
taskInstance.setId(22);
......@@ -87,14 +103,14 @@ public class TaskResponseServiceTest {
public void testAddResponse() {
Mockito.when(processService.findTaskInstanceById(Mockito.any())).thenReturn(taskInstance);
Mockito.when(channel.writeAndFlush(Mockito.any())).thenReturn(null);
taskRspService.addResponse(ackEvent);
taskRspService.addResponse(resultEvent);
taskEventService.addEvent(ackEvent);
taskEventService.addEvent(resultEvent);
}
@After
public void after() {
if (taskRspService != null) {
taskRspService.stop();
if (taskEventService != null) {
taskEventService.stop();
}
}
}
......@@ -69,24 +69,24 @@ public enum CommandType {
TASK_EXECUTE_REQUEST,
/**
* execute task ack
* task execute running, from worker to master
*/
TASK_EXECUTE_ACK,
TASK_EXECUTE_RUNNING,
/**
* execute task response
* task execute running ack, from master to worker
*/
TASK_EXECUTE_RESPONSE,
TASK_EXECUTE_RUNNING_ACK,
/**
* db task ack
* task execute response, from worker to master
*/
DB_TASK_ACK,
TASK_EXECUTE_RESPONSE,
/**
* db task response
* task execute response ack, from master to worker
*/
DB_TASK_RESPONSE,
TASK_EXECUTE_RESPONSE_ACK,
/**
* kill task
......
......@@ -61,7 +61,7 @@ public class StateEventResponseCommand implements Serializable {
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.DB_TASK_RESPONSE);
command.setType(CommandType.TASK_EXECUTE_RESPONSE_ACK);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
......
......@@ -22,18 +22,19 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
/**
* db task final result response command
* task execute response ack command
* from master to worker
*/
public class DBTaskResponseCommand implements Serializable {
public class TaskExecuteResponseAckCommand implements Serializable {
private int taskInstanceId;
private int status;
public DBTaskResponseCommand() {
public TaskExecuteResponseAckCommand() {
super();
}
public DBTaskResponseCommand(int status, int taskInstanceId) {
public TaskExecuteResponseAckCommand(int status, int taskInstanceId) {
this.status = status;
this.taskInstanceId = taskInstanceId;
}
......@@ -61,7 +62,7 @@ public class DBTaskResponseCommand implements Serializable {
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.DB_TASK_RESPONSE);
command.setType(CommandType.TASK_EXECUTE_RESPONSE_ACK);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
......@@ -69,7 +70,7 @@ public class DBTaskResponseCommand implements Serializable {
@Override
public String toString() {
return "DBTaskResponseCommand{"
return "TaskExecuteResponseAckCommand{"
+ "taskInstanceId=" + taskInstanceId
+ ", status=" + status
+ '}';
......
......@@ -23,7 +23,7 @@ import java.io.Serializable;
import java.util.Date;
/**
* execute task response command
* execute task response command
*/
public class TaskExecuteResponseCommand implements Serializable {
......@@ -36,23 +36,43 @@ public class TaskExecuteResponseCommand implements Serializable {
}
/**
* task instance id
* task instance id
*/
private int taskInstanceId;
/**
* process instance id
*/
private int processInstanceId;
private int processInstanceId;
/**
* status
* status
*/
private int status;
/**
* startTime
*/
private Date startTime;
/**
* host
*/
private String host;
/**
* logPath
*/
private String logPath;
/**
* end time
* executePath
*/
private String executePath;
/**
* end time
*/
private Date endTime;
......@@ -72,6 +92,38 @@ public class TaskExecuteResponseCommand implements Serializable {
*/
private String varPool;
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
public void setVarPool(String varPool) {
this.varPool = varPool;
}
......@@ -79,7 +131,7 @@ public class TaskExecuteResponseCommand implements Serializable {
public String getVarPool() {
return varPool;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
......@@ -122,6 +174,7 @@ public class TaskExecuteResponseCommand implements Serializable {
/**
* package response command
*
* @return command
*/
public Command convert2Command() {
......@@ -136,10 +189,16 @@ public class TaskExecuteResponseCommand implements Serializable {
public String toString() {
return "TaskExecuteResponseCommand{"
+ "taskInstanceId=" + taskInstanceId
+ ", processInstanceId=" + processInstanceId
+ ", status=" + status
+ ", startTime=" + startTime
+ ", endTime=" + endTime
+ ", host=" + host
+ ", logPath=" + logPath
+ ", executePath=" + executePath
+ ", processId=" + processId
+ ", appIds='" + appIds + '\''
+ ", varPool=" + varPool
+ '}';
}
......
......@@ -22,18 +22,19 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import java.io.Serializable;
/**
* db task ack request command
* task execute running ack command
* from master to worker
*/
public class DBTaskAckCommand implements Serializable {
public class TaskExecuteRunningAckCommand implements Serializable {
private int taskInstanceId;
private int status;
public DBTaskAckCommand() {
public TaskExecuteRunningAckCommand() {
super();
}
public DBTaskAckCommand(int status, int taskInstanceId) {
public TaskExecuteRunningAckCommand(int status, int taskInstanceId) {
this.status = status;
this.taskInstanceId = taskInstanceId;
}
......@@ -61,7 +62,7 @@ public class DBTaskAckCommand implements Serializable {
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.DB_TASK_ACK);
command.setType(CommandType.TASK_EXECUTE_RUNNING_ACK);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
......@@ -69,6 +70,6 @@ public class DBTaskAckCommand implements Serializable {
@Override
public String toString() {
return "DBTaskAckCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + '}';
return "TaskExecuteRunningAckCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + '}';
}
}
......@@ -23,9 +23,10 @@ import java.io.Serializable;
import java.util.Date;
/**
* execute task request command
* task execute running command
* from worker to master
*/
public class TaskExecuteAckCommand implements Serializable {
public class TaskExecuteRunningCommand implements Serializable {
/**
* taskInstanceId
......@@ -62,6 +63,16 @@ public class TaskExecuteAckCommand implements Serializable {
*/
private String executePath;
/**
* processId
*/
private int processId;
/**
* appIds
*/
private String appIds;
public Date getStartTime() {
return startTime;
}
......@@ -94,6 +105,14 @@ public class TaskExecuteAckCommand implements Serializable {
this.taskInstanceId = taskInstanceId;
}
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
public String getLogPath() {
return logPath;
}
......@@ -110,6 +129,22 @@ public class TaskExecuteAckCommand implements Serializable {
this.executePath = executePath;
}
public int getProcessId() {
return processId;
}
public void setProcessId(int processId) {
this.processId = processId;
}
public String getAppIds() {
return appIds;
}
public void setAppIds(String appIds) {
this.appIds = appIds;
}
/**
* package request command
*
......@@ -117,7 +152,7 @@ public class TaskExecuteAckCommand implements Serializable {
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_EXECUTE_ACK);
command.setType(CommandType.TASK_EXECUTE_RUNNING);
byte[] body = JSONUtils.toJsonByteArray(this);
command.setBody(body);
return command;
......@@ -125,22 +160,16 @@ public class TaskExecuteAckCommand implements Serializable {
@Override
public String toString() {
return "TaskExecuteAckCommand{"
return "TaskExecuteRunningCommand{"
+ "taskInstanceId=" + taskInstanceId
+ ", processInstanceId='" + processInstanceId + '\''
+ ", startTime=" + startTime
+ ", host='" + host + '\''
+ ", status=" + status
+ ", logPath='" + logPath + '\''
+ ", executePath='" + executePath + '\''
+ ", processInstanceId='" + processInstanceId + '\''
+ ", processId=" + processId + '\''
+ ", appIds='" + appIds + '\''
+ '}';
}
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
}
......@@ -17,13 +17,19 @@
package org.apache.dolphinscheduler.service.process;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
import static java.util.stream.Collectors.toSet;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
......@@ -124,11 +130,10 @@ import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import java.util.ArrayList;
import java.util.Arrays;
......@@ -143,17 +148,16 @@ import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_FATHER_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_PARENT_INSTANCE_ID;
import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
/**
* process relative dao that some mappers in this.
......@@ -164,6 +168,7 @@ public class ProcessService {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.DISPATCH.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.DELAY_EXECUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
......@@ -266,8 +271,8 @@ public class ProcessService {
/**
* handle Command (construct ProcessInstance from Command) , wrapped in transaction
*
* @param logger logger
* @param host host
* @param logger logger
* @param host host
* @param command found command
* @return process instance
*/
......@@ -368,7 +373,7 @@ public class ProcessService {
/**
* set process waiting thread
*
* @param command command
* @param command command
* @param processInstance processInstance
* @return process instance
*/
......@@ -581,8 +586,6 @@ public class ProcessService {
/**
* recursive delete all task instance by process instance id
*
* @param processInstanceId
*/
public void deleteWorkTaskInstanceByProcessInstanceId(int processInstanceId) {
List<TaskInstance> taskInstanceList = findValidTaskListByProcessId(processInstanceId);
......@@ -603,7 +606,7 @@ public class ProcessService {
* recursive query sub process definition id by parent id.
*
* @param parentCode parentCode
* @param ids ids
* @param ids ids
*/
public void recurseFindSubProcess(long parentCode, List<Long> ids) {
List<TaskDefinition> taskNodeList = this.getTaskNodeListByDefinition(parentCode);
......@@ -628,7 +631,7 @@ public class ProcessService {
* create recovery waiting thread command and delete origin command at the same time.
* if the recovery command is exists, only update the field update_time
*
* @param originCommand originCommand
* @param originCommand originCommand
* @param processInstance processInstance
*/
public void createRecoveryWaitingThreadCommand(Command originCommand, ProcessInstance processInstance) {
......@@ -684,7 +687,7 @@ public class ProcessService {
/**
* get schedule time from command
*
* @param command command
* @param command command
* @param cmdParam cmdParam map
* @return date
*/
......@@ -713,8 +716,8 @@ public class ProcessService {
* generate a new work process instance from command.
*
* @param processDefinition processDefinition
* @param command command
* @param cmdParam cmdParam map
* @param command command
* @param cmdParam cmdParam map
* @return process instance
*/
private ProcessInstance generateNewProcessInstance(ProcessDefinition processDefinition,
......@@ -799,7 +802,7 @@ public class ProcessService {
* use definition creator's tenant.
*
* @param tenantId tenantId
* @param userId userId
* @param userId userId
* @return tenant
*/
public Tenant getTenantForProcess(int tenantId, int userId) {
......@@ -837,7 +840,7 @@ public class ProcessService {
/**
* check command parameters is valid
*
* @param command command
* @param command command
* @param cmdParam cmdParam map
* @return whether command param is valid
*/
......@@ -857,7 +860,7 @@ public class ProcessService {
* construct process instance according to one command.
*
* @param command command
* @param host host
* @param host host
* @return process instance
*/
protected ProcessInstance constructProcessInstance(Command command, String host) {
......@@ -1036,7 +1039,7 @@ public class ProcessService {
* return complement data if the process start with complement data
*
* @param processInstance processInstance
* @param command command
* @param command command
* @return command type
*/
private CommandType getCommandTypeIfComplement(ProcessInstance processInstance, Command command) {
......@@ -1051,8 +1054,8 @@ public class ProcessService {
* initialize complement data parameters
*
* @param processDefinition processDefinition
* @param processInstance processInstance
* @param cmdParam cmdParam
* @param processInstance processInstance
* @param cmdParam cmdParam
*/
private void initComplementDataParam(ProcessDefinition processDefinition,
ProcessInstance processInstance,
......@@ -1125,7 +1128,7 @@ public class ProcessService {
* only the keys doesn't in sub process global would be joined.
*
* @param parentGlobalParams parentGlobalParams
* @param subGlobalParams subGlobalParams
* @param subGlobalParams subGlobalParams
* @return global params join
*/
private String joinGlobalParams(String parentGlobalParams, String subGlobalParams) {
......@@ -1192,7 +1195,7 @@ public class ProcessService {
* submit sub process to command
*
* @param processInstance processInstance
* @param taskInstance taskInstance
* @param taskInstance taskInstance
* @return task instance
*/
@Transactional(rollbackFor = Exception.class)
......@@ -1223,7 +1226,7 @@ public class ProcessService {
* set map {parent instance id, task instance id, 0(child instance id)}
*
* @param parentInstance parentInstance
* @param parentTask parentTask
* @param parentTask parentTask
* @return process instance map
*/
private ProcessInstanceMap setProcessInstanceMap(ProcessInstance parentInstance, TaskInstance parentTask) {
......@@ -1252,7 +1255,7 @@ public class ProcessService {
* find previous task work process map.
*
* @param parentProcessInstance parentProcessInstance
* @param parentTask parentTask
* @param parentTask parentTask
* @return process instance map
*/
private ProcessInstanceMap findPreviousTaskProcessMap(ProcessInstance parentProcessInstance,
......@@ -1278,7 +1281,7 @@ public class ProcessService {
* create sub work process command
*
* @param parentProcessInstance parentProcessInstance
* @param task task
* @param task task
*/
public void createSubWorkProcess(ProcessInstance parentProcessInstance, TaskInstance task) {
if (!task.isSubProcess()) {
......@@ -1412,7 +1415,7 @@ public class ProcessService {
* update sub process definition
*
* @param parentProcessInstance parentProcessInstance
* @param childDefinitionCode childDefinitionId
* @param childDefinitionCode childDefinitionId
*/
private void updateSubProcessDefinitionByParent(ProcessInstance parentProcessInstance, long childDefinitionCode) {
ProcessDefinition fatherDefinition = this.findProcessDefinition(parentProcessInstance.getProcessDefinitionCode(),
......@@ -1427,7 +1430,7 @@ public class ProcessService {
/**
* submit task to mysql
*
* @param taskInstance taskInstance
* @param taskInstance taskInstance
* @param processInstance processInstance
* @return task instance
*/
......@@ -1463,7 +1466,7 @@ public class ProcessService {
* return stop if work process state is ready stop
* if all of above are not satisfied, return submit success
*
* @param taskInstance taskInstance
* @param taskInstance taskInstance
* @param processInstance processInstance
* @return process instance state
*/
......@@ -1476,6 +1479,7 @@ public class ProcessService {
state == ExecutionStatus.RUNNING_EXECUTION
|| state == ExecutionStatus.DELAY_EXECUTION
|| state == ExecutionStatus.KILL
|| state == ExecutionStatus.DISPATCH
) {
return state;
}
......@@ -1689,7 +1693,7 @@ public class ProcessService {
* get id list by task state
*
* @param instanceId instanceId
* @param state state
* @param state state
* @return task instance states
*/
public List<Integer> findTaskIdByInstanceState(int instanceId, ExecutionStatus state) {
......@@ -1744,7 +1748,7 @@ public class ProcessService {
* find work process map by parent process id and parent task id.
*
* @param parentWorkProcessId parentWorkProcessId
* @param parentTaskId parentTaskId
* @param parentTaskId parentTaskId
* @return process instance map
*/
public ProcessInstanceMap findWorkProcessMapByParent(Integer parentWorkProcessId, Integer parentTaskId) {
......@@ -1766,7 +1770,7 @@ public class ProcessService {
* find sub process instance
*
* @param parentProcessId parentProcessId
* @param parentTaskId parentTaskId
* @param parentTaskId parentTaskId
* @return process instance
*/
public ProcessInstance findSubProcessInstance(Integer parentProcessId, Integer parentTaskId) {
......@@ -1795,29 +1799,6 @@ public class ProcessService {
return processInstance;
}
/**
* change task state
*
* @param state state
* @param startTime startTime
* @param host host
* @param executePath executePath
* @param logPath logPath
*/
public void changeTaskState(TaskInstance taskInstance,
ExecutionStatus state,
Date startTime,
String host,
String executePath,
String logPath) {
taskInstance.setState(state);
taskInstance.setStartTime(startTime);
taskInstance.setHost(host);
taskInstance.setExecutePath(executePath);
taskInstance.setLogPath(logPath);
saveTaskInstance(taskInstance);
}
/**
* update process instance
*
......@@ -1828,27 +1809,6 @@ public class ProcessService {
return processInstanceMapper.updateById(processInstance);
}
/**
* change task state
*
* @param state state
* @param endTime endTime
* @param varPool varPool
*/
public void changeTaskState(TaskInstance taskInstance, ExecutionStatus state,
Date endTime,
int processId,
String appIds,
String varPool) {
taskInstance.setPid(processId);
taskInstance.setAppLink(appIds);
taskInstance.setState(state);
taskInstance.setEndTime(endTime);
taskInstance.setVarPool(varPool);
changeOutParam(taskInstance);
saveTaskInstance(taskInstance);
}
/**
* for show in page of taskInstance
*/
......@@ -2006,7 +1966,7 @@ public class ProcessService {
* update process instance state by id
*
* @param processInstanceId processInstanceId
* @param executionStatus executionStatus
* @param executionStatus executionStatus
* @return update process result
*/
public int updateProcessInstanceState(Integer processInstanceId, ExecutionStatus executionStatus) {
......@@ -2042,7 +2002,7 @@ public class ProcessService {
/**
* find tenant code by resource name
*
* @param resName resource name
* @param resName resource name
* @param resourceType resource type
* @return tenant code
*/
......@@ -2080,7 +2040,7 @@ public class ProcessService {
* find last scheduler process instance in the date interval
*
* @param definitionCode definitionCode
* @param dateInterval dateInterval
* @param dateInterval dateInterval
* @return process instance
*/
public ProcessInstance findLastSchedulerProcessInterval(Long definitionCode, DateInterval dateInterval) {
......@@ -2093,7 +2053,7 @@ public class ProcessService {
* find last manual process instance interval
*
* @param definitionCode process definition code
* @param dateInterval dateInterval
* @param dateInterval dateInterval
* @return process instance
*/
public ProcessInstance findLastManualProcessInterval(Long definitionCode, DateInterval dateInterval) {
......@@ -2106,8 +2066,8 @@ public class ProcessService {
* find last running process instance
*
* @param definitionCode process definition code
* @param startTime start time
* @param endTime end time
* @param startTime start time
* @param endTime end time
* @return process instance
*/
public ProcessInstance findLastRunningProcess(Long definitionCode, Date startTime, Date endTime) {
......@@ -2191,7 +2151,7 @@ public class ProcessService {
/**
* list unauthorized udf function
*
* @param userId user id
* @param userId user id
* @param needChecks data source id array
* @return unauthorized udf function list
*/
......@@ -2599,7 +2559,7 @@ public class ProcessService {
* add authorized resources
*
* @param ownResources own resources
* @param userId userId
* @param userId userId
*/
private void addAuthorizedResources(List<Resource> ownResources, int userId) {
List<Integer> relationResourceIds = resourceUserMapper.queryResourcesIdListByUserIdAndPerm(userId, 7);
......@@ -2742,12 +2702,7 @@ public class ProcessService {
/**
* the first time (when submit the task ) get the resource of the task group
*
* @param taskId task id
* @param taskName
* @param groupId
* @param processId
* @param priority
* @return
* @param taskId task id
*/
public boolean acquireTaskGroup(int taskId,
String taskName, int groupId,
......@@ -2788,9 +2743,6 @@ public class ProcessService {
/**
* try to get the task group resource(when other task release the resource)
*
* @param taskGroupQueue
* @return
*/
public boolean robTaskGroupResouce(TaskGroupQueue taskGroupQueue) {
TaskGroup taskGroup = taskGroupMapper.selectById(taskGroupQueue.getGroupId());
......@@ -2877,11 +2829,11 @@ public class ProcessService {
/**
* insert into task group queue
*
* @param taskId task id
* @param taskName task name
* @param groupId group id
* @param taskId task id
* @param taskName task name
* @param groupId group id
* @param processId process id
* @param priority priority
* @param priority priority
* @return result and msg code
*/
public TaskGroupQueue insertIntoTaskGroupQueue(Integer taskId,
......
......@@ -207,6 +207,16 @@ public class TaskExecutionContext {
private ResourceParametersHelper resourceParametersHelper;
/**
* endTime
*/
private Date endTime;
/**
* sql TaskExecutionContext
*/
private SQLTaskExecutionContext sqlTaskExecutionContext;
/**
* resources full name and tenant code
*/
......@@ -538,12 +548,61 @@ public class TaskExecutionContext {
this.dataQualityTaskExecutionContext = dataQualityTaskExecutionContext;
}
public void setCurrentExecutionStatus(ExecutionStatus currentExecutionStatus) {
this.currentExecutionStatus = currentExecutionStatus;
}
public ExecutionStatus getCurrentExecutionStatus() {
return currentExecutionStatus;
}
public void setCurrentExecutionStatus(ExecutionStatus currentExecutionStatus) {
this.currentExecutionStatus = currentExecutionStatus;
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
@Override
public String toString() {
return "TaskExecutionContext{"
+ "taskInstanceId=" + taskInstanceId
+ ", taskName='" + taskName + '\''
+ ", currentExecutionStatus=" + currentExecutionStatus
+ ", firstSubmitTime=" + firstSubmitTime
+ ", startTime=" + startTime
+ ", taskType='" + taskType + '\''
+ ", host='" + host + '\''
+ ", executePath='" + executePath + '\''
+ ", logPath='" + logPath + '\''
+ ", taskJson='" + taskJson + '\''
+ ", processId=" + processId
+ ", processDefineCode=" + processDefineCode
+ ", processDefineVersion=" + processDefineVersion
+ ", appIds='" + appIds + '\''
+ ", processInstanceId=" + processInstanceId
+ ", scheduleTime=" + scheduleTime
+ ", globalParams='" + globalParams + '\''
+ ", executorId=" + executorId
+ ", cmdTypeIfComplement=" + cmdTypeIfComplement
+ ", tenantCode='" + tenantCode + '\''
+ ", queue='" + queue + '\''
+ ", projectCode=" + projectCode
+ ", taskParams='" + taskParams + '\''
+ ", envFile='" + envFile + '\''
+ ", dryRun='" + dryRun + '\''
+ ", definedParams=" + definedParams
+ ", taskAppId='" + taskAppId + '\''
+ ", taskTimeoutStrategy=" + taskTimeoutStrategy
+ ", taskTimeout=" + taskTimeout
+ ", workerGroup='" + workerGroup + '\''
+ ", environmentConfig='" + environmentConfig + '\''
+ ", delayTime=" + delayTime
+ ", resources=" + resources
+ ", sqlTaskExecutionContext=" + sqlTaskExecutionContext
+ ", dataQualityTaskExecutionContext=" + dataQualityTaskExecutionContext
+ '}';
}
}
......@@ -62,7 +62,9 @@ public enum ExecutionStatus {
FORCED_SUCCESS(13, "forced success"),
SERIAL_WAIT(14, "serial wait"),
READY_BLOCK(15, "ready block"),
BLOCK(16, "block");
BLOCK(16, "block"),
DISPATCH(17, "dispatch"),
;
ExecutionStatus(int code, String descp) {
this.code = code;
......
......@@ -28,10 +28,10 @@ import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.config.NettyServerConfig;
import org.apache.dolphinscheduler.server.log.LoggerRequestProcessor;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.processor.DBTaskAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.DBTaskResponseProcessor;
import org.apache.dolphinscheduler.server.worker.processor.HostUpdateProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResponseAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.runner.RetryReportTaskStatusThread;
......@@ -111,10 +111,10 @@ public class WorkerServer implements IStoppable {
private TaskKillProcessor taskKillProcessor;
@Autowired
private DBTaskAckProcessor dbTaskAckProcessor;
private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
@Autowired
private DBTaskResponseProcessor dbTaskResponseProcessor;
private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor;
@Autowired
private HostUpdateProcessor hostUpdateProcessor;
......@@ -143,8 +143,8 @@ public class WorkerServer implements IStoppable {
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, taskExecuteProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_ACK, dbTaskAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.DB_TASK_RESPONSE, dbTaskResponseProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
// logger server
......
......@@ -30,28 +30,30 @@ public class ResponseCache {
private static final ResponseCache instance = new ResponseCache();
private ResponseCache(){}
private ResponseCache() {
}
public static ResponseCache get() {
return instance;
}
private Map<Integer,Command> ackCache = new ConcurrentHashMap<>();
private Map<Integer,Command> responseCache = new ConcurrentHashMap<>();
private Map<Integer, Command> runningCache = new ConcurrentHashMap<>();
private Map<Integer, Command> responseCache = new ConcurrentHashMap<>();
/**
* cache response
*
* @param taskInstanceId taskInstanceId
* @param command command
* @param event event ACK/RESULT
*/
public void cache(Integer taskInstanceId, Command command, Event event) {
switch (event) {
case ACK:
ackCache.put(taskInstanceId,command);
case RUNNING:
runningCache.put(taskInstanceId, command);
break;
case RESULT:
responseCache.put(taskInstanceId,command);
responseCache.put(taskInstanceId, command);
break;
default:
throw new IllegalArgumentException("invalid event type : " + event);
......@@ -59,15 +61,17 @@ public class ResponseCache {
}
/**
* remove ack cache
* remove running cache
*
* @param taskInstanceId taskInstanceId
*/
public void removeAckCache(Integer taskInstanceId) {
ackCache.remove(taskInstanceId);
public void removeRunningCache(Integer taskInstanceId) {
runningCache.remove(taskInstanceId);
}
/**
* remove reponse cache
* remove response cache
*
* @param taskInstanceId taskInstanceId
*/
public void removeResponseCache(Integer taskInstanceId) {
......@@ -75,18 +79,20 @@ public class ResponseCache {
}
/**
* getAckCache
* get running cache
*
* @return getAckCache
*/
public Map<Integer,Command> getAckCache() {
return ackCache;
public Map<Integer, Command> getRunningCache() {
return runningCache;
}
/**
* getResponseCache
*
* @return getResponseCache
*/
public Map<Integer,Command> getResponseCache() {
public Map<Integer, Command> getResponseCache() {
return responseCache;
}
}
......@@ -19,16 +19,25 @@ package org.apache.dolphinscheduler.server.worker.processor;
import static org.apache.dolphinscheduler.common.Constants.SLEEP_TIME_MILLIS;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import java.util.Arrays;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import io.netty.channel.Channel;
......@@ -45,6 +54,12 @@ public class TaskCallbackService {
private final Logger logger = LoggerFactory.getLogger(TaskCallbackService.class);
private static final int[] RETRY_BACKOFF = {1, 2, 3, 5, 10, 20, 40, 100, 100, 100, 100, 200, 200, 200};
@Autowired
private TaskExecuteResponseAckProcessor taskExecuteRunningProcessor;
@Autowired
private TaskExecuteResponseAckProcessor taskExecuteResponseAckProcessor;
/**
* remote channels
*/
......@@ -58,15 +73,15 @@ public class TaskCallbackService {
public TaskCallbackService() {
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
this.nettyRemotingClient.registerProcessor(CommandType.DB_TASK_ACK, new DBTaskAckProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.DB_TASK_RESPONSE, new DBTaskResponseProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
}
/**
* add callback channel
*
* @param taskInstanceId taskInstanceId
* @param channel channel
* @param channel channel
*/
public void addRemoteChannel(int taskInstanceId, NettyRemoteChannel channel) {
REMOTE_CHANNELS.put(taskInstanceId, channel);
......@@ -128,26 +143,13 @@ public class TaskCallbackService {
REMOTE_CHANNELS.remove(taskInstanceId);
}
/**
* send ack
*
* @param taskInstanceId taskInstanceId
* @param command command
*/
public void sendAck(int taskInstanceId, Command command) {
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
if (nettyRemoteChannel != null) {
nettyRemoteChannel.writeAndFlush(command);
}
}
/**
* send result
*
* @param taskInstanceId taskInstanceId
* @param command command
* @param command command
*/
public void sendResult(int taskInstanceId, Command command) {
public void send(int taskInstanceId, Command command) {
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
if (nettyRemoteChannel != null) {
nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener() {
......@@ -161,6 +163,99 @@ public class TaskCallbackService {
}
});
}
}
/**
* build task execute running command
*
* @param taskExecutionContext taskExecutionContext
* @return TaskExecuteAckCommand
*/
private TaskExecuteRunningCommand buildTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteRunningCommand command = new TaskExecuteRunningCommand();
command.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
command.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
command.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
command.setLogPath(taskExecutionContext.getLogPath());
command.setHost(taskExecutionContext.getHost());
command.setStartTime(taskExecutionContext.getStartTime());
command.setExecutePath(taskExecutionContext.getExecutePath());
return command;
}
/**
* build task execute response command
*
* @param taskExecutionContext taskExecutionContext
* @return TaskExecuteResponseCommand
*/
private TaskExecuteResponseCommand buildTaskExecuteResponseCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteResponseCommand command = new TaskExecuteResponseCommand();
command.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
command.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
command.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
command.setLogPath(taskExecutionContext.getLogPath());
command.setExecutePath(taskExecutionContext.getExecutePath());
command.setAppIds(taskExecutionContext.getAppIds());
command.setProcessId(taskExecutionContext.getProcessId());
command.setHost(taskExecutionContext.getHost());
command.setStartTime(taskExecutionContext.getStartTime());
command.setEndTime(taskExecutionContext.getEndTime());
command.setVarPool(taskExecutionContext.getVarPool());
command.setExecutePath(taskExecutionContext.getExecutePath());
return command;
}
/**
* build TaskKillResponseCommand
*
* @param taskExecutionContext taskExecutionContext
* @return build TaskKillResponseCommand
*/
private TaskKillResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext) {
TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskKillResponseCommand.setHost(taskExecutionContext.getHost());
taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
return taskKillResponseCommand;
}
/**
* send task execute running command
* todo unified callback command
*/
public void sendTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteRunningCommand command = buildTaskExecuteRunningCommand(taskExecutionContext);
// add response cache
ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), Event.RUNNING);
send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
}
/**
* send task execute delay command
* todo unified callback command
*/
public void sendTaskExecuteDelayCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteRunningCommand command = buildTaskExecuteRunningCommand(taskExecutionContext);
send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
}
/**
* send task execute response command
* todo unified callback command
*/
public void sendTaskExecuteResponseCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteResponseCommand command = buildTaskExecuteResponseCommand(taskExecutionContext);
// add response cache
ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), Event.RESULT);
send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
}
public void sendTaskKillResponseCommand(TaskExecutionContext taskExecutionContext) {
TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskExecutionContext);
send(taskExecutionContext.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
TaskCallbackService.remove(taskExecutionContext.getTaskInstanceId());
}
}
......@@ -18,7 +18,6 @@
package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.FileUtils;
......@@ -30,12 +29,10 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheMana
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.runner.TaskExecuteThread;
import org.apache.dolphinscheduler.server.worker.runner.WorkerManagerThread;
......@@ -128,6 +125,21 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
taskExecutionContext.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
if (Constants.DRY_RUN_FLAG_NO == taskExecutionContext.getDryRun()) {
if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
}
// check if the OS user exists
if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
logger.error("tenantCode: {} does not exist, taskInstanceId: {}",
taskExecutionContext.getTenantCode(), taskExecutionContext.getTaskInstanceId());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskExecutionContext.setEndTime(new Date());
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
return;
}
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
logger.info("task instance local execute path : {}", execLocalPath);
......@@ -135,12 +147,13 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
try {
FileUtils.createWorkDirIfAbsent(execLocalPath);
if (CommonUtils.isSudoEnable() && workerConfig.isTenantAutoCreate()) {
OSUtils.createUserIfAbsent(taskExecutionContext.getTenantCode());
}
} catch (Throwable ex) {
logger.error("create execLocalPath: {}", execLocalPath, ex);
logger.error("create execLocalPath fail, path: {}, taskInstanceId: {}", execLocalPath, taskExecutionContext.getTaskInstanceId());
logger.error("create executeLocalPath fail", ex);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
return;
}
}
......@@ -153,48 +166,19 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
logger.info("delay the execution of task instance {}, delay time: {} s", taskExecutionContext.getTaskInstanceId(), remainTime);
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.DELAY_EXECUTION);
taskExecutionContext.setStartTime(null);
} else {
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
taskExecutionContext.setStartTime(new Date());
taskCallbackService.sendTaskExecuteDelayCommand(taskExecutionContext);
}
this.doAck(taskExecutionContext);
// submit task to manager
if (!workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager))) {
logger.info("submit task to manager error, queue is full, queue size is {}", workerManager.getDelayQueueSize());
boolean offer = workerManager.offer(new TaskExecuteThread(taskExecutionContext, taskCallbackService, alertClientService, taskPluginManager));
if (!offer) {
logger.error("submit task to manager error, queue is full, queue size is {}, taskInstanceId: {}",
workerManager.getDelayQueueSize(), taskExecutionContext.getTaskInstanceId());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
}
}
private void doAck(TaskExecutionContext taskExecutionContext) {
// tell master that task is in executing
TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext);
ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command(), Event.ACK);
taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command());
}
/**
* build ack command
*
* @param taskExecutionContext taskExecutionContext
* @return TaskExecuteAckCommand
*/
private TaskExecuteAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ackCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
ackCommand.setLogPath(LogUtils.getTaskLogPath(taskExecutionContext));
ackCommand.setHost(taskExecutionContext.getHost());
ackCommand.setStartTime(taskExecutionContext.getStartTime());
ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
taskExecutionContext.setLogPath(ackCommand.getLogPath());
ackCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
return ackCommand;
}
/**
* get execute local path
*
......
......@@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.DBTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
......@@ -34,31 +34,31 @@ import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**
* db task response processor
* task execute running ack, from master to worker
*/
@Component
public class DBTaskResponseProcessor implements NettyRequestProcessor {
public class TaskExecuteResponseAckProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(DBTaskResponseProcessor.class);
private final Logger logger = LoggerFactory.getLogger(TaskExecuteResponseAckProcessor.class);
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.DB_TASK_RESPONSE == command.getType(),
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE_ACK == command.getType(),
String.format("invalid command type : %s", command.getType()));
DBTaskResponseCommand taskResponseCommand = JSONUtils.parseObject(
command.getBody(), DBTaskResponseCommand.class);
TaskExecuteResponseAckCommand taskExecuteResponseAckCommand = JSONUtils.parseObject(
command.getBody(), TaskExecuteResponseAckCommand.class);
if (taskResponseCommand == null) {
logger.error("dBTask Response command is null");
if (taskExecuteResponseAckCommand == null) {
logger.error("task execute response ack command is null");
return;
}
logger.info("dBTask Response command : {}", taskResponseCommand);
logger.info("task execute response ack command : {}", taskExecuteResponseAckCommand);
if (taskResponseCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
ResponseCache.get().removeResponseCache(taskResponseCommand.getTaskInstanceId());
TaskCallbackService.remove(taskResponseCommand.getTaskInstanceId());
logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskResponseCommand.getTaskInstanceId());
if (taskExecuteResponseAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
ResponseCache.get().removeResponseCache(taskExecuteResponseAckCommand.getTaskInstanceId());
TaskCallbackService.remove(taskExecuteResponseAckCommand.getTaskInstanceId());
logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskExecuteResponseAckCommand.getTaskInstanceId());
}
}
......
......@@ -21,7 +21,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.DBTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
......@@ -34,29 +34,29 @@ import com.google.common.base.Preconditions;
import io.netty.channel.Channel;
/**
* db task ack processor
* task execute running ack processor
*/
@Component
public class DBTaskAckProcessor implements NettyRequestProcessor {
public class TaskExecuteRunningAckProcessor implements NettyRequestProcessor {
private final Logger logger = LoggerFactory.getLogger(DBTaskAckProcessor.class);
private final Logger logger = LoggerFactory.getLogger(TaskExecuteRunningAckProcessor.class);
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.DB_TASK_ACK == command.getType(),
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING_ACK == command.getType(),
String.format("invalid command type : %s", command.getType()));
DBTaskAckCommand taskAckCommand = JSONUtils.parseObject(
command.getBody(), DBTaskAckCommand.class);
TaskExecuteRunningAckCommand runningAckCommand = JSONUtils.parseObject(
command.getBody(), TaskExecuteRunningAckCommand.class);
if (taskAckCommand == null) {
logger.error("dBTask ACK request command is null");
if (runningAckCommand == null) {
logger.error("task execute running ack command is null");
return;
}
logger.info("dBTask ACK request command : {}", taskAckCommand);
logger.info("task execute running ack command : {}", runningAckCommand);
if (taskAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
ResponseCache.get().removeAckCache(taskAckCommand.getTaskInstanceId());
if (runningAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
ResponseCache.get().removeRunningCache(runningAckCommand.getTaskInstanceId());
}
}
......
......@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.server.worker.processor;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
......@@ -90,10 +91,17 @@ public class TaskKillProcessor implements NettyRequestProcessor {
taskCallbackService.addRemoteChannel(killCommand.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(killCommand, result);
taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskKillResponseCommand.getTaskInstanceId());
TaskCallbackService.remove(killCommand.getTaskInstanceId());
TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(killCommand.getTaskInstanceId());
if (taskExecutionContext == null) {
logger.error("taskRequest cache is null, taskInstanceId: {}", killCommand.getTaskInstanceId());
return;
}
taskExecutionContext.setCurrentExecutionStatus(result.getLeft() ? ExecutionStatus.SUCCESS : ExecutionStatus.FAILURE);
taskExecutionContext.setAppIds(String.join(TaskConstants.COMMA, result.getRight()));
taskCallbackService.sendTaskKillResponseCommand(taskExecutionContext);
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
logger.info("remove REMOTE_CHANNELS, task instance id:{}", killCommand.getTaskInstanceId());
}
......
......@@ -47,7 +47,7 @@ public class RetryReportTaskStatusThread implements Runnable {
private TaskCallbackService taskCallbackService;
public void start() {
Thread thread = new Thread(this,"RetryReportTaskStatusThread");
Thread thread = new Thread(this, "RetryReportTaskStatusThread");
thread.setDaemon(true);
thread.start();
}
......@@ -65,21 +65,21 @@ public class RetryReportTaskStatusThread implements Runnable {
ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL);
try {
if (!instance.getAckCache().isEmpty()) {
Map<Integer,Command> ackCache = instance.getAckCache();
for (Map.Entry<Integer, Command> entry : ackCache.entrySet()) {
if (!instance.getRunningCache().isEmpty()) {
Map<Integer, Command> runningCache = instance.getRunningCache();
for (Map.Entry<Integer, Command> entry : runningCache.entrySet()) {
Integer taskInstanceId = entry.getKey();
Command ackCommand = entry.getValue();
taskCallbackService.sendAck(taskInstanceId,ackCommand);
Command runningCommand = entry.getValue();
taskCallbackService.send(taskInstanceId, runningCommand);
}
}
if (!instance.getResponseCache().isEmpty()) {
Map<Integer,Command> responseCache = instance.getResponseCache();
Map<Integer, Command> responseCache = instance.getResponseCache();
for (Map.Entry<Integer, Command> entry : responseCache.entrySet()) {
Integer taskInstanceId = entry.getKey();
Command responseCommand = entry.getValue();
taskCallbackService.sendResult(taskInstanceId,responseCommand);
taskCallbackService.send(taskInstanceId, responseCommand);
}
}
} catch (Exception e) {
......
......@@ -17,19 +17,15 @@
package org.apache.dolphinscheduler.server.worker.runner;
import com.github.rholder.retry.RetryException;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.Event;
import org.apache.dolphinscheduler.common.enums.WarningType;
import org.apache.dolphinscheduler.common.storage.StorageOperate;
import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.RetryerUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
......@@ -37,17 +33,14 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheMana
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.model.TaskAlertInfo;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.utils.ProcessUtils;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.service.alert.AlertClientService;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.task.TaskPluginManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import java.io.File;
import java.io.IOException;
......@@ -57,11 +50,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.apache.dolphinscheduler.common.Constants.SINGLE_SLASH;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* task scheduler thread
......@@ -131,35 +124,26 @@ public class TaskExecuteThread implements Runnable, Delayed {
@Override
public void run() {
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(), taskExecutionContext.getProcessInstanceId());
if (Constants.DRY_RUN_FLAG_YES == taskExecutionContext.getDryRun()) {
responseCommand.setStatus(ExecutionStatus.SUCCESS.getCode());
responseCommand.setEndTime(new Date());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.SUCCESS);
taskExecutionContext.setStartTime(new Date());
taskExecutionContext.setEndTime(new Date());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
return;
}
try {
logger.info("script path : {}", taskExecutionContext.getExecutePath());
// check if the OS user exists
if (!OSUtils.getUserList().contains(taskExecutionContext.getTenantCode())) {
String errorLog = String.format("tenantCode: %s does not exist", taskExecutionContext.getTenantCode());
logger.error(errorLog);
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
responseCommand.setEndTime(new Date());
return;
}
if (taskExecutionContext.getStartTime() == null) {
taskExecutionContext.setStartTime(new Date());
}
if (taskExecutionContext.getCurrentExecutionStatus() != ExecutionStatus.RUNNING_EXECUTION) {
changeTaskExecutionStatusToRunning();
}
logger.info("the task begins to execute. task instance id: {}", taskExecutionContext.getTaskInstanceId());
// callback task execute running
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
taskCallbackService.sendTaskExecuteRunningCommand(taskExecutionContext);
// copy hdfs/minio file to local
downloadResource(taskExecutionContext.getExecutePath(), taskExecutionContext.getResources(), logger);
......@@ -197,29 +181,27 @@ public class TaskExecuteThread implements Runnable, Delayed {
// task handle
this.task.handle();
responseCommand.setStatus(this.task.getExitStatus().getCode());
// task result process
if (this.task.getNeedAlert()) {
sendAlert(this.task.getTaskAlertInfo(), responseCommand.getStatus());
sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode());
}
responseCommand.setEndTime(new Date());
responseCommand.setProcessId(this.task.getProcessId());
responseCommand.setAppIds(this.task.getAppIds());
responseCommand.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode()));
taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
taskExecutionContext.setProcessId(this.task.getProcessId());
taskExecutionContext.setAppIds(this.task.getAppIds());
taskExecutionContext.setVarPool(JSONUtils.toJsonString(this.task.getParameters().getVarPool()));
logger.info("task instance id : {},task final status : {}", taskExecutionContext.getTaskInstanceId(), this.task.getExitStatus());
} catch (Throwable e) {
logger.error("task scheduler failure", e);
kill();
responseCommand.setStatus(ExecutionStatus.FAILURE.getCode());
responseCommand.setEndTime(new Date());
responseCommand.setProcessId(task.getProcessId());
responseCommand.setAppIds(task.getAppIds());
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.FAILURE);
taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
taskExecutionContext.setProcessId(this.task.getProcessId());
taskExecutionContext.setAppIds(this.task.getAppIds());
} finally {
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
clearTaskExecPath();
}
}
......@@ -312,7 +294,7 @@ public class TaskExecuteThread implements Runnable, Delayed {
// query the tenant code of the resource according to the name of the resource
String resHdfsPath = storageOperate.getResourceFileName(tenantCode, fullName);
logger.info("get resource file from hdfs :{}", resHdfsPath);
storageOperate.download(tenantCode,resHdfsPath, execLocalPath + File.separator + fullName, false, true);
storageOperate.download(tenantCode, resHdfsPath, execLocalPath + File.separator + fullName, false, true);
} catch (Exception e) {
logger.error(e.getMessage(), e);
throw new ServiceException(e.getMessage());
......@@ -323,40 +305,6 @@ public class TaskExecuteThread implements Runnable, Delayed {
}
}
/**
* send an ack to change the status of the task.
*/
private void changeTaskExecutionStatusToRunning() {
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.RUNNING_EXECUTION);
Command ackCommand = buildAckCommand().convert2Command();
try {
RetryerUtils.retryCall(() -> {
taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
return Boolean.TRUE;
});
} catch (ExecutionException | RetryException e) {
logger.error(e.getMessage(), e);
}
}
/**
* build ack command.
*
* @return TaskExecuteAckCommand
*/
private TaskExecuteAckCommand buildAckCommand() {
TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ackCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
ackCommand.setStartTime(taskExecutionContext.getStartTime());
ackCommand.setLogPath(taskExecutionContext.getLogPath());
ackCommand.setHost(taskExecutionContext.getHost());
ackCommand.setExecutePath(taskExecutionContext.getExecutePath());
return ackCommand;
}
/**
* get current TaskExecutionContext
*
......
......@@ -108,7 +108,7 @@ public class WorkerManagerThread implements Runnable {
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId(), taskExecutionContext.getProcessInstanceId());
responseCommand.setStatus(ExecutionStatus.KILL.getCode());
ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command(), Event.RESULT);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
}
/**
......
......@@ -24,8 +24,8 @@ import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.remote.utils.ChannelUtils;
import org.apache.dolphinscheduler.remote.utils.JsonSerializer;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
......@@ -53,7 +53,7 @@ import org.slf4j.Logger;
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({SpringApplicationContext.class, TaskCallbackService.class, WorkerConfig.class, FileUtils.class,
JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
JsonSerializer.class, JSONUtils.class, ThreadUtils.class, ExecutorService.class, ChannelUtils.class})
@Ignore
public class TaskExecuteProcessorTest {
......@@ -84,7 +84,7 @@ public class TaskExecuteProcessorTest {
workerConfig.setListenPort(1234);
command = new Command();
command.setType(CommandType.TASK_EXECUTE_REQUEST);
ackCommand = new TaskExecuteAckCommand().convert2Command();
ackCommand = new TaskExecuteRunningCommand().convert2Command();
taskRequestCommand = new TaskExecuteRequestCommand();
alertClientService = PowerMockito.mock(AlertClientService.class);
workerExecService = PowerMockito.mock(ExecutorService.class);
......@@ -95,7 +95,7 @@ public class TaskExecuteProcessorTest {
PowerMockito.when(ChannelUtils.toAddress(null)).thenReturn(null);
taskCallbackService = PowerMockito.mock(TaskCallbackService.class);
PowerMockito.doNothing().when(taskCallbackService).sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
PowerMockito.doNothing().when(taskCallbackService).send(taskExecutionContext.getTaskInstanceId(), ackCommand);
PowerMockito.mockStatic(SpringApplicationContext.class);
PowerMockito.when(SpringApplicationContext.getBean(TaskCallbackService.class))
......@@ -125,10 +125,10 @@ public class TaskExecuteProcessorTest {
PowerMockito.mockStatic(FileUtils.class);
PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()))
taskExecutionContext.getProcessDefineCode(),
taskExecutionContext.getProcessDefineVersion(),
taskExecutionContext.getProcessInstanceId(),
taskExecutionContext.getTaskInstanceId()))
.thenReturn(taskExecutionContext.getExecutePath());
PowerMockito.doNothing().when(FileUtils.class, "createWorkDirIfAbsent", taskExecutionContext.getExecutePath());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册