diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java index 693a8585d8efeec3840a911192e77cfc38f5a657..5f424e1ad0b60babe2e76b7bdf2b5526df0c2bf2 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java @@ -205,7 +205,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } int saveResult = processService.saveProcessDefinition(loginUser, project, processDefinitionName, desc, - locations, connects, processData, processDefinition); + locations, connects, processData, processDefinition, true); if (saveResult > 0) { putMsg(result, Status.SUCCESS); @@ -414,7 +414,7 @@ public class ProcessDefinitionServiceImpl extends BaseServiceImpl implements Pro } ProcessData newProcessData = JSONUtils.parseObject(processDefinitionJson, ProcessData.class); int saveResult = processService.saveProcessDefinition(loginUser, project, name, desc, - locations, connects, newProcessData, processDefinition); + locations, connects, newProcessData, processDefinition, true); if (saveResult > 0) { putMsg(result, Status.SUCCESS); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java index 67d6183571d3ffd538cacf5101ac0115deb3c43e..1a2841126fe6f63c029bbbf74f475468f993e0af 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java @@ -52,7 +52,6 @@ import org.apache.dolphinscheduler.common.utils.StringUtils; import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils; import org.apache.dolphinscheduler.dao.entity.ProcessData; import org.apache.dolphinscheduler.dao.entity.ProcessDefinition; -import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog; import org.apache.dolphinscheduler.dao.entity.ProcessInstance; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog; @@ -72,7 +71,6 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; -import java.text.ParseException; import java.util.ArrayList; import java.util.Collections; import java.util.Date; @@ -425,12 +423,12 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce * @param locations locations * @param connects connects * @return update result code - * @throws ParseException parse exception for json parse */ + @Transactional @Override public Map updateProcessInstance(User loginUser, String projectName, Integer processInstanceId, String processInstanceJson, String scheduleTime, Boolean syncDefine, - Flag flag, String locations, String connects) throws ParseException { + Flag flag, String locations, String connects) { Map result = new HashMap<>(); Project project = projectMapper.queryByName(projectName); //check project permission @@ -461,10 +459,10 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce } Tenant tenant = processService.getTenantForProcess(processData.getTenantId(), processDefinition.getUserId()); - setProcessInstance(processInstance, tenant, scheduleTime, locations, - connects, processInstanceJson, processData); + setProcessInstance(processInstance, tenant, scheduleTime, processData); int updateDefine = 1; if (Boolean.TRUE.equals(syncDefine)) { + processDefinition.setId(processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode()).getId()); updateDefine = syncDefinition(loginUser, project, locations, connects, processInstance, processDefinition, processData); @@ -495,37 +493,29 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce processDefinition.setTimeout(processInstance.getTimeout()); processDefinition.setUpdateTime(new Date()); - int updateDefine = processService.saveProcessDefinition(loginUser, project, processDefinition.getName(), + return processService.saveProcessDefinition(loginUser, project, processDefinition.getName(), processDefinition.getDescription(), locations, connects, - processData, processDefinition); - return updateDefine; + processData, processDefinition, false); } /** * update process instance attributes - * - * @return false if check failed or */ - private void setProcessInstance(ProcessInstance processInstance, Tenant tenant, - String scheduleTime, String locations, String connects, String processInstanceJson, - ProcessData processData) { + private void setProcessInstance(ProcessInstance processInstance, Tenant tenant, String scheduleTime, ProcessData processData) { Date schedule = processInstance.getScheduleTime(); if (scheduleTime != null) { schedule = DateUtils.getScheduleDate(scheduleTime); } processInstance.setScheduleTime(schedule); - processInstance.setLocations(locations); - processInstance.setConnects(connects); - if (StringUtils.isNotEmpty(processInstanceJson)) { - return; - } List globalParamList = processData.getGlobalParams(); - Map globalParamMap = Optional.ofNullable(globalParamList).orElse(Collections.emptyList()).stream().collect(Collectors.toMap(Property::getProp, Property::getValue)); + Map globalParamMap = Optional.ofNullable(globalParamList) + .orElse(Collections.emptyList()) + .stream() + .collect(Collectors.toMap(Property::getProp, Property::getValue)); String globalParams = ParameterUtils.curingGlobalParams(globalParamMap, globalParamList, processInstance.getCmdTypeIfComplement(), schedule); - int timeout = processData.getTimeout(); - processInstance.setTimeout(timeout); + processInstance.setTimeout(processData.getTimeout()); if (tenant != null) { processInstance.setTenantCode(tenant.getTenantCode()); } @@ -706,13 +696,10 @@ public class ProcessInstanceServiceImpl extends BaseServiceImpl implements Proce throw new RuntimeException("workflow instance is null"); } - ProcessDefinitionLog processDefinitionLog = processDefinitionLogMapper.queryByDefinitionCodeAndVersion( + ProcessDefinition processDefinition = processDefinitionLogMapper.queryByDefinitionCodeAndVersion( processInstance.getProcessDefinitionCode(), processInstance.getProcessDefinitionVersion() ); - ProcessDefinition processDefinition = JSONUtils.parseObject(JSONUtils.toJsonString(processDefinitionLog), - ProcessDefinition.class); - GanttDto ganttDto = new GanttDto(); DAG dag = processService.genDagGraph(processDefinition); //topological sort diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index 2a4443d4929f9c3fe4ade1e939cf0e9af27f0237..247067642d865b6c846181cdc18eda5eeb054aa1 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -456,7 +456,8 @@ public class ProcessDefinitionServiceTest { , Mockito.anyString() , Mockito.anyString() , Mockito.any(ProcessData.class) - , Mockito.any(ProcessDefinition.class))) + , Mockito.any(ProcessDefinition.class) + ,true)) .thenReturn(1); Mockito.when(processService.genProcessData(Mockito.any())).thenReturn(getProcessData()); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java index e85449dc77ee2a724cb481a4f1a1355ff6647ee5..3656ab28f37962e79535d647007b0f3a7af62935 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java @@ -409,7 +409,7 @@ public class ProcessInstanceServiceTest { when(processDefineMapper.updateById(processDefinition)).thenReturn(1); when(processService.saveProcessDefinition(Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), - Mockito.anyString(), Mockito.any(), Mockito.any())).thenReturn(1); + Mockito.anyString(), Mockito.any(), Mockito.any(), true)).thenReturn(1); putMsg(result, Status.SUCCESS, projectName); Map successRes = processInstanceService.updateProcessInstance(loginUser, projectName, 1, diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java index 44fa49484d7e4ca5a83952a494bc26ee9ce0b31a..89a89be4c5e114fa7dbdd95376edf6f6b05f0691 100644 --- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java +++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java @@ -1586,32 +1586,6 @@ public class ProcessService { return processInstanceMapper.updateById(processInstance); } - /** - * update the process instance - * - * @param processInstanceId processInstanceId - * @param processJson processJson - * @param globalParams globalParams - * @param scheduleTime scheduleTime - * @param flag flag - * @param locations locations - * @param connects connects - * @return update process instance result - */ - public int updateProcessInstance(Integer processInstanceId, String processJson, - String globalParams, Date scheduleTime, Flag flag, - String locations, String connects) { - ProcessInstance processInstance = processInstanceMapper.queryDetailById(processInstanceId); - if (processInstance != null) { - processInstance.setGlobalParams(globalParams); - processInstance.setScheduleTime(scheduleTime); - processInstance.setLocations(locations); - processInstance.setConnects(connects); - return processInstanceMapper.updateById(processInstance); - } - return 0; - } - /** * change task state * @@ -2163,13 +2137,13 @@ public class ProcessService { /** * switch process definition version to process definition log version */ - public int processDefinitionToDB(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) { + public int processDefinitionToDB(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog, Boolean isFromProcessDefine) { if (null == processDefinition || null == processDefinitionLog) { return Constants.DEFINITION_FAILURE; } processDefinitionLog.setId(processDefinition.getId()); - processDefinitionLog.setReleaseState(ReleaseState.OFFLINE); + processDefinitionLog.setReleaseState(isFromProcessDefine ? ReleaseState.OFFLINE : ReleaseState.ONLINE); processDefinitionLog.setFlag(Flag.YES); int result; @@ -2185,7 +2159,7 @@ public class ProcessService { * switch process definition version to process definition log version */ public int switchVersion(ProcessDefinition processDefinition, ProcessDefinitionLog processDefinitionLog) { - int switchResult = processDefinitionToDB(processDefinition, processDefinitionLog); + int switchResult = processDefinitionToDB(processDefinition, processDefinitionLog, true); if (switchResult != Constants.DEFINITION_FAILURE) { switchProcessTaskRelationVersion(processDefinition); } @@ -2266,14 +2240,15 @@ public class ProcessService { * save processDefinition (including create or update processDefinition) */ public int saveProcessDefinition(User operator, Project project, String name, String desc, String locations, - String connects, ProcessData processData, ProcessDefinition processDefinition) { + String connects, ProcessData processData, ProcessDefinition processDefinition, + Boolean isFromProcessDefine) { ProcessDefinitionLog processDefinitionLog = insertProcessDefinitionLog(operator, processDefinition.getCode(), name, processData, project, desc, locations, connects); - Map taskDefinitionMap = handleTaskDefinition(operator, project.getCode(), processData.getTasks()); + Map taskDefinitionMap = handleTaskDefinition(operator, project.getCode(), processData.getTasks(), isFromProcessDefine); if (Constants.DEFINITION_FAILURE == handleTaskRelation(operator, project.getCode(), processDefinitionLog, processData.getTasks(), taskDefinitionMap)) { return Constants.DEFINITION_FAILURE; } - return processDefinitionToDB(processDefinition, processDefinitionLog); + return processDefinitionToDB(processDefinition, processDefinitionLog, isFromProcessDefine); } /** @@ -2319,7 +2294,7 @@ public class ProcessService { /** * handle task definition */ - public Map handleTaskDefinition(User operator, Long projectCode, List taskNodes) { + public Map handleTaskDefinition(User operator, Long projectCode, List taskNodes, Boolean isFromProcessDefine) { if (taskNodes == null) { return null; } @@ -2336,7 +2311,7 @@ public class ProcessService { } saveTaskDefinition(operator, projectCode, taskNode, taskDefinition); } else { - if (isTaskOnline(taskDefinition.getCode())) { + if (isFromProcessDefine && isTaskOnline(taskDefinition.getCode())) { throw new ServiceException(String.format("The task %s is on line in process", taskNode.getName())); } updateTaskDefinition(operator, projectCode, taskNode, taskDefinition); diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java index 3bee978a3033f8656358fc264dfc5b5c8fbc71df..c8cb176c25e8b1443da90f0b61663f9382b3f389 100644 --- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java +++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/process/ProcessServiceTest.java @@ -361,7 +361,7 @@ public class ProcessServiceTest { Mockito.when(processDefineMapper.updateById(any())).thenReturn(1); Mockito.when(processDefineLogMapper.insert(any())).thenReturn(1); - int i = processService.saveProcessDefinition(user, project, "name", "desc", "locations", "connects", processData, processDefinition); + int i = processService.saveProcessDefinition(user, project, "name", "desc", "locations", "connects", processData, processDefinition, true); Assert.assertEquals(1, i); } diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql index 5997046f41b7d3040f140b6ba8a6fe7bde9a39c5..cc1d70a205e31d01967de9808fdadc57128a3ff3 100644 --- a/sql/dolphinscheduler_mysql.sql +++ b/sql/dolphinscheduler_mysql.sql @@ -455,7 +455,7 @@ CREATE TABLE `t_ds_task_definition` ( `description` text COMMENT 'description', `project_code` bigint(20) NOT NULL COMMENT 'project code', `user_id` int(11) DEFAULT NULL COMMENT 'task definition creator id', - `task_type` varchar(50) DEFAULT NOT NULL COMMENT 'task type', + `task_type` varchar(50) NOT NULL COMMENT 'task type', `task_params` text COMMENT 'job custom parameters', `flag` tinyint(2) DEFAULT NULL COMMENT '0 not available, 1 available', `task_priority` tinyint(4) DEFAULT NULL COMMENT 'job priority', @@ -484,7 +484,7 @@ CREATE TABLE `t_ds_task_definition_log` ( `description` text COMMENT 'description', `project_code` bigint(20) NOT NULL COMMENT 'project code', `user_id` int(11) DEFAULT NULL COMMENT 'task definition creator id', - `task_type` varchar(50) DEFAULT NOT NULL COMMENT 'task type', + `task_type` varchar(50) NOT NULL COMMENT 'task type', `task_params` text COMMENT 'job custom parameters', `flag` tinyint(2) DEFAULT NULL COMMENT '0 not available, 1 available', `task_priority` tinyint(4) DEFAULT NULL COMMENT 'job priority', @@ -546,32 +546,6 @@ CREATE TABLE `t_ds_process_task_relation_log` ( PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; --- ---------------------------- --- Table structure for t_ds_process_definition_version --- ---------------------------- -DROP TABLE IF EXISTS `t_ds_process_definition_version`; -CREATE TABLE `t_ds_process_definition_version` ( - `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key', - `process_definition_id` int(11) NOT NULL COMMENT 'process definition id', - `version` int(11) DEFAULT NULL COMMENT 'process definition version', - `process_definition_json` longtext COMMENT 'process definition json content', - `description` text, - `global_params` text COMMENT 'global parameters', - `locations` text COMMENT 'Node location information', - `connects` text COMMENT 'Node connection information', - `warning_group_id` int(11) DEFAULT NULL COMMENT 'alert group id', - `create_time` datetime DEFAULT NULL COMMENT 'create time', - `timeout` int(11) DEFAULT '0' COMMENT 'time out', - `resource_ids` varchar(255) DEFAULT NULL COMMENT 'resource ids', - PRIMARY KEY (`id`), - UNIQUE KEY `process_definition_id_and_version` (`process_definition_id`,`version`) USING BTREE, - KEY `process_definition_index` (`id`) USING BTREE -) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; - --- ---------------------------- --- Records of t_ds_process_definition --- ---------------------------- - -- ---------------------------- -- Table structure for t_ds_process_instance -- ---------------------------- @@ -814,7 +788,7 @@ DROP TABLE IF EXISTS `t_ds_task_instance`; CREATE TABLE `t_ds_task_instance` ( `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'key', `name` varchar(255) DEFAULT NULL COMMENT 'task name', - `task_type` varchar(50) DEFAULT NOT NULL COMMENT 'task type', + `task_type` varchar(50) NOT NULL COMMENT 'task type', `task_code` bigint(20) NOT NULL COMMENT 'task definition code', `task_definition_version` int(11) DEFAULT NULL COMMENT 'task definition version', `process_instance_id` int(11) DEFAULT NULL COMMENT 'process instance id', diff --git a/sql/dolphinscheduler_postgre.sql b/sql/dolphinscheduler_postgre.sql index ae12df56cb11d95fb9304d0e37f5f1d8e74e03a6..37b07f3e491329b9954f762a166f40bd25e963f9 100644 --- a/sql/dolphinscheduler_postgre.sql +++ b/sql/dolphinscheduler_postgre.sql @@ -428,29 +428,6 @@ CREATE TABLE t_ds_process_task_relation_log ( PRIMARY KEY (id) ) ; --- --- Table structure for table t_ds_process_definition_version --- - -DROP TABLE IF EXISTS t_ds_process_definition_version; -CREATE TABLE t_ds_process_definition_version ( - id int NOT NULL , - process_definition_id int NOT NULL , - version int DEFAULT NULL , - process_definition_json text , - description text , - global_params text , - locations text , - connects text , - warning_group_id int4 DEFAULT NULL, - create_time timestamp DEFAULT NULL , - timeout int DEFAULT '0' , - resource_ids varchar(64), - PRIMARY KEY (id) -) ; - -create index process_definition_id_and_version on t_ds_process_definition_version (process_definition_id,version); - -- -- Table structure for table t_ds_process_instance -- @@ -834,9 +811,6 @@ ALTER TABLE t_ds_process_task_relation ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds DROP SEQUENCE IF EXISTS t_ds_process_task_relation_log_id_sequence; CREATE SEQUENCE t_ds_process_task_relation_log_id_sequence; ALTER TABLE t_ds_process_task_relation_log ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_process_task_relation_log_id_sequence'); -DROP SEQUENCE IF EXISTS t_ds_process_definition_version_id_sequence; -CREATE SEQUENCE t_ds_process_definition_version_id_sequence; -ALTER TABLE t_ds_process_definition_version ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_process_definition_version_id_sequence'); DROP SEQUENCE IF EXISTS t_ds_process_instance_id_sequence; CREATE SEQUENCE t_ds_process_instance_id_sequence; ALTER TABLE t_ds_process_instance ALTER COLUMN id SET DEFAULT NEXTVAL('t_ds_process_instance_id_sequence');