未验证 提交 b0f9cd72 编写于 作者: T Tboy 提交者: GitHub

Merge pull request #18 from apache/refactor-worker

Refactor worker
......@@ -21,6 +21,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.slf4j.Logger;
......@@ -68,7 +69,7 @@ public class LoggerService {
return new Result(Status.TASK_INSTANCE_NOT_FOUND.getCode(), Status.TASK_INSTANCE_NOT_FOUND.getMsg());
}
String host = taskInstance.getHost();
String host = Host.of(taskInstance.getHost()).getIp();
if(StringUtils.isEmpty(host)){
return new Result(Status.TASK_INSTANCE_NOT_FOUND.getCode(), Status.TASK_INSTANCE_NOT_FOUND.getMsg());
}
......@@ -94,7 +95,7 @@ public class LoggerService {
if (taskInstance == null){
throw new RuntimeException("task instance is null");
}
String host = taskInstance.getHost();
String host = Host.of(taskInstance.getHost()).getIp();
return logClient.getLogBytes(host, Constants.RPC_PORT, taskInstance.getLogPath());
}
}
......@@ -186,21 +186,11 @@ public class TaskInstance implements Serializable {
@TableField(exist = false)
private String dependentResult;
/**
* worker group id
*/
private int workerGroupId;
public void init(String host,Date startTime,String executePath){
this.host = host;
this.startTime = startTime;
this.executePath = executePath;
}
public ProcessInstance getProcessInstance() {
return processInstance;
}
......
......@@ -48,7 +48,6 @@ public class TaskExecutionContextBuilder {
taskExecutionContext.setLogPath(taskInstance.getLogPath());
taskExecutionContext.setExecutePath(taskInstance.getExecutePath());
taskExecutionContext.setTaskJson(taskInstance.getTaskJson());
taskExecutionContext.setHost(taskInstance.getHost());
taskExecutionContext.setWorkerGroup(Constants.DEFAULT_WORKER_GROUP);
return this;
}
......
......@@ -29,7 +29,7 @@ public class TaskExecutionContext implements Serializable{
/**
* task id
*/
private Integer taskInstanceId;
private int taskInstanceId;
/**
......@@ -51,7 +51,7 @@ public class TaskExecutionContext implements Serializable{
* host
*/
private String host;
/**
* task execute path
*/
......@@ -70,7 +70,7 @@ public class TaskExecutionContext implements Serializable{
/**
* processId
*/
private Integer processId;
private int processId;
/**
* appIds
......@@ -80,7 +80,7 @@ public class TaskExecutionContext implements Serializable{
/**
* process instance id
*/
private Integer processInstanceId;
private int processInstanceId;
/**
......@@ -97,13 +97,13 @@ public class TaskExecutionContext implements Serializable{
/**
* execute user id
*/
private Integer executorId;
private int executorId;
/**
* command type if complement
*/
private Integer cmdTypeIfComplement;
private int cmdTypeIfComplement;
/**
......@@ -120,12 +120,12 @@ public class TaskExecutionContext implements Serializable{
/**
* process define id
*/
private Integer processDefineId;
private int processDefineId;
/**
* project id
*/
private Integer projectId;
private int projectId;
/**
* taskParams
......@@ -173,22 +173,11 @@ public class TaskExecutionContext implements Serializable{
*/
private DataxTaskExecutionContext dataxTaskExecutionContext;
public String getWorkerGroup() {
return workerGroup;
}
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
public Integer getTaskInstanceId() {
public int getTaskInstanceId() {
return taskInstanceId;
}
public void setTaskInstanceId(Integer taskInstanceId) {
public void setTaskInstanceId(int taskInstanceId) {
this.taskInstanceId = taskInstanceId;
}
......@@ -216,6 +205,14 @@ public class TaskExecutionContext implements Serializable{
this.taskType = taskType;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getExecutePath() {
return executePath;
}
......@@ -224,6 +221,14 @@ public class TaskExecutionContext implements Serializable{
this.executePath = executePath;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public String getTaskJson() {
return taskJson;
}
......@@ -232,11 +237,27 @@ public class TaskExecutionContext implements Serializable{
this.taskJson = taskJson;
}
public Integer getProcessInstanceId() {
public int getProcessId() {
return processId;
}
public void setProcessId(int processId) {
this.processId = processId;
}
public String getAppIds() {
return appIds;
}
public void setAppIds(String appIds) {
this.appIds = appIds;
}
public int getProcessInstanceId() {
return processInstanceId;
}
public void setProcessInstanceId(Integer processInstanceId) {
public void setProcessInstanceId(int processInstanceId) {
this.processInstanceId = processInstanceId;
}
......@@ -256,6 +277,22 @@ public class TaskExecutionContext implements Serializable{
this.globalParams = globalParams;
}
public int getExecutorId() {
return executorId;
}
public void setExecutorId(int executorId) {
this.executorId = executorId;
}
public int getCmdTypeIfComplement() {
return cmdTypeIfComplement;
}
public void setCmdTypeIfComplement(int cmdTypeIfComplement) {
this.cmdTypeIfComplement = cmdTypeIfComplement;
}
public String getTenantCode() {
return tenantCode;
}
......@@ -272,46 +309,22 @@ public class TaskExecutionContext implements Serializable{
this.queue = queue;
}
public Integer getProcessDefineId() {
public int getProcessDefineId() {
return processDefineId;
}
public void setProcessDefineId(Integer processDefineId) {
public void setProcessDefineId(int processDefineId) {
this.processDefineId = processDefineId;
}
public Integer getProjectId() {
public int getProjectId() {
return projectId;
}
public void setProjectId(Integer projectId) {
public void setProjectId(int projectId) {
this.projectId = projectId;
}
public Integer getExecutorId() {
return executorId;
}
public void setExecutorId(Integer executorId) {
this.executorId = executorId;
}
public Integer getCmdTypeIfComplement() {
return cmdTypeIfComplement;
}
public void setCmdTypeIfComplement(Integer cmdTypeIfComplement) {
this.cmdTypeIfComplement = cmdTypeIfComplement;
}
public String getLogPath() {
return logPath;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
}
public String getTaskParams() {
return taskParams;
}
......@@ -360,28 +373,12 @@ public class TaskExecutionContext implements Serializable{
this.taskTimeout = taskTimeout;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public Integer getProcessId() {
return processId;
}
public void setProcessId(Integer processId) {
this.processId = processId;
}
public String getAppIds() {
return appIds;
public String getWorkerGroup() {
return workerGroup;
}
public void setAppIds(String appIds) {
this.appIds = appIds;
public void setWorkerGroup(String workerGroup) {
this.workerGroup = workerGroup;
}
public SQLTaskExecutionContext getSqlTaskExecutionContext() {
......
......@@ -128,7 +128,7 @@ public class MasterServer implements IStoppable {
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.start();
//
......
......@@ -90,6 +90,7 @@ public class ExecutorDispatcher implements InitializingBean {
throw new ExecuteException(String.format("fail to execute : %s due to no worker ", context.getContext()));
}
context.setHost(host);
context.getContext().setHost(host.getAddress());
executorManager.beforeExecute(context);
try {
/**
......
......@@ -22,6 +22,7 @@ import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.Host;
......@@ -71,7 +72,7 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
*/
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.TASK_KILL_RESPONSE, new TaskKillResponseProcessor());
}
......@@ -130,8 +131,9 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
return success;
}
@Override
public void executeDirectly(ExecutionContext context) throws ExecuteException {
Command command = buildCommand(context);
Command command = buildKillCommand(context);
Host host = context.getHost();
doExecute(host,command);
}
......@@ -158,6 +160,28 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
return requestCommand.convert2Command();
}
/**
* build command
* @param context context
* @return command
*/
private Command buildKillCommand(ExecutionContext context) {
TaskKillRequestCommand requestCommand = new TaskKillRequestCommand();
ExecutorType executorType = context.getExecutorType();
switch (executorType){
case WORKER:
TaskExecutionContext taskExecutionContext = context.getContext();
requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
break;
case CLIENT:
break;
default:
throw new IllegalArgumentException("invalid executor type : " + executorType);
}
return requestCommand.convert2Command();
}
/**
* execute logic
* @param host host
......
......@@ -46,8 +46,7 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor {
Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskKillResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillResponseCommand.class);
logger.info("received command : {}", responseCommand);
logger.info("已经接受到了worker杀任务的回应");
logger.info("received task kill response command : {}", responseCommand);
}
......
......@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import com.alibaba.fastjson.JSONObject;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
......@@ -183,12 +184,9 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
alreadyKilled = true;
TaskExecutionContext taskExecutionContext = super.getTaskExecutionContext(taskInstance);
ExecutionContext executionContext = new ExecutionContext(taskExecutionContext, ExecutorType.WORKER);
Host host = new Host();
host.setIp(taskInstance.getHost());
host.setPort(12346);
Host host = Host.of(taskInstance.getHost());
executionContext.setHost(host);
nettyExecutorManager.executeDirectly(executionContext);
......
......@@ -78,10 +78,12 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
logger.info("received command : {}", command);
TaskExecuteRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
command.getBody(), TaskExecuteRequestCommand.class);
logger.info("received command : {}", taskRequestCommand);
String contextJson = taskRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
......@@ -141,7 +143,7 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
ackCommand.setLogPath(getTaskLogPath(taskExecutionContext));
ackCommand.setHost(OSUtils.getHost());
ackCommand.setHost(taskExecutionContext.getHost());
ackCommand.setStartTime(new Date());
if(taskExecutionContext.getTaskType().equals(TaskType.SQL.name()) || taskExecutionContext.getTaskType().equals(TaskType.PROCEDURE.name())){
ackCommand.setExecutePath(null);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册