未验证 提交 c2585a11 编写于 作者: Z zhang 提交者: GitHub

[BUG] master task log can't output repair (#7029)

* update task_definition_log field typ

update task_definition_log  field task_params type,modify text to longtext to be consistent with the main table

* update t_ds_task_definition_log field task_params

update t_ds_task_definition_log field task_params

* [FIX:211128] master task log Can't output repair
Co-authored-by: Jegger_Z's avatarcrane <zhangbq3306@163.com>
上级 be3bd4c8
......@@ -31,6 +31,7 @@ import org.apache.dolphinscheduler.common.task.sqoop.sources.SourceMysqlParamete
import org.apache.dolphinscheduler.common.task.sqoop.targets.TargetMysqlParameter;
import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.LoggerUtils;
import org.apache.dolphinscheduler.common.utils.TaskParametersUtils;
import org.apache.dolphinscheduler.dao.entity.DataSource;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
......@@ -97,6 +98,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
/**
* task timeout process
*
* @return
*/
protected abstract boolean taskTimeout();
......@@ -105,6 +107,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
public void run() {
}
@Override
public boolean action(TaskAction taskAction) {
......@@ -216,11 +219,23 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
.create();
}
/**
* set master task running logger.
*/
public void setTaskExecutionLogger() {
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInstance.getFirstSubmitTime(),
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
}
/**
* set procedure task relation
*
* @param procedureTaskExecutionContext procedureTaskExecutionContext
* @param taskInstance taskInstance
* @param taskInstance taskInstance
*/
private void setProcedureTaskRelation(ProcedureTaskExecutionContext procedureTaskExecutionContext, TaskInstance taskInstance) {
ProcedureParameters procedureParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), ProcedureParameters.class);
......@@ -233,7 +248,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
* set datax task relation
*
* @param dataxTaskExecutionContext dataxTaskExecutionContext
* @param taskInstance taskInstance
* @param taskInstance taskInstance
*/
protected void setDataxTaskRelation(DataxTaskExecutionContext dataxTaskExecutionContext, TaskInstance taskInstance) {
DataxParameters dataxParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), DataxParameters.class);
......@@ -258,7 +273,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
* set sqoop task relation
*
* @param sqoopTaskExecutionContext sqoopTaskExecutionContext
* @param taskInstance taskInstance
* @param taskInstance taskInstance
*/
private void setSqoopTaskRelation(SqoopTaskExecutionContext sqoopTaskExecutionContext, TaskInstance taskInstance) {
SqoopParameters sqoopParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqoopParameters.class);
......@@ -285,11 +300,12 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
}
}
/**
* set SQL task relation
*
* @param sqlTaskExecutionContext sqlTaskExecutionContext
* @param taskInstance taskInstance
* @param taskInstance taskInstance
*/
private void setSQLTaskRelation(SQLTaskExecutionContext sqlTaskExecutionContext, TaskInstance taskInstance) {
SqlParameters sqlParameters = JSONUtils.parseObject(taskInstance.getTaskParams(), SqlParameters.class);
......@@ -301,7 +317,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
// whether udf type
boolean udfTypeFlag = Enums.getIfPresent(UdfType.class, Strings.nullToEmpty(sqlParameters.getType())).isPresent()
&& !StringUtils.isEmpty(sqlParameters.getUdfs());
&& !StringUtils.isEmpty(sqlParameters.getUdfs());
if (udfTypeFlag) {
String[] udfFunIds = sqlParameters.getUdfs().split(",");
......@@ -325,7 +341,7 @@ public abstract class BaseTaskProcessor implements ITaskProcessor {
/**
* whehter tenant is null
*
* @param tenant tenant
* @param tenant tenant
* @param taskInstance taskInstance
* @return result
*/
......
......@@ -57,10 +57,6 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
@Autowired
NettyExecutorManager nettyExecutorManager = SpringApplicationContext.getBean(NettyExecutorManager.class);
/**
* logger of MasterBaseTaskExecThread
*/
protected Logger logger = LoggerFactory.getLogger(getClass());
@Override
public boolean submit(TaskInstance task, ProcessInstance processInstance, int maxRetryTimes, int commitInterval) {
......@@ -70,6 +66,7 @@ public class CommonTaskProcessor extends BaseTaskProcessor {
if (this.taskInstance == null) {
return false;
}
setTaskExecutionLogger();
dispatchTask(taskInstance, processInstance);
return true;
}
......
......@@ -81,12 +81,7 @@ public class ConditionTaskProcessor extends BaseTaskProcessor {
taskInstance.getTaskCode(), taskInstance.getTaskDefinitionVersion()
);
logger = LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
taskInstance.getFirstSubmitTime(),
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
setTaskExecutionLogger();
String threadLoggerInfoName = String.format(Constants.TASK_LOG_INFO_FORMAT, processService.formatTaskAppId(this.taskInstance));
Thread.currentThread().setName(threadLoggerInfoName);
initTaskParameters();
......
......@@ -93,6 +93,7 @@ public class DependentTaskProcessor extends BaseTaskProcessor {
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
setTaskExecutionLogger();
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
......
......@@ -59,7 +59,7 @@ public class SubTaskProcessor extends BaseTaskProcessor {
if (this.taskInstance == null) {
return false;
}
setTaskExecutionLogger();
return true;
}
......
......@@ -75,6 +75,7 @@ public class SwitchTaskProcessor extends BaseTaskProcessor {
processInstance.getProcessDefinitionVersion(),
taskInstance.getProcessInstanceId(),
taskInstance.getId()));
setTaskExecutionLogger();
taskInstance.setHost(NetUtils.getAddr(masterConfig.getListenPort()));
taskInstance.setState(ExecutionStatus.RUNNING_EXECUTION);
taskInstance.setStartTime(new Date());
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册