提交 db31deb5 编写于 作者: W Wenjun Ruan

[Bug] [Master] Worker failover will cause task cannot be failover (#10631)

* fix worker failover may lose event

(cherry picked from commit 66624c5c)
上级 c488a9f8
......@@ -20,10 +20,12 @@ package org.apache.dolphinscheduler.common.enums;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import io.netty.channel.Channel;
import lombok.Data;
/**
* state event
*/
@Data
public class StateEvent {
/**
......@@ -45,79 +47,4 @@ public class StateEvent {
private Channel channel;
public ExecutionStatus getExecutionStatus() {
return executionStatus;
}
public void setExecutionStatus(ExecutionStatus executionStatus) {
this.executionStatus = executionStatus;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public long getTaskCode() {
return taskCode;
}
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
public String getContext() {
return context;
}
public void setContext(String context) {
this.context = context;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public void setTaskCode(long taskCode) {
this.taskCode = taskCode;
}
public Channel getChannel() {
return channel;
}
public void setChannel(Channel channel) {
this.channel = channel;
}
@Override
public String toString() {
return "State Event :"
+ "key: " + key
+ " type: " + type.toString()
+ " executeStatus: " + executionStatus
+ " task instance id: " + taskInstanceId
+ " process instance id: " + processInstanceId
+ " context: " + context
;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public void setType(StateEventType type) {
this.type = type;
}
public StateEventType getType() {
return this.type;
}
}
......@@ -423,24 +423,27 @@ public class OSUtils {
}
/**
* check memory and cpu usage
* Check memory and cpu usage is overload the given thredshod.
*
* @param maxCpuloadAvg maxCpuloadAvg
* @param maxCpuLoadAvg maxCpuLoadAvg
* @param reservedMemory reservedMemory
* @return check memory and cpu usage
* @return True, if the cpu or memory exceed the given thredshod.
*/
public static Boolean checkResource(double maxCpuloadAvg, double reservedMemory) {
public static Boolean isOverload(double maxCpuLoadAvg, double reservedMemory) {
// system load average
double loadAverage = loadAverage();
// system available physical memory
double availablePhysicalMemorySize = availablePhysicalMemorySize();
if (loadAverage > maxCpuloadAvg || availablePhysicalMemorySize < reservedMemory) {
logger.warn("current cpu load average {} is too high or available memory {}G is too low, under max.cpuload.avg={} and reserved.memory={}G",
loadAverage, availablePhysicalMemorySize, maxCpuloadAvg, reservedMemory);
return false;
} else {
if (loadAverage > maxCpuLoadAvg || availablePhysicalMemorySize < reservedMemory) {
logger.warn(
"Current cpu load average {} is too high or available memory {}G is too low, under max.cpuLoad.avg={} and reserved.memory={}G",
loadAverage,
availablePhysicalMemorySize,
maxCpuLoadAvg,
reservedMemory);
return true;
}
return false;
}
}
......@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.server.master.cache;
import lombok.NonNull;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import java.util.Collection;
......@@ -55,7 +57,7 @@ public interface ProcessInstanceExecCacheManager {
* @param processInstanceId processInstanceId
* @param workflowExecuteThread if it is null, will not be cached
*/
void cache(int processInstanceId, WorkflowExecuteRunnable workflowExecuteThread);
void cache(int processInstanceId, @NonNull WorkflowExecuteRunnable workflowExecuteThread);
/**
* get all WorkflowExecuteThread from cache
......
......@@ -30,6 +30,8 @@ import org.springframework.stereotype.Component;
import com.google.common.collect.ImmutableList;
import lombok.NonNull;
/**
* cache of process instance id and WorkflowExecuteThread
*/
......@@ -59,10 +61,7 @@ public class ProcessInstanceExecCacheManagerImpl implements ProcessInstanceExecC
}
@Override
public void cache(int processInstanceId, WorkflowExecuteRunnable workflowExecuteThread) {
if (workflowExecuteThread == null) {
return;
}
public void cache(int processInstanceId, @NonNull WorkflowExecuteRunnable workflowExecuteThread) {
processInstanceExecMaps.put(processInstanceId, workflowExecuteThread);
}
......
......@@ -131,9 +131,8 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
for (TaskPriority dispatchFailedTask : failedDispatchTasks) {
taskPriorityQueue.put(dispatchFailedTask);
}
// If there are tasks in a cycle that cannot find the worker group,
// sleep for 1 second
if (taskPriorityQueue.size() <= failedDispatchTasks.size()) {
// If the all task dispatch failed, will sleep for 1s to avoid the master cpu higher.
if (fetchTaskNum == failedDispatchTasks.size()) {
TimeUnit.MILLISECONDS.sleep(Constants.SLEEP_TIME_MILLIS);
}
}
......@@ -218,8 +217,7 @@ public class TaskPriorityQueueConsumer extends BaseDaemonThread {
}
private Command toCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(taskExecutionContext));
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(taskExecutionContext);
return requestCommand.convert2Command();
}
......
......@@ -17,11 +17,13 @@
package org.apache.dolphinscheduler.server.master.metrics;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
public final class ProcessInstanceMetrics {
......@@ -29,15 +31,25 @@ public final class ProcessInstanceMetrics {
throw new UnsupportedOperationException("Utility class");
}
private static final Timer COMMAND_QUERY_TIMETER =
Timer.builder("ds.workflow.command.query.duration")
.description("Command query duration")
.register(Metrics.globalRegistry);
private static final Timer PROCESS_INSTANCE_GENERATE_TIMER =
Timer.builder("ds.workflow.instance.generate.duration")
.description("Process instance generated duration")
.register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_SUBMIT_COUNTER =
Counter.builder("dolphinscheduler_process_instance_submit_count")
.description("Process instance submit total count")
.register(Metrics.globalRegistry);
Counter.builder("ds.workflow.instance.submit.count")
.description("Process instance submit total count")
.register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_TIMEOUT_COUNTER =
Counter.builder("dolphinscheduler_process_instance_timeout_count")
.description("Process instance timeout total count")
.register(Metrics.globalRegistry);
Counter.builder("ds.workflow.instance.timeout.count")
.description("Process instance timeout total count")
.register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_FINISH_COUNTER =
Counter.builder("dolphinscheduler_process_instance_finish_count")
......@@ -55,19 +67,27 @@ public final class ProcessInstanceMetrics {
.register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_STOP_COUNTER =
Counter.builder("dolphinscheduler_process_instance_stop_count")
.description("Process instance stop total count")
.register(Metrics.globalRegistry);
Counter.builder("ds.workflow.instance.stop.count")
.description("Process instance stop total count")
.register(Metrics.globalRegistry);
private static final Counter PROCESS_INSTANCE_FAILOVER_COUNTER =
Counter.builder("dolphinscheduler_process_instance_failover_count")
.description("Process instance failover total count")
.register(Metrics.globalRegistry);
Counter.builder("ds.workflow.instance.failover.count")
.description("Process instance failover total count")
.register(Metrics.globalRegistry);
public static void recordCommandQueryTime(long milliseconds) {
COMMAND_QUERY_TIMETER.record(milliseconds, TimeUnit.MILLISECONDS);
}
public static void recordProcessInstanceGenerateTime(long milliseconds) {
PROCESS_INSTANCE_GENERATE_TIMER.record(milliseconds, TimeUnit.MILLISECONDS);
}
public static synchronized void registerProcessInstanceRunningGauge(Supplier<Number> function) {
Gauge.builder("dolphinscheduler_process_instance_running_gauge", function)
.description("The current running process instance count")
.register(Metrics.globalRegistry);
Gauge.builder("ds.workflow.instance.running", function)
.description("The current running process instance count")
.register(Metrics.globalRegistry);
}
public static void incProcessInstanceSubmit() {
......@@ -97,5 +117,4 @@ public final class ProcessInstanceMetrics {
public static void incProcessInstanceFailover() {
PROCESS_INSTANCE_FAILOVER_COUNTER.increment();
}
}
......@@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheM
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.server.master.metrics.MasterServerMetrics;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.registry.ServerNodeManager;
import org.apache.dolphinscheduler.service.alert.ProcessAlertManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
......@@ -61,31 +62,19 @@ public class MasterSchedulerService extends BaseDaemonThread {
*/
private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class);
/**
* dolphinscheduler database interface
*/
@Autowired
private ProcessService processService;
/**
* master config
*/
@Autowired
private MasterConfig masterConfig;
/**
* alert manager
*/
@Autowired
private ProcessAlertManager processAlertManager;
/**
* netty remoting client
*/
private NettyRemotingClient nettyRemotingClient;
@Autowired
NettyExecutorManager nettyExecutorManager;
private NettyExecutorManager nettyExecutorManager;
/**
* master prepare exec service
......@@ -104,6 +93,8 @@ public class MasterSchedulerService extends BaseDaemonThread {
@Autowired
private StateWheelExecuteThread stateWheelExecuteThread;
private String masterAddress;
protected MasterSchedulerService() {
super("MasterCommandLoopThread");
}
......@@ -115,6 +106,7 @@ public class MasterSchedulerService extends BaseDaemonThread {
this.masterPrepareExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("MasterPreExecThread", masterConfig.getPreExecThreads());
NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
this.masterAddress = NetUtils.getAddr(masterConfig.getListenPort());
}
@Override
......@@ -138,13 +130,13 @@ public class MasterSchedulerService extends BaseDaemonThread {
public void run() {
while (Stopper.isRunning()) {
try {
boolean runCheckFlag = OSUtils.checkResource(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
if (!runCheckFlag) {
boolean isOverload = OSUtils.isOverload(masterConfig.getMaxCpuLoadAvg(), masterConfig.getReservedMemory());
if (isOverload) {
MasterServerMetrics.incMasterOverload();
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
continue;
}
scheduleProcess();
scheduleWorkflow();
} catch (InterruptedException interruptedException) {
logger.warn("Master schedule service interrupted, close the loop", interruptedException);
Thread.currentThread().interrupt();
......@@ -156,13 +148,12 @@ public class MasterSchedulerService extends BaseDaemonThread {
}
/**
* 1. get command by slot
* 2. donot handle command if slot is empty
* Query command from database by slot, and transform to workflow instance, then submit to workflowExecuteThreadPool.
*/
private void scheduleProcess() throws InterruptedException {
private void scheduleWorkflow() throws InterruptedException {
List<Command> commands = findCommands();
if (CollectionUtils.isEmpty(commands)) {
//indicate that no command ,sleep for 1s
// indicate that no command ,sleep for 1s
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
return;
}
......@@ -177,7 +168,7 @@ public class MasterSchedulerService extends BaseDaemonThread {
try {
LoggerUtils.setWorkflowInstanceIdMDC(processInstance.getId());
logger.info("Master schedule service starting workflow instance");
WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
final WorkflowExecuteRunnable workflowExecuteRunnable = new WorkflowExecuteRunnable(
processInstance
, processService
, nettyExecutorManager
......@@ -189,9 +180,14 @@ public class MasterSchedulerService extends BaseDaemonThread {
if (processInstance.getTimeout() > 0) {
stateWheelExecuteThread.addProcess4TimeoutCheck(processInstance);
}
workflowExecuteThreadPool.startWorkflow(workflowExecuteRunnable);
ProcessInstanceMetrics.incProcessInstanceSubmit();
workflowExecuteThreadPool.submit(workflowExecuteRunnable);
logger.info("Master schedule service started workflow instance");
} catch (Exception ex) {
processInstanceExecCacheManager.removeByProcessInstanceId(processInstance.getId());
stateWheelExecuteThread.removeProcess4TimeoutCheck(processInstance.getId());
logger.info("Master submit workflow to thread pool failed, will remove workflow runnable from cache manager", ex);
} finally {
LoggerUtils.removeWorkflowInstanceIdMDC();
}
......@@ -199,21 +195,21 @@ public class MasterSchedulerService extends BaseDaemonThread {
}
private List<ProcessInstance> command2ProcessInstance(List<Command> commands) throws InterruptedException {
long commandTransformStartTime = System.currentTimeMillis();
logger.info("Master schedule service transforming command to ProcessInstance, commandSize: {}", commands.size());
List<ProcessInstance> processInstances = Collections.synchronizedList(new ArrayList<>(commands.size()));
CountDownLatch latch = new CountDownLatch(commands.size());
for (final Command command : commands) {
masterPrepareExecService.execute(() -> {
try {
// todo: this check is not safe, the slot may change after command transform.
// slot check again
SlotCheckState slotCheckState = slotCheck(command);
if (slotCheckState.equals(SlotCheckState.CHANGE) || slotCheckState.equals(SlotCheckState.INJECT)) {
logger.info("Master handle command {} skip, slot check state: {}", command.getId(), slotCheckState);
return;
}
ProcessInstance processInstance = processService.handleCommand(logger,
getLocalAddress(),
command);
ProcessInstance processInstance = processService.handleCommand(masterAddress, command);
if (processInstance != null) {
processInstances.add(processInstance);
logger.info("Master handle command {} end, create process instance {}", command.getId(), processInstance.getId());
......@@ -231,24 +227,26 @@ public class MasterSchedulerService extends BaseDaemonThread {
latch.await();
logger.info("Master schedule service transformed command to ProcessInstance, commandSize: {}, processInstanceSize: {}",
commands.size(), processInstances.size());
ProcessInstanceMetrics.recordProcessInstanceGenerateTime(System.currentTimeMillis() - commandTransformStartTime);
return processInstances;
}
private List<Command> findCommands() {
long scheduleStartTime = System.currentTimeMillis();
int thisMasterSlot = ServerNodeManager.getSlot();
int masterCount = ServerNodeManager.getMasterSize();
if (masterCount <= 0) {
logger.warn("Master count: {} is invalid, the current slot: {}", masterCount, thisMasterSlot);
return Collections.emptyList();
}
int pageNumber = 0;
int pageSize = masterConfig.getFetchCommandNum();
List<Command> result = new ArrayList<>();
if (Stopper.isRunning()) {
int thisMasterSlot = ServerNodeManager.getSlot();
int masterCount = ServerNodeManager.getMasterSize();
if (masterCount > 0) {
result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
if (CollectionUtils.isNotEmpty(result)) {
logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}",
result.size(), thisMasterSlot, masterCount);
}
}
final List<Command> result = processService.findCommandPageBySlot(pageSize, pageNumber, masterCount, thisMasterSlot);
if (CollectionUtils.isNotEmpty(result)) {
logger.info("Master schedule service loop command success, command size: {}, current slot: {}, total slot size: {}",
result.size(), thisMasterSlot, masterCount);
}
ProcessInstanceMetrics.recordCommandQueryTime(System.currentTimeMillis() - scheduleStartTime);
return result;
}
......@@ -266,7 +264,4 @@ public class MasterSchedulerService extends BaseDaemonThread {
return state;
}
private String getLocalAddress() {
return NetUtils.getAddr(masterConfig.getListenPort());
}
}
......@@ -59,24 +59,24 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
private static final Logger logger = LoggerFactory.getLogger(StateWheelExecuteThread.class);
/**
* process timeout check list
* ProcessInstance timeout check list, element is the processInstanceId.
*/
private ConcurrentLinkedQueue<Integer> processInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<Integer> processInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
/**
* task time out check list
*/
private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceTimeoutCheckList = new ConcurrentLinkedQueue<>();
/**
* task retry check list
*/
private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceRetryCheckList = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceRetryCheckList = new ConcurrentLinkedQueue<>();
/**
* task state check list
*/
private ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceStateCheckList = new ConcurrentLinkedQueue<>();
private final ConcurrentLinkedQueue<TaskInstanceKey> taskInstanceStateCheckList = new ConcurrentLinkedQueue<>();
@Autowired
private MasterConfig masterConfig;
......@@ -116,8 +116,8 @@ public class StateWheelExecuteThread extends BaseDaemonThread {
logger.info("Success add workflow instance into timeout check list");
}
public void removeProcess4TimeoutCheck(ProcessInstance processInstance) {
boolean removeFlag = processInstanceTimeoutCheckList.remove(processInstance.getId());
public void removeProcess4TimeoutCheck(int processInstanceId) {
boolean removeFlag = processInstanceTimeoutCheckList.remove(processInstanceId);
if (removeFlag) {
logger.info("Success remove workflow instance from timeout check list");
} else {
......
......@@ -426,6 +426,7 @@ public class WorkflowExecuteRunnable implements Runnable {
if (task.getState().typeIsFinished()) {
if (completeTaskMap.containsKey(task.getTaskCode()) && completeTaskMap.get(task.getTaskCode()) == task.getId()) {
logger.warn("The task instance is already complete, stateEvent: {}", stateEvent);
return true;
}
taskFinished(task);
......@@ -452,11 +453,9 @@ public class WorkflowExecuteRunnable implements Runnable {
}
private void taskFinished(TaskInstance taskInstance) {
logger.info("work flow {} task id:{} code:{} state:{} ",
processInstance.getId(),
taskInstance.getId(),
taskInstance.getTaskCode(),
taskInstance.getState());
logger.info("TaskInstance finished task code:{} state:{} ",
taskInstance.getTaskCode(),
taskInstance.getState());
activeTaskProcessorMaps.remove(taskInstance.getTaskCode());
stateWheelExecuteThread.removeTask4TimeoutCheck(processInstance, taskInstance);
......@@ -472,12 +471,13 @@ public class WorkflowExecuteRunnable implements Runnable {
}
} else if (taskInstance.taskCanRetry() && processInstance.getState() != ExecutionStatus.READY_STOP) {
// retry task
logger.info("Retry taskInstance taskInstance state: {}", taskInstance.getState());
retryTaskInstance(taskInstance);
} else if (taskInstance.getState().typeIsFailure()) {
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
// There are child nodes and the failure policy is: CONTINUE
if (DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag)
&& processInstance.getFailureStrategy() == FailureStrategy.CONTINUE) {
if (processInstance.getFailureStrategy() == FailureStrategy.CONTINUE
&& DagHelper.haveAllNodeAfterNode(Long.toString(taskInstance.getTaskCode()), dag)) {
submitPostNode(Long.toString(taskInstance.getTaskCode()));
} else {
errorTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
......@@ -486,6 +486,7 @@ public class WorkflowExecuteRunnable implements Runnable {
}
}
} else if (taskInstance.getState().typeIsFinished()) {
// todo: when the task instance type is pause, then it should not in completeTaskMap
completeTaskMap.put(taskInstance.getTaskCode(), taskInstance.getId());
}
......@@ -528,7 +529,7 @@ public class WorkflowExecuteRunnable implements Runnable {
}
TaskInstance newTaskInstance = cloneRetryTaskInstance(taskInstance);
if (newTaskInstance == null) {
logger.error("retry fail, new taskInstancce is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId());
logger.error("retry fail, new taskInstance is null, task code:{}, task id:{}", taskInstance.getTaskCode(), taskInstance.getId());
return;
}
waitToRetryTaskInstanceMap.put(newTaskInstance.getTaskCode(), newTaskInstance);
......@@ -625,7 +626,7 @@ public class WorkflowExecuteRunnable implements Runnable {
* check if task instance exist by task code
*/
public boolean checkTaskInstanceByCode(long taskCode) {
if (taskInstanceMap == null || taskInstanceMap.size() == 0) {
if (taskInstanceMap.isEmpty()) {
return false;
}
for (TaskInstance taskInstance : taskInstanceMap.values()) {
......@@ -640,7 +641,7 @@ public class WorkflowExecuteRunnable implements Runnable {
* check if task instance exist by id
*/
public boolean checkTaskInstanceById(int taskInstanceId) {
if (taskInstanceMap == null || taskInstanceMap.size() == 0) {
if (taskInstanceMap.isEmpty()) {
return false;
}
return taskInstanceMap.containsKey(taskInstanceId);
......@@ -688,7 +689,7 @@ public class WorkflowExecuteRunnable implements Runnable {
if (stateEvent.getExecutionStatus() == ExecutionStatus.STOP) {
// serial wait execution type needs to wake up the waiting process
if (processDefinition.getExecutionType().typeIsSerialWait()){
if (processDefinition.getExecutionType().typeIsSerialWait() || processDefinition.getExecutionType().typeIsSerialPriority()) {
endProcess();
return true;
}
......@@ -1277,6 +1278,10 @@ public class WorkflowExecuteRunnable implements Runnable {
}
}
public Collection<TaskInstance> getAllTaskInstances() {
return taskInstanceMap.values();
}
private void setVarPoolValue(Map<String, Property> allProperty, Map<String, TaskInstance> allTaskInstance, TaskInstance preTaskInstance, Property thisProperty) {
//for this taskInstance all the param in this part is IN.
thisProperty.setDirect(Direct.IN);
......
......@@ -52,7 +52,7 @@ import com.google.common.base.Strings;
import lombok.NonNull;
/**
* Used to execute {@link WorkflowExecuteRunnable}, when
* Used to execute {@link WorkflowExecuteRunnable}.
*/
@Component
public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
......@@ -100,14 +100,6 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
logger.info("Submit state event success, stateEvent: {}", stateEvent);
}
/**
* Start the given workflow.
*/
public void startWorkflow(WorkflowExecuteRunnable workflowExecuteThread) {
ProcessInstanceMetrics.incProcessInstanceSubmit();
submit(workflowExecuteThread);
}
/**
* Handle the events belong to the given workflow.
*/
......@@ -139,7 +131,7 @@ public class WorkflowExecuteThreadPool extends ThreadPoolTaskExecutor {
try {
LoggerUtils.setWorkflowInstanceIdMDC(workflowExecuteThread.getProcessInstance().getId());
if (workflowExecuteThread.workFlowFinish()) {
stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance());
stateWheelExecuteThread.removeProcess4TimeoutCheck(workflowExecuteThread.getProcessInstance().getId());
processInstanceExecCacheManager.removeByProcessInstanceId(processInstanceId);
notifyProcessChanged(workflowExecuteThread.getProcessInstance());
logger.info("Workflow instance is finished.");
......
......@@ -89,6 +89,8 @@ import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConst
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_CONNECTOR_TYPE;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TARGET_DATASOURCE_ID;
import lombok.NonNull;
public abstract class BaseTaskProcessor implements ITaskProcessor {
protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
......@@ -107,22 +109,19 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected int commitInterval;
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
protected ProcessService processService;
protected MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
protected MasterConfig masterConfig;
protected TaskPluginManager taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class);
protected TaskPluginManager taskPluginManager;
protected String threadLoggerInfoName;
@Override
public void init(TaskInstance taskInstance, ProcessInstance processInstance) {
if (processService == null) {
processService = SpringApplicationContext.getBean(ProcessService.class);
}
if (masterConfig == null) {
masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
}
public void init(@NonNull TaskInstance taskInstance, @NonNull ProcessInstance processInstance) {
processService = SpringApplicationContext.getBean(ProcessService.class);
masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
taskPluginManager = SpringApplicationContext.getBean(TaskPluginManager.class);
this.taskInstance = taskInstance;
this.processInstance = processInstance;
this.maxRetryTimes = masterConfig.getTaskCommitRetryTimes();
......@@ -227,7 +226,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
@Override
public String getType() {
return null;
throw new UnsupportedOperationException("This abstract class doesn's has type");
}
@Override
......
......@@ -20,64 +20,22 @@ package org.apache.dolphinscheduler.server.master.runner.task;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import java.util.Objects;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NonNull;
/**
* task instance key, processInstanceId
* Used to identify a task instance.
*/
@Data
@AllArgsConstructor
public class TaskInstanceKey {
private int processInstanceId;
private long taskCode;
private int taskVersion;
private final int processInstanceId;
private final long taskCode;
private final int taskVersion;
public TaskInstanceKey(int processInstanceId, long taskCode, int taskVersion) {
this.processInstanceId = processInstanceId;
this.taskCode = taskCode;
this.taskVersion = taskVersion;
}
public int getProcessInstanceId() {
return processInstanceId;
}
public long getTaskCode() {
return taskCode;
}
public int getTaskVersion() {
return taskVersion;
}
public static TaskInstanceKey getTaskInstanceKey(ProcessInstance processInstance, TaskInstance taskInstance) {
if (processInstance == null || taskInstance == null) {
return null;
}
public static TaskInstanceKey getTaskInstanceKey(@NonNull ProcessInstance processInstance, @NonNull TaskInstance taskInstance) {
return new TaskInstanceKey(processInstance.getId(), taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion());
}
@Override
public String toString() {
return "TaskKey{"
+ "processInstanceId=" + processInstanceId
+ ", taskCode=" + taskCode
+ ", taskVersion=" + taskVersion
+ '}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TaskInstanceKey taskInstanceKey = (TaskInstanceKey) o;
return processInstanceId == taskInstanceKey.processInstanceId && taskCode == taskInstanceKey.taskCode && taskVersion == taskInstanceKey.taskVersion;
}
@Override
public int hashCode() {
return Objects.hash(processInstanceId, taskCode, taskVersion);
}
}
......@@ -17,19 +17,19 @@
package org.apache.dolphinscheduler.server.master.service;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.NodeType;
import org.apache.dolphinscheduler.common.enums.StateEvent;
import org.apache.dolphinscheduler.common.enums.StateEventType;
import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.metrics.ProcessInstanceMetrics;
import org.apache.dolphinscheduler.server.master.metrics.TaskMetrics;
......@@ -49,6 +49,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -56,6 +57,7 @@ import org.springframework.stereotype.Component;
import io.micrometer.core.annotation.Counted;
import io.micrometer.core.annotation.Timed;
import lombok.NonNull;
/**
* failover service
......@@ -67,15 +69,20 @@ public class FailoverService {
private final MasterConfig masterConfig;
private final ProcessService processService;
private final WorkflowExecuteThreadPool workflowExecuteThreadPool;
public FailoverService(RegistryClient registryClient,
MasterConfig masterConfig,
ProcessService processService,
WorkflowExecuteThreadPool workflowExecuteThreadPool) {
this.registryClient = checkNotNull(registryClient);
this.masterConfig = checkNotNull(masterConfig);
this.processService = checkNotNull(processService);
this.workflowExecuteThreadPool = checkNotNull(workflowExecuteThreadPool);
private final ProcessInstanceExecCacheManager cacheManager;
private final String localAddress;
public FailoverService(@NonNull RegistryClient registryClient,
@NonNull MasterConfig masterConfig,
@NonNull ProcessService processService,
@NonNull WorkflowExecuteThreadPool workflowExecuteThreadPool,
@NonNull ProcessInstanceExecCacheManager cacheManager) {
this.registryClient = registryClient;
this.masterConfig = masterConfig;
this.processService = processService;
this.workflowExecuteThreadPool = workflowExecuteThreadPool;
this.cacheManager = cacheManager;
this.localAddress = NetUtils.getAddr(masterConfig.getListenPort());
}
/**
......@@ -88,7 +95,7 @@ public class FailoverService {
if (CollectionUtils.isEmpty(hosts)) {
return;
}
LOGGER.info("Master failover service {} begin to failover hosts:{}", getLocalAddress(), hosts);
LOGGER.info("Master failover service {} begin to failover hosts:{}", localAddress, hosts);
for (String host : hosts) {
failoverMasterWithLock(host);
......@@ -174,11 +181,10 @@ public class FailoverService {
}
/**
* failover worker tasks
* Do the worker failover. Will find the SUBMITTED_SUCCESS/DISPATCH/RUNNING_EXECUTION/DELAY_EXECUTION/READY_PAUSE/READY_STOP tasks belong the given worker,
* and failover these tasks.
* <p>
* 1. kill yarn job if there are yarn jobs in tasks.
* 2. change task state from running to need failover.
* 3. failover all tasks when workerHost is null
* Note: When we do worker failover, the master will only failover the processInstance belongs to the current master.
*
* @param workerHost worker host
*/
......@@ -188,29 +194,40 @@ public class FailoverService {
}
long startTime = System.currentTimeMillis();
List<TaskInstance> needFailoverTaskInstanceList = processService.queryNeedFailoverTaskInstances(workerHost);
Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
// we query the task instance from cache, so that we can directly update the cache
final List<TaskInstance> needFailoverTaskInstanceList = cacheManager.getAll()
.stream()
.flatMap(workflowExecuteRunnable -> workflowExecuteRunnable.getAllTaskInstances().stream())
.filter(taskInstance ->
workerHost.equals(taskInstance.getHost()) && ExecutionStatus.isNeedFailoverWorkflowInstanceState(taskInstance.getState()))
.collect(Collectors.toList());
final Map<Integer, ProcessInstance> processInstanceCacheMap = new HashMap<>();
LOGGER.info("start worker[{}] failover, task list size:{}", workerHost, needFailoverTaskInstanceList.size());
List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
final List<Server> workerServers = registryClient.getServerList(NodeType.WORKER);
for (TaskInstance taskInstance : needFailoverTaskInstanceList) {
ProcessInstance processInstance = processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
if (processInstance == null) {
processInstance = processService.findProcessInstanceDetailById(taskInstance.getProcessInstanceId());
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskInstance.getProcessInstanceId(), taskInstance.getId());
try {
ProcessInstance processInstance = processInstanceCacheMap.get(taskInstance.getProcessInstanceId());
if (processInstance == null) {
LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null",
taskInstance.getProcessInstanceId(), taskInstance.getId());
processInstance = cacheManager.getByProcessInstanceId(taskInstance.getProcessInstanceId()).getProcessInstance();
if (processInstance == null) {
LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null",
taskInstance.getProcessInstanceId(), taskInstance.getId());
continue;
}
processInstanceCacheMap.put(processInstance.getId(), processInstance);
}
// only failover the task owned myself if worker down.
if (!StringUtils.equalsIgnoreCase(processInstance.getHost(), localAddress)) {
continue;
}
processInstanceCacheMap.put(processInstance.getId(), processInstance);
}
// only failover the task owned myself if worker down.
if (!processInstance.getHost().equalsIgnoreCase(getLocalAddress())) {
continue;
LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
failoverTaskInstance(processInstance, taskInstance, workerServers);
} finally {
LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
}
LOGGER.info("failover task instance id: {}, process instance id: {}", taskInstance.getId(), taskInstance.getProcessInstanceId());
failoverTaskInstance(processInstance, taskInstance, workerServers);
}
LOGGER.info("end worker[{}] failover, useTime:{}ms", workerHost, System.currentTimeMillis() - startTime);
}
......@@ -221,17 +238,14 @@ public class FailoverService {
* 1. kill yarn job if run on worker and there are yarn jobs in tasks.
* 2. change task state from running to need failover.
* 3. try to notify local master
*
* @param processInstance
* @param taskInstance
* @param servers if failover master, servers container master servers and worker servers; if failover worker, servers contain worker servers.
* @param servers if failover master, servers container master servers and worker servers; if failover worker, servers contain worker servers.
*/
private void failoverTaskInstance(ProcessInstance processInstance, TaskInstance taskInstance, List<Server> servers) {
if (processInstance == null) {
LOGGER.error("failover task instance error, processInstance {} of taskInstance {} is null",
taskInstance.getProcessInstanceId(), taskInstance.getId());
return;
}
private void failoverTaskInstance(@NonNull ProcessInstance processInstance, TaskInstance taskInstance, List<Server> servers) {
if (!checkTaskInstanceNeedFailover(servers, taskInstance)) {
LOGGER.info("The taskInstance doesn't need to failover");
return;
}
TaskMetrics.incTaskFailover();
......@@ -240,6 +254,7 @@ public class FailoverService {
taskInstance.setProcessInstance(processInstance);
if (!isMasterTask) {
LOGGER.info("The failover taskInstance is not master task");
TaskExecutionContext taskExecutionContext = TaskExecutionContextBuilder.get()
.buildTaskInstanceRelatedInfo(taskInstance)
.buildProcessInstanceRelatedInfo(processInstance)
......@@ -249,6 +264,8 @@ public class FailoverService {
// only kill yarn job if exists , the local thread has exited
ProcessUtils.killYarnJob(taskExecutionContext);
}
} else {
LOGGER.info("The failover taskInstance is a master task");
}
taskInstance.setState(ExecutionStatus.NEED_FAULT_TOLERANCE);
......@@ -278,7 +295,7 @@ public class FailoverService {
while (iterator.hasNext()) {
String host = iterator.next();
if (registryClient.checkNodeExists(host, NodeType.MASTER)) {
if (!getLocalAddress().equals(host)) {
if (!localAddress.equals(host)) {
iterator.remove();
}
}
......@@ -390,11 +407,8 @@ public class FailoverService {
return serverStartupTime;
}
/**
* get local address
*/
String getLocalAddress() {
return NetUtils.getAddr(masterConfig.getListenPort());
public String getLocalAddress() {
return localAddress;
}
}
......@@ -92,7 +92,7 @@ master:
pre-exec-threads: 10
# master execute thread number to limit process instances in parallel
exec-threads: 100
# master dispatch task number per batch
# master dispatch task number per batch, if all the tasks dispatch failed in a batch, will sleep 1s.
dispatch-task-number: 3
# master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
host-selector: lower_weight
......
......@@ -56,7 +56,7 @@ public class ProcessInstanceExecCacheManagerImplTest {
Assert.assertTrue(processInstanceExecCacheManager.contains(1));
}
@Test
@Test(expected = NullPointerException.class)
public void testCacheNull() {
processInstanceExecCacheManager.cache(2, null);
WorkflowExecuteRunnable workflowExecuteThread = processInstanceExecCacheManager.getByProcessInstanceId(2);
......
......@@ -49,8 +49,7 @@ public class ExecutionContextTestUtils {
.buildProcessDefinitionRelatedInfo(processDefinition)
.create();
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(context));
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand(context);
Command command = requestCommand.convert2Command();
ExecutionContext executionContext = new ExecutionContext(command, ExecutorType.WORKER);
......
......@@ -97,7 +97,7 @@ public class NettyExecutorManagerTest {
private Command toCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
requestCommand.setTaskExecutionContext(JSONUtils.toJsonString(taskExecutionContext));
requestCommand.setTaskExecutionContext(taskExecutionContext);
return requestCommand.convert2Command();
}
}
......@@ -18,8 +18,8 @@
package org.apache.dolphinscheduler.server.master.service;
import static org.apache.dolphinscheduler.common.Constants.COMMON_TASK_TYPE;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_DEPENDENT;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SWITCH;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doNothing;
......@@ -31,7 +31,9 @@ import org.apache.dolphinscheduler.common.model.Server;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteThreadPool;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
......@@ -45,7 +47,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
......@@ -63,7 +64,6 @@ import com.google.common.collect.Lists;
@PrepareForTest({RegistryClient.class})
@PowerMockIgnore({"javax.management.*"})
public class FailoverServiceTest {
@InjectMocks
private FailoverService failoverService;
@Mock
......@@ -78,6 +78,9 @@ public class FailoverServiceTest {
@Mock
private WorkflowExecuteThreadPool workflowExecuteThreadPool;
@Mock
private ProcessInstanceExecCacheManager cacheManager;
private static int masterPort = 5678;
private static int workerPort = 1234;
......@@ -95,6 +98,7 @@ public class FailoverServiceTest {
springApplicationContext.setApplicationContext(applicationContext);
given(masterConfig.getListenPort()).willReturn(masterPort);
failoverService = new FailoverService(registryClient, masterConfig, processService, workflowExecuteThreadPool, cacheManager);
testMasterHost = failoverService.getLocalAddress();
String ip = testMasterHost.split(":")[0];
......@@ -182,7 +186,16 @@ public class FailoverServiceTest {
@Test
public void failoverWorkTest() {
workerTaskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
WorkflowExecuteRunnable workflowExecuteRunnable = Mockito.mock(WorkflowExecuteRunnable.class);
Mockito.when(workflowExecuteRunnable.getAllTaskInstances()).thenReturn(Lists.newArrayList(workerTaskInstance));
Mockito.when(workflowExecuteRunnable.getProcessInstance()).thenReturn(processInstance);
Mockito.when(cacheManager.getAll()).thenReturn(Lists.newArrayList(workflowExecuteRunnable));
Mockito.when(cacheManager.getByProcessInstanceId(Mockito.anyInt())).thenReturn(workflowExecuteRunnable);
failoverService.failoverServerWhenDown(testWorkerHost, NodeType.WORKER);
Assert.assertEquals(workerTaskInstance.getState(), ExecutionStatus.NEED_FAULT_TOLERANCE);
Assert.assertEquals(ExecutionStatus.NEED_FAULT_TOLERANCE, workerTaskInstance.getState());
}
}
......@@ -20,10 +20,6 @@
package org.apache.dolphinscheduler.meter;
import javax.annotation.PostConstruct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
......
......@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.remote.command;
import java.io.Serializable;
......@@ -25,6 +26,8 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class Command implements Serializable {
private static final long serialVersionUID = -1L;
private static final AtomicLong REQUEST_ID = new AtomicLong(1);
public static final byte MAGIC = (byte) 0xbabe;
......
......@@ -18,39 +18,23 @@
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import java.io.Serializable;
/**
* execute task request command
*/
public class TaskExecuteRequestCommand implements Serializable {
/**
* task execution context
*/
private String taskExecutionContext;
public String getTaskExecutionContext() {
return taskExecutionContext;
}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
public void setTaskExecutionContext(String taskExecutionContext) {
this.taskExecutionContext = taskExecutionContext;
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class TaskExecuteRequestCommand implements Serializable {
public TaskExecuteRequestCommand() {
}
private static final long serialVersionUID = -1L;
public TaskExecuteRequestCommand(String taskExecutionContext) {
this.taskExecutionContext = taskExecutionContext;
}
private TaskExecutionContext taskExecutionContext;
/**
* package request command
*
* @return command
*/
public Command convert2Command() {
Command command = new Command();
command.setType(CommandType.TASK_EXECUTE_REQUEST);
......@@ -59,10 +43,4 @@ public class TaskExecuteRequestCommand implements Serializable {
return command;
}
@Override
public String toString() {
return "TaskExecuteRequestCommand{"
+ "taskExecutionContext='" + taskExecutionContext + '\''
+ '}';
}
}
......@@ -44,6 +44,8 @@ import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.NonNull;
/**
* mainly used to get the start command line of a process.
*/
......@@ -182,7 +184,10 @@ public class ProcessUtils {
* @param taskExecutionContext taskExecutionContext
* @return yarn application ids
*/
public static List<String> killYarnJob(TaskExecutionContext taskExecutionContext) {
public static List<String> killYarnJob(@NonNull TaskExecutionContext taskExecutionContext) {
if (taskExecutionContext.getLogPath() == null) {
return Collections.emptyList();
}
try {
Thread.sleep(Constants.SLEEP_TIME_MILLIS);
String log;
......
......@@ -52,16 +52,16 @@ import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.plugin.task.api.model.DateInterval;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
import org.slf4j.Logger;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.springframework.transaction.annotation.Transactional;
public interface ProcessService {
@Transactional
ProcessInstance handleCommand(Logger logger, String host, Command command);
ProcessInstance handleCommand(String host, Command command);
void moveToErrorCommand(Command command, String message);
......@@ -161,8 +161,6 @@ public interface ProcessService {
void changeOutParam(TaskInstance taskInstance);
List<String> convertIntListToString(List<Integer> intList);
Schedule querySchedule(int id);
List<Schedule> queryReleaseSchedulerListByProcessDefinitionCode(long processDefinitionCode);
......
......@@ -17,7 +17,6 @@
package org.apache.dolphinscheduler.service.process;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_EMPTY_SUB_PROCESS;
......@@ -31,6 +30,8 @@ import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
import static java.util.stream.Collectors.toSet;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
......@@ -170,13 +171,6 @@ public class ProcessServiceImpl implements ProcessService {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final int[] stateArray = new int[] {ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.DISPATCH.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.DELAY_EXECUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
ExecutionStatus.READY_STOP.ordinal()};
@Autowired
private UserMapper userMapper;
......@@ -281,7 +275,7 @@ public class ProcessServiceImpl implements ProcessService {
*/
@Override
@Transactional
public ProcessInstance handleCommand(Logger logger, String host, Command command) {
public ProcessInstance handleCommand(String host, Command command) {
ProcessInstance processInstance = constructProcessInstance(command, host);
// cannot construct process instance, return null
if (processInstance == null) {
......@@ -1947,8 +1941,7 @@ public class ProcessServiceImpl implements ProcessService {
* @param intList intList
* @return string list
*/
@Override
public List<String> convertIntListToString(List<Integer> intList) {
private List<String> convertIntListToString(List<Integer> intList) {
if (intList == null) {
return new ArrayList<>();
}
......@@ -2013,12 +2006,12 @@ public class ProcessServiceImpl implements ProcessService {
*/
@Override
public List<ProcessInstance> queryNeedFailoverProcessInstances(String host) {
return processInstanceMapper.queryByHostAndStatus(host, stateArray);
return processInstanceMapper.queryByHostAndStatus(host, ExecutionStatus.getNeedFailoverWorkflowInstanceState());
}
@Override
public List<String> queryNeedFailoverProcessInstanceHost() {
return processInstanceMapper.queryNeedFailoverProcessInstanceHost(stateArray);
return processInstanceMapper.queryNeedFailoverProcessInstanceHost(ExecutionStatus.getNeedFailoverWorkflowInstanceState());
}
/**
......@@ -2055,7 +2048,7 @@ public class ProcessServiceImpl implements ProcessService {
@Override
public List<TaskInstance> queryNeedFailoverTaskInstances(String host) {
return taskInstanceMapper.queryByHostAndStatus(host,
stateArray);
ExecutionStatus.getNeedFailoverWorkflowInstanceState());
}
/**
......@@ -2189,7 +2182,7 @@ public class ProcessServiceImpl implements ProcessService {
return processInstanceMapper.queryLastRunningProcess(definitionCode,
startTime,
endTime,
stateArray);
ExecutionStatus.getNeedFailoverWorkflowInstanceState());
}
/**
......
......@@ -17,7 +17,12 @@
package org.apache.dolphinscheduler.service.process;
import com.fasterxml.jackson.databind.JsonNode;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.mockito.ArgumentMatchers.any;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.Flag;
......@@ -74,6 +79,14 @@ import org.apache.dolphinscheduler.plugin.task.api.model.ResourceInfo;
import org.apache.dolphinscheduler.service.exceptions.ServiceException;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtilsTest;
import org.apache.dolphinscheduler.spi.params.base.FormType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
......@@ -87,17 +100,7 @@ import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_RECOVER_PROCESS_ID_STRING;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_START_PARAMS;
import static org.apache.dolphinscheduler.common.Constants.CMD_PARAM_SUB_PROCESS_DEFINE_CODE;
import static org.mockito.ArgumentMatchers.any;
import com.fasterxml.jackson.databind.JsonNode;
/**
* process service test
......@@ -284,9 +287,12 @@ public class ProcessServiceTest {
Command command = new Command();
command.setProcessDefinitionCode(222);
command.setCommandType(CommandType.REPEAT_RUNNING);
command.setCommandParam("{\"" + CMD_PARAM_RECOVER_PROCESS_ID_STRING + "\":\"111\",\""
+ CMD_PARAM_SUB_PROCESS_DEFINE_CODE + "\":\"222\"}");
Assert.assertNull(processService.handleCommand(logger, host, command));
command.setCommandParam("{\""
+ CMD_PARAM_RECOVER_PROCESS_ID_STRING
+ "\":\"111\",\""
+ CMD_PARAM_SUB_PROCESS_DEFINE_CODE
+ "\":\"222\"}");
Assert.assertNull(processService.handleCommand(host, command));
int definitionVersion = 1;
long definitionCode = 123;
......@@ -321,7 +327,7 @@ public class ProcessServiceTest {
Mockito.when(processDefineLogMapper.queryByDefinitionCodeAndVersion(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion())).thenReturn(new ProcessDefinitionLog(processDefinition));
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
Assert.assertNotNull(processService.handleCommand(logger, host, command1));
Assert.assertNotNull(processService.handleCommand(host, command1));
Command command2 = new Command();
command2.setId(2);
......@@ -331,7 +337,7 @@ public class ProcessServiceTest {
command2.setCommandType(CommandType.RECOVER_SUSPENDED_PROCESS);
command2.setProcessInstanceId(processInstanceId);
Mockito.when(commandMapper.deleteById(2)).thenReturn(1);
Assert.assertNotNull(processService.handleCommand(logger, host, command2));
Assert.assertNotNull(processService.handleCommand(host, command2));
Command command3 = new Command();
command3.setId(3);
......@@ -341,7 +347,7 @@ public class ProcessServiceTest {
command3.setCommandParam("{\"WaitingThreadInstanceId\":222}");
command3.setCommandType(CommandType.START_FAILURE_TASK_PROCESS);
Mockito.when(commandMapper.deleteById(3)).thenReturn(1);
Assert.assertNotNull(processService.handleCommand(logger, host, command3));
Assert.assertNotNull(processService.handleCommand(host, command3));
Command command4 = new Command();
command4.setId(4);
......@@ -351,7 +357,7 @@ public class ProcessServiceTest {
command4.setCommandType(CommandType.REPEAT_RUNNING);
command4.setProcessInstanceId(processInstanceId);
Mockito.when(commandMapper.deleteById(4)).thenReturn(1);
Assert.assertNotNull(processService.handleCommand(logger, host, command4));
Assert.assertNotNull(processService.handleCommand(host, command4));
Command command5 = new Command();
command5.setId(5);
......@@ -390,7 +396,7 @@ public class ProcessServiceTest {
Mockito.when(processInstanceMapper.queryDetailById(223)).thenReturn(processInstance2);
Mockito.when(processDefineMapper.queryByCode(11L)).thenReturn(processDefinition1);
Mockito.when(commandMapper.deleteById(1)).thenReturn(1);
Assert.assertNotNull(processService.handleCommand(logger, host, command1));
Assert.assertNotNull(processService.handleCommand(host, command1));
Command command6 = new Command();
command6.setId(6);
......@@ -401,7 +407,7 @@ public class ProcessServiceTest {
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(11L, 1, Constants.RUNNING_PROCESS_STATE, 223)).thenReturn(lists);
Mockito.when(processInstanceMapper.updateNextProcessIdById(223, 222)).thenReturn(true);
Mockito.when(commandMapper.deleteById(6)).thenReturn(1);
ProcessInstance processInstance6 = processService.handleCommand(logger, host, command6);
ProcessInstance processInstance6 = processService.handleCommand(host, command6);
Assert.assertTrue(processInstance6 != null);
processDefinition1.setExecutionType(ProcessExecutionTypeEnum.SERIAL_DISCARD);
......@@ -444,7 +450,7 @@ public class ProcessServiceTest {
Mockito.when(processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(12L, 1, Constants.RUNNING_PROCESS_STATE, 0)).thenReturn(lists);
Mockito.when(processInstanceMapper.updateById(processInstance)).thenReturn(1);
Mockito.when(commandMapper.deleteById(9)).thenReturn(1);
ProcessInstance processInstance10 = processService.handleCommand(logger, host, command9);
ProcessInstance processInstance10 = processService.handleCommand(host, command9);
Assert.assertTrue(processInstance10 == null);
}
......@@ -485,7 +491,7 @@ public class ProcessServiceTest {
Mockito.when(processInstanceMapper.queryDetailById(222)).thenReturn(processInstance);
// will throw exception when command id is 0 and delete fail
processService.handleCommand(logger, host, command1);
processService.handleCommand(host, command1);
}
@Test
......
......@@ -22,13 +22,25 @@ import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper;
import java.io.Serializable;
import java.util.Date;
import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* to master/worker task transport
*/
public class TaskExecutionContext {
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TaskExecutionContext implements Serializable {
private static final long serialVersionUID = -1L;
/**
* task id
......@@ -236,373 +248,4 @@ public class TaskExecutionContext {
private DataQualityTaskExecutionContext dataQualityTaskExecutionContext;
public String getTaskLogName() {
return taskLogName;
}
public void setTaskLogName(String taskLogName) {
this.taskLogName = taskLogName;
}
public Map<String, String> getResources() {
return resources;
}
public void setResources(Map<String, String> resources) {
this.resources = resources;
}
public Map<String, Property> getParamsMap() {
return paramsMap;
}
public void setParamsMap(Map<String, Property> paramsMap) {
this.paramsMap = paramsMap;
}
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
public String getTaskName() {
return taskName;
}
public void setTaskName(String taskName) {
this.taskName = taskName;
}
public Date getFirstSubmitTime() {
return firstSubmitTime;
}
public void setFirstSubmitTime(Date firstSubmitTime) {
this.firstSubmitTime = firstSubmitTime;
}
public Date getStartTime() {
return startTime;
}
public void setStartTime(Date startTime) {
this.startTime = startTime;
}
public String getTaskType() {
return taskType;
}
public void setTaskType(String taskType) {
this.taskType = taskType;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getExecutePath() {
return executePath;
}
public void setExecutePath(String executePath) {
this.executePath = executePath;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public String getTaskJson() {
return taskJson;
}
public void setTaskJson(String taskJson) {
this.taskJson = taskJson;
}
public int getProcessId() {
return processId;
}
public void setProcessId(int processId) {
this.processId = processId;
}
public String getAppIds() {
return appIds;
}
public void setAppIds(String appIds) {
this.appIds = appIds;
}
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
public Date getScheduleTime() {
return scheduleTime;
}
public void setScheduleTime(Date scheduleTime) {
this.scheduleTime = scheduleTime;
}
public String getGlobalParams() {
return globalParams;
}
public void setGlobalParams(String globalParams) {
this.globalParams = globalParams;
}
public int getExecutorId() {
return executorId;
}
public void setExecutorId(int executorId) {
this.executorId = executorId;
}
public int getCmdTypeIfComplement() {
return cmdTypeIfComplement;
}
public void setCmdTypeIfComplement(int cmdTypeIfComplement) {
this.cmdTypeIfComplement = cmdTypeIfComplement;
}
public String getTenantCode() {
return tenantCode;
}
public void setTenantCode(String tenantCode) {
this.tenantCode = tenantCode;
}
public String getQueue() {
return queue;
}
public void setQueue(String queue) {
this.queue = queue;
}
public int getProcessDefineId() {
return processDefineId;
}
public void setProcessDefineId(int processDefineId) {
this.processDefineId = processDefineId;
}
public int getProjectId() {
return projectId;
}
public void setProjectId(int projectId) {
this.projectId = projectId;
}
public String getTaskParams() {
return taskParams;
}
public void setTaskParams(String taskParams) {
this.taskParams = taskParams;
}
public String getEnvFile() {
return envFile;
}
public void setEnvFile(String envFile) {
this.envFile = envFile;
}
public String getEnvironmentConfig() {
return environmentConfig;
}
public void setEnvironmentConfig(String config) {
this.environmentConfig = config;
}
public Map<String, String> getDefinedParams() {
return definedParams;
}
public void setDefinedParams(Map<String, String> definedParams) {
this.definedParams = definedParams;
}
public String getTaskAppId() {
return taskAppId;
}
public void setTaskAppId(String taskAppId) {
this.taskAppId = taskAppId;
}
public TaskTimeoutStrategy getTaskTimeoutStrategy() {
return taskTimeoutStrategy;
}
public void setTaskTimeoutStrategy(TaskTimeoutStrategy taskTimeoutStrategy) {
this.taskTimeoutStrategy = taskTimeoutStrategy;
}
public int getTaskTimeout() {
return taskTimeout;
}
public void setTaskTimeout(int taskTimeout) {
this.taskTimeout = taskTimeout;
}
public String getWorkerGroup() {
return workerGroup;
}
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
public int getDelayTime() {
return delayTime;
}
public void setDelayTime(int delayTime) {
this.delayTime = delayTime;
}
public ResourceParametersHelper getResourceParametersHelper() {
return resourceParametersHelper;
}
public void setResourceParametersHelper(ResourceParametersHelper resourceParametersHelper) {
this.resourceParametersHelper = resourceParametersHelper;
}
public String getVarPool() {
return varPool;
}
public void setVarPool(String varPool) {
this.varPool = varPool;
}
public int getDryRun() {
return dryRun;
}
public void setDryRun(int dryRun) {
this.dryRun = dryRun;
}
public Long getProcessDefineCode() {
return processDefineCode;
}
public void setProcessDefineCode(Long processDefineCode) {
this.processDefineCode = processDefineCode;
}
public int getProcessDefineVersion() {
return processDefineVersion;
}
public void setProcessDefineVersion(int processDefineVersion) {
this.processDefineVersion = processDefineVersion;
}
public long getProjectCode() {
return projectCode;
}
public void setProjectCode(long projectCode) {
this.projectCode = projectCode;
}
public DataQualityTaskExecutionContext getDataQualityTaskExecutionContext() {
return dataQualityTaskExecutionContext;
}
public void setDataQualityTaskExecutionContext(DataQualityTaskExecutionContext dataQualityTaskExecutionContext) {
this.dataQualityTaskExecutionContext = dataQualityTaskExecutionContext;
}
public void setCurrentExecutionStatus(ExecutionStatus currentExecutionStatus) {
this.currentExecutionStatus = currentExecutionStatus;
}
public ExecutionStatus getCurrentExecutionStatus() {
return currentExecutionStatus;
}
public Date getEndTime() {
return endTime;
}
public void setEndTime(Date endTime) {
this.endTime = endTime;
}
@Override
public String toString() {
return "TaskExecutionContext{"
+ "taskInstanceId=" + taskInstanceId
+ ", taskName='" + taskName + '\''
+ ", currentExecutionStatus=" + currentExecutionStatus
+ ", firstSubmitTime=" + firstSubmitTime
+ ", startTime=" + startTime
+ ", taskType='" + taskType + '\''
+ ", host='" + host + '\''
+ ", executePath='" + executePath + '\''
+ ", logPath='" + logPath + '\''
+ ", taskJson='" + taskJson + '\''
+ ", processId=" + processId
+ ", processDefineCode=" + processDefineCode
+ ", processDefineVersion=" + processDefineVersion
+ ", appIds='" + appIds + '\''
+ ", processInstanceId=" + processInstanceId
+ ", scheduleTime=" + scheduleTime
+ ", globalParams='" + globalParams + '\''
+ ", executorId=" + executorId
+ ", cmdTypeIfComplement=" + cmdTypeIfComplement
+ ", tenantCode='" + tenantCode + '\''
+ ", queue='" + queue + '\''
+ ", projectCode=" + projectCode
+ ", taskParams='" + taskParams + '\''
+ ", envFile='" + envFile + '\''
+ ", dryRun='" + dryRun + '\''
+ ", definedParams=" + definedParams
+ ", taskAppId='" + taskAppId + '\''
+ ", taskTimeoutStrategy=" + taskTimeoutStrategy
+ ", taskTimeout=" + taskTimeout
+ ", workerGroup='" + workerGroup + '\''
+ ", environmentConfig='" + environmentConfig + '\''
+ ", delayTime=" + delayTime
+ ", resources=" + resources
+ ", sqlTaskExecutionContext=" + sqlTaskExecutionContext
+ ", dataQualityTaskExecutionContext=" + dataQualityTaskExecutionContext
+ '}';
}
}
......@@ -78,6 +78,15 @@ public enum ExecutionStatus {
private static HashMap<Integer, ExecutionStatus> EXECUTION_STATUS_MAP = new HashMap<>();
private static final int[] NEED_FAILOVER_STATES = new int[] {
ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.DISPATCH.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.DELAY_EXECUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
ExecutionStatus.READY_STOP.ordinal()
};
static {
for (ExecutionStatus executionStatus : ExecutionStatus.values()) {
EXECUTION_STATUS_MAP.put(executionStatus.code, executionStatus);
......@@ -180,4 +189,18 @@ public enum ExecutionStatus {
}
throw new IllegalArgumentException("invalid status : " + status);
}
public static boolean isNeedFailoverWorkflowInstanceState(ExecutionStatus executionStatus) {
return
ExecutionStatus.SUBMITTED_SUCCESS == executionStatus
|| ExecutionStatus.DISPATCH == executionStatus
|| ExecutionStatus.RUNNING_EXECUTION == executionStatus
|| ExecutionStatus.DELAY_EXECUTION == executionStatus
|| ExecutionStatus.READY_PAUSE == executionStatus
|| ExecutionStatus.READY_STOP == executionStatus;
}
public static int[] getNeedFailoverWorkflowInstanceState() {
return NEED_FAILOVER_STATES;
}
}
......@@ -22,22 +22,20 @@ import java.util.function.Supplier;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.Metrics;
import lombok.experimental.UtilityClass;
public final class WorkerServerMetrics {
public WorkerServerMetrics() {
throw new UnsupportedOperationException("Utility class");
}
@UtilityClass
public class WorkerServerMetrics {
private static final Counter WORKER_OVERLOAD_COUNTER =
Counter.builder("dolphinscheduler_worker_overload_count")
.description("worker load count")
.register(Metrics.globalRegistry);
Counter.builder("ds.worker.overload.count")
.description("overloaded workers count")
.register(Metrics.globalRegistry);
private static final Counter WORKER_SUBMIT_QUEUE_IS_FULL_COUNTER =
Counter.builder("dolphinscheduler_worker_submit_queue_is_full_count")
.description("worker task submit queue is full count")
.register(Metrics.globalRegistry);
Counter.builder("ds.worker.full.submit.queue.count")
.description("full worker submit queues count")
.register(Metrics.globalRegistry);
public static void incWorkerOverloadCount() {
WORKER_OVERLOAD_COUNTER.increment();
......@@ -48,9 +46,10 @@ public final class WorkerServerMetrics {
}
public static void registerWorkerRunningTaskGauge(Supplier<Number> supplier) {
Gauge.builder("dolphinscheduler_worker_running_task_gauge", supplier)
.description("worker running task gauge")
.register(Metrics.globalRegistry);
Gauge.builder("ds.task.running", supplier)
.description("number of running tasks on workers")
.register(Metrics.globalRegistry);
}
}
......@@ -33,6 +33,7 @@ import org.apache.dolphinscheduler.remote.processor.NettyRemoteChannel;
import org.apache.dolphinscheduler.server.worker.cache.ResponseCache;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
......@@ -91,9 +92,6 @@ public class TaskCallbackService {
* change remote channel
*/
public void changeRemoteChannel(int taskInstanceId, NettyRemoteChannel channel) {
if (REMOTE_CHANNELS.containsKey(taskInstanceId)) {
REMOTE_CHANNELS.remove(taskInstanceId);
}
REMOTE_CHANNELS.put(taskInstanceId, channel);
}
......@@ -103,19 +101,19 @@ public class TaskCallbackService {
* @param taskInstanceId taskInstanceId
* @return callback channel
*/
private NettyRemoteChannel getRemoteChannel(int taskInstanceId) {
private Optional<NettyRemoteChannel> getRemoteChannel(int taskInstanceId) {
Channel newChannel;
NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
if (nettyRemoteChannel != null) {
if (nettyRemoteChannel.isActive()) {
return nettyRemoteChannel;
return Optional.of(nettyRemoteChannel);
}
newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
if (newChannel != null) {
return getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId);
return Optional.of(getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId));
}
}
return null;
return Optional.empty();
}
public long pause(int ntries) {
......@@ -150,15 +148,13 @@ public class TaskCallbackService {
* @param command command
*/
public void send(int taskInstanceId, Command command) {
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
if (nettyRemoteChannel != null) {
nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener() {
Optional<NettyRemoteChannel> nettyRemoteChannel = getRemoteChannel(taskInstanceId);
if (nettyRemoteChannel.isPresent()) {
nettyRemoteChannel.get().writeAndFlush(command).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// remove(taskInstanceId);
return;
public void operationComplete(ChannelFuture future) {
if (!future.isSuccess()) {
logger.error("Send callback command error, taskInstanceId: {}, command: {}", taskInstanceId, command);
}
}
});
......
......@@ -109,8 +109,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
}
logger.info("task execute request command : {}", taskRequestCommand);
String contextJson = taskRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONUtils.parseObject(contextJson, TaskExecutionContext.class);
TaskExecutionContext taskExecutionContext = taskRequestCommand.getTaskExecutionContext();
if (taskExecutionContext == null) {
logger.error("task execution context is null");
......
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.worker.runner;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.remote.command.Command;
......@@ -34,7 +35,7 @@ import org.springframework.stereotype.Component;
* Retry Report Task Status Thread
*/
@Component
public class RetryReportTaskStatusThread implements Runnable {
public class RetryReportTaskStatusThread extends BaseDaemonThread {
private final Logger logger = LoggerFactory.getLogger(RetryReportTaskStatusThread.class);
......@@ -46,11 +47,14 @@ public class RetryReportTaskStatusThread implements Runnable {
@Autowired
private TaskCallbackService taskCallbackService;
public void start() {
protected RetryReportTaskStatusThread() {
super("RetryReportTaskStatusThread");
}
@Override
public synchronized void start() {
logger.info("Retry report task status thread starting");
Thread thread = new Thread(this, "RetryReportTaskStatusThread");
thread.setDaemon(true);
thread.start();
super.start();
logger.info("Retry report task status thread started");
}
......@@ -59,7 +63,7 @@ public class RetryReportTaskStatusThread implements Runnable {
*/
@Override
public void run() {
ResponseCache instance = ResponseCache.get();
final ResponseCache instance = ResponseCache.get();
while (Stopper.isRunning()) {
......@@ -67,25 +71,45 @@ public class RetryReportTaskStatusThread implements Runnable {
ThreadUtils.sleep(RETRY_REPORT_TASK_STATUS_INTERVAL);
try {
if (!instance.getRunningCache().isEmpty()) {
Map<Integer, Command> runningCache = instance.getRunningCache();
for (Map.Entry<Integer, Command> entry : runningCache.entrySet()) {
Integer taskInstanceId = entry.getKey();
Command runningCommand = entry.getValue();
taskCallbackService.send(taskInstanceId, runningCommand);
}
// todo: Only retry the send failed command
retryRunningCommand(instance);
retryResponseCommand(instance);
} catch (Exception e) {
logger.warn("Retry report task status error", e);
}
}
}
private void retryRunningCommand(ResponseCache instance) {
if (!instance.getRunningCache().isEmpty()) {
Map<Integer, Command> runningCache = instance.getRunningCache();
for (Map.Entry<Integer, Command> entry : runningCache.entrySet()) {
Integer taskInstanceId = entry.getKey();
Command runningCommand = entry.getValue();
try {
taskCallbackService.send(taskInstanceId, runningCommand);
} catch (Exception ex) {
logger.error("Retry send running command to master error, taskInstanceId: {}, command: {}",
taskInstanceId,
runningCommand);
}
}
}
}
if (!instance.getResponseCache().isEmpty()) {
Map<Integer, Command> responseCache = instance.getResponseCache();
for (Map.Entry<Integer, Command> entry : responseCache.entrySet()) {
Integer taskInstanceId = entry.getKey();
Command responseCommand = entry.getValue();
taskCallbackService.send(taskInstanceId, responseCommand);
}
private void retryResponseCommand(ResponseCache instance) {
if (!instance.getResponseCache().isEmpty()) {
Map<Integer, Command> responseCache = instance.getResponseCache();
for (Map.Entry<Integer, Command> entry : responseCache.entrySet()) {
Integer taskInstanceId = entry.getKey();
Command responseCommand = entry.getValue();
try {
taskCallbackService.send(taskInstanceId, responseCommand);
} catch (Exception ex) {
logger.error("Retry send response command to master error, taskInstanceId: {}, command: {}",
taskInstanceId,
responseCommand);
}
} catch (Exception e) {
logger.warn("Retry report task status error", e);
}
}
}
......
......@@ -131,9 +131,10 @@ public class TaskExecuteThread implements Runnable, Delayed {
taskExecutionContext.setEndTime(new Date());
TaskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskCallbackService.sendTaskExecuteResponseCommand(taskExecutionContext);
logger.info("[WorkflowInstance-{}][TaskInstance-{}] Task dry run success",
taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
return;
}
try {
LoggerUtils.setWorkflowAndTaskInstanceIDMDC(taskExecutionContext.getProcessInstanceId(), taskExecutionContext.getTaskInstanceId());
logger.info("script path : {}", taskExecutionContext.getExecutePath());
......
......@@ -88,7 +88,7 @@ public class TaskExecuteProcessorTest {
command = new Command();
command.setType(CommandType.TASK_EXECUTE_REQUEST);
ackCommand = new TaskExecuteRunningCommand().convert2Command();
taskRequestCommand = new TaskExecuteRequestCommand();
taskRequestCommand = new TaskExecuteRequestCommand(taskExecutionContext);
alertClientService = PowerMockito.mock(AlertClientService.class);
workerExecService = PowerMockito.mock(ExecutorService.class);
PowerMockito.when(workerExecService.submit(Mockito.any(TaskExecuteThread.class)))
......@@ -127,8 +127,6 @@ public class TaskExecuteProcessorTest {
PowerMockito.mockStatic(JSONUtils.class);
PowerMockito.when(JSONUtils.parseObject(command.getBody(), TaskExecuteRequestCommand.class))
.thenReturn(taskRequestCommand);
PowerMockito.when(JSONUtils.parseObject(taskRequestCommand.getTaskExecutionContext(), TaskExecutionContext.class))
.thenReturn(taskExecutionContext);
PowerMockito.mockStatic(FileUtils.class);
PowerMockito.when(FileUtils.getProcessExecDir(taskExecutionContext.getProjectCode(),
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册