未验证 提交 acf9c84c 编写于 作者: J JinYong Li 提交者: GitHub

[Feature][workflow list edit] add task update with upstream (#7829)

* add task save and  binds workflow

* add task update with upstream
上级 d2794beb
......@@ -152,6 +152,36 @@ public class TaskDefinitionController extends BaseController {
return returnDataList(result);
}
/**
* update task definition
*
* @param loginUser login user
* @param projectCode project code
* @param code task definition code
* @param taskDefinitionJsonObj task definition json object
* @param upstreamCodes upstream task codes, sep comma
* @return update result code
*/
@ApiOperation(value = "updateWithUpstream", notes = "UPDATE_TASK_DEFINITION_NOTES")
@ApiImplicitParams({
@ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = true, type = "Long"),
@ApiImplicitParam(name = "code", value = "TASK_DEFINITION_CODE", required = true, dataType = "Long", example = "1"),
@ApiImplicitParam(name = "taskDefinitionJsonObj", value = "TASK_DEFINITION_JSON", required = true, type = "String"),
@ApiImplicitParam(name = "upstreamCodes", value = "UPSTREAM_CODES", required = false, type = "String")
})
@PutMapping(value = "/{code}/with-upstream")
@ResponseStatus(HttpStatus.OK)
@ApiException(UPDATE_TASK_DEFINITION_ERROR)
@AccessLogAnnotation(ignoreRequestArgs = "loginUser")
public Result updateTaskWithUpstream(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable(value = "code") long code,
@RequestParam(value = "taskDefinitionJsonObj", required = true) String taskDefinitionJsonObj,
@RequestParam(value = "upstreamCodes", required = false) String upstreamCodes) {
Map<String, Object> result = taskDefinitionService.updateTaskWithUpstream(loginUser, projectCode, code, taskDefinitionJsonObj, upstreamCodes);
return returnDataList(result);
}
/**
* query task definition version paging list info
*
......@@ -300,7 +330,7 @@ public class TaskDefinitionController extends BaseController {
@ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE", required = false, type = "Long"),
@ApiImplicitParam(name = "searchWorkflowName", value = "SEARCH_WORKFLOW_NAME", required = false, type = "String"),
@ApiImplicitParam(name = "searchTaskName", value = "SEARCH_TASK_NAME", required = false, type = "String"),
@ApiImplicitParam(name = "taskType", value = "TASK_TYPE", required = true, dataType = "TaskType", example = "SHELL"),
@ApiImplicitParam(name = "taskType", value = "TASK_TYPE", required = false, dataType = "TaskType", example = "SHELL"),
@ApiImplicitParam(name = "pageNo", value = "PAGE_NO", required = true, dataType = "Int", example = "1"),
@ApiImplicitParam(name = "pageSize", value = "PAGE_SIZE", required = true, dataType = "Int", example = "10")
})
......@@ -312,7 +342,7 @@ public class TaskDefinitionController extends BaseController {
@ApiParam(name = "projectCode", value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "searchWorkflowName", required = false) String searchWorkflowName,
@RequestParam(value = "searchTaskName", required = false) String searchTaskName,
@RequestParam(value = "taskType", required = true) TaskType taskType,
@RequestParam(value = "taskType", required = false) TaskType taskType,
@RequestParam("pageNo") Integer pageNo,
@RequestParam("pageSize") Integer pageSize) {
Result result = checkPageParams(pageNo, pageSize);
......
......@@ -258,7 +258,7 @@ public enum Status {
DATA_IS_NULL(50018, "data {0} is null", "数据[{0}]不能为空"),
PROCESS_NODE_HAS_CYCLE(50019, "process node has cycle", "流程节点间存在循环依赖"),
PROCESS_NODE_S_PARAMETER_INVALID(50020, "process node {0} parameter invalid", "流程节点[{0}]参数无效"),
PROCESS_DEFINE_STATE_ONLINE(50021, "process definition {0} is already on line", "工作流定义[{0}]已上线"),
PROCESS_DEFINE_STATE_ONLINE(50021, "process definition [{0}] is already on line", "工作流定义[{0}]已上线"),
DELETE_PROCESS_DEFINE_BY_CODE_ERROR(50022, "delete process definition by code error", "删除工作流定义错误"),
SCHEDULE_CRON_STATE_ONLINE(50023, "the status of schedule {0} is already on line", "调度配置[{0}]已上线"),
DELETE_SCHEDULE_CRON_BY_ID_ERROR(50024, "delete schedule by id error", "删除调度配置错误"),
......
......@@ -91,6 +91,22 @@ public interface TaskDefinitionService {
long taskCode,
String taskDefinitionJsonObj);
/**
* update task definition and upstream
*
* @param loginUser login user
* @param projectCode project code
* @param taskCode task definition code
* @param taskDefinitionJsonObj task definition json object
* @param upstreamCodes upstream task codes, sep comma
* @return update result code
*/
Map<String, Object> updateTaskWithUpstream(User loginUser,
long projectCode,
long taskCode,
String taskDefinitionJsonObj,
String upstreamCodes);
/**
* update task definition
*
......
......@@ -26,6 +26,7 @@ import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.ConditionType;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.TaskType;
......@@ -173,10 +174,14 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
return result;
}
ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(processDefinitionCode);
if (processDefinition == null) {
if (processDefinition == null || projectCode != processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefinitionCode);
return result;
}
if (processDefinition.getReleaseState() == ReleaseState.ONLINE) {
putMsg(result, Status.PROCESS_DEFINE_STATE_ONLINE, processDefinitionCode);
return result;
}
TaskDefinitionLog taskDefinition = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
if (taskDefinition == null) {
logger.error("taskDefinitionJsonObj is not valid json");
......@@ -216,14 +221,17 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
processTaskRelationLog.setPreTaskVersion(upstreamTask.getVersion());
processTaskRelationLog.setPostTaskCode(taskCode);
processTaskRelationLog.setPostTaskVersion(Constants.VERSION_FIRST);
processTaskRelationLog.setConditionType(ConditionType.NONE);
processTaskRelationLog.setConditionParams("{}");
processTaskRelationLogList.add(processTaskRelationLog);
}
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode);
if (!processTaskRelationList.isEmpty()) {
processTaskRelationLogList.addAll(processTaskRelationList.stream().map(ProcessTaskRelationLog::new).collect(Collectors.toList()));
}
int insertResult = processService.saveTaskRelation(loginUser, projectCode, processDefinition.getCode(), processDefinition.getVersion(),
processTaskRelationLogList, null);
if (insertResult == Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, processDefinition);
} else {
if (insertResult != Constants.EXIT_CODE_SUCCESS) {
putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
}
......@@ -233,6 +241,8 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg(result, Status.CREATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.CREATE_TASK_DEFINITION_ERROR);
}
putMsg(result, Status.SUCCESS);
result.put(Constants.DATA_LIST, taskDefinition);
return result;
}
......@@ -286,7 +296,7 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
return result;
}
if (taskDefinition.getFlag() == Flag.YES) {
if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) {
putMsg(result, Status.TASK_DEFINE_STATE_ONLINE, taskCode);
return result;
}
......@@ -332,36 +342,81 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
@Transactional(rollbackFor = RuntimeException.class)
@Override
public Map<String, Object> updateTaskDefinition(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj) {
Map<String, Object> result = new HashMap<>();
int version = updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result);
if (version <= 0) {
return result;
}
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
if (!processTaskRelationList.isEmpty()) {
List<ProcessTaskRelationLog> processTaskRelationLogList = new ArrayList<>();
int delete = 0;
int deleteLog = 0;
Date now = new Date();
for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
if (processTaskRelationLog.getPreTaskCode() == taskCode) {
processTaskRelationLog.setPreTaskVersion(version);
}
if (processTaskRelationLog.getPostTaskCode() == taskCode) {
processTaskRelationLog.setPostTaskVersion(version);
}
processTaskRelationLog.setOperator(loginUser.getId());
processTaskRelationLog.setOperateTime(now);
processTaskRelationLog.setUpdateTime(now);
processTaskRelationLogList.add(processTaskRelationLog);
}
if ((delete & deleteLog) == 0) {
throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
} else {
int insertRelation = processTaskRelationMapper.batchInsert(processTaskRelationLogList);
int insertRelationLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogList);
if ((insertRelation & insertRelationLog) == 0) {
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
}
}
}
result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS);
return result;
}
private int updateTask(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, Map<String, Object> result) {
Project project = projectMapper.queryByCode(projectCode);
//check user access for project
Map<String, Object> result = projectService.checkProjectAndAuth(loginUser, project, projectCode);
result.putAll(projectService.checkProjectAndAuth(loginUser, project, projectCode));
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
return Constants.EXIT_CODE_FAILURE;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(taskCode);
if (taskDefinition == null) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
return result;
return Constants.EXIT_CODE_FAILURE;
}
if (processService.isTaskOnline(taskCode) && taskDefinition.getFlag() == Flag.YES) {
putMsg(result, Status.NOT_SUPPORT_UPDATE_TASK_DEFINITION);
return result;
return Constants.EXIT_CODE_FAILURE;
}
TaskDefinitionLog taskDefinitionToUpdate = JSONUtils.parseObject(taskDefinitionJsonObj, TaskDefinitionLog.class);
if (taskDefinition.equals(taskDefinitionToUpdate)) {
return taskDefinition.getVersion();
}
if (taskDefinitionToUpdate == null) {
logger.error("taskDefinitionJson is not valid json");
putMsg(result, Status.DATA_IS_NOT_VALID, taskDefinitionJsonObj);
return result;
return Constants.EXIT_CODE_FAILURE;
}
if (!CheckUtils.checkTaskDefinitionParameters(taskDefinitionToUpdate)) {
logger.error("task definition {} parameter invalid", taskDefinitionToUpdate.getName());
putMsg(result, Status.PROCESS_NODE_S_PARAMETER_INVALID, taskDefinitionToUpdate.getName());
return result;
return Constants.EXIT_CODE_FAILURE;
}
Integer version = taskDefinitionLogMapper.queryMaxVersionForDefinition(taskCode);
if (version == null || version == 0) {
putMsg(result, Status.DATA_IS_NOT_VALID, taskCode);
return result;
return Constants.EXIT_CODE_FAILURE;
}
Date now = new Date();
taskDefinitionToUpdate.setCode(taskCode);
......@@ -381,43 +436,101 @@ public class TaskDefinitionServiceImpl extends BaseServiceImpl implements TaskDe
putMsg(result, Status.UPDATE_TASK_DEFINITION_ERROR);
throw new ServiceException(Status.UPDATE_TASK_DEFINITION_ERROR);
}
return version;
}
/**
* update task definition and upstream
*
* @param loginUser login user
* @param projectCode project code
* @param taskCode task definition code
* @param taskDefinitionJsonObj task definition json object
* @param upstreamCodes upstream task codes, sep comma
* @return update result code
*/
@Override
public Map<String, Object> updateTaskWithUpstream(User loginUser, long projectCode, long taskCode, String taskDefinitionJsonObj, String upstreamCodes) {
Map<String, Object> result = new HashMap<>();
int version = updateTask(loginUser, projectCode, taskCode, taskDefinitionJsonObj, result);
if (version <= 0) {
return result;
}
Map<Long, TaskDefinition> queryUpStreamTaskCodeMap;
if (StringUtils.isNotBlank(upstreamCodes)) {
Set<Long> upstreamTaskCodes = Arrays.stream(upstreamCodes.split(Constants.COMMA)).map(Long::parseLong).collect(Collectors.toSet());
List<TaskDefinition> upstreamTaskDefinitionList = taskDefinitionMapper.queryByCodeList(upstreamTaskCodes);
queryUpStreamTaskCodeMap = upstreamTaskDefinitionList.stream().collect(Collectors.toMap(TaskDefinition::getCode, taskDefinition -> taskDefinition));
// upstreamTaskCodes - queryUpStreamTaskCodeMap.keySet
upstreamTaskCodes.removeAll(queryUpStreamTaskCodeMap.keySet());
if (!upstreamTaskCodes.isEmpty()) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, StringUtils.join(upstreamTaskCodes, Constants.COMMA));
return result;
}
} else {
queryUpStreamTaskCodeMap = new HashMap<>();
}
List<ProcessTaskRelation> processTaskRelationList = processTaskRelationMapper.queryByTaskCode(taskCode);
if (!queryUpStreamTaskCodeMap.isEmpty() && processTaskRelationList.isEmpty()) {
putMsg(result, Status.PROCESS_TASK_RELATION_NOT_EXIST, StringUtils.join(queryUpStreamTaskCodeMap.keySet(), Constants.COMMA));
throw new ServiceException(Status.PROCESS_TASK_RELATION_NOT_EXIST);
}
if (!processTaskRelationList.isEmpty()) {
List<ProcessTaskRelationLog> processTaskRelationLogList = new ArrayList<>();
List<ProcessTaskRelationLog> relationLogs = new ArrayList<>();
Date now = new Date();
int delete = 0;
int deleteLog = 0;
for (ProcessTaskRelation processTaskRelation : processTaskRelationList) {
ProcessTaskRelationLog processTaskRelationLog = new ProcessTaskRelationLog(processTaskRelation);
delete += processTaskRelationMapper.deleteRelation(processTaskRelationLog);
deleteLog += processTaskRelationLogMapper.deleteRelation(processTaskRelationLog);
processTaskRelationLog.setOperator(loginUser.getId());
processTaskRelationLog.setOperateTime(now);
processTaskRelationLog.setUpdateTime(now);
if (processTaskRelationLog.getPreTaskCode() == taskCode) {
processTaskRelationLog.setPreTaskVersion(version);
}
if (processTaskRelationLog.getPostTaskCode() == taskCode) {
processTaskRelationLog.setPostTaskVersion(version);
TaskDefinition definition = queryUpStreamTaskCodeMap.remove(processTaskRelationLog.getPreTaskCode());
if (definition == null) {
processTaskRelationLog.setPreTaskCode(0L);
processTaskRelationLog.setPreTaskVersion(0);
}
}
processTaskRelationLog.setOperator(loginUser.getId());
processTaskRelationLog.setOperateTime(now);
processTaskRelationLog.setUpdateTime(now);
processTaskRelationLogList.add(processTaskRelationLog);
relationLogs.add(processTaskRelationLog);
}
if ((delete & deleteLog) == 0) {
throw new ServiceException(Status.DELETE_TASK_PROCESS_RELATION_ERROR);
} else {
int insertRelation = processTaskRelationMapper.batchInsert(processTaskRelationLogList);
int insertRelationLog = processTaskRelationLogMapper.batchInsert(processTaskRelationLogList);
if ((insertRelation & insertRelationLog) == 0) {
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
}
if (!queryUpStreamTaskCodeMap.isEmpty()) {
ProcessTaskRelationLog taskRelationLogDeepCopy = JSONUtils.parseObject(JSONUtils.toJsonString(relationLogs.get(0)), ProcessTaskRelationLog.class);
assert taskRelationLogDeepCopy != null;
for (TaskDefinition upstreamTask : queryUpStreamTaskCodeMap.values()) {
taskRelationLogDeepCopy.setPreTaskCode(upstreamTask.getCode());
taskRelationLogDeepCopy.setPreTaskVersion(upstreamTask.getVersion());
relationLogs.add(taskRelationLogDeepCopy);
}
}
Map<Long, ProcessTaskRelationLog> taskRelationLogMap =
relationLogs.stream().collect(Collectors.toMap(ProcessTaskRelation::getPreTaskCode, processTaskRelationLog -> processTaskRelationLog));
if (taskRelationLogMap.containsKey(0L) && taskRelationLogMap.size() >= 3) {
taskRelationLogMap.remove(0L);
}
int insertRelation = processTaskRelationMapper.batchInsert(relationLogs);
int insertRelationLog = processTaskRelationLogMapper.batchInsert(relationLogs);
if ((insertRelation & insertRelationLog) == 0) {
putMsg(result, Status.CREATE_PROCESS_TASK_RELATION_ERROR);
throw new ServiceException(Status.CREATE_PROCESS_TASK_RELATION_ERROR);
}
}
result.put(Constants.DATA_LIST, taskCode);
putMsg(result, Status.SUCCESS, update);
putMsg(result, Status.SUCCESS);
return result;
}
/**
* update task definition
* Switch task definition
*
* @param loginUser login user
* @param projectCode project code
......
......@@ -459,24 +459,25 @@ public class TaskDefinition {
}
TaskDefinition that = (TaskDefinition) o;
return failRetryTimes == that.failRetryTimes
&& failRetryInterval == that.failRetryInterval
&& timeout == that.timeout
&& delayTime == that.delayTime
&& Objects.equals(name, that.name)
&& Objects.equals(description, that.description)
&& Objects.equals(taskType, that.taskType)
&& Objects.equals(taskParams, that.taskParams)
&& flag == that.flag
&& taskPriority == that.taskPriority
&& Objects.equals(workerGroup, that.workerGroup)
&& timeoutFlag == that.timeoutFlag
&& timeoutNotifyStrategy == that.timeoutNotifyStrategy
&& Objects.equals(resourceIds, that.resourceIds)
&& environmentCode == that.environmentCode
&& taskGroupId == that.taskGroupId
&& taskGroupPriority == that.taskGroupPriority;
&& failRetryInterval == that.failRetryInterval
&& timeout == that.timeout
&& delayTime == that.delayTime
&& Objects.equals(name, that.name)
&& Objects.equals(description, that.description)
&& Objects.equals(taskType, that.taskType)
&& Objects.equals(taskParams, that.taskParams)
&& flag == that.flag
&& taskPriority == that.taskPriority
&& Objects.equals(workerGroup, that.workerGroup)
&& timeoutFlag == that.timeoutFlag
&& timeoutNotifyStrategy == that.timeoutNotifyStrategy
&& (Objects.equals(resourceIds, that.resourceIds)
|| (StringUtils.EMPTY.equals(resourceIds) && that.resourceIds == null)
|| (StringUtils.EMPTY.equals(that.resourceIds) && resourceIds == null))
&& environmentCode == that.environmentCode
&& taskGroupId == that.taskGroupId
&& taskGroupPriority == that.taskGroupPriority;
}
@Override
public String toString() {
return "TaskDefinition{"
......
......@@ -44,6 +44,11 @@ public class TaskMainInfo {
*/
private int taskVersion;
/**
* task type
*/
private String taskType;
/**
* create time
*/
......@@ -115,6 +120,14 @@ public class TaskMainInfo {
this.taskVersion = taskVersion;
}
public String getTaskType() {
return taskType;
}
public void setTaskType(String taskType) {
this.taskType = taskType;
}
public Date getTaskCreateTime() {
return taskCreateTime;
}
......
......@@ -87,7 +87,7 @@
</foreach>
</insert>
<select id="queryDefineListPaging" resultType="org.apache.dolphinscheduler.dao.entity.TaskMainInfo">
select td.name task_name,td.code task_code,td.version task_version,td.create_time task_create_time,td.update_time task_update_time,
select td.name task_name,td.code task_code,td.version task_version,td.task_type,td.create_time task_create_time,td.update_time task_update_time,
pd.code process_definition_code,pd.version process_definition_version,pd.name process_definition_name,pd.release_state process_release_state,
pt.pre_task_code upstream_task_code,up.name upstream_task_name
from t_ds_task_definition td
......
......@@ -20,3 +20,7 @@ ALTER TABLE `t_ds_process_task_relation` ADD INDEX `idx_project_code_process_def
ALTER TABLE `t_ds_process_task_relation_log` ADD INDEX `idx_project_code_process_definition_code` (`project_code`, `process_definition_code`) USING BTREE;
ALTER TABLE `t_ds_task_definition_log` ADD INDEX `idx_code_version` (`code`,`version`) USING BTREE;
alter table t_ds_task_definition_log add `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `resource_ids`;
alter table t_ds_task_definition_log add `task_group_priority` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `task_group_id`;
alter table t_ds_task_definition add `task_group_id` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `resource_ids`;
alter table t_ds_task_definition add `task_group_priority` int(11) DEFAULT NULL COMMENT 'task group id' AFTER `task_group_id`;
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册