未验证 提交 3701a24d 编写于 作者: Z zhuxt2015 提交者: GitHub

[Improvement][Task Log] Task status log print description instead of code (#11009)

* use execution status instead of status code
上级 de6e58ec
......@@ -109,7 +109,7 @@ public class TaskDelayEventHandler implements TaskEventHandler {
private void sendAckToWorker(TaskEvent taskEvent) {
// If event handle success, send ack to worker to otherwise the worker will retry this event
TaskExecuteRunningAckMessage taskExecuteRunningAckMessage =
new TaskExecuteRunningAckMessage(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
new TaskExecuteRunningAckMessage(ExecutionStatus.SUCCESS, taskEvent.getTaskInstanceId());
taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckMessage.convert2Command());
}
......
......@@ -69,7 +69,7 @@ public class TaskRejectByWorkerEventHandler implements TaskEventHandler {
}
public void sendAckToWorker(TaskEvent taskEvent) {
TaskRejectAckCommand taskRejectAckMessage = new TaskRejectAckCommand(ExecutionStatus.SUCCESS.getCode(),
TaskRejectAckCommand taskRejectAckMessage = new TaskRejectAckCommand(ExecutionStatus.SUCCESS,
taskEvent.getTaskInstanceId(),
masterConfig.getMasterAddress(),
taskEvent.getWorkerAddress(),
......
......@@ -110,7 +110,7 @@ public class TaskResultEventHandler implements TaskEventHandler {
public void sendAckToWorker(TaskEvent taskEvent) {
// we didn't set the receiver address, since the ack doen's need to retry
TaskExecuteAckCommand taskExecuteAckMessage = new TaskExecuteAckCommand(ExecutionStatus.SUCCESS.getCode(),
TaskExecuteAckCommand taskExecuteAckMessage = new TaskExecuteAckCommand(ExecutionStatus.SUCCESS,
taskEvent.getTaskInstanceId(),
masterConfig.getMasterAddress(),
taskEvent.getWorkerAddress(),
......
......@@ -107,7 +107,7 @@ public class TaskRunningEventHandler implements TaskEventHandler {
private void sendAckToWorker(TaskEvent taskEvent) {
// If event handle success, send ack to worker to otherwise the worker will retry this event
TaskExecuteRunningAckMessage taskExecuteRunningAckMessage =
new TaskExecuteRunningAckMessage(ExecutionStatus.SUCCESS.getCode(), taskEvent.getTaskInstanceId());
new TaskExecuteRunningAckMessage(ExecutionStatus.SUCCESS, taskEvent.getTaskInstanceId());
taskEvent.getChannel().writeAndFlush(taskExecuteRunningAckMessage.convert2Command());
}
......
......@@ -136,7 +136,7 @@ public class StateEventResponseService {
private void writeResponse(StateEvent stateEvent, ExecutionStatus status) {
Channel channel = stateEvent.getChannel();
if (channel != null) {
StateEventResponseCommand command = new StateEventResponseCommand(status.getCode(), stateEvent.getKey());
StateEventResponseCommand command = new StateEventResponseCommand(status, stateEvent.getKey());
channel.writeAndFlush(command.convert2Command());
}
}
......
......@@ -109,7 +109,7 @@ public class TaskEvent {
TaskEvent event = new TaskEvent();
event.setProcessInstanceId(command.getProcessInstanceId());
event.setTaskInstanceId(command.getTaskInstanceId());
event.setState(ExecutionStatus.of(command.getStatus()));
event.setState(command.getStatus());
event.setStartTime(command.getStartTime());
event.setExecutePath(command.getExecutePath());
event.setLogPath(command.getLogPath());
......
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.TaskExecuteRunningCommand;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import org.apache.dolphinscheduler.server.master.processor.queue.TaskEventService;
......@@ -66,7 +67,7 @@ public class TaskAckProcessorTest {
taskExecuteRunningMessage = new TaskExecuteRunningCommand("127.0.0.1:5678",
" 127.0.0.1:1234",
System.currentTimeMillis());
taskExecuteRunningMessage.setStatus(1);
taskExecuteRunningMessage.setStatus(ExecutionStatus.RUNNING_EXECUTION);
taskExecuteRunningMessage.setExecutePath("/dolphinscheduler/worker");
taskExecuteRunningMessage.setHost("localhost");
taskExecuteRunningMessage.setLogPath("/temp/worker.log");
......
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.server.master.processor;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.CommandType;
import org.apache.dolphinscheduler.remote.command.TaskKillResponseCommand;
......@@ -50,7 +51,7 @@ public class TaskKillResponseProcessorTest {
new ArrayList<String>() {{ add("task_1"); }});
taskKillResponseCommand.setHost("localhost");
taskKillResponseCommand.setProcessId(1);
taskKillResponseCommand.setStatus(1);
taskKillResponseCommand.setStatus(ExecutionStatus.RUNNING_EXECUTION);
taskKillResponseCommand.setTaskInstanceId(1);
}
......
......@@ -78,7 +78,7 @@ public class TaskResponseServiceTest {
System.currentTimeMillis());
taskExecuteRunningMessage.setProcessId(1);
taskExecuteRunningMessage.setTaskInstanceId(22);
taskExecuteRunningMessage.setStatus(ExecutionStatus.RUNNING_EXECUTION.getCode());
taskExecuteRunningMessage.setStatus(ExecutionStatus.RUNNING_EXECUTION);
taskExecuteRunningMessage.setExecutePath("path");
taskExecuteRunningMessage.setLogPath("logPath");
taskExecuteRunningMessage.setHost("127.*.*.*");
......
......@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import java.io.Serializable;
......@@ -27,22 +28,22 @@ import java.io.Serializable;
public class StateEventResponseCommand implements Serializable {
private String key;
private int status;
private ExecutionStatus status;
public StateEventResponseCommand() {
super();
}
public StateEventResponseCommand(int status, String key) {
public StateEventResponseCommand(ExecutionStatus status, String key) {
this.status = status;
this.key = key;
}
public int getStatus() {
public ExecutionStatus getStatus() {
return status;
}
public void setStatus(int status) {
public void setStatus(ExecutionStatus status) {
this.status = status;
}
......
......@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import lombok.Data;
import lombok.EqualsAndHashCode;
......@@ -35,9 +36,9 @@ import lombok.ToString;
public class TaskExecuteAckCommand extends BaseCommand {
private int taskInstanceId;
private int status;
private ExecutionStatus status;
public TaskExecuteAckCommand(int status,
public TaskExecuteAckCommand(ExecutionStatus status,
int taskInstanceId,
String sourceServerAddress,
String messageReceiverAddress,
......
......@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import java.io.Serializable;
......@@ -28,13 +29,13 @@ import java.io.Serializable;
public class TaskExecuteRunningAckMessage implements Serializable {
private int taskInstanceId;
private int status;
private ExecutionStatus status;
public TaskExecuteRunningAckMessage() {
super();
}
public TaskExecuteRunningAckMessage(int status, int taskInstanceId) {
public TaskExecuteRunningAckMessage(ExecutionStatus status, int taskInstanceId) {
this.status = status;
this.taskInstanceId = taskInstanceId;
}
......@@ -47,11 +48,11 @@ public class TaskExecuteRunningAckMessage implements Serializable {
this.taskInstanceId = taskInstanceId;
}
public int getStatus() {
public ExecutionStatus getStatus() {
return status;
}
public void setStatus(int status) {
public void setStatus(ExecutionStatus status) {
this.status = status;
}
......
......@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import java.util.Date;
......@@ -58,7 +59,7 @@ public class TaskExecuteRunningCommand extends BaseCommand {
/**
* status
*/
private int status;
private ExecutionStatus status;
/**
* logPath
......
......@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import java.io.Serializable;
import java.util.List;
......@@ -40,7 +41,7 @@ public class TaskKillResponseCommand implements Serializable {
/**
* status
*/
private int status;
private ExecutionStatus status;
/**
......@@ -69,11 +70,11 @@ public class TaskKillResponseCommand implements Serializable {
this.host = host;
}
public int getStatus() {
public ExecutionStatus getStatus() {
return status;
}
public void setStatus(int status) {
public void setStatus(ExecutionStatus status) {
this.status = status;
}
......@@ -111,7 +112,7 @@ public class TaskKillResponseCommand implements Serializable {
return "TaskKillResponseCommand{"
+ "taskInstanceId=" + taskInstanceId
+ ", host='" + host + '\''
+ ", status=" + status
+ ", status=" + status.getDescp()
+ ", processId=" + processId
+ ", appIds=" + appIds
+ '}';
......
......@@ -18,6 +18,7 @@
package org.apache.dolphinscheduler.remote.command;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
import lombok.Data;
import lombok.EqualsAndHashCode;
......@@ -31,9 +32,9 @@ import lombok.ToString;
public class TaskRejectAckCommand extends BaseCommand {
private int taskInstanceId;
private int status;
private ExecutionStatus status;
public TaskRejectAckCommand(int status,
public TaskRejectAckCommand(ExecutionStatus status,
int taskInstanceId,
String messageSenderAddress,
String messageReceiverAddress,
......
......@@ -52,7 +52,7 @@ public class TaskExecuteRunningMessageSender implements MessageSender<TaskExecut
System.currentTimeMillis());
taskExecuteRunningMessage.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskExecuteRunningMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
taskExecuteRunningMessage.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
taskExecuteRunningMessage.setStatus(taskExecutionContext.getCurrentExecutionStatus());
taskExecuteRunningMessage.setLogPath(taskExecutionContext.getLogPath());
taskExecuteRunningMessage.setHost(taskExecutionContext.getHost());
taskExecuteRunningMessage.setStartTime(taskExecutionContext.getStartTime());
......
......@@ -62,17 +62,17 @@ public class TaskExecuteResultAckProcessor implements NettyRequestProcessor {
try {
LoggerUtils.setTaskInstanceIdMDC(taskExecuteAckMessage.getTaskInstanceId());
if (taskExecuteAckMessage.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
if (taskExecuteAckMessage.getStatus() == ExecutionStatus.SUCCESS) {
messageRetryRunner.removeRetryMessage(taskExecuteAckMessage.getTaskInstanceId(),
CommandType.TASK_EXECUTE_RESULT);
logger.debug("remove REMOTE_CHANNELS, task instance id:{}", taskExecuteAckMessage.getTaskInstanceId());
} else if (taskExecuteAckMessage.getStatus() == ExecutionStatus.FAILURE.getCode()) {
} else if (taskExecuteAckMessage.getStatus() == ExecutionStatus.FAILURE) {
// master handle worker response error, will still retry
logger.error("Receive task execute result ack message, the message status is not success, message: {}",
taskExecuteAckMessage);
taskExecuteAckMessage);
} else {
throw new IllegalArgumentException("Invalid task execute response ack status: "
+ taskExecuteAckMessage.getStatus());
+ taskExecuteAckMessage.getStatus());
}
} finally {
LoggerUtils.removeTaskInstanceIdMDC();
......
......@@ -61,7 +61,7 @@ public class TaskExecuteRunningAckProcessor implements NettyRequestProcessor {
LoggerUtils.setTaskInstanceIdMDC(runningAckCommand.getTaskInstanceId());
logger.info("task execute running ack command : {}", runningAckCommand);
if (runningAckCommand.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
if (runningAckCommand.getStatus() == ExecutionStatus.SUCCESS) {
messageRetryRunner.removeRetryMessage(runningAckCommand.getTaskInstanceId(),
CommandType.TASK_EXECUTE_RUNNING);
}
......
......@@ -122,7 +122,7 @@ public class TaskKillProcessor implements NettyRequestProcessor {
private void sendTaskKillResponseCommand(Channel channel, TaskExecutionContext taskExecutionContext) {
TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus());
taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
taskKillResponseCommand.setHost(taskExecutionContext.getHost());
......
......@@ -55,7 +55,7 @@ public class TaskRejectAckProcessor implements NettyRequestProcessor {
}
try {
LoggerUtils.setTaskInstanceIdMDC(taskRejectAckMessage.getTaskInstanceId());
if (taskRejectAckMessage.getStatus() == ExecutionStatus.SUCCESS.getCode()) {
if (taskRejectAckMessage.getStatus() == ExecutionStatus.SUCCESS) {
messageRetryRunner.removeRetryMessage(taskRejectAckMessage.getTaskInstanceId(),
CommandType.TASK_REJECT);
logger.debug("removeRecallCache: task instance id:{}", taskRejectAckMessage.getTaskInstanceId());
......
......@@ -207,10 +207,10 @@ public class TaskExecuteThread implements Runnable, Delayed {
// task result process
if (this.task.getNeedAlert()) {
sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus().getCode());
sendAlert(this.task.getTaskAlertInfo(), this.task.getExitStatus());
}
taskExecutionContext.setCurrentExecutionStatus(ExecutionStatus.of(this.task.getExitStatus().getCode()));
taskExecutionContext.setCurrentExecutionStatus(this.task.getExitStatus());
taskExecutionContext.setEndTime(DateUtils.getCurrentDate());
taskExecutionContext.setProcessId(this.task.getProcessId());
taskExecutionContext.setAppIds(this.task.getAppIds());
......@@ -233,8 +233,8 @@ public class TaskExecuteThread implements Runnable, Delayed {
}
}
private void sendAlert(TaskAlertInfo taskAlertInfo, int status) {
int strategy = status == ExecutionStatus.SUCCESS.getCode() ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode();
private void sendAlert(TaskAlertInfo taskAlertInfo, ExecutionStatus status) {
int strategy = status == ExecutionStatus.SUCCESS ? WarningType.SUCCESS.getCode() : WarningType.FAILURE.getCode();
alertClientService.sendAlert(taskAlertInfo.getAlertGroupId(), taskAlertInfo.getTitle(), taskAlertInfo.getContent(), strategy);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册