diff --git a/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java b/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java index b6d3d4475f6f005c138c725f5802b82756d841f8..a3c1e40cfd2726b3aab7b53363a79c55331bfb6b 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/controller/ExecutorController.java @@ -149,10 +149,11 @@ public class ExecutorController extends BaseController { @GetMapping(value = "/get-receiver-cc") @ResponseStatus(HttpStatus.OK) public Result getReceiverCc(@RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @RequestParam(value = "processDefinitionId") int processDefinitionId){ + @RequestParam(value = "processDefinitionId",required = false) Integer processDefinitionId, + @RequestParam(value = "processInstanceId",required = false) Integer processInstanceId) { logger.info("login user {}, get process definition receiver and cc", loginUser.getUserName()); try { - Map result = execService.getReceiverCc(processDefinitionId); + Map result = execService.getReceiverCc(processDefinitionId,processInstanceId); return returnDataList(result); } catch (Exception e) { logger.error(QUERY_RECIPIENTS_AND_COPYERS_BY_PROCESS_DEFINITION_ERROR.getMsg(),e); diff --git a/escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java b/escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java index ee57ff9c9111c0985465863b03bef0272460d3a0..640ffeb5f83fcdd624ef158c70ca9976f74d5ad7 100644 --- a/escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java +++ b/escheduler-api/src/main/java/cn/escheduler/api/service/ExecutorService.java @@ -361,18 +361,29 @@ public class ExecutorService extends BaseService{ } /** - * query recipients and copyers by process definition id + * query recipients and copyers by process definition id or processInstanceId * * @param processDefineId * @return */ - public Map getReceiverCc(int processDefineId) { + public Map getReceiverCc(Integer processDefineId,Integer processInstanceId) { Map result = new HashMap<>(); - + logger.info("processInstanceId {}",processInstanceId); + if(processDefineId == null && processInstanceId == null){ + throw new RuntimeException("You must set values for parameters processDefineId or processInstanceId"); + } + if(processDefineId == null && processInstanceId != null) { + ProcessInstance processInstance = processInstanceMapper.queryById(processInstanceId); + if (processInstance == null) { + throw new RuntimeException("processInstanceId is not exists"); + } + processDefineId = processInstance.getProcessDefinitionId(); + } ProcessDefinition processDefinition = processDefinitionMapper.queryByDefineId(processDefineId); if (processDefinition == null){ - throw new RuntimeException("processDefineId is not exists"); + throw new RuntimeException(String.format("processDefineId %d is not exists",processDefineId)); } + String receivers = processDefinition.getReceivers(); String receiversCc = processDefinition.getReceiversCc(); Map dataMap = new HashMap<>(); diff --git a/escheduler-api/src/test/java/cn/escheduler/api/controller/ExecutorControllerTest.java b/escheduler-api/src/test/java/cn/escheduler/api/controller/ExecutorControllerTest.java index e5e14dc95314b6b95e01313a9dfee10904bbe9a0..a7d4cf9e203f738f9f427c6b0f9f7890575718cc 100644 --- a/escheduler-api/src/test/java/cn/escheduler/api/controller/ExecutorControllerTest.java +++ b/escheduler-api/src/test/java/cn/escheduler/api/controller/ExecutorControllerTest.java @@ -32,8 +32,11 @@ import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.MvcResult; import org.springframework.test.web.servlet.setup.MockMvcBuilders; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; import org.springframework.web.context.WebApplicationContext; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.get; import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; @@ -66,4 +69,21 @@ public class ExecutorControllerTest { Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue()); logger.info(mvcResult.getResponse().getContentAsString()); } + + @Test + public void getReceiverCc() throws Exception { + MultiValueMap paramsMap = new LinkedMultiValueMap<>(); + //paramsMap.add("processDefinitionId","4"); + paramsMap.add("processInstanceId","13"); + //paramsMap.add("processInstanceId","13"); + MvcResult mvcResult = mockMvc.perform(get("/projects/{projectName}/executors/get-receiver-cc","li_sql_test") + .header("sessionId", "e79b3353-e227-4680-88c0-544194e64025") + .params(paramsMap)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8)) + .andReturn(); + Result result = JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(), Result.class); + Assert.assertEquals(Status.SUCCESS.getCode(),result.getCode().intValue()); + logger.info(mvcResult.getResponse().getContentAsString()); + } } \ No newline at end of file diff --git a/escheduler-common/src/main/java/cn/escheduler/common/task/sql/SqlParameters.java b/escheduler-common/src/main/java/cn/escheduler/common/task/sql/SqlParameters.java index 4feb7037f09e3f72022248e753c1d89f1b857186..a30fdd11704a6ea10e05668bb66ae56ec1d20c02 100644 --- a/escheduler-common/src/main/java/cn/escheduler/common/task/sql/SqlParameters.java +++ b/escheduler-common/src/main/java/cn/escheduler/common/task/sql/SqlParameters.java @@ -65,6 +65,16 @@ public class SqlParameters extends AbstractParameters { */ private String connParams; + /** + * receivers + */ + private String receivers; + + /** + * receivers cc + */ + private String receiversCc; + public String getType() { return type; } @@ -121,6 +131,21 @@ public class SqlParameters extends AbstractParameters { this.connParams = connParams; } + public String getReceivers() { + return receivers; + } + + public void setReceivers(String receivers) { + this.receivers = receivers; + } + + public String getReceiversCc() { + return receiversCc; + } + + public void setReceiversCc(String receiversCc) { + this.receiversCc = receiversCc; + } @Override public boolean checkParameters() { @@ -142,6 +167,8 @@ public class SqlParameters extends AbstractParameters { ", udfs='" + udfs + '\'' + ", showType='" + showType + '\'' + ", connParams='" + connParams + '\'' + + ", receivers='" + receivers + '\'' + + ", receiversCc='" + receiversCc + '\'' + '}'; } } diff --git a/escheduler-dao/readme.txt b/escheduler-dao/readme.txt deleted file mode 100644 index 6e7acb635a6a15161ce1e9be0bec19f5d9fd9104..0000000000000000000000000000000000000000 --- a/escheduler-dao/readme.txt +++ /dev/null @@ -1,54 +0,0 @@ --- 用户指定队列 -alter table t_escheduler_user add queue varchar(64); - --- 访问token -CREATE TABLE `t_escheduler_access_token` ( - `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键', - `user_id` int(11) DEFAULT NULL COMMENT '用户id', - `token` varchar(64) DEFAULT NULL COMMENT 'token令牌', - `expire_time` datetime DEFAULT NULL COMMENT 'token有效结束时间', - `create_time` datetime DEFAULT NULL COMMENT '创建时间', - `update_time` datetime DEFAULT NULL COMMENT '更新时间', - PRIMARY KEY (`id`) -) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8; - -CREATE TABLE `t_escheduler_error_command` ( - `id` int(11) NOT NULL COMMENT '主键', - `command_type` tinyint(4) NULL DEFAULT NULL COMMENT '命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程', - `executor_id` int(11) NULL DEFAULT NULL COMMENT '命令执行者', - `process_definition_id` int(11) NULL DEFAULT NULL COMMENT '流程定义id', - `command_param` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '命令的参数(json格式)', - `task_depend_type` tinyint(4) NULL DEFAULT NULL COMMENT '节点依赖类型', - `failure_strategy` tinyint(4) NULL DEFAULT 0 COMMENT '失败策略:0结束,1继续', - `warning_type` tinyint(4) NULL DEFAULT 0 COMMENT '告警类型', - `warning_group_id` int(11) NULL DEFAULT NULL COMMENT '告警组', - `schedule_time` datetime(0) NULL DEFAULT NULL COMMENT '预期运行时间', - `start_time` datetime(0) NULL DEFAULT NULL COMMENT '开始时间', - `update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新时间', - `dependence` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '依赖字段', - `process_instance_priority` int(11) NULL DEFAULT NULL COMMENT '流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest', - `message` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '执行信息', - PRIMARY KEY (`id`) USING BTREE -) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; - - -CREATE TABLE `t_escheduler_worker_group` ( - `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'id', - `name` varchar(256) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL COMMENT '组名称', - `ip_list` varchar(256) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL COMMENT 'worker地址列表', - `create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间', - `update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新时间', - PRIMARY KEY (`id`) USING BTREE -) ENGINE = InnoDB AUTO_INCREMENT = 1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; - -ALTER TABLE `t_escheduler_task_instance` -ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' AFTER `task_instance_priority`; - -ALTER TABLE `t_escheduler_command` -ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`; - -ALTER TABLE `t_escheduler_error_command` -ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`; - -ALTER TABLE `t_escheduler_schedules` -ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' NULL AFTER `process_instance_priority`; \ No newline at end of file diff --git a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java index bab755ba67d898890ec0deb94cf5b11808580fc1..68b404015682eb864f258e262ce6fef55aa5ebee 100644 --- a/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java +++ b/escheduler-server/src/main/java/cn/escheduler/server/worker/task/sql/SqlTask.java @@ -305,7 +305,7 @@ public class SqlTask extends AbstractTask { receviersList.add(user.getEmail()); } // custom receiver - String receivers = processDefine.getReceivers(); + String receivers = sqlParameters.getReceivers(); if (StringUtils.isNotEmpty(receivers)){ String[] splits = receivers.split(Constants.COMMA); for (String receiver : splits){ @@ -315,11 +315,8 @@ public class SqlTask extends AbstractTask { // copy list List receviersCcList = new ArrayList(); - - // Custom Copier - String receiversCc = processDefine.getReceiversCc(); - + String receiversCc = sqlParameters.getReceiversCc(); if (StringUtils.isNotEmpty(receiversCc)){ String[] splits = receiversCc.split(Constants.COMMA); for (String receiverCc : splits){ diff --git a/sql/upgrade/1.0.2_schema/mysql/escheduler_ddl.sql b/sql/upgrade/1.0.2_schema/mysql/escheduler_ddl.sql index b89a10480e8980ae7024c2c35e54354f02a6de7a..2e3394fd41353ca9b373af3a84b30b99d1744367 100644 --- a/sql/upgrade/1.0.2_schema/mysql/escheduler_ddl.sql +++ b/sql/upgrade/1.0.2_schema/mysql/escheduler_ddl.sql @@ -18,4 +18,166 @@ d// delimiter ; CALL ac_escheduler_T_t_escheduler_version; -DROP PROCEDURE ac_escheduler_T_t_escheduler_version; \ No newline at end of file +DROP PROCEDURE ac_escheduler_T_t_escheduler_version; + +-- ac_escheduler_T_t_escheduler_user_C_queue +drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_user_C_queue; +delimiter d// +CREATE PROCEDURE ac_escheduler_T_t_escheduler_user_C_queue() + BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_escheduler_user' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME='queue') + THEN + ALTER TABLE t_escheduler_user ADD COLUMN queue varchar(64) COMMENT '队列' AFTER update_time; + END IF; + END; + +d// + +delimiter ; +CALL ac_escheduler_T_t_escheduler_user_C_queue; +DROP PROCEDURE ac_escheduler_T_t_escheduler_user_C_queue; + +-- ac_escheduler_T_t_escheduler_access_token +drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_access_token; +delimiter d// +CREATE PROCEDURE ac_escheduler_T_t_escheduler_access_token() + BEGIN + drop table if exists t_escheduler_access_token; + CREATE TABLE IF NOT EXISTS `t_escheduler_access_token` ( + `id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主键', + `user_id` int(11) DEFAULT NULL COMMENT '用户id', + `token` varchar(64) DEFAULT NULL COMMENT 'token令牌', + `expire_time` datetime DEFAULT NULL COMMENT 'token有效结束时间', + `create_time` datetime DEFAULT NULL COMMENT '创建时间', + `update_time` datetime DEFAULT NULL COMMENT '更新时间', + PRIMARY KEY (`id`) + ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8; + + END; + +d// + +delimiter ; +CALL ac_escheduler_T_t_escheduler_access_token; +DROP PROCEDURE ac_escheduler_T_t_escheduler_access_token; + +-- ac_escheduler_T_t_escheduler_error_command +drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_error_command; +delimiter d// +CREATE PROCEDURE ac_escheduler_T_t_escheduler_error_command() + BEGIN + drop table if exists t_escheduler_error_command; + CREATE TABLE IF NOT EXISTS `t_escheduler_error_command` ( + `id` int(11) NOT NULL COMMENT '主键', + `command_type` tinyint(4) NULL DEFAULT NULL COMMENT '命令类型:0 启动工作流,1 从当前节点开始执行,2 恢复被容错的工作流,3 恢复暂停流程,4 从失败节点开始执行,5 补数,6 调度,7 重跑,8 暂停,9 停止,10 恢复等待线程', + `executor_id` int(11) NULL DEFAULT NULL COMMENT '命令执行者', + `process_definition_id` int(11) NULL DEFAULT NULL COMMENT '流程定义id', + `command_param` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '命令的参数(json格式)', + `task_depend_type` tinyint(4) NULL DEFAULT NULL COMMENT '节点依赖类型', + `failure_strategy` tinyint(4) NULL DEFAULT 0 COMMENT '失败策略:0结束,1继续', + `warning_type` tinyint(4) NULL DEFAULT 0 COMMENT '告警类型', + `warning_group_id` int(11) NULL DEFAULT NULL COMMENT '告警组', + `schedule_time` datetime(0) NULL DEFAULT NULL COMMENT '预期运行时间', + `start_time` datetime(0) NULL DEFAULT NULL COMMENT '开始时间', + `update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新时间', + `dependence` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '依赖字段', + `process_instance_priority` int(11) NULL DEFAULT NULL COMMENT '流程实例优先级:0 Highest,1 High,2 Medium,3 Low,4 Lowest', + `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组', + `message` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '执行信息', + PRIMARY KEY (`id`) USING BTREE + ) ENGINE = InnoDB AUTO_INCREMENT=1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; + + END; + +d// + +delimiter ; +CALL ac_escheduler_T_t_escheduler_error_command; +DROP PROCEDURE ac_escheduler_T_t_escheduler_error_command; + +-- ac_escheduler_T_t_escheduler_worker_group +drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_worker_group; +delimiter d// +CREATE PROCEDURE ac_escheduler_T_t_escheduler_worker_group() + BEGIN + drop table if exists t_escheduler_worker_group; + CREATE TABLE IF NOT EXISTS `t_escheduler_worker_group` ( + `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'id', + `name` varchar(256) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL COMMENT '组名称', + `ip_list` varchar(256) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL COMMENT 'worker地址列表', + `create_time` datetime(0) NULL DEFAULT NULL COMMENT '创建时间', + `update_time` datetime(0) NULL DEFAULT NULL COMMENT '更新时间', + PRIMARY KEY (`id`) USING BTREE + ) ENGINE = InnoDB AUTO_INCREMENT=1 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic; + + END; + +d// + +delimiter ; +CALL ac_escheduler_T_t_escheduler_worker_group; +DROP PROCEDURE ac_escheduler_T_t_escheduler_worker_group; + +-- ac_escheduler_T_t_escheduler_task_instance_C_worker_group_id +drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_task_instance_C_worker_group_id; +delimiter d// +CREATE PROCEDURE ac_escheduler_T_t_escheduler_task_instance_C_worker_group_id() + BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_escheduler_task_instance' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME='worker_group_id') + THEN + ALTER TABLE t_escheduler_task_instance ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' AFTER `task_instance_priority`; + END IF; + END; + +d// + +delimiter ; +CALL ac_escheduler_T_t_escheduler_task_instance_C_worker_group_id; +DROP PROCEDURE ac_escheduler_T_t_escheduler_task_instance_C_worker_group_id; + + +-- ac_escheduler_T_t_escheduler_command_C_worker_group_id +drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_command_C_worker_group_id; +delimiter d// +CREATE PROCEDURE ac_escheduler_T_t_escheduler_command_C_worker_group_id() + BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_escheduler_command' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME='worker_group_id') + THEN + ALTER TABLE t_escheduler_command ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' AFTER `process_instance_priority`; + END IF; + END; + +d// + +delimiter ; +CALL ac_escheduler_T_t_escheduler_command_C_worker_group_id; +DROP PROCEDURE ac_escheduler_T_t_escheduler_command_C_worker_group_id; + +-- ac_escheduler_T_t_escheduler_schedules_C_worker_group_id +drop PROCEDURE if EXISTS ac_escheduler_T_t_escheduler_schedules_C_worker_group_id; +delimiter d// +CREATE PROCEDURE ac_escheduler_T_t_escheduler_schedules_C_worker_group_id() + BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS + WHERE TABLE_NAME='t_escheduler_schedules' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME='worker_group_id') + THEN + ALTER TABLE t_escheduler_schedules ADD COLUMN `worker_group_id` int(11) NULL DEFAULT -1 COMMENT '任务指定运行的worker分组' AFTER `process_instance_priority`; + END IF; + END; + +d// + +delimiter ; +CALL ac_escheduler_T_t_escheduler_schedules_C_worker_group_id; +DROP PROCEDURE ac_escheduler_T_t_escheduler_schedules_C_worker_group_id; \ No newline at end of file