未验证 提交 d7df8999 编写于 作者: M Mr.An 提交者: GitHub

[Fix-11003]Task group queue is not updated to final state (#11004)

* fix after the task group is forced to start the task. Task group status has not changed
上级 a5b3144e
......@@ -17,6 +17,7 @@
package org.apache.dolphinscheduler.service.process;
import static java.util.stream.Collectors.toSet;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_END_DATE;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST;
import static org.apache.dolphinscheduler.common.Constants.CMDPARAM_COMPLEMENT_DATA_START_DATE;
......@@ -31,8 +32,6 @@ import static org.apache.dolphinscheduler.plugin.task.api.enums.DataType.VARCHAR
import static org.apache.dolphinscheduler.plugin.task.api.enums.Direct.IN;
import static org.apache.dolphinscheduler.plugin.task.api.utils.DataQualityConstants.TASK_INSTANCE_ID;
import static java.util.stream.Collectors.toSet;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.CommandType;
......@@ -322,14 +321,14 @@ public class ProcessServiceImpl implements ProcessService {
//when we get the running instance(or waiting instance) only get the priority instance(by id)
if (processDefinition.getExecutionType().typeIsSerialWait()) {
List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
saveProcessInstance(processInstance);
}
} else if (processDefinition.getExecutionType().typeIsSerialDiscard()) {
List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isNotEmpty(runningProcessInstances)) {
processInstance.setState(ExecutionStatus.STOP);
saveProcessInstance(processInstance);
......@@ -339,7 +338,7 @@ public class ProcessServiceImpl implements ProcessService {
saveProcessInstance(processInstance);
} else if (processDefinition.getExecutionType().typeIsSerialPriority()) {
List<ProcessInstance> runningProcessInstances = this.processInstanceMapper.queryByProcessDefineCodeAndProcessDefinitionVersionAndStatusAndNextId(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
processInstance.getProcessDefinitionVersion(), Constants.RUNNING_PROCESS_STATE, processInstance.getId());
if (CollectionUtils.isEmpty(runningProcessInstances)) {
processInstance.setState(ExecutionStatus.SUBMITTED_SUCCESS);
saveProcessInstance(processInstance);
......@@ -356,13 +355,13 @@ public class ProcessServiceImpl implements ProcessService {
// determine whether the process is normal
if (update > 0) {
StateEventChangeCommand stateEventChangeCommand = new StateEventChangeCommand(
info.getId(), 0, info.getState(), info.getId(), 0
info.getId(), 0, info.getState(), info.getId(), 0
);
try {
Host host = new Host(info.getHost());
stateEventCallbackService.sendResult(host, stateEventChangeCommand.convert2Command());
} catch (Exception e) {
logger.error("sendResultError",e );
logger.error("sendResultError", e);
}
}
}
......@@ -798,10 +797,10 @@ public class ProcessServiceImpl implements ProcessService {
}
String globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
getCommandTypeIfComplement(processInstance, command),
processInstance.getScheduleTime(), timezoneId);
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
getCommandTypeIfComplement(processInstance, command),
processInstance.getScheduleTime(), timezoneId);
processInstance.setGlobalParams(globalParams);
// set process instance priority
......@@ -952,10 +951,10 @@ public class ProcessServiceImpl implements ProcessService {
// Recalculate global parameters after rerun.
String globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
commandTypeIfComplement,
processInstance.getScheduleTime(), timezoneId);
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
commandTypeIfComplement,
processInstance.getScheduleTime(), timezoneId);
processInstance.setGlobalParams(globalParams);
processInstance.setProcessDefinition(processDefinition);
}
......@@ -1146,9 +1145,9 @@ public class ProcessServiceImpl implements ProcessService {
String timezoneId = cmdParam.get(Constants.SCHEDULE_TIMEZONE);
String globalParams = curingGlobalParamsService.curingGlobalParams(processInstance.getId(),
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), timezoneId);
processDefinition.getGlobalParamMap(),
processDefinition.getGlobalParamList(),
CommandType.COMPLEMENT_DATA, processInstance.getScheduleTime(), timezoneId);
processInstance.setGlobalParams(globalParams);
}
......@@ -1310,9 +1309,9 @@ public class ProcessServiceImpl implements ProcessService {
TaskInstance task = submitTaskInstanceToDB(taskInstance, processInstance);
if (task == null) {
logger.error("Save taskInstance to db error, task name:{}, process id:{} state: {} ",
taskInstance.getName(),
taskInstance.getProcessInstance().getId(),
processInstance.getState());
taskInstance.getName(),
taskInstance.getProcessInstance().getId(),
processInstance.getState());
return null;
}
......@@ -1559,9 +1558,9 @@ public class ProcessServiceImpl implements ProcessService {
ExecutionStatus processInstanceState = processInstance.getState();
if (processInstanceState.typeIsFinished() || processInstanceState == ExecutionStatus.READY_STOP) {
logger.warn("processInstance: {} state was: {}, skip submit this task, taskCode: {}",
processInstance.getId(),
processInstanceState,
taskInstance.getTaskCode());
processInstance.getId(),
processInstanceState,
taskInstance.getTaskCode());
return null;
}
if (processInstanceState == ExecutionStatus.READY_PAUSE) {
......@@ -2989,7 +2988,7 @@ public class ProcessServiceImpl implements ProcessService {
return null;
}
try {
while (taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), taskGroup.getUseSize()
while (thisTaskGroupQueue.getForceStart() == Flag.NO.getCode() && taskGroupMapper.releaseTaskGroupResource(taskGroup.getId(), taskGroup.getUseSize()
, thisTaskGroupQueue.getId(), TaskGroupQueueStatus.ACQUIRE_SUCCESS.getCode()) != 1) {
thisTaskGroupQueue = this.taskGroupQueueMapper.queryByTaskId(taskInstance.getId());
if (thisTaskGroupQueue.getStatus() == TaskGroupQueueStatus.RELEASE) {
......@@ -3089,6 +3088,7 @@ public class ProcessServiceImpl implements ProcessService {
throw new ServiceException("delete command fail, id:" + commandId);
}
}
/**
* find k8s config yaml by clusterName
*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册