From a561a618af1538e6feb990913d757921344b16f5 Mon Sep 17 00:00:00 2001 From: JinyLeeChina <42576980+JinyLeeChina@users.noreply.github.com> Date: Fri, 11 Jun 2021 09:43:05 +0800 Subject: [PATCH] [Feature][JsonSplit-api] api of processDefinition create/update (#5602) * processDefinition create/update * fix codeStyle * fix codeStyle * fix ut Co-authored-by: JinyLeeChina <297062848@qq.com> --- .../ProcessDefinitionController.java | 64 ++--- .../dolphinscheduler/api/enums/Status.java | 1 + .../api/service/ProcessDefinitionService.java | 45 ++-- .../impl/ProcessDefinitionServiceImpl.java | 220 +++++++++++------- .../api/service/impl/TenantServiceImpl.java | 20 -- .../ProcessDefinitionControllerTest.java | 45 ++-- .../service/ProcessDefinitionServiceTest.java | 10 +- .../mapper/ProcessTaskRelationLogMapper.java | 8 + .../dao/mapper/ProcessTaskRelationMapper.java | 9 + .../dao/mapper/TenantMapper.java | 4 +- .../mapper/ProcessTaskRelationLogMapper.xml | 15 +- .../dao/mapper/ProcessTaskRelationMapper.xml | 10 + .../dao/mapper/TenantMapperTest.java | 4 +- .../service/process/ProcessService.java | 65 ++++++ 14 files changed, 346 insertions(+), 174 deletions(-) diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java index 286ae3d78..9ce62107f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java @@ -95,16 +95,18 @@ public class ProcessDefinitionController extends BaseController { * @param loginUser login user * @param projectName project name * @param name process definition name - * @param json process definition json * @param description description - * @param locations locations for nodes + * @param globalParams globalParams * @param connects connects for nodes + * @param locations locations for nodes + * @param timeout timeout + * @param tenantCode tenantCode + * @param taskRelationJson relation json for nodes * @return create result code */ @ApiOperation(value = "save", notes = "CREATE_PROCESS_DEFINITION_NOTES") @ApiImplicitParams({ @ApiImplicitParam(name = "name", value = "PROCESS_DEFINITION_NAME", required = true, type = "String"), - @ApiImplicitParam(name = "processDefinitionJson", value = "PROCESS_DEFINITION_JSON", required = true, type = "String"), @ApiImplicitParam(name = "locations", value = "PROCESS_DEFINITION_LOCATIONS", required = true, type = "String"), @ApiImplicitParam(name = "connects", value = "PROCESS_DEFINITION_CONNECTS", required = true, type = "String"), @ApiImplicitParam(name = "description", value = "PROCESS_DEFINITION_DESC", required = false, type = "String"), @@ -116,13 +118,16 @@ public class ProcessDefinitionController extends BaseController { public Result createProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, @RequestParam(value = "name", required = true) String name, - @RequestParam(value = "processDefinitionJson", required = true) String json, - @RequestParam(value = "locations", required = true) String locations, - @RequestParam(value = "connects", required = true) String connects, - @RequestParam(value = "description", required = false) String description) throws JsonProcessingException { + @RequestParam(value = "description", required = false) String description, + @RequestParam(value = "globalParams", required = false, defaultValue = "[]") String globalParams, + @RequestParam(value = "connects", required = false) String connects, + @RequestParam(value = "locations", required = false) String locations, + @RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout, + @RequestParam(value = "tenantCode", required = true) String tenantCode, + @RequestParam(value = "taskRelationJson", required = true) String taskRelationJson) throws JsonProcessingException { - Map result = processDefinitionService.createProcessDefinition(loginUser, projectName, name, json, - description, locations, connects); + Map result = processDefinitionService.createProcessDefinition(loginUser, projectName, name, description, globalParams, + connects, locations, timeout, tenantCode, taskRelationJson); return returnDataList(result); } @@ -207,19 +212,21 @@ public class ProcessDefinitionController extends BaseController { * @param loginUser login user * @param projectName project name * @param name process definition name - * @param id process definition id - * @param processDefinitionJson process definition json + * @param code process definition code * @param description description - * @param locations locations for nodes + * @param globalParams globalParams * @param connects connects for nodes + * @param locations locations for nodes + * @param timeout timeout + * @param tenantCode tenantCode + * @param taskRelationJson relation json for nodes * @return update result code */ - @ApiOperation(value = "updateProcessDefinition", notes = "UPDATE_PROCESS_DEFINITION_NOTES") + @ApiOperation(value = "update", notes = "UPDATE_PROCESS_DEFINITION_NOTES") @ApiImplicitParams({ @ApiImplicitParam(name = "name", value = "PROCESS_DEFINITION_NAME", required = true, type = "String"), - @ApiImplicitParam(name = "id", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"), - @ApiImplicitParam(name = "processDefinitionJson", value = "PROCESS_DEFINITION_JSON", required = true, type = "String"), + @ApiImplicitParam(name = "code", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "123456789"), @ApiImplicitParam(name = "locations", value = "PROCESS_DEFINITION_LOCATIONS", required = true, type = "String"), @ApiImplicitParam(name = "connects", value = "PROCESS_DEFINITION_CONNECTS", required = true, type = "String"), @ApiImplicitParam(name = "description", value = "PROCESS_DEFINITION_DESC", required = false, type = "String"), @@ -232,15 +239,18 @@ public class ProcessDefinitionController extends BaseController { public Result updateProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, @RequestParam(value = "name", required = true) String name, - @RequestParam(value = "id", required = true) int id, - @RequestParam(value = "processDefinitionJson", required = true) String processDefinitionJson, - @RequestParam(value = "locations", required = false) String locations, - @RequestParam(value = "connects", required = false) String connects, + @RequestParam(value = "code", required = true) long code, @RequestParam(value = "description", required = false) String description, + @RequestParam(value = "globalParams", required = false, defaultValue = "[]") String globalParams, + @RequestParam(value = "connects", required = false) String connects, + @RequestParam(value = "locations", required = false) String locations, + @RequestParam(value = "timeout", required = false, defaultValue = "0") int timeout, + @RequestParam(value = "tenantCode", required = true) String tenantCode, + @RequestParam(value = "taskRelationJson", required = true) String taskRelationJson, @RequestParam(value = "releaseState", required = false, defaultValue = "OFFLINE") ReleaseState releaseState) { - Map result = processDefinitionService.updateProcessDefinition(loginUser, projectName, id, name, - processDefinitionJson, description, locations, connects); + Map result = processDefinitionService.updateProcessDefinition(loginUser, projectName, name, code, description, globalParams, + connects, locations, timeout, tenantCode, taskRelationJson); // If the update fails, the result will be returned directly if (result.get(Constants.STATUS) != Status.SUCCESS) { return returnDataList(result); @@ -248,7 +258,7 @@ public class ProcessDefinitionController extends BaseController { // Judge whether to go online after editing,0 means offline, 1 means online if (releaseState == ReleaseState.ONLINE) { - result = processDefinitionService.releaseProcessDefinition(loginUser, projectName, id, releaseState); + result = processDefinitionService.releaseProcessDefinition(loginUser, projectName, code, releaseState); } return returnDataList(result); } @@ -342,14 +352,14 @@ public class ProcessDefinitionController extends BaseController { * * @param loginUser login user * @param projectName project name - * @param processId process definition id + * @param code process definition code * @param releaseState release state * @return release result code */ @ApiOperation(value = "releaseProcessDefinition", notes = "RELEASE_PROCESS_DEFINITION_NOTES") @ApiImplicitParams({ @ApiImplicitParam(name = "name", value = "PROCESS_DEFINITION_NAME", required = true, type = "String"), - @ApiImplicitParam(name = "processId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100"), + @ApiImplicitParam(name = "code", value = "PROCESS_DEFINITION_CODE", required = true, dataType = "Long", example = "123456789"), @ApiImplicitParam(name = "releaseState", value = "PROCESS_DEFINITION_CONNECTS", required = true, dataType = "ReleaseState"), }) @PostMapping(value = "/release") @@ -358,10 +368,10 @@ public class ProcessDefinitionController extends BaseController { @AccessLogAnnotation(ignoreRequestArgs = "loginUser") public Result releaseProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, - @RequestParam(value = "processId", required = true) int processId, + @RequestParam(value = "code", required = true) long code, @RequestParam(value = "releaseState", required = true) ReleaseState releaseState) { - Map result = processDefinitionService.releaseProcessDefinition(loginUser, projectName, processId, releaseState); + Map result = processDefinitionService.releaseProcessDefinition(loginUser, projectName, code, releaseState); return returnDataList(result); } @@ -390,7 +400,7 @@ public class ProcessDefinitionController extends BaseController { } /** - * query datail of process definition by name + * query detail of process definition by name * * @param loginUser login user * @param projectName project name diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 8372a6935..dd9d4bcf7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -270,6 +270,7 @@ public enum Status { PROCESS_TASK_RELATION_NOT_EXIST(50033, "process task relation {0} does not exist", "工作流任务关系[{0}]不存在"), PROCESS_TASK_RELATION_EXIST(50034, "process task relation is already exist, processCode:[{0}]", "工作流任务关系已存在, processCode:[{0}]"), PROCESS_DAG_IS_EMPTY(50035, "process dag can not be empty", "工作流dag不能为空"), + CHECK_PROCESS_TASK_RELATION_ERROR(50036, "check process task relation error", "工作流任务关系参数错误"), HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"), /** diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index c09b0658d..ef9c88532 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -40,20 +40,26 @@ public interface ProcessDefinitionService { * @param loginUser login user * @param projectName project name * @param name process definition name - * @param processDefinitionJson process definition json - * @param desc description - * @param locations locations for nodes + * @param description description + * @param globalParams global params * @param connects connects for nodes + * @param locations locations for nodes + * @param timeout timeout + * @param tenantCode tenantCode + * @param taskRelationJson relation json for nodes * @return create result code * @throws JsonProcessingException JsonProcessingException */ Map createProcessDefinition(User loginUser, String projectName, String name, - String processDefinitionJson, - String desc, + String description, + String globalParams, + String connects, String locations, - String connects) throws JsonProcessingException; + int timeout, + String tenantCode, + String taskRelationJson) throws JsonProcessingException; /** * query process definition list @@ -141,19 +147,27 @@ public interface ProcessDefinitionService { * @param loginUser login user * @param projectName project name * @param name process definition name - * @param id process definition id - * @param processDefinitionJson process definition json - * @param desc description - * @param locations locations for nodes + * @param code process definition code + * @param description description + * @param globalParams global params * @param connects connects for nodes + * @param locations locations for nodes + * @param timeout timeout + * @param tenantCode tenantCode + * @param taskRelationJson relation json for nodes * @return update result code */ Map updateProcessDefinition(User loginUser, String projectName, - int id, String name, - String processDefinitionJson, String desc, - String locations, String connects); + long code, + String description, + String globalParams, + String connects, + String locations, + int timeout, + String tenantCode, + String taskRelationJson); /** * verify process definition name unique @@ -184,13 +198,13 @@ public interface ProcessDefinitionService { * * @param loginUser login user * @param projectName project name - * @param id process definition id + * @param code process definition code * @param releaseState release state * @return release result code */ Map releaseProcessDefinition(User loginUser, String projectName, - int id, + long code, ReleaseState releaseState); /** @@ -299,6 +313,7 @@ public interface ProcessDefinitionService { */ Map deleteByProcessDefinitionIdAndVersion(User loginUser, String projectName, int processDefinitionId, long version); + /** * check has associated process definition * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 5bccb01a7..3a853f02d 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -56,11 +56,13 @@ import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.Schedule; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.dao.entity.Tenant; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper; @@ -69,6 +71,7 @@ import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; import org.apache.dolphinscheduler.dao.mapper.ScheduleMapper; import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper; import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper; +import org.apache.dolphinscheduler.dao.mapper.TenantMapper; import org.apache.dolphinscheduler.dao.mapper.UserMapper; import org.apache.dolphinscheduler.service.permission.PermissionCheck; import org.apache.dolphinscheduler.service.process.ProcessService; @@ -159,27 +162,36 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro @Autowired private SchedulerService schedulerService; + @Autowired + private TenantMapper tenantMapper; + /** * create process definition * * @param loginUser login user * @param projectName project name - * @param processDefinitionName process definition name - * @param processDefinitionJson process definition json - * @param desc description - * @param locations locations for nodes + * @param name process definition name + * @param description description + * @param globalParams global params * @param connects connects for nodes + * @param locations locations for nodes + * @param timeout timeout + * @param tenantCode tenantCode + * @param taskRelationJson relation json for nodes * @return create result code */ @Override @Transactional(rollbackFor = Exception.class) public Map createProcessDefinition(User loginUser, String projectName, - String processDefinitionName, - String processDefinitionJson, - String desc, + String name, + String description, + String globalParams, + String connects, String locations, - String connects) { + int timeout, + String tenantCode, + String taskRelationJson) { Map result = new HashMap<>(); Project project = projectMapper.queryByName(projectName); @@ -190,34 +202,74 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return checkResult; } - ProcessDefinition processDefinition = new ProcessDefinition(); - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - Map checkProcessJson = checkProcessNodeList(processData, processDefinitionJson); - if (checkProcessJson.get(Constants.STATUS) != Status.SUCCESS) { - return checkProcessJson; + List taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class); + Map checkRelationJson = checkTaskRelationList(taskRelationList, taskRelationJson); + if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) { + return checkRelationJson; + } + + Tenant tenant = tenantMapper.queryByTenantCode(tenantCode); + if (tenant == null) { + putMsg(result, Status.TENANT_NOT_EXIST); + return result; } + long processDefinitionCode; try { - long processDefinitionCode = SnowFlakeUtils.getInstance().nextId(); - processDefinition.setCode(processDefinitionCode); - processDefinition.setVersion(1); + processDefinitionCode = SnowFlakeUtils.getInstance().nextId(); } catch (SnowFlakeException e) { putMsg(result, Status.CREATE_PROCESS_DEFINITION); return result; } - int saveResult = processService.saveProcessDefinition(loginUser, project, processDefinitionName, desc, - locations, connects, processData, processDefinition, true); + int insertVersion = processService.saveProcessDefine(loginUser, project, name, description, globalParams, + locations, connects, timeout, tenant.getId(), processDefinitionCode, 0, true); - if (saveResult > 0) { - putMsg(result, Status.SUCCESS); - // return processDefinition object with ID - result.put(Constants.DATA_LIST, processDefinition.getId()); + if (insertVersion > 0) { + int insertResult = processService.saveTaskRelation(loginUser, project.getCode(), processDefinitionCode, insertVersion, taskRelationList); + if (insertResult > 0) { + putMsg(result, Status.SUCCESS); + // return processDefinitionCode + result.put(Constants.DATA_LIST, processDefinitionCode); + } else { + putMsg(result, Status.CREATE_PROCESS_DEFINITION); + } } else { putMsg(result, Status.CREATE_PROCESS_DEFINITION); } return result; + } + + private Map checkTaskRelationList(List taskRelationList, String taskRelationJson) { + Map result = new HashMap<>(); + try { + if (taskRelationList == null || taskRelationList.isEmpty()) { + logger.error("task relation list is null"); + putMsg(result, Status.DATA_IS_NOT_VALID, taskRelationJson); + return result; + } + // TODO check has cycle + // if (graphHasCycle(taskRelationList)) { + // logger.error("process DAG has cycle"); + // putMsg(result, Status.PROCESS_NODE_HAS_CYCLE); + // return result; + // } + + // check whether the task relation json is normal + for (ProcessTaskRelationLog processTaskRelationLog : taskRelationList) { + if (processTaskRelationLog.getPostTaskCode() == 0 || processTaskRelationLog.getPostTaskVersion() == 0) { + logger.error("the post_task_code or post_task_version can't be zero"); + putMsg(result, Status.CHECK_PROCESS_TASK_RELATION_ERROR); + return result; + } + } + putMsg(result, Status.SUCCESS); + } catch (Exception e) { + result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR); + result.put(Constants.MSG, e.getMessage()); + } + return result; } /** @@ -364,22 +416,28 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * @param loginUser login user * @param projectName project name * @param name process definition name - * @param id process definition id - * @param processDefinitionJson process definition json - * @param desc description - * @param locations locations for nodes + * @param code process definition code + * @param description description + * @param globalParams global params * @param connects connects for nodes + * @param locations locations for nodes + * @param timeout timeout + * @param tenantCode tenantCode + * @param taskRelationJson relation json for nodes * @return update result code */ @Override public Map updateProcessDefinition(User loginUser, String projectName, - int id, String name, - String processDefinitionJson, - String desc, + long code, + String description, + String globalParams, + String connects, String locations, - String connects) { + int timeout, + String tenantCode, + String taskRelationJson) { Map result = new HashMap<>(); Project project = projectMapper.queryByName(projectName); @@ -389,16 +447,22 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return checkResult; } - ProcessData processData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - Map checkProcessJson = checkProcessNodeList(processData, processDefinitionJson); - if ((checkProcessJson.get(Constants.STATUS) != Status.SUCCESS)) { - return checkProcessJson; + List taskRelationList = JSONUtils.toList(taskRelationJson, ProcessTaskRelationLog.class); + Map checkRelationJson = checkTaskRelationList(taskRelationList, taskRelationJson); + if (checkRelationJson.get(Constants.STATUS) != Status.SUCCESS) { + return checkRelationJson; + } + + Tenant tenant = tenantMapper.queryByTenantCode(tenantCode); + if (tenant == null) { + putMsg(result, Status.TENANT_NOT_EXIST); + return result; } - // TODO processDefinitionMapper.queryByCode - ProcessDefinition processDefinition = processService.findProcessDefineById(id); + + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); // check process definition exists if (processDefinition == null) { - putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, id); + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code); return result; } if (processDefinition.getReleaseState() == ReleaseState.ONLINE) { @@ -406,21 +470,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro putMsg(result, Status.PROCESS_DEFINE_NOT_ALLOWED_EDIT, processDefinition.getName()); return result; } - if (!name.equals(processDefinition.getName())) { - // check whether the new process define name exist - ProcessDefinition definition = processDefinitionMapper.verifyByDefineName(project.getCode(), name); - if (definition != null) { - putMsg(result, Status.PROCESS_DEFINITION_NAME_EXIST, name); - return result; - } - } - ProcessData newProcessData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); - int saveResult = processService.saveProcessDefinition(loginUser, project, name, desc, - locations, connects, newProcessData, processDefinition, true); - if (saveResult > 0) { - putMsg(result, Status.SUCCESS); - result.put(Constants.DATA_LIST, processDefinition); + int insertVersion = processService.saveProcessDefine(loginUser, project, name, description, globalParams, + locations, connects, timeout, tenant.getId(), code, processDefinition.getId(), true); + if (insertVersion > 0) { + int insertResult = processService.saveTaskRelation(loginUser, project.getCode(), code, insertVersion, taskRelationList); + if (insertResult > 0) { + putMsg(result, Status.SUCCESS); + result.put(Constants.DATA_LIST, processDefinition); + } else { + putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); + } } else { putMsg(result, Status.UPDATE_PROCESS_DEFINITION_ERROR); } @@ -536,13 +596,13 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro * * @param loginUser login user * @param projectName project name - * @param id process definition id + * @param code process definition code * @param releaseState release state * @return release result code */ @Override @Transactional(rollbackFor = RuntimeException.class) - public Map releaseProcessDefinition(User loginUser, String projectName, int id, ReleaseState releaseState) { + public Map releaseProcessDefinition(User loginUser, String projectName, long code, ReleaseState releaseState) { HashMap result = new HashMap<>(); Project project = projectMapper.queryByName(projectName); @@ -558,7 +618,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro return result; } - ProcessDefinition processDefinition = processDefinitionMapper.selectById(id); + ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(code); switch (releaseState) { case ONLINE: @@ -587,7 +647,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro ); for (Schedule schedule : scheduleList) { - logger.info("set schedule offline, project id: {}, schedule id: {}, process definition id: {}", project.getId(), schedule.getId(), id); + logger.info("set schedule offline, project id: {}, schedule id: {}, process definition code: {}", project.getId(), schedule.getId(), code); // set status schedule.setReleaseState(ReleaseState.OFFLINE); scheduleMapper.updateById(schedule); @@ -833,7 +893,6 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro processMeta, processDefinitionName, processDefinitionId); - } /** @@ -847,13 +906,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro String importProcessParam) { Map createProcessResult = null; try { - createProcessResult = createProcessDefinition(loginUser - , currentProjectName, - processDefinitionName + "_import_" + DateUtils.getCurrentTimeStamp(), - importProcessParam, - processMeta.getProcessDefinitionDescription(), - processMeta.getProcessDefinitionLocations(), - processMeta.getProcessDefinitionConnects()); + // TODO import + // createProcessResult = createProcessDefinition(loginUser + // , currentProjectName, + // processDefinitionName + "_import_" + DateUtils.getCurrentTimeStamp(), + // importProcessParam, + // processMeta.getProcessDefinitionDescription(), + // processMeta.getProcessDefinitionLocations(), + // processMeta.getProcessDefinitionConnects()); putMsg(result, Status.SUCCESS); } catch (Exception e) { logger.error("import process meta json data: {}", e.getMessage(), e); @@ -1049,13 +1109,14 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } try { - createProcessDefinition(loginUser - , targetProject.getName(), - subProcess.getName(), - subProcessJson, - subProcess.getDescription(), - subProcess.getLocations(), - subProcess.getConnects()); + // TODO import subProcess + // createProcessDefinition(loginUser + // , targetProject.getName(), + // subProcess.getName(), + // subProcessJson, + // subProcess.getDescription(), + // subProcess.getLocations(), + // subProcess.getConnects()); logger.info("create sub process, project: {}, process name: {}", targetProject.getName(), subProcess.getName()); } catch (Exception e) { @@ -1424,14 +1485,17 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } processData.setTasks(taskNodeList); String processDefinitionJson = JSONUtils.toJsonString(processData); - return createProcessDefinition( - loginUser, - targetProject.getName(), - processDefinition.getName() + "_copy_" + currentTimeStamp, - processDefinitionJson, - processDefinition.getDescription(), - locationsJN.toString(), - processDefinition.getConnects()); + // TODO copy process + // return createProcessDefinition( + // loginUser, + // targetProject.getName(), + // processDefinition.getName() + "_copy_" + currentTimeStamp, + // processDefinitionJson, + // processDefinition.getDescription(), + // locationsJN.toString(), + // processDefinition.getConnects()); + // TODO remove + return result; } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java index dcdc833dc..1c37c80c1 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TenantServiceImpl.java @@ -278,26 +278,6 @@ public class TenantServiceImpl extends BaseServiceImpl implements TenantService return processInstanceMapper.queryByTenantIdAndStatus(tenant.getId(), Constants.NOT_TERMINATED_STATES); } - /** - * query tenant list - * - * @param tenantCode tenant code - * @return tenant list - */ - public Map queryTenantList(String tenantCode) { - - Map result = new HashMap<>(); - - List resourceList = tenantMapper.queryByTenantCode(tenantCode); - if (CollectionUtils.isNotEmpty(resourceList)) { - result.put(Constants.DATA_LIST, resourceList); - putMsg(result, Status.SUCCESS); - } else { - putMsg(result, Status.TENANT_NOT_EXIST); - } - return result; - } - /** * query tenant list * diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java index 683a331d8..41657cb05 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java @@ -73,26 +73,27 @@ public class ProcessDefinitionControllerTest { @Test public void testCreateProcessDefinition() throws Exception { - String json = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\",\"name\"" - + ":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234\\\"\\" - + "necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\"" - + ",\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false}," - + "\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}"; - String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"; + String json = "[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1," + + "\"condition_type\":0,\"condition_params\":{}},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1," + + "\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":{}}]"; String projectName = "test"; String name = "dag_test"; String description = "desc test"; + String globalParams = "[]"; String connects = "[]"; + String locations = "[]"; + int timeout = 0; + String tenantCode = "root"; Map result = new HashMap<>(); putMsg(result, Status.SUCCESS); result.put(Constants.DATA_LIST, 1); - Mockito.when(processDefinitionService.createProcessDefinition(user, projectName, name, json, - description, locations, connects)).thenReturn(result); + Mockito.when(processDefinitionService.createProcessDefinition(user, projectName, name, description, globalParams, + connects, locations, timeout, tenantCode, json)).thenReturn(result); - Result response = processDefinitionController.createProcessDefinition(user, projectName, name, json, - locations, connects, description); + Result response = processDefinitionController.createProcessDefinition(user, projectName, name, description, globalParams, + connects, locations, timeout, tenantCode, json); Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue()); } @@ -121,28 +122,28 @@ public class ProcessDefinitionControllerTest { } @Test - public void updateProcessDefinition() throws Exception { - - String json = "{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\",\"name\":\"ssh_test1\"" - + ",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234\\\"\\necho ${aa}\"}" - + ",\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\"" - + ":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\"" - + ":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}"; + public void updateProcessDefinition() { + String json = "[{\"name\":\"\",\"pre_task_code\":0,\"pre_task_version\":0,\"post_task_code\":123456789,\"post_task_version\":1," + + "\"condition_type\":0,\"condition_params\":{}},{\"name\":\"\",\"pre_task_code\":123456789,\"pre_task_version\":1," + + "\"post_task_code\":123451234,\"post_task_version\":1,\"condition_type\":0,\"condition_params\":{}}]"; String locations = "{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"; String projectName = "test"; String name = "dag_test"; String description = "desc test"; String connects = "[]"; - int id = 1; + String globalParams = "[]"; + int timeout = 0; + String tenantCode = "root"; + long code = 123L; Map result = new HashMap<>(); putMsg(result, Status.SUCCESS); result.put("processDefinitionId", 1); - Mockito.when(processDefinitionService.updateProcessDefinition(user, projectName, id, name, json, - description, locations, connects)).thenReturn(result); + Mockito.when(processDefinitionService.updateProcessDefinition(user, projectName, name, code, description, globalParams, + connects, locations, timeout, tenantCode, json)).thenReturn(result); - Result response = processDefinitionController.updateProcessDefinition(user, projectName, name, id, json, - locations, connects, description,ReleaseState.OFFLINE); + Result response = processDefinitionController.updateProcessDefinition(user, projectName, name, code, description, globalParams, + connects, locations, timeout, tenantCode, json, ReleaseState.OFFLINE); Assert.assertEquals(Status.SUCCESS.getCode(), response.getCode().intValue()); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 8ee5b9aed..904c34b2b 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -597,7 +597,7 @@ public class ProcessDefinitionServiceTest { // project check auth success, processs definition online putMsg(result, Status.SUCCESS, projectName); - Mockito.when(processDefineMapper.selectById(46)).thenReturn(getProcessDefinition()); + Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(getProcessDefinition()); Map onlineRes = processDefinitionService.releaseProcessDefinition( loginUser, "project_test1", 46, ReleaseState.ONLINE); Assert.assertEquals(Status.SUCCESS, onlineRes.get(Constants.STATUS)); @@ -1035,10 +1035,10 @@ public class ProcessDefinitionServiceTest { + " \"tenantId\": 1,\n" + " \"timeout\": 0\n" + "}"; - Map updateResult = processDefinitionService.updateProcessDefinition(loginUser, projectName, 1, "test", - sqlDependentJson, "", "", ""); + Map updateResult = processDefinitionService.updateProcessDefinition(loginUser, projectName, "test", 1, + "", "", "", "", 0, "root", sqlDependentJson); - Assert.assertEquals(Status.UPDATE_PROCESS_DEFINITION_ERROR, updateResult.get(Constants.STATUS)); + Assert.assertEquals(Status.DATA_IS_NOT_VALID, updateResult.get(Constants.STATUS)); } @Test @@ -1112,7 +1112,7 @@ public class ProcessDefinitionServiceTest { processDefinition.setProjectId(2); processDefinition.setTenantId(1); processDefinition.setDescription(""); - processDefinition.setCode(9999L); + processDefinition.setCode(46L); return processDefinition; } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java index 9183b0f62..55ab26fb6 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.java @@ -44,4 +44,12 @@ public interface ProcessTaskRelationLogMapper extends BaseMapper taskRelationList); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java index 5334606ed..0d94f9670 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.java @@ -18,6 +18,7 @@ package org.apache.dolphinscheduler.dao.mapper; import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelation; +import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog; import org.apache.ibatis.annotations.Param; @@ -65,4 +66,12 @@ public interface ProcessTaskRelationMapper extends BaseMapper taskRelationList); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java index 45c824ce8..89b62378c 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TenantMapper.java @@ -38,9 +38,9 @@ public interface TenantMapper extends BaseMapper { /** * query tenant by code * @param tenantCode tenantCode - * @return tenant list + * @return tenant */ - List queryByTenantCode(@Param("tenantCode") String tenantCode); + Tenant queryByTenantCode(@Param("tenantCode") String tenantCode); /** * tenant page diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml index e7e4a1245..9769850e1 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationLogMapper.xml @@ -29,8 +29,7 @@ WHERE process_definition_code = #{processCode} and process_definition_version = #{processVersion} - select from t_ds_process_task_relation_log @@ -39,4 +38,16 @@ and post_task_code = #{taskCode} and post_task_version = #{taskVersion} + + insert into t_ds_process_task_relation_log (`name`, process_definition_version, project_code, process_definition_code, + pre_task_code, pre_task_version, post_task_code, post_task_version, condition_type, condition_params, operator, operate_time, + create_time, update_time) + values + + (#{relation.name},#{relation.process_definition_version},#{relation.project_code},#{relation.process_definition_code}, + #{relation.pre_task_code},#{relation.pre_task_version},#{relation.post_task_code},#{relation.post_task_version}, + #{relation.condition_type},#{relation.condition_params},#{relation.operator},#{relation.operate_time}, + #{relation.create_time},#{relation.update_time}) + + diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml index 963f6b494..006944127 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessTaskRelationMapper.xml @@ -59,4 +59,14 @@ WHERE project_code = #{projectCode} and process_definition_code = #{processCode} + + insert into t_ds_process_task_relation (`name`, process_definition_version, project_code, process_definition_code, + pre_task_code, pre_task_version, post_task_code, post_task_version, condition_type, condition_params, create_time, update_time) + values + + (#{relation.name},#{relation.process_definition_version},#{relation.project_code},#{relation.process_definition_code}, + #{relation.pre_task_code},#{relation.pre_task_version},#{relation.post_task_code},#{relation.post_task_version}, + #{relation.condition_type},#{relation.condition_params},#{relation.create_time},#{relation.update_time}) + + diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TenantMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TenantMapperTest.java index b8a97e15c..bfd7f88cf 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TenantMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/TenantMapperTest.java @@ -119,12 +119,10 @@ public class TenantMapperTest { */ @Test public void testQueryByTenantCode() { - Tenant tenant = insertOne(); tenant.setTenantCode("ut code"); tenantMapper.updateById(tenant); - List tenantList = tenantMapper.queryByTenantCode("ut code"); - Assert.assertEquals(1, tenantList.size()); + Assert.assertNotNull(tenantMapper.queryByTenantCode("ut code")); } /** diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 8ee49bddc..135fcaa17 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -2225,6 +2225,69 @@ public class ProcessService { /** * save processDefinition (including create or update processDefinition) */ + public int saveProcessDefine(User operator, Project project, String name, String description, String globalParams, + String locations, String connects, int timeout, int tenantId, long processDefinitionCode, + int processDefinitionId, Boolean isFromProcessDefine) { + ProcessDefinitionLog processDefinitionLog = new ProcessDefinitionLog(); + Integer version = processDefineLogMapper.queryMaxVersionForDefinition(processDefinitionCode); + int insertVersion = version == null || version == 0 ? 1 : version + 1; + processDefinitionLog.setUserId(operator.getId()); + processDefinitionLog.setCode(processDefinitionCode); + processDefinitionLog.setVersion(insertVersion); + processDefinitionLog.setName(name); + processDefinitionLog.setFlag(Flag.YES); + processDefinitionLog.setReleaseState(isFromProcessDefine ? ReleaseState.OFFLINE : ReleaseState.ONLINE); + processDefinitionLog.setProjectCode(project.getCode()); + processDefinitionLog.setDescription(description); + processDefinitionLog.setGlobalParams(globalParams); + processDefinitionLog.setLocations(locations); + processDefinitionLog.setConnects(connects); + processDefinitionLog.setTimeout(timeout); + processDefinitionLog.setTenantId(tenantId); + processDefinitionLog.setOperator(operator.getId()); + Date now = new Date(); + processDefinitionLog.setOperateTime(now); + processDefinitionLog.setUpdateTime(now); + processDefinitionLog.setCreateTime(now); + int insertLog = processDefineLogMapper.insert(processDefinitionLog); + int result; + if (0 == processDefinitionId) { + result = processDefineMapper.insert(processDefinitionLog); + } else { + processDefinitionLog.setId(processDefinitionId); + result = processDefineMapper.updateById(processDefinitionLog); + } + return (insertLog & result) > 0 ? insertVersion : 0; + } + + /** + * save task relations + */ + public int saveTaskRelation(User operator, long projectCode, long processDefinitionCode, int processDefinitionVersion, + List taskRelationList) { + List processTaskRelationList = processTaskRelationMapper.queryByProcessCode(projectCode, processDefinitionCode); + if (!processTaskRelationList.isEmpty()) { + processTaskRelationMapper.deleteByCode(projectCode, processDefinitionCode); + } + Date now = new Date(); + taskRelationList.forEach(processTaskRelationLog -> { + processTaskRelationLog.setProjectCode(projectCode); + processTaskRelationLog.setProcessDefinitionCode(processDefinitionCode); + processTaskRelationLog.setProcessDefinitionVersion(processDefinitionVersion); + processTaskRelationLog.setCreateTime(now); + processTaskRelationLog.setUpdateTime(now); + processTaskRelationLog.setOperator(operator.getId()); + processTaskRelationLog.setOperateTime(now); + }); + int result = processTaskRelationMapper.batchInsert(taskRelationList); + int resultLog = processTaskRelationLogMapper.batchInsert(taskRelationList); + return result & resultLog; + } + + /** + * save processDefinition (including create or update processDefinition) + */ + @Deprecated public int saveProcessDefinition(User operator, Project project, String name, String desc, String locations, String connects, ProcessData processData, ProcessDefinition processDefinition, Boolean isFromProcessDefine) { @@ -2240,6 +2303,7 @@ public class ProcessService { /** * save processDefinition */ + @Deprecated public ProcessDefinitionLog insertProcessDefinitionLog(User operator, Long processDefinitionCode, String processDefinitionName, ProcessData processData, Project project, String desc, String locations, String connects) { @@ -2280,6 +2344,7 @@ public class ProcessService { /** * handle task definition */ + @Deprecated public Map handleTaskDefinition(User operator, Long projectCode, List taskNodes, Boolean isFromProcessDefine) { if (taskNodes == null) { return null; -- GitLab