提交 09173e8d 编写于 作者: T Technoboy-

refactor ExecutionContext

上级 b0f9cd72
......@@ -17,6 +17,11 @@
package org.apache.dolphinscheduler.server.entity;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import java.io.Serializable;
import java.util.Date;
import java.util.Map;
......@@ -397,6 +402,18 @@ public class TaskExecutionContext implements Serializable{
this.dataxTaskExecutionContext = dataxTaskExecutionContext;
}
public Command toCommand(){
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(this));
return requestCommand.convert2Command();
}
public Command toKillCommand(){
TaskKillRequestCommand requestCommand = new TaskKillRequestCommand();
requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(this));
return requestCommand.convert2Command();
}
@Override
public String toString() {
return "TaskExecutionContext{" +
......
......@@ -20,7 +20,6 @@ package org.apache.dolphinscheduler.server.master.dispatch;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
......@@ -45,9 +44,6 @@ public class ExecutorDispatcher implements InitializingBean {
@Autowired
private NettyExecutorManager nettyExecutorManager;
@Autowired
private MasterConfig masterConfig;
/**
* round robin host manager
*/
......@@ -87,10 +83,9 @@ public class ExecutorDispatcher implements InitializingBean {
*/
Host host = hostManager.select(context);
if (StringUtils.isEmpty(host.getAddress())) {
throw new ExecuteException(String.format("fail to execute : %s due to no worker ", context.getContext()));
throw new ExecuteException(String.format("fail to execute : %s due to no worker ", context.getCommand()));
}
context.setHost(host);
context.getContext().setHost(host.getAddress());
executorManager.beforeExecute(context);
try {
/**
......
......@@ -17,8 +17,8 @@
package org.apache.dolphinscheduler.server.master.dispatch.context;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
/**
......@@ -32,30 +32,47 @@ public class ExecutionContext {
private Host host;
/**
* context
* command
*/
private final TaskExecutionContext context;
private final Command command;
/**
* executor type : worker or client
*/
private final ExecutorType executorType;
public ExecutionContext(TaskExecutionContext context, ExecutorType executorType) {
this.context = context;
/**
* worker group
*/
private String workerGroup;
public ExecutionContext(Command command, ExecutorType executorType) {
this.command = command;
this.executorType = executorType;
}
public String getWorkerGroup(){
return context.getWorkerGroup();
public ExecutionContext(Command command, ExecutorType executorType, String workerGroup) {
this.command = command;
this.executorType = executorType;
this.workerGroup = workerGroup;
}
public Command getCommand() {
return command;
}
public ExecutorType getExecutorType() {
return executorType;
}
public TaskExecutionContext getContext() {
return context;
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
public String getWorkerGroup(){
return this.workerGroup;
}
public Host getHost() {
......
......@@ -21,12 +21,8 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
......@@ -98,7 +94,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
/**
* build command accord executeContext
*/
Command command = buildCommand(context);
Command command = context.getCommand();
/**
* execute task host
......@@ -111,14 +107,14 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
success = true;
context.setHost(host);
} catch (ExecuteException ex) {
logger.error(String.format("execute context : %s error", context.getContext()), ex);
logger.error(String.format("execute command : %s error", command), ex);
try {
failNodeSet.add(host.getAddress());
Set<String> tmpAllIps = new HashSet<>(allNodes);
Collection<String> remained = CollectionUtils.subtract(tmpAllIps, failNodeSet);
if (remained != null && remained.size() > 0) {
host = Host.of(remained.iterator().next());
logger.error("retry execute context : {} host : {}", context.getContext(), host);
logger.error("retry execute command : {} host : {}", command, host);
} else {
throw new ExecuteException("fail after try all nodes");
}
......@@ -133,53 +129,8 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
@Override
public void executeDirectly(ExecutionContext context) throws ExecuteException {
Command command = buildKillCommand(context);
Host host = context.getHost();
doExecute(host,command);
}
/**
* build command
* @param context context
* @return command
*/
private Command buildCommand(ExecutionContext context) {
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
ExecutorType executorType = context.getExecutorType();
switch (executorType){
case WORKER:
TaskExecutionContext taskExecutionContext = context.getContext();
requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
break;
case CLIENT:
break;
default:
throw new IllegalArgumentException("invalid executor type : " + executorType);
}
return requestCommand.convert2Command();
}
/**
* build command
* @param context context
* @return command
*/
private Command buildKillCommand(ExecutionContext context) {
TaskKillRequestCommand requestCommand = new TaskKillRequestCommand();
ExecutorType executorType = context.getExecutorType();
switch (executorType){
case WORKER:
TaskExecutionContext taskExecutionContext = context.getContext();
requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
break;
case CLIENT:
break;
default:
throw new IllegalArgumentException("invalid executor type : " + executorType);
}
return requestCommand.convert2Command();
doExecute(host, context.getCommand());
}
/**
......
......@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.Callable;
import static org.apache.dolphinscheduler.common.Constants.DOLPHINSCHEDULER_TASKS_QUEUE;
/**
* master task exec base class
......@@ -131,7 +130,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
*/
private Boolean dispatch(TaskInstance taskInstance){
TaskExecutionContext context = getTaskExecutionContext(taskInstance);
ExecutionContext executionContext = new ExecutionContext(context, ExecutorType.WORKER);
ExecutionContext executionContext = new ExecutionContext(context.toCommand(), ExecutorType.WORKER, context.getWorkerGroup());
try {
return dispatcher.dispatch(executionContext);
} catch (ExecuteException e) {
......@@ -227,8 +226,8 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
}
}
if(submitDB && !submitTask){
// dispatcht task
submitTask = dispatchtTask(task);
// dispatch task
submitTask = dispatchTask(task);
}
if(submitDB && submitTask){
return task;
......@@ -254,7 +253,7 @@ public class MasterBaseTaskExecThread implements Callable<Boolean> {
* @param taskInstance taskInstance
* @return whether submit task success
*/
public Boolean dispatchtTask(TaskInstance taskInstance) {
public Boolean dispatchTask(TaskInstance taskInstance) {
try{
if(taskInstance.isSubProcess()){
......
......@@ -26,7 +26,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
......@@ -184,7 +183,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
alreadyKilled = true;
TaskExecutionContext taskExecutionContext = super.getTaskExecutionContext(taskInstance);
ExecutionContext executionContext = new ExecutionContext(taskExecutionContext, ExecutorType.WORKER);
ExecutionContext executionContext = new ExecutionContext(taskExecutionContext.toKillCommand(), ExecutorType.WORKER, taskExecutionContext.getWorkerGroup());
Host host = Host.of(taskInstance.getHost());
executionContext.setHost(host);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册