diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java index b3fe1a9eef633820250f0066443227c555a5c8bb..4f3dafdf272ec1f2d79a628e5b1ea099a604ad0f 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java @@ -94,6 +94,30 @@ public class ProcessDefinitionController extends BaseController { return returnDataList(result); } + /** + * copy process definition + * + * @param loginUser login user + * @param projectName project name + * @param processId process definition id + * @return copy result code + */ + @ApiOperation(value = "copyProcessDefinition", notes= "COPY_PROCESS_DEFINITION_NOTES") + @ApiImplicitParams({ + @ApiImplicitParam(name = "processId", value = "PROCESS_DEFINITION_ID", required = true, dataType = "Int", example = "100") + }) + @PostMapping(value = "/copy") + @ResponseStatus(HttpStatus.OK) + @ApiException(COPY_PROCESS_DEFINITION_ERROR) + public Result copyProcessDefinition(@ApiIgnore @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @ApiParam(name = "projectName", value = "PROJECT_NAME", required = true) @PathVariable String projectName, + @RequestParam(value = "processId", required = true) int processId) throws JsonProcessingException { + logger.info("copy process definition, login user:{}, project name:{}, process definition id:{}", + loginUser.getUserName(), projectName, processId); + Map result = processDefinitionService.copyProcessDefinition(loginUser, projectName, processId); + return returnDataList(result); + } + /** * verify process definition name unique * diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java index 00665aae715bbfca49ddea10ea742b4c46286e45..8c52dd4d50fc6c4484231b2ffb1f625fd2c09656 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java @@ -168,15 +168,13 @@ public enum Status { PREVIEW_SCHEDULE_ERROR(10139,"preview schedule error", "预览调度配置错误"), PARSE_TO_CRON_EXPRESSION_ERROR(10140,"parse cron to cron expression error", "解析调度表达式错误"), SCHEDULE_START_TIME_END_TIME_SAME(10141,"The start time must not be the same as the end", "开始时间不能和结束时间一样"), - DELETE_TENANT_BY_ID_FAIL(100142,"delete tenant by id fail, for there are {0} process instances in executing using it", "删除租户失败,有[{0}]个运行中的工作流实例正在使用"), - DELETE_TENANT_BY_ID_FAIL_DEFINES(100143,"delete tenant by id fail, for there are {0} process definitions using it", "删除租户失败,有[{0}]个工作流定义正在使用"), - DELETE_TENANT_BY_ID_FAIL_USERS(100144,"delete tenant by id fail, for there are {0} users using it", "删除租户失败,有[{0}]个用户正在使用"), - - DELETE_WORKER_GROUP_BY_ID_FAIL(100145,"delete worker group by id fail, for there are {0} process instances in executing using it", "删除Worker分组失败,有[{0}]个运行中的工作流实例正在使用"), - - QUERY_WORKER_GROUP_FAIL(100146,"query worker group fail ", "查询worker分组失败"), - DELETE_WORKER_GROUP_FAIL(100147,"delete worker group fail ", "删除worker分组失败"), - + DELETE_TENANT_BY_ID_FAIL(10142,"delete tenant by id fail, for there are {0} process instances in executing using it", "删除租户失败,有[{0}]个运行中的工作流实例正在使用"), + DELETE_TENANT_BY_ID_FAIL_DEFINES(10143,"delete tenant by id fail, for there are {0} process definitions using it", "删除租户失败,有[{0}]个工作流定义正在使用"), + DELETE_TENANT_BY_ID_FAIL_USERS(10144,"delete tenant by id fail, for there are {0} users using it", "删除租户失败,有[{0}]个用户正在使用"), + DELETE_WORKER_GROUP_BY_ID_FAIL(10145,"delete worker group by id fail, for there are {0} process instances in executing using it", "删除Worker分组失败,有[{0}]个运行中的工作流实例正在使用"), + QUERY_WORKER_GROUP_FAIL(10146,"query worker group fail ", "查询worker分组失败"), + DELETE_WORKER_GROUP_FAIL(10147,"delete worker group fail ", "删除worker分组失败"), + COPY_PROCESS_DEFINITION_ERROR(10148,"copy process definition error", "复制工作流错误"), UDF_FUNCTION_NOT_EXIST(20001, "UDF function not found", "UDF函数不存在"), UDF_FUNCTION_EXISTS(20002, "UDF function already exists", "UDF函数已存在"), diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java index 4081cab7323547205ecf6777737735dd688e2ea2..14cadbf18980b726ea68ae98e2aa56f3c8c83354 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java @@ -112,8 +112,13 @@ public class ProcessDefinitionService extends BaseDAGService { * @return create result code * @throws JsonProcessingException JsonProcessingException */ - public Map createProcessDefinition(User loginUser, String projectName, String name, - String processDefinitionJson, String desc, String locations, String connects) throws JsonProcessingException { + public Map createProcessDefinition(User loginUser, + String projectName, + String name, + String processDefinitionJson, + String desc, + String locations, + String connects) throws JsonProcessingException { Map result = new HashMap<>(5); Project project = projectMapper.queryByName(projectName); @@ -281,6 +286,41 @@ public class ProcessDefinitionService extends BaseDAGService { return result; } + /** + * copy process definition + * + * @param loginUser login user + * @param projectName project name + * @param processId process definition id + * @return copy result code + */ + public Map copyProcessDefinition(User loginUser, String projectName, Integer processId) throws JsonProcessingException{ + + Map result = new HashMap<>(5); + Project project = projectMapper.queryByName(projectName); + + Map checkResult = projectService.checkProjectAndAuth(loginUser, project, projectName); + Status resultStatus = (Status) checkResult.get(Constants.STATUS); + if (resultStatus != Status.SUCCESS) { + return checkResult; + } + + ProcessDefinition processDefinition = processDefineMapper.selectById(processId); + if (processDefinition == null) { + putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId); + return result; + } else { + return createProcessDefinition( + loginUser, + projectName, + processDefinition.getName()+"_copy_"+System.currentTimeMillis(), + processDefinition.getProcessDefinitionJson(), + processDefinition.getDescription(), + processDefinition.getLocations(), + processDefinition.getConnects()); + } + } + /** * update process definition * diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages.properties b/dolphinscheduler-api/src/main/resources/i18n/messages.properties index c4ca13168dd90f233a37d41afaf78071cd3bd97c..369e5e3c725dcc41c88884f53b464d178bd61d4b 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages.properties @@ -173,6 +173,7 @@ PROCESS_DEFINITION_ID=process definition id PROCESS_DEFINITION_IDS=process definition ids RELEASE_PROCESS_DEFINITION_NOTES=release process definition QUERY_PROCESS_DEFINITION_BY_ID_NOTES=query process definition by id +COPY_PROCESS_DEFINITION_NOTES=copy process definition QUERY_PROCESS_DEFINITION_LIST_NOTES=query process definition list QUERY_PROCESS_DEFINITION_LIST_PAGING_NOTES=query process definition list paging QUERY_ALL_DEFINITION_LIST_NOTES=query all definition list diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties index e0c1c286d13c6a3d245c97800a7118b5f9db08e8..92df7426136b91ff0c65f21bdc60363bae54f7b5 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages_en_US.properties @@ -173,6 +173,7 @@ PROCESS_DEFINITION_ID=process definition id PROCESS_DEFINITION_IDS=process definition ids RELEASE_PROCESS_DEFINITION_NOTES=release process definition QUERY_PROCESS_DEFINITION_BY_ID_NOTES=query process definition by id +COPY_PROCESS_DEFINITION_NOTES=copy process definition QUERY_PROCESS_DEFINITION_LIST_NOTES=query process definition list QUERY_PROCESS_DEFINITION_LIST_PAGING_NOTES=query process definition list paging QUERY_ALL_DEFINITION_LIST_NOTES=query all definition list diff --git a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties index af7fde50686346beb160637065efe6064d0a9177..3b427912b54914837d1c88945cd88b115ee0e11d 100644 --- a/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties +++ b/dolphinscheduler-api/src/main/resources/i18n/messages_zh_CN.properties @@ -171,6 +171,7 @@ UPDATE_PROCESS_DEFINITION_NOTES=更新流程定义 PROCESS_DEFINITION_ID=流程定义ID RELEASE_PROCESS_DEFINITION_NOTES=发布流程定义 QUERY_PROCESS_DEFINITION_BY_ID_NOTES=查询流程定义通过流程定义ID +COPY_PROCESS_DEFINITION_NOTES=复制流程定义 QUERY_PROCESS_DEFINITION_LIST_NOTES=查询流程定义列表 QUERY_PROCESS_DEFINITION_LIST_PAGING_NOTES=分页查询流程定义列表 QUERY_ALL_DEFINITION_LIST_NOTES=查询所有流程定义 diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java index c028dd41671ec9c1e2214111b2d56c38b2475cac..a69df9744e0652e799d0ba50460c57aa7e72aac7 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java @@ -174,6 +174,21 @@ public class ProcessDefinitionControllerTest{ Assert.assertEquals(Status.SUCCESS.getCode(),response.getCode().intValue()); } + @Test + public void testCopyProcessDefinition() throws Exception { + + String projectName = "test"; + int id = 1; + + Map result = new HashMap<>(5); + putMsg(result, Status.SUCCESS); + + Mockito.when(processDefinitionService.copyProcessDefinition(user, projectName,id)).thenReturn(result); + Result response = processDefinitionController.copyProcessDefinition(user, projectName,id); + + Assert.assertEquals(Status.SUCCESS.getCode(),response.getCode().intValue()); + } + @Test public void testQueryProcessDefinitionList() throws Exception { diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java index a0da2289dc46e9dfa875a0e8fbd5e965f6059c64..5a03cdb2687fa4a70df2689a2a891496c7c1517a 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java @@ -198,6 +198,47 @@ public class ProcessDefinitionServiceTest { Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); } + @Test + public void testCopyProcessDefinition() throws Exception{ + String projectName = "project_test1"; + Mockito.when(projectMapper.queryByName(projectName)).thenReturn(getProject(projectName)); + + Project project = getProject(projectName); + + User loginUser = new User(); + loginUser.setId(-1); + loginUser.setUserType(UserType.GENERAL_USER); + + Map result = new HashMap<>(5); + //project check auth success, instance not exist + putMsg(result, Status.SUCCESS, projectName); + Mockito.when(projectService.checkProjectAndAuth(loginUser,project,projectName)).thenReturn(result); + + ProcessDefinition definition = getProcessDefinition(); + definition.setLocations("{\"tasks-36196\":{\"name\":\"ssh_test1\",\"targetarr\":\"\",\"x\":141,\"y\":70}}"); + definition.setProcessDefinitionJson("{\"globalParams\":[],\"tasks\":[{\"type\":\"SHELL\",\"id\":\"tasks-36196\",\"name\":\"ssh_test1\",\"params\":{\"resourceList\":[],\"localParams\":[],\"rawScript\":\"aa=\\\"1234\\\"\\necho ${aa}\"},\"desc\":\"\",\"runFlag\":\"NORMAL\",\"dependence\":{},\"maxRetryTimes\":\"0\",\"retryInterval\":\"1\",\"timeout\":{\"strategy\":\"\",\"interval\":null,\"enable\":false},\"taskInstancePriority\":\"MEDIUM\",\"workerGroupId\":-1,\"preTasks\":[]}],\"tenantId\":-1,\"timeout\":0}"); + definition.setConnects("[]"); + //instance exit + Mockito.when(processDefineMapper.selectById(46)).thenReturn(definition); + + Map createProcessResult = new HashMap<>(5); + putMsg(result, Status.SUCCESS); + + Mockito.when(processDefinitionService.createProcessDefinition( + loginUser, + definition.getProjectName(), + definition.getName(), + definition.getProcessDefinitionJson(), + definition.getDescription(), + definition.getLocations(), + definition.getConnects())).thenReturn(createProcessResult); + + Map successRes = processDefinitionService.copyProcessDefinition(loginUser, + "project_test1", 46); + + Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS)); + } + @Test public void deleteProcessDefinitionByIdTest() throws Exception { String projectName = "project_test1"; @@ -770,12 +811,14 @@ public class ProcessDefinitionServiceTest { * @return ProcessDefinition */ private ProcessDefinition getProcessDefinition(){ + ProcessDefinition processDefinition = new ProcessDefinition(); processDefinition.setId(46); processDefinition.setName("test_pdf"); processDefinition.setProjectId(2); processDefinition.setTenantId(1); processDefinition.setDescription(""); + return processDefinition; } diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java index a0fee7c36e84d7f75b6b317ff7a9e1f0595a74d0..78ba3a6b44138734cdd8471a6a4d334dda1d8562 100644 --- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java +++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackServiceTest.java @@ -133,8 +133,6 @@ public class TaskCallbackServiceTest { nettyRemotingClient.close(); } - - @Test(expected = IllegalArgumentException.class) public void testSendAckWithIllegalArgumentException(){ TaskExecuteAckCommand ackCommand = Mockito.mock(TaskExecuteAckCommand.class); @@ -178,39 +176,40 @@ public class TaskCallbackServiceTest { } } - @Test(expected = IllegalStateException.class) - public void testSendAckWithIllegalStateException2(){ - masterRegistry.registry(); - final NettyServerConfig serverConfig = new NettyServerConfig(); - serverConfig.setListenPort(30000); - NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); - nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor); - nettyRemotingServer.start(); - - final NettyClientConfig clientConfig = new NettyClientConfig(); - NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig); - Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000")); - taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1)); - channel.close(); - TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand(); - ackCommand.setTaskInstanceId(1); - ackCommand.setStartTime(new Date()); +// @Test(expected = IllegalStateException.class) +// public void testSendAckWithIllegalStateException2(){ +// masterRegistry.registry(); +// final NettyServerConfig serverConfig = new NettyServerConfig(); +// serverConfig.setListenPort(30000); +// NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(serverConfig); +// nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_ACK, taskAckProcessor); +// nettyRemotingServer.start(); +// +// final NettyClientConfig clientConfig = new NettyClientConfig(); +// NettyRemotingClient nettyRemotingClient = new NettyRemotingClient(clientConfig); +// Channel channel = nettyRemotingClient.getChannel(Host.of("localhost:30000")); +// taskCallbackService.addRemoteChannel(1, new NettyRemoteChannel(channel, 1)); +// channel.close(); +// TaskExecuteAckCommand ackCommand = new TaskExecuteAckCommand(); +// ackCommand.setTaskInstanceId(1); +// ackCommand.setStartTime(new Date()); +// +// nettyRemotingServer.close(); +// +// taskCallbackService.sendAck(1, ackCommand.convert2Command()); +// try { +// Thread.sleep(5000); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// +// Stopper.stop(); +// +// try { +// Thread.sleep(5000); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// } - nettyRemotingServer.close(); - - taskCallbackService.sendAck(1, ackCommand.convert2Command()); - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - Stopper.stop(); - - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } } diff --git a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue index 53939f3f7bdbee217bf4f7507bae7d0f9805c330..95bdc2930cff3dfe6ca81ec88d9157720d82d37f 100644 --- a/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue +++ b/dolphinscheduler-ui/src/js/conf/home/pages/projects/pages/definition/pages/list/_source/list.vue @@ -46,7 +46,7 @@ {{$t('Timing state')}} - + {{$t('Operation')}} @@ -90,6 +90,7 @@ + v.code === code)[0].desc }, @@ -306,6 +307,21 @@ releaseState: 1 }) }, + /** + * copy + */ + _copyProcess (item) { + this.copyProcess({ + processId: item.id + }).then(res => { + this.$message.success(res.msg) + $('body').find('.tooltip.fade.top.in').remove() + this._onUpdate() + }).catch(e => { + this.$message.error(e.msg || '') + }) + }, + _export (item) { this.exportDefinition({ processDefinitionId: item.id, diff --git a/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js b/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js index a63c9edb8c5a6614dbcd973a0f22b1e027d0873e..f282c8e30a15ed976ab076a15fd4d7e3ac9a3d16 100644 --- a/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js +++ b/dolphinscheduler-ui/src/js/conf/home/store/dag/actions.js @@ -90,6 +90,7 @@ export default { }) }) }, + /** * Get process definition DAG diagram details */ @@ -127,6 +128,22 @@ export default { }) }) }, + +/** + * Get process definition DAG diagram details + */ + copyProcess ({ state }, payload) { + return new Promise((resolve, reject) => { + io.post(`projects/${state.projectName}/process/copy`, { + processId: payload.processId + }, res => { + resolve(res) + }).catch(e => { + reject(e) + }) + }) + }, + /** * Get the process instance DAG diagram details */ diff --git a/pom.xml b/pom.xml index 8f81e2aea98e8aa9f81892e35bcf730d75a40b12..0647724ed06093311d144d817036c6b48967c083 100644 --- a/pom.xml +++ b/pom.xml @@ -764,6 +764,7 @@ **/common/utils/HadoopUtilsTest.java **/common/utils/HttpUtilsTest.java **/common/ConstantsTest.java + **/common/utils/HadoopUtils.java **/dao/mapper/AccessTokenMapperTest.java **/dao/mapper/AlertGroupMapperTest.java **/dao/mapper/CommandMapperTest.java