未验证 提交 46a5d17b 编写于 作者: W wind 提交者: GitHub

[cherry-pick-2.0.3][Improvement][TaskLog] Unified task log #7831 (#7926)

* [cherry-pick][Improvement][TaskLog] Unified task log #7831

* [cherry-pick][Improvement][TaskLog] Unified task log #7831

* fix thread name
Co-authored-by: Ncaishunfeng <534328519@qq.com>
上级 8dc529d3
......@@ -359,7 +359,7 @@ public class WorkflowExecuteThread implements Runnable {
taskFinished(task);
} else if (activeTaskProcessorMaps.containsKey(stateEvent.getTaskInstanceId())) {
ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(stateEvent.getTaskInstanceId());
iTaskProcessor.run();
iTaskProcessor.action(TaskAction.RUN);
if (iTaskProcessor.taskState().typeIsFinished()) {
task = processService.findTaskInstanceById(stateEvent.getTaskInstanceId());
......@@ -630,11 +630,12 @@ public class WorkflowExecuteThread implements Runnable {
&& taskProcessor.getType().equalsIgnoreCase(Constants.COMMON_TASK_TYPE)) {
notifyProcessHostUpdate(taskInstance);
}
boolean submit = taskProcessor.submit(taskInstance, processInstance, masterConfig.getMasterTaskCommitRetryTimes(), masterConfig.getMasterTaskCommitInterval());
taskProcessor.init(taskInstance, processInstance);
boolean submit = taskProcessor.action(TaskAction.SUBMIT);
if (submit) {
this.taskInstanceHashMap.put(taskInstance.getId(), taskInstance.getTaskCode(), taskInstance);
activeTaskProcessorMaps.put(taskInstance.getId(), taskProcessor);
taskProcessor.run();
taskProcessor.action(TaskAction.RUN);
addTimeoutCheck(taskInstance);
addRetryCheck(taskInstance);
TaskDefinition taskDefinition = processService.findTaskDefinition(
......
......@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParamete
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
......@@ -39,6 +40,7 @@ import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.UdfFunc;
import org.apache.dolphinscheduler.server.builder.TaskExecutionContextBuilder;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.queue.entity.TaskExecutionContext;
......@@ -68,7 +70,7 @@ import com.google.common.base.Strings;
public abstract class BaseTaskProcessor implements ITaskProcessor {
protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME, getClass()));
protected final Logger logger = LoggerFactory.getLogger(String.format(TaskConstants.TASK_LOG_LOGGER_NAME_FORMAT, getClass()));
protected boolean killed = false;
......@@ -78,10 +80,32 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
protected TaskInstance taskInstance = null;
protected ProcessInstance processInstance;
protected ProcessInstance processInstance = null;
protected int maxRetryTimes;
protected int commitInterval;
protected ProcessService processService = SpringApplicationContext.getBean(ProcessService.class);
protected MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
protected String threadLoggerInfoName;
@Override
public void init(TaskInstance taskInstance, ProcessInstance processInstance) {
if (processService == null) {
processService = SpringApplicationContext.getBean(ProcessService.class);
}
if (masterConfig == null) {
masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
}
this.taskInstance = taskInstance;
this.processInstance = processInstance;
this.maxRetryTimes = masterConfig.getMasterTaskCommitRetryTimes();
this.commitInterval = masterConfig.getMasterTaskCommitInterval();
}
/**
* persist task
*
......@@ -91,21 +115,16 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
/**
* pause task, common tasks donot need this.
*
* @return
*/
protected abstract boolean pauseTask();
/**
* kill task, all tasks need to realize this function
*
* @return
*/
protected abstract boolean killTask();
/**
* task timeout process
* @return
*/
protected abstract boolean taskTimeout();
......@@ -119,12 +138,22 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return persistTask(taskAction);
}
@Override
public void run() {
}
/*
* submit task
*/
protected abstract boolean submitTask();
/**
* run task
*/
protected abstract boolean runTask();
@Override
public boolean action(TaskAction taskAction) {
String threadName = Thread.currentThread().getName();
if (StringUtils.isNotEmpty(threadLoggerInfoName)) {
Thread.currentThread().setName(String.format(TaskConstants.TASK_LOGGER_THREAD_NAME_FORMAT, threadLoggerInfoName));
}
switch (taskAction) {
case STOP:
......@@ -133,13 +162,27 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return pause();
case TIMEOUT:
return timeout();
case SUBMIT:
return submit();
case RUN:
return run();
default:
logger.error("unknown task action: {}", taskAction.toString());
}
// reset thread name
Thread.currentThread().setName(threadName);
return false;
}
protected boolean submit() {
return submitTask();
}
protected boolean run() {
return runTask();
}
protected boolean timeout() {
if (timeout) {
return true;
......@@ -148,9 +191,6 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return timeout;
}
/**
* @return
*/
protected boolean pause() {
if (paused) {
return true;
......@@ -172,6 +212,18 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
return null;
}
/**
* set master task running logger.
*/
public void setTaskExecutionLogger() {
threadLoggerInfoName = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId());
Thread.currentThread().setName(String.format(TaskConstants.TASK_LOGGER_THREAD_NAME_FORMAT, threadLoggerInfoName));
}
/**
* get TaskExecutionContext
*
......@@ -321,7 +373,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
// whether udf type
boolean udfTypeFlag = Enums.getIfPresent(UdfType.class, Strings.nullToEmpty(sqlParameters.getType())).isPresent()
&& !StringUtils.isEmpty(sqlParameters.getUdfs());
&& !StringUtils.isEmpty(sqlParameters.getUdfs());
if (udfTypeFlag) {
String[] udfFunIds = sqlParameters.getUdfs().split(",");
......
......@@ -24,7 +24,6 @@ import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.TaskKillRequestCommand;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.master.dispatch.context.ExecutionContext;
import org.apache.dolphinscheduler.server.master.dispatch.enums.ExecutorType;
import org.apache.dolphinscheduler.server.master.dispatch.exceptions.ExecuteException;
......@@ -39,8 +38,6 @@ import org.apache.commons.lang.StringUtils;
import java.util.Date;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
/**
......@@ -51,25 +48,17 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
@Autowired
private TaskPriorityQueue taskUpdateQueue;
@Autowired
MasterConfig masterConfig;
@Autowired
NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
/**
* logger of MasterBaseTaskExecThread
*/
protected Logger logger = LoggerFactory.getLogger(getClass());
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval) {
this.processInstance = processInstance;
this.taskInstance = processService.submitTask(task, maxRetryTimes, commitInterval);
public boolean submitTask() {
this.taskInstance = processService.submitTask(taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
}
setTaskExecutionLogger();
return dispatchTask(taskInstance, processInstance);
}
......@@ -79,7 +68,8 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
}
@Override
public void run() {
public boolean runTask() {
return true;
}
@Override
......
......@@ -26,15 +26,10 @@ import org.apache.dolphinscheduler.common.model.DependentItem;
import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.spi.task.TaskConstants;
import java.util.ArrayList;
import java.util.Date;
......@@ -42,23 +37,16 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* condition task processor
*/
public class ConditionTaskProcessor extends BaseTaskProcessor {
protected static final Logger logger = LoggerFactory.getLogger(TaskConstants.TASK_LOG_LOGGER_NAME);
/**
* dependent parameters
*/
private DependentParameters dependentParameters;
ProcessInstance processInstance;
/**
* condition result
*/
......@@ -69,14 +57,11 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
*/
private Map<Long, ExecutionStatus> completeTaskList = new ConcurrentHashMap<>();
MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
private TaskDefinition taskDefinition;
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
this.processInstance = processInstance;
this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
public boolean submitTask() {
this.taskInstance = processService.submitTask(taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
......@@ -85,12 +70,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
);
String threadLoggerInfoName = LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId());
Thread.currentThread().setName(String.format(TaskConstants.TASK_LOGGER_THREAD_NAME_FORMAT,threadLoggerInfoName));
setTaskExecutionLogger();
initTaskParameters();
logger.info("dependent task start");
return true;
......@@ -102,13 +82,14 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
}
@Override
public void run() {
public boolean runTask() {
if (conditionResult.equals(DependResult.WAITING)) {
setConditionResult();
endTask();
} else {
endTask();
}
return true;
}
@Override
......
......@@ -27,13 +27,9 @@ import org.apache.dolphinscheduler.common.model.DependentTaskModel;
import org.apache.dolphinscheduler.common.task.dependent.DependentParameters;
import org.apache.dolphinscheduler.common.utils.DependentUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.DependentExecute;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.ArrayList;
import java.util.Date;
......@@ -69,18 +65,13 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
DependResult result;
ProcessInstance processInstance;
TaskDefinition taskDefinition;
MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
boolean allDependentItemFinished;
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
this.processInstance = processInstance;
this.taskInstance = task;
this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
public boolean submitTask() {
this.taskInstance = processService.submitTask(taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
......@@ -88,6 +79,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
);
setTaskExecutionLogger();
taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
......@@ -106,7 +98,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
}
@Override
public void run() {
public boolean runTask() {
if (!allDependentItemFinished) {
allDependentItemFinished = allDependentTaskFinish();
}
......@@ -114,6 +106,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
getTaskDependResult();
endTask();
}
return true;
}
@Override
......
......@@ -28,14 +28,12 @@ public interface ITaskProcessor {
boolean persist(TaskAction taskAction);
void run();
void init(TaskInstance taskInstance, ProcessInstance processInstance);
boolean action(TaskAction taskAction);
String getType();
boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval);
ExecutionStatus taskState();
}
......@@ -22,9 +22,9 @@ import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.remote.command.StateEventChangeCommand;
import org.apache.dolphinscheduler.remote.processor.StateEventCallbackService;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import java.util.Date;
......@@ -36,8 +36,6 @@ import java.util.concurrent.locks.ReentrantLock;
*/
public class SubTaskProcessor extends BaseTaskProcessor {
private ProcessInstance processInstance;
private ProcessInstance subProcessInstance = null;
private TaskDefinition taskDefinition;
......@@ -49,17 +47,22 @@ public class SubTaskProcessor extends BaseTaskProcessor {
private StateEventCallbackService stateEventCallbackService = SpringApplicationContext.getBean(StateEventCallbackService.class);
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
this.processInstance = processInstance;
public boolean submitTask() {
taskDefinition = processService.findTaskDefinition(
task.getTaskCode(), task.getTaskDefinitionVersion()
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
);
this.taskInstance = processService.submitTask(task, masterTaskCommitRetryTimes, masterTaskCommitInterval);
this.taskInstance = processService.submitTask(taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
}
setTaskExecutionLogger();
taskInstance.setLogPath(LogUtils.getTaskLogPath(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
return true;
}
......@@ -69,7 +72,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
}
@Override
public void run() {
public boolean runTask() {
try {
this.runLock.lock();
if (setSubWorkFlow()) {
......@@ -83,6 +86,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
} finally {
this.runLock.unlock();
}
return true;
}
@Override
......
......@@ -25,13 +25,10 @@ import org.apache.dolphinscheduler.common.task.switchtask.SwitchParameters;
import org.apache.dolphinscheduler.common.task.switchtask.SwitchResultVo;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.NetUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
import org.apache.dolphinscheduler.server.utils.LogUtils;
import org.apache.dolphinscheduler.server.utils.SwitchTaskUtils;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
......@@ -48,27 +45,22 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
protected final String rgex = "['\"]*\\$\\{(.*?)\\}['\"]*";
private TaskInstance taskInstance;
private ProcessInstance processInstance;
TaskDefinition taskDefinition;
MasterConfig masterConfig = SpringApplicationContext.getBean(MasterConfig.class);
/**
* switch result
*/
private DependResult conditionResult;
@Override
public boolean submit(TaskInstance taskInstance, ProcessInstance processInstance, int masterTaskCommitRetryTimes, int masterTaskCommitInterval) {
this.processInstance = processInstance;
this.taskInstance = processService.submitTask(taskInstance, masterTaskCommitRetryTimes, masterTaskCommitInterval);
public boolean submitTask() {
this.taskInstance = processService.submitTask(taskInstance, maxRetryTimes, commitInterval);
if (this.taskInstance == null) {
return false;
}
setTaskExecutionLogger();
taskDefinition = processService.findTaskDefinition(
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
);
......@@ -84,7 +76,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
}
@Override
public void run() {
public boolean runTask() {
try {
if (!this.taskState().typeIsFinished() && setSwitchResult()) {
endTaskState();
......@@ -95,6 +87,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
this.taskInstance.getId(),
e);
}
return true;
}
@Override
......
......@@ -23,5 +23,7 @@ package org.apache.dolphinscheduler.server.master.runner.task;
public enum TaskAction {
PAUSE,
STOP,
TIMEOUT
TIMEOUT,
SUBMIT,
RUN
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册