未验证 提交 71c11bd5 编写于 作者: T Tboy 提交者: GitHub

refactor kill logic (#2060)

上级 4d64ef09
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ EXECUTE_TASK_REQUEST, /** * execute task ack */ EXECUTE_TASK_ACK, /** * execute task response */ EXECUTE_TASK_RESPONSE, /** * kill task */ KILL_TASK_REQUEST, /** * kill task response */ KILL_TASK_RESPONSE, /** * ping */ PING, /** * pong */ PONG;}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; public enum CommandType { /** * roll view log request */ ROLL_VIEW_LOG_REQUEST, /** * roll view log response */ ROLL_VIEW_LOG_RESPONSE, /** * view whole log request */ VIEW_WHOLE_LOG_REQUEST, /** * view whole log response */ VIEW_WHOLE_LOG_RESPONSE, /** * get log bytes request */ GET_LOG_BYTES_REQUEST, /** * get log bytes response */ GET_LOG_BYTES_RESPONSE, WORKER_REQUEST, MASTER_RESPONSE, /** * execute task request */ TASK_EXECUTE_REQUEST, /** * execute task ack */ TASK_EXECUTE_ACK, /** * execute task response */ TASK_EXECUTE_RESPONSE, /** * kill task */ TASK_KILL_REQUEST, /** * kill task response */ TASK_KILL_RESPONSE, /** * ping */ PING, /** * pong */ PONG;}
\ No newline at end of file
......
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.Date; /** * execute task request command */public class ExecuteTaskAckCommand implements Serializable { /** * taskInstanceId */ private int taskInstanceId; /** * startTime */ private Date startTime; /** * host */ private String host; /** * status */ private int status; /** * logPath */ private String logPath; /** * executePath */ private String executePath; public Date getStartTime() { return startTime; } public void setStartTime(Date startTime) { this.startTime = startTime; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_ACK); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskAckCommand{" + "taskInstanceId=" + taskInstanceId + ", startTime=" + startTime + ", host='" + host + '\'' + ", status=" + status + ", logPath='" + logPath + '\'' + ", executePath='" + executePath + '\'' + '}'; }}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.Date; /** * execute task request command */public class TaskExecuteAckCommand implements Serializable { /** * taskInstanceId */ private int taskInstanceId; /** * startTime */ private Date startTime; /** * host */ private String host; /** * status */ private int status; /** * logPath */ private String logPath; /** * executePath */ private String executePath; public Date getStartTime() { return startTime; } public void setStartTime(Date startTime) { this.startTime = startTime; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getLogPath() { return logPath; } public void setLogPath(String logPath) { this.logPath = logPath; } public String getExecutePath() { return executePath; } public void setExecutePath(String executePath) { this.executePath = executePath; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_EXECUTE_ACK); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskExecuteAckCommand{" + "taskInstanceId=" + taskInstanceId + ", startTime=" + startTime + ", host='" + host + '\'' + ", status=" + status + ", logPath='" + logPath + '\'' + ", executePath='" + executePath + '\'' + '}'; }}
\ No newline at end of file
......
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */public class ExecuteTaskRequestCommand implements Serializable { /** * task execution context */ private String taskExecutionContext; public String getTaskExecutionContext() { return taskExecutionContext; } public void setTaskExecutionContext(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } public ExecuteTaskRequestCommand() { } public ExecuteTaskRequestCommand(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskRequestCommand{" + "taskExecutionContext='" + taskExecutionContext + '\'' + '}'; }}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * execute task request command */public class TaskExecuteRequestCommand implements Serializable { /** * task execution context */ private String taskExecutionContext; public String getTaskExecutionContext() { return taskExecutionContext; } public void setTaskExecutionContext(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } public TaskExecuteRequestCommand() { } public TaskExecuteRequestCommand(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_EXECUTE_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskExecuteRequestCommand{" + "taskExecutionContext='" + taskExecutionContext + '\'' + '}'; }}
\ No newline at end of file
......
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.Date; /** * execute task response command */public class ExecuteTaskResponseCommand implements Serializable { public ExecuteTaskResponseCommand() { } public ExecuteTaskResponseCommand(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * task instance id */ private int taskInstanceId; /** * status */ private int status; /** * end time */ private Date endTime; /** * processId */ private int processId; /** * appIds */ private String appIds; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getEndTime() { return endTime; } public void setEndTime(Date endTime) { this.endTime = endTime; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public String getAppIds() { return appIds; } public void setAppIds(String appIds) { this.appIds = appIds; } /** * package response command * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.EXECUTE_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "ExecuteTaskResponseCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + ", endTime=" + endTime + ", processId=" + processId + ", appIds='" + appIds + '\'' + '}'; }}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.Date; /** * execute task response command */public class TaskExecuteResponseCommand implements Serializable { public TaskExecuteResponseCommand() { } public TaskExecuteResponseCommand(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } /** * task instance id */ private int taskInstanceId; /** * status */ private int status; /** * end time */ private Date endTime; /** * processId */ private int processId; /** * appIds */ private String appIds; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public Date getEndTime() { return endTime; } public void setEndTime(Date endTime) { this.endTime = endTime; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public String getAppIds() { return appIds; } public void setAppIds(String appIds) { this.appIds = appIds; } /** * package response command * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_EXECUTE_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskExecuteResponseCommand{" + "taskInstanceId=" + taskInstanceId + ", status=" + status + ", endTime=" + endTime + ", processId=" + processId + ", appIds='" + appIds + '\'' + '}'; }}
\ No newline at end of file
......
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * kill task request command */public class KillTaskRequestCommand implements Serializable { /** * task execution context */ private String taskExecutionContext; public String getTaskExecutionContext() { return taskExecutionContext; } public void setTaskExecutionContext(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } public KillTaskRequestCommand() { } public KillTaskRequestCommand(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.KILL_TASK_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "KillTaskRequestCommand{" + "taskExecutionContext='" + taskExecutionContext + '\'' + '}'; }}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable; /** * kill task request command */public class TaskKillRequestCommand implements Serializable { /** * task execution context */ private String taskExecutionContext; public String getTaskExecutionContext() { return taskExecutionContext; } public void setTaskExecutionContext(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } public TaskKillRequestCommand() { } public TaskKillRequestCommand(String taskExecutionContext) { this.taskExecutionContext = taskExecutionContext; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_KILL_REQUEST); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskKillRequestCommand{" + "taskExecutionContext='" + taskExecutionContext + '\'' + '}'; }}
\ No newline at end of file
......
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.Date;import java.util.List; /** * kill task response command */public class KillTaskResponseCommand implements Serializable { /** * taskInstanceId */ private int taskInstanceId; /** * host */ private String host; /** * status */ private int status; /** * processId */ private int processId; /** * other resource manager appId , for example : YARN etc */ protected List<String> appIds; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public List<String> getAppIds() { return appIds; } public void setAppIds(List<String> appIds) { this.appIds = appIds; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.KILL_TASK_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "KillTaskResponseCommand{" + "taskInstanceId=" + taskInstanceId + ", host='" + host + '\'' + ", status=" + status + ", processId=" + processId + ", appIds=" + appIds + '}'; }}
\ No newline at end of file
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.dolphinscheduler.remote.command; import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer; import java.io.Serializable;import java.util.Date;import java.util.List; /** * kill task response command */public class TaskKillResponseCommand implements Serializable { /** * taskInstanceId */ private int taskInstanceId; /** * host */ private String host; /** * status */ private int status; /** * processId */ private int processId; /** * other resource manager appId , for example : YARN etc */ protected List<String> appIds; public int getTaskInstanceId() { return taskInstanceId; } public void setTaskInstanceId(int taskInstanceId) { this.taskInstanceId = taskInstanceId; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public int getProcessId() { return processId; } public void setProcessId(int processId) { this.processId = processId; } public List<String> getAppIds() { return appIds; } public void setAppIds(List<String> appIds) { this.appIds = appIds; } /** * package request command * * @return command */ public Command convert2Command(){ Command command = new Command(); command.setType(CommandType.TASK_KILL_RESPONSE); byte[] body = FastJsonSerializer.serialize(this); command.setBody(body); return command; } @Override public String toString() { return "TaskKillResponseCommand{" + "taskInstanceId=" + taskInstanceId + ", host='" + host + '\'' + ", status=" + status + ", processId=" + processId + ", appIds=" + appIds + '}'; }}
\ No newline at end of file
......
......@@ -126,9 +126,9 @@ public class MasterServer implements IStoppable {
NettyServerConfig serverConfig = new NettyServerConfig();
serverConfig.setListenPort(45678);
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskKillResponseProcessor());
this.nettyRemotingServer.start();
//
......
......@@ -18,8 +18,8 @@
package org.apache.dolphinscheduler.server.master.cache;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
/**
......@@ -47,14 +47,14 @@ public interface TaskInstanceCacheManager {
*
* @param taskAckCommand taskAckCommand
*/
void cacheTaskInstance(ExecuteTaskAckCommand taskAckCommand);
void cacheTaskInstance(TaskExecuteAckCommand taskAckCommand);
/**
* cache taskInstance
*
* @param executeTaskResponseCommand executeTaskResponseCommand
* @param taskExecuteResponseCommand taskExecuteResponseCommand
*/
void cacheTaskInstance(ExecuteTaskResponseCommand executeTaskResponseCommand);
void cacheTaskInstance(TaskExecuteResponseCommand taskExecuteResponseCommand);
/**
* remove taskInstance by taskInstanceId
......
......@@ -18,8 +18,8 @@ package org.apache.dolphinscheduler.server.master.cache.impl;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.service.process.ProcessService;
......@@ -86,7 +86,7 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
* @param taskAckCommand taskAckCommand
*/
@Override
public void cacheTaskInstance(ExecuteTaskAckCommand taskAckCommand) {
public void cacheTaskInstance(TaskExecuteAckCommand taskAckCommand) {
TaskInstance taskInstance = new TaskInstance();
taskInstance.setState(ExecutionStatus.of(taskAckCommand.getStatus()));
taskInstance.setStartTime(taskAckCommand.getStartTime());
......@@ -99,13 +99,13 @@ public class TaskInstanceCacheManagerImpl implements TaskInstanceCacheManager {
/**
* cache taskInstance
*
* @param executeTaskResponseCommand executeTaskResponseCommand
* @param taskExecuteResponseCommand taskExecuteResponseCommand
*/
@Override
public void cacheTaskInstance(ExecuteTaskResponseCommand executeTaskResponseCommand) {
TaskInstance taskInstance = getByTaskInstanceId(executeTaskResponseCommand.getTaskInstanceId());
taskInstance.setState(ExecutionStatus.of(executeTaskResponseCommand.getStatus()));
taskInstance.setEndTime(executeTaskResponseCommand.getEndTime());
public void cacheTaskInstance(TaskExecuteResponseCommand taskExecuteResponseCommand) {
TaskInstance taskInstance = getByTaskInstanceId(taskExecuteResponseCommand.getTaskInstanceId());
taskInstance.setState(ExecutionStatus.of(taskExecuteResponseCommand.getStatus()));
taskInstance.setEndTime(taskExecuteResponseCommand.getEndTime());
}
/**
......
......@@ -41,6 +41,14 @@ public interface ExecutorManager<T> {
*/
T execute(ExecutionContext context) throws ExecuteException;
/**
* execute task directly without retry
* @param context context
* @return T
* @throws ExecuteException
*/
void executeDirectly(ExecutionContext context) throws ExecuteException;
/**
* after execute
* @param context context
......
......@@ -21,7 +21,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.Host;
......@@ -30,6 +30,7 @@ import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionConte
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.processor.TaskAckProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.apache.dolphinscheduler.server.master.processor.TaskResponseProcessor;
import org.apache.dolphinscheduler.server.registry.ZookeeperNodeManager;
import org.slf4j.Logger;
......@@ -68,8 +69,9 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
* register EXECUTE_TASK_RESPONSE command type TaskResponseProcessor
* register EXECUTE_TASK_ACK command type TaskAckProcessor
*/
this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.EXECUTE_TASK_ACK, new TaskAckProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskResponseProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_ACK, new TaskAckProcessor());
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE, new TaskKillResponseProcessor());
}
......@@ -128,13 +130,19 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
return success;
}
public void executeDirectly(ExecutionContext context) throws ExecuteException {
Command command = buildCommand(context);
Host host = context.getHost();
doExecute(host,command);
}
/**
* build command
* @param context context
* @return command
*/
private Command buildCommand(ExecutionContext context) {
ExecuteTaskRequestCommand requestCommand = new ExecuteTaskRequestCommand();
TaskExecuteRequestCommand requestCommand = new TaskExecuteRequestCommand();
ExecutorType executorType = context.getExecutorType();
switch (executorType){
case WORKER:
......@@ -203,4 +211,8 @@ public class NettyExecutorManager extends AbstractExecutorManager<Boolean>{
}
return nodes;
}
public NettyRemotingClient getNettyRemotingClient() {
return nettyRemotingClient;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.master.dispatch.executor;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
import org.apache.dolphinscheduler.server.master.processor.TaskKillResponseProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
* netty executor manager
*/
@Service
public class NettyKillManager extends AbstractExecutorManager<Boolean>{
private final Logger logger = LoggerFactory.getLogger(NettyKillManager.class);
/**
* netty remote client
*/
private final NettyRemotingClient nettyRemotingClient;
public NettyKillManager(){
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
/**
* register KILL_TASK_RESPONSE command type TaskKillResponseProcessor
*/
this.nettyRemotingClient.registerProcessor(CommandType.KILL_TASK_RESPONSE, new TaskKillResponseProcessor());
}
/**
* execute logic
*
* @param context context
* @return result
* @throws ExecuteException
*/
@Override
public Boolean execute(ExecutionContext context) throws ExecuteException {
Host host = context.getHost();
Command command = buildCommand(context);
try {
doExecute(host, command);
return true;
}catch (ExecuteException ex) {
logger.error(String.format("execute context : %s error", context.getContext()), ex);
return false;
}
}
private Command buildCommand(ExecutionContext context) {
KillTaskRequestCommand requestCommand = new KillTaskRequestCommand();
TaskExecutionContext taskExecutionContext = context.getContext();
requestCommand.setTaskExecutionContext(FastJsonSerializer.serializeToString(taskExecutionContext));
return requestCommand.convert2Command();
}
/**
* execute logic
* @param host host
* @param command command
* @throws ExecuteException
*/
private void doExecute(final Host host, final Command command) throws ExecuteException {
/**
* retry count,default retry 3
*/
int retryCount = 3;
boolean success = false;
do {
try {
nettyRemotingClient.send(host, command);
success = true;
} catch (Exception ex) {
logger.error(String.format("send command : %s to %s error", command, host), ex);
retryCount--;
try {
Thread.sleep(100);
} catch (InterruptedException ignore) {}
}
} while (retryCount >= 0 && !success);
if (!success) {
throw new ExecuteException(String.format("send command : %s to %s error", command, host));
}
}
}
......@@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
......@@ -57,12 +57,12 @@ public class TaskAckProcessor implements NettyRequestProcessor {
/**
* task ack process
* @param channel channel channel
* @param command command ExecuteTaskAckCommand
* @param command command TaskExecuteAckCommand
*/
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
ExecuteTaskAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskAckCommand.class);
Preconditions.checkArgument(CommandType.TASK_EXECUTE_ACK == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskExecuteAckCommand taskAckCommand = FastJsonSerializer.deserialize(command.getBody(), TaskExecuteAckCommand.class);
logger.info("taskAckCommand : {}", taskAckCommand);
taskInstanceCacheManager.cacheTaskInstance(taskAckCommand);
......
......@@ -18,19 +18,12 @@
package org.apache.dolphinscheduler.server.master.processor;
import io.netty.channel.Channel;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -50,9 +43,9 @@ public class TaskKillResponseProcessor implements NettyRequestProcessor {
*/
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.KILL_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
Preconditions.checkArgument(CommandType.TASK_KILL_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
KillTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), KillTaskResponseCommand.class);
TaskKillResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillResponseCommand.class);
logger.info("received command : {}", responseCommand);
logger.info("已经接受到了worker杀任务的回应");
}
......
......@@ -22,7 +22,7 @@ import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
......@@ -63,9 +63,9 @@ public class TaskResponseProcessor implements NettyRequestProcessor {
*/
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
Preconditions.checkArgument(CommandType.TASK_EXECUTE_RESPONSE == command.getType(), String.format("invalid command type : %s", command.getType()));
ExecuteTaskResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), ExecuteTaskResponseCommand.class);
TaskExecuteResponseCommand responseCommand = FastJsonSerializer.deserialize(command.getBody(), TaskExecuteResponseCommand.class);
logger.info("received command : {}", responseCommand);
taskInstanceCacheManager.cacheTaskInstance(responseCommand);
......
......@@ -32,7 +32,7 @@ import org.apache.dolphinscheduler.server.master.cache.TaskInstanceCacheManager;
import org.apache.dolphinscheduler.server.master.cache.impl.TaskInstanceCacheManagerImpl;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyKillManager;
import org.apache.dolphinscheduler.server.master.dispatch.executor.NettyExecutorManager;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -57,7 +57,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
private TaskInstanceCacheManager taskInstanceCacheManager;
private NettyKillManager nettyKillManager;
private NettyExecutorManager nettyExecutorManager;
/**
* constructor of MasterTaskExecThread
......@@ -67,7 +67,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
public MasterTaskExecThread(TaskInstance taskInstance, ProcessInstance processInstance){
super(taskInstance, processInstance);
this.taskInstanceCacheManager = SpringApplicationContext.getBean(TaskInstanceCacheManagerImpl.class);
this.nettyKillManager = SpringApplicationContext.getBean(NettyKillManager.class);
this.nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
}
/**
......@@ -191,7 +191,7 @@ public class MasterTaskExecThread extends MasterBaseTaskExecThread {
host.setPort(12346);
executionContext.setHost(host);
nettyKillManager.execute(executionContext);
nettyExecutorManager.executeDirectly(executionContext);
logger.info("master add kill task :{} id:{} to kill queue",
taskInstance.getName(), taskInstance.getId() );
......
......@@ -127,8 +127,8 @@ public class WorkerServer implements IStoppable {
//init remoting server
NettyServerConfig serverConfig = new NettyServerConfig();
this.nettyRemotingServer = new NettyRemotingServer(serverConfig);
this.nettyRemotingServer.registerProcessor(CommandType.EXECUTE_TASK_REQUEST, new TaskExecuteProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.KILL_TASK_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_REQUEST, new TaskExecuteProcessor());
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, new TaskKillProcessor());
this.nettyRemotingServer.start();
this.workerRegistry = new WorkerRegistry(zookeeperRegistryCenter, serverConfig.getListenPort(), workerConfig.getWorkerHeartbeatInterval(), workerConfig.getWorkerGroup());
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dolphinscheduler.server.worker.processor;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
/**
* taks callback service
*/
public class KillTaskCallbackService {
private final Logger logger = LoggerFactory.getLogger(KillTaskCallbackService.class);
/**
* remote channels
*/
private static final ConcurrentHashMap<Integer, NettyRemoteChannel> REMOTE_CHANNELS = new ConcurrentHashMap<>();
/**
* netty remoting client
*/
private final NettyRemotingClient nettyRemotingClient;
public KillTaskCallbackService(){
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
}
/**
* add callback channel
* @param taskInstanceId taskInstanceId
* @param channel channel
*/
public void addRemoteChannel(int taskInstanceId, NettyRemoteChannel channel){
REMOTE_CHANNELS.put(taskInstanceId, channel);
}
/**
* get callback channel
* @param taskInstanceId taskInstanceId
* @return callback channel
*/
public NettyRemoteChannel getRemoteChannel(int taskInstanceId){
NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
if(nettyRemoteChannel.isActive()){
return nettyRemoteChannel;
}
Channel newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
if(newChannel != null){
NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, nettyRemoteChannel.getOpaque());
addRemoteChannel(taskInstanceId, remoteChannel);
return remoteChannel;
}
return null;
}
/**
* remove callback channels
* @param taskInstanceId taskInstanceId
*/
public void remove(int taskInstanceId){
REMOTE_CHANNELS.remove(taskInstanceId);
}
/**
* send result
*
* @param taskInstanceId taskInstanceId
* @param killTaskResponseCommand killTaskResponseCommand
*/
public void sendKillResult(int taskInstanceId, KillTaskResponseCommand killTaskResponseCommand){
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
if(nettyRemoteChannel == null){
//TODO
} else{
nettyRemoteChannel.writeAndFlush(killTaskResponseCommand.convert2Command()).addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(future.isSuccess()){
remove(taskInstanceId);
return;
}
}
});
}
}
}
......@@ -22,17 +22,18 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import org.apache.dolphinscheduler.remote.NettyRemotingClient;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.config.NettyClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap;
/**
* taks callback service
*/
@Service
public class TaskCallbackService {
private final Logger logger = LoggerFactory.getLogger(TaskCallbackService.class);
......@@ -92,14 +93,14 @@ public class TaskCallbackService {
/**
* send ack
* @param taskInstanceId taskInstanceId
* @param ackCommand ackCommand
* @param command command
*/
public void sendAck(int taskInstanceId, ExecuteTaskAckCommand ackCommand){
public void sendAck(int taskInstanceId, Command command){
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
if(nettyRemoteChannel == null){
//TODO
} else{
nettyRemoteChannel.writeAndFlush(ackCommand.convert2Command());
nettyRemoteChannel.writeAndFlush(command);
}
}
......@@ -107,14 +108,14 @@ public class TaskCallbackService {
* send result
*
* @param taskInstanceId taskInstanceId
* @param responseCommand responseCommand
* @param command command
*/
public void sendResult(int taskInstanceId, ExecuteTaskResponseCommand responseCommand){
public void sendResult(int taskInstanceId, Command command){
NettyRemoteChannel nettyRemoteChannel = getRemoteChannel(taskInstanceId);
if(nettyRemoteChannel == null){
//TODO
} else{
nettyRemoteChannel.writeAndFlush(responseCommand.convert2Command()).addListener(new ChannelFutureListener(){
nettyRemoteChannel.writeAndFlush(command).addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture future) throws Exception {
......
......@@ -31,8 +31,8 @@ import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskAckCommand;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRequestCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
......@@ -69,18 +69,18 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
private final TaskCallbackService taskCallbackService;
public TaskExecuteProcessor(){
this.taskCallbackService = new TaskCallbackService();
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.workerExecService = ThreadUtils.newDaemonFixedThreadExecutor("Worker-Execute-Thread", workerConfig.getWorkerExecThreads());
}
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.EXECUTE_TASK_REQUEST == command.getType(),
Preconditions.checkArgument(CommandType.TASK_EXECUTE_REQUEST == command.getType(),
String.format("invalid command type : %s", command.getType()));
logger.info("received command : {}", command);
ExecuteTaskRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
command.getBody(), ExecuteTaskRequestCommand.class);
TaskExecuteRequestCommand taskRequestCommand = FastJsonSerializer.deserialize(
command.getBody(), TaskExecuteRequestCommand.class);
String contextJson = taskRequestCommand.getTaskExecutionContext();
......@@ -105,8 +105,8 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
private void doAck(TaskExecutionContext taskExecutionContext){
// tell master that task is in executing
ExecuteTaskAckCommand ackCommand = buildAckCommand(taskExecutionContext);
taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand);
TaskExecuteAckCommand ackCommand = buildAckCommand(taskExecutionContext);
taskCallbackService.sendAck(taskExecutionContext.getTaskInstanceId(), ackCommand.convert2Command());
}
/**
......@@ -134,10 +134,10 @@ public class TaskExecuteProcessor implements NettyRequestProcessor {
/**
* build ack command
* @param taskExecutionContext taskExecutionContext
* @return ExecuteTaskAckCommand
* @return TaskExecuteAckCommand
*/
private ExecuteTaskAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
ExecuteTaskAckCommand ackCommand = new ExecuteTaskAckCommand();
private TaskExecuteAckCommand buildAckCommand(TaskExecutionContext taskExecutionContext) {
TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand();
ackCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
ackCommand.setStatus(ExecutionStatus.RUNNING_EXEUTION.getCode());
ackCommand.setLogPath(getTaskLogPath(taskExecutionContext));
......
......@@ -27,8 +27,8 @@ import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.KillTaskRequestCommand;
import org.apache.dolphinscheduler.remote.command.KillTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
import org.apache.dolphinscheduler.remote.processor.NettyRequestProcessor;
import org.apache.dolphinscheduler.remote.utils.FastJsonSerializer;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
......@@ -58,7 +58,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
/**
* task callback service
*/
private final KillTaskCallbackService killTaskCallbackService;
private final TaskCallbackService taskCallbackService;
/**
* taskExecutionContextCacheManager
......@@ -72,7 +72,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
public TaskKillProcessor(){
this.killTaskCallbackService = new KillTaskCallbackService();
this.taskCallbackService = SpringApplicationContext.getBean(TaskCallbackService.class);
this.workerConfig = SpringApplicationContext.getBean(WorkerConfig.class);
this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
}
......@@ -115,41 +115,41 @@ public class TaskKillProcessor implements NettyRequestProcessor {
@Override
public void process(Channel channel, Command command) {
Preconditions.checkArgument(CommandType.KILL_TASK_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
KillTaskRequestCommand killTaskRequestCommand = FastJsonSerializer.deserialize(command.getBody(), KillTaskRequestCommand.class);
logger.info("received command : {}", killTaskRequestCommand);
Preconditions.checkArgument(CommandType.TASK_KILL_REQUEST == command.getType(), String.format("invalid command type : %s", command.getType()));
TaskKillRequestCommand taskKillRequestCommand = FastJsonSerializer.deserialize(command.getBody(), TaskKillRequestCommand.class);
logger.info("received command : {}", taskKillRequestCommand);
String contextJson = killTaskRequestCommand.getTaskExecutionContext();
String contextJson = taskKillRequestCommand.getTaskExecutionContext();
TaskExecutionContext taskExecutionContext = JSONObject.parseObject(contextJson, TaskExecutionContext.class);
Boolean killStatus = doKill(taskExecutionContext);
killTaskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
taskCallbackService.addRemoteChannel(taskExecutionContext.getTaskInstanceId(),
new NettyRemoteChannel(channel, command.getOpaque()));
KillTaskResponseCommand killTaskResponseCommand = buildKillTaskResponseCommand(taskExecutionContext,killStatus);
killTaskCallbackService.sendKillResult(killTaskResponseCommand.getTaskInstanceId(),killTaskResponseCommand);
TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskExecutionContext,killStatus);
taskCallbackService.sendResult(taskKillResponseCommand.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
}
/**
* build KillTaskResponseCommand
* build TaskKillResponseCommand
*
* @param taskExecutionContext taskExecutionContext
* @param killStatus killStatus
* @return build KillTaskResponseCommand
* @return build TaskKillResponseCommand
*/
private KillTaskResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext,
private TaskKillResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext,
Boolean killStatus) {
KillTaskResponseCommand killTaskResponseCommand = new KillTaskResponseCommand();
killTaskResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
killTaskResponseCommand.setHost(taskExecutionContext.getHost());
killTaskResponseCommand.setStatus(killStatus ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
killTaskResponseCommand.setProcessId(taskExecutionContext.getProcessId());
killTaskResponseCommand.setAppIds(appIds);
return killTaskResponseCommand;
TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskKillResponseCommand.setHost(taskExecutionContext.getHost());
taskKillResponseCommand.setStatus(killStatus ? ExecutionStatus.SUCCESS.getCode() : ExecutionStatus.FAILURE.getCode());
taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
taskKillResponseCommand.setAppIds(appIds);
return taskKillResponseCommand;
}
/**
......
......@@ -24,7 +24,7 @@ import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.TaskTimeoutParameter;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.remote.command.ExecuteTaskResponseCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.processor.TaskCallbackService;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
......@@ -75,7 +75,7 @@ public class TaskExecuteThread implements Runnable {
@Override
public void run() {
ExecuteTaskResponseCommand responseCommand = new ExecuteTaskResponseCommand(taskExecutionContext.getTaskInstanceId());
TaskExecuteResponseCommand responseCommand = new TaskExecuteResponseCommand(taskExecutionContext.getTaskInstanceId());
try {
logger.info("script path : {}", taskExecutionContext.getExecutePath());
// task node
......@@ -134,7 +134,7 @@ public class TaskExecuteThread implements Runnable {
responseCommand.setProcessId(task.getProcessId());
responseCommand.setAppIds(task.getAppIds());
} finally {
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), responseCommand.convert2Command());
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册