diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java index c1cd15c8074068208fa5e4bb95df2ae2896c8d7f..66e13eace72371116826197884e3a723a1d65be7 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java @@ -189,7 +189,7 @@ public class PythonGateway { ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName); - // In the case project exists, but current process definition still not created, we should also return the init + // In the case project exists, but current workflow still not created, we should also return the init // version of it if (processDefinition == null) { result.put("code", CodeGenerateUtils.getInstance().genCode()); @@ -210,20 +210,20 @@ public class PythonGateway { } /** - * create or update process definition. - * If process definition do not exists in Project=`projectCode` would create a new one - * If process definition already exists in Project=`projectCode` would update it + * create or update workflow. + * If workflow do not exists in Project=`projectCode` would create a new one + * If workflow already exists in Project=`projectCode` would update it * - * @param userName user name who create or update process definition - * @param projectName project name which process definition belongs to - * @param name process definition name + * @param userName user name who create or update workflow + * @param projectName project name which workflow belongs to + * @param name workflow name * @param description description * @param globalParams global params - * @param schedule schedule for process definition, will not set schedule if null, + * @param schedule schedule for workflow, will not set schedule if null, * and if would always fresh exists schedule if not null * @param warningType warning type * @param warningGroupId warning group id - * @param timeout timeout for process definition working, if running time longer than timeout, + * @param timeout timeout for workflow working, if running time longer than timeout, * task will mark as fail * @param workerGroup run task in which worker group * @param tenantCode tenantCode @@ -232,33 +232,33 @@ public class PythonGateway { * @param otherParamsJson otherParamsJson handle other params * @return create result code */ - public Long createOrUpdateProcessDefinition(String userName, - String projectName, - String name, - String description, - String globalParams, - String schedule, - String warningType, - int warningGroupId, - int timeout, - String workerGroup, - String tenantCode, - int releaseState, - String taskRelationJson, - String taskDefinitionJson, - String otherParamsJson, - String executionType) { + public Long createOrUpdateWorkflow(String userName, + String projectName, + String name, + String description, + String globalParams, + String schedule, + String warningType, + int warningGroupId, + int timeout, + String workerGroup, + String tenantCode, + int releaseState, + String taskRelationJson, + String taskDefinitionJson, + String otherParamsJson, + String executionType) { User user = usersService.queryUser(userName); Project project = projectMapper.queryByName(projectName); long projectCode = project.getCode(); - ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, name); + ProcessDefinition processDefinition = getWorkflow(user, projectCode, name); ProcessExecutionTypeEnum executionTypeEnum = ProcessExecutionTypeEnum.valueOf(executionType); long processDefinitionCode; - // create or update process definition + // create or update workflow if (processDefinition != null) { processDefinitionCode = processDefinition.getCode(); - // make sure process definition offline which could edit + // make sure workflow offline which could edit processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, ReleaseState.OFFLINE); processDefinitionService.updateProcessDefinition(user, projectCode, name, @@ -274,7 +274,7 @@ public class PythonGateway { processDefinitionCode = processDefinition.getCode(); } - // Fresh process definition schedule + // Fresh workflow schedule if (schedule != null) { createOrUpdateSchedule(user, projectCode, processDefinitionCode, schedule, workerGroup, warningType, warningGroupId); @@ -285,23 +285,23 @@ public class PythonGateway { } /** - * get process definition + * get workflow * * @param user user who create or update schedule - * @param projectCode project which process definition belongs to - * @param processDefinitionName process definition name + * @param projectCode project which workflow belongs to + * @param workflowName workflow name */ - private ProcessDefinition getProcessDefinition(User user, long projectCode, String processDefinitionName) { + private ProcessDefinition getWorkflow(User user, long projectCode, String workflowName) { Map verifyProcessDefinitionExists = - processDefinitionService.verifyProcessDefinitionName(user, projectCode, processDefinitionName, 0); + processDefinitionService.verifyProcessDefinitionName(user, projectCode, workflowName, 0); Status verifyStatus = (Status) verifyProcessDefinitionExists.get(Constants.STATUS); ProcessDefinition processDefinition = null; if (verifyStatus == Status.PROCESS_DEFINITION_NAME_EXIST) { - processDefinition = processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName); + processDefinition = processDefinitionMapper.queryByDefineName(projectCode, workflowName); } else if (verifyStatus != Status.SUCCESS) { String msg = - "Verify process definition exists status is invalid, neither SUCCESS or PROCESS_DEFINITION_NAME_EXIST."; + "Verify workflow exists status is invalid, neither SUCCESS or WORKFLOW_NAME_EXIST."; logger.error(msg); throw new RuntimeException(msg); } @@ -310,13 +310,13 @@ public class PythonGateway { } /** - * create or update process definition schedule. + * create or update workflow schedule. * It would always use latest schedule define in workflow-as-code, and set schedule online when * it's not null * * @param user user who create or update schedule - * @param projectCode project which process definition belongs to - * @param processDefinitionCode process definition code + * @param projectCode project which workflow belongs to + * @param workflowCode workflow code * @param schedule schedule expression * @param workerGroup work group * @param warningType warning type @@ -324,24 +324,24 @@ public class PythonGateway { */ private void createOrUpdateSchedule(User user, long projectCode, - long processDefinitionCode, + long workflowCode, String schedule, String workerGroup, String warningType, int warningGroupId) { - Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(processDefinitionCode); + Schedule scheduleObj = scheduleMapper.queryByProcessDefinitionCode(workflowCode); // create or update schedule int scheduleId; if (scheduleObj == null) { - processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, + processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode, ReleaseState.ONLINE); - Map result = schedulerService.insertSchedule(user, projectCode, processDefinitionCode, + Map result = schedulerService.insertSchedule(user, projectCode, workflowCode, schedule, WarningType.valueOf(warningType), warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE); scheduleId = (int) result.get("scheduleId"); } else { scheduleId = scheduleObj.getId(); - processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinitionCode, + processDefinitionService.releaseProcessDefinition(user, projectCode, workflowCode, ReleaseState.OFFLINE); schedulerService.updateSchedule(user, projectCode, scheduleId, schedule, WarningType.valueOf(warningType), warningGroupId, DEFAULT_FAILURE_STRATEGY, DEFAULT_PRIORITY, workerGroup, DEFAULT_ENVIRONMENT_CODE); @@ -349,20 +349,20 @@ public class PythonGateway { schedulerService.setScheduleState(user, projectCode, scheduleId, ReleaseState.ONLINE); } - public void execProcessInstance(String userName, - String projectName, - String processDefinitionName, - String cronTime, - String workerGroup, - String warningType, - Integer warningGroupId, - Integer timeout) { + public void execWorkflowInstance(String userName, + String projectName, + String workflowName, + String cronTime, + String workerGroup, + String warningType, + Integer warningGroupId, + Integer timeout) { User user = usersService.queryUser(userName); Project project = projectMapper.queryByName(projectName); ProcessDefinition processDefinition = - processDefinitionMapper.queryByDefineName(project.getCode(), processDefinitionName); + processDefinitionMapper.queryByDefineName(project.getCode(), workflowName); - // make sure process definition online + // make sure workflow online processDefinitionService.releaseProcessDefinition(user, project.getCode(), processDefinition.getCode(), ReleaseState.ONLINE); @@ -391,7 +391,7 @@ public class PythonGateway { // side object /* * Grant project's permission to user. Use when project's created user not current but Python API use it to change - * process definition. + * workflow. */ private Integer grantProjectToUser(Project project, User user) { Date now = new Date(); @@ -512,31 +512,31 @@ public class PythonGateway { } /** - * Get processDefinition by given processDefinitionName name. It return map contain processDefinition id, name, code. - * Useful in Python API create subProcess task which need processDefinition information. + * Get workflow object by given workflow name. It returns map contain workflow id, name, code. + * Useful in Python API create subProcess task which need workflow information. * * @param userName user who create or update schedule - * @param projectName project name which process definition belongs to - * @param processDefinitionName process definition name + * @param projectName project name which workflow belongs to + * @param workflowName workflow name */ - public Map getProcessDefinitionInfo(String userName, String projectName, - String processDefinitionName) { + public Map getWorkflowInfo(String userName, String projectName, + String workflowName) { Map result = new HashMap<>(); User user = usersService.queryUser(userName); Project project = (Project) projectService.queryByName(user, projectName).get(Constants.DATA_LIST); long projectCode = project.getCode(); - ProcessDefinition processDefinition = getProcessDefinition(user, projectCode, processDefinitionName); - // get process definition info + ProcessDefinition processDefinition = getWorkflow(user, projectCode, workflowName); + // get workflow info if (processDefinition != null) { - // make sure process definition online + // make sure workflow online processDefinitionService.releaseProcessDefinition(user, projectCode, processDefinition.getCode(), ReleaseState.ONLINE); result.put("id", processDefinition.getId()); result.put("name", processDefinition.getName()); result.put("code", processDefinition.getCode()); } else { - String msg = String.format("Can not find valid process definition by name %s", processDefinitionName); + String msg = String.format("Can not find valid workflow by name %s", workflowName); logger.error(msg); throw new IllegalArgumentException(msg); } @@ -545,14 +545,14 @@ public class PythonGateway { } /** - * Get project, process definition, task code. - * Useful in Python API create dependent task which need processDefinition information. + * Get project, workflow, task code. + * Useful in Python API create dependent task which need workflow information. * - * @param projectName project name which process definition belongs to - * @param processDefinitionName process definition name + * @param projectName project name which workflow belongs to + * @param workflowName workflow name * @param taskName task name */ - public Map getDependentInfo(String projectName, String processDefinitionName, String taskName) { + public Map getDependentInfo(String projectName, String workflowName, String taskName) { Map result = new HashMap<>(); Project project = projectMapper.queryByName(projectName); @@ -565,9 +565,9 @@ public class PythonGateway { result.put("projectCode", projectCode); ProcessDefinition processDefinition = - processDefinitionMapper.queryByDefineName(projectCode, processDefinitionName); + processDefinitionMapper.queryByDefineName(projectCode, workflowName); if (processDefinition == null) { - String msg = String.format("Can not find valid process definition by name %s", processDefinitionName); + String msg = String.format("Can not find valid workflow by name %s", workflowName); logger.error(msg); throw new IllegalArgumentException(msg); } @@ -582,8 +582,8 @@ public class PythonGateway { } /** - * Get resource by given program type and full name. It return map contain resource id, name. - * Useful in Python API create flink or spark task which need processDefinition information. + * Get resource by given program type and full name. It returns map contain resource id, name. + * Useful in Python API create flink or spark task which need workflow information. * * @param programType program type one of SCALA, JAVA and PYTHON * @param fullName full name of the resource @@ -628,7 +628,7 @@ public class PythonGateway { /** * Get resource by given resource type and full name. It return map contain resource id, name. - * Useful in Python API create task which need processDefinition information. + * Useful in Python API create task which need workflow information. * * @param userName user who query resource * @param fullName full name of the resource